We have introduced the related concepts of NSQ as well as the installation and application of NSQ. From the beginning of this article, the implementation details of NSQ will be introduced with the source code.

A single NSQD in NSQ can have multiple topics, and each topic can have multiple channels. A channel receives copies of all the messages in this topic to achieve multicast distribution, and each message on the channel is evenly distributed to its subscribers to achieve load balancing.

The entry function

Let’s first look at the entry function of NSQD:

Func main() {PRG := & Program {} if err := svc.run (PRG, syscall.sigint, syscall.sigterm); // Func main() {PRG := & Program {} if err := svc.run (PRG, syscall.sigint, syscall.sigterm); err ! = nil { logFatal("%s", err) } } func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0])  return os.Chdir(dir) } return nil } func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) ... }Copy the code

Elegant background process management via third-party SVC packages, svc.run () -> svc.init () -> svc.start (), to Start NSQD instances.

The configuration item is initialized

Initialize the configuration item (OPts, CFG), load the historical data (nsqD.loadmetadata), persist the latest data (nsqD.persistmetadata), and then open the coroutine and enter the nsqd.main () Main function.

Resolve(opts, flagSet, CFG) NSQD, err := nsqd.New(opts) if err! = nil { logFatal("failed to instantiate nsqd - %s", err) } p.nsqd = nsqd err = p.nsqd.LoadMetadata() if err ! = nil { logFatal("failed to load metadata - %s", err) } err = p.nsqd.PersistMetadata() if err ! = nil { logFatal("failed to persist metadata - %s", err) } go func() { err := p.nsqd.Main() if err ! = nil { p.Stop() os.Exit(1) } }()Copy the code

Then it is to initialize tcpServer, httpServer, httpsServer, and then monitor queue information (n.kueuescanloop), node information management (n.lookuploop), statistics (n.tatsdloop) output.

NSQD = n.waitgroup.wrap (func() {exitFunc(protocol.tcpserver (n.tcplistener,)) {exitFunc(protocol.tcpserver (n.tcplistener,)); tcpServer, n.logf)) }) httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig ! = nil && n.getOpts().HTTPSAddress ! = "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress ! = "" { n.waitGroup.Wrap(n.statsdLoop) }Copy the code

Handle the request

TCP/HTTP requests are processed separately, and handler coroutines are enabled for concurrent processing. NewHTTPServer registers routes using the Decorate mode (explained below).

/ / in NSQD/HTTP. Go: 44 router: = httprouter. The New () router. HandleMethodNotAllowed = true. The router PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf) s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1)) // v1 negotiate router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1)) router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1)) // only v1 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))Copy the code

Distribute http-pipeline routes

Go :22 for {clientConn, err := listener.accept () if err! = nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if ! strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } go handler.Handle(clientConn) }Copy the code

The above implementation is the main code for TCP-handler processing.

TCP parsing protocol

TCP parses V2 and uses prot.IOLoop(CONN) encapsulated by the internal protocol for processing.

Protocol switch protocolMagic {case "V2": prot = &protocolv2 {CTX: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } err = prot.IOLoop(clientConn) if err ! = nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return }Copy the code

Message generation and consumption

P. ec(command execution) and P. end(result sending) are carried out through internal protocols to ensure that each NSQD node can correctly generate and consume messages. Once there is error in the above process, it will be captured and processed to ensure the reliability of distributed delivery.

// In NSQD /protocol_v2.go:79 params := bytes.split (line, separatorBytes) p.tx. Nsqd. logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte response, err = p.Exec(client, params) if err ! = nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr ! = nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr ! = nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response ! = nil { err = p.Send(client, frameTypeResponse, response) if err ! = nil { err = fmt.Errorf("failed to send response - %s", err) break } }Copy the code

NSQD also enables TCP and HTTP services, both of which can be provided to producers and consumers. The HTTP service also provides NSQadmin with access to NSQD local topic and channel information.

summary

This paper mainly introduces NSQD. In general, the implementation of NSQD is not complicated. NSQD is a daemon that receives, queues, and delivers messages to clients. NSQD can run independently, but is typically configured by the cluster in which the NSQlookupd instance resides.

In the next article, we will look at the details of the implementation of other modules in NSQ.

Recommended reading

NSQ Parsing – An overview of high-performance messaging middleware

High-performance messaging middleware NSQ Parsing – Application practices

In microservice architecture, ELK is used for log collection and unified processing

How do I handle Go error exceptions without a try-catch?

Subscribe to the latest articles, welcome to follow my official account