In the last article, I first learned about the demo demonstration of NSQ three modules (NSQD, NSQlookupd, NSqadmin). This article starts from the source code, and analyzes the execution process and logical processing of NSQD step by step, learning other people’s excellent project architecture, in order to apply what I have learned.

1. nsqdPerform the entrance

In NSQ /apps/ NSQD /main.go, you can find the execution entry file as follows:

2. nsqdExecute master logic source code

2.1 Elegant background process management through third-party SVC package, svc.run () -> svc.init () -> svc.start (), Start NSQD instance;

func main(a) {
  prg := &program{}
  iferr := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err ! =nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start(a) error {
  opts := nsqd.NewOptions()

  flagSet := nsqdFlagSet(opts)
  flagSet.Parse(os.Args[1:)... }Copy the code


2.2 Initialize the configuration item (OPTS, CFG), load the historical data (nsqd.LoadMetadata), persist the latest data (nsqD. PersistMetadata), and then open the coroutine to enter the Main function nsqd.Main().

options.Resolve(opts, flagSet, cfg)
  nsqd, err := nsqd.New(opts)
  iferr ! =nil {
    logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()
  iferr ! =nil {
    logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata()
  iferr ! =nil {
    logFatal("failed to persist metadata - %s", err)
  }

  go func(a) {
    err := p.nsqd.Main()
    iferr ! =nil {
      p.Stop()
      os.Exit(1)}} ()Copy the code


2.3 Initialize tcpServer, httpServer, and httpsServer, and then monitor queue information (n.kueuescanloop), node information management (n.lookuploop), and statistics information (n.tatsdloop).

tcpServer := &tcpServer{ctx: ctx}
  n.waitGroup.Wrap(func(a) {
    exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
  })
  httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
  n.waitGroup.Wrap(func(a) {
    exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
  })
  ifn.tlsConfig ! =nil&& n.getOpts().HTTPSAddress ! ="" {
    httpsServer := newHTTPServer(ctx, true.true)
    n.waitGroup.Wrap(func(a) {
      exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    })
  }

  n.waitGroup.Wrap(n.queueScanLoop)
  n.waitGroup.Wrap(n.lookupLoop)
  ifn.getOpts().StatsdAddress ! ="" {
    n.waitGroup.Wrap(n.statsdLoop)
  }
Copy the code


2.4 Process TCP/HTTP requests separately and enable the Handler coroutine for concurrent processing. NewHTTPServer registers routes using the Decorate mode (explained later).

Http-class route Distribution:

router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
  s := &httpServer{
    ctx:         ctx,
    tlsEnabled:  tlsEnabled,
    tlsRequired: tlsRequired,
    router:      router,
  }

  router.Handle("GET"."/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET"."/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("POST"."/pub", http_api.Decorate(s.doPUB, http_api.V1))
  router.Handle("POST"."/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
  router.Handle("GET"."/stats", http_api.Decorate(s.doStats, log, http_api.V1))

  // only v1
  router.Handle("POST"."/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST"."/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
Copy the code


TCP – handler to deal with:

for {
    clientConn, err := listener.Accept()
    iferr ! =nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if! strings.Contains(err.Error(),"use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }
Copy the code


2.5 TCP Parses V2 and uses prot.IOLoop(conn) encapsulated by internal protocols for processing.

var prot protocol.Protocol
  switch protocolMagic {
  case " V2":
    prot = &protocolV2{ctx: p.ctx}
  default:
    protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)
  iferr ! =nil {
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }
Copy the code


2.6 P.ec (command execution) and P.end (result sending) are carried out through internal protocols to ensure that each NSQD node can correctly generate and consume messages. Once any error occurs in the above process, it will be captured and processed to ensure the reliability of distributed delivery.

params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)
    iferr ! =nil {
      ctx := ""
      ifparentErr := err.(protocol.ChildErr).Parent(); parentErr ! =nil {
        ctx = "-" + parentErr.Error()
      }
      p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
      ifsendErr ! =nil {
        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // errors of type FatalClientErr should forceably close the connection
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }

    ifresponse ! =nil {
      err = p.Send(client, frameTypeResponse, response)
      iferr ! =nil {
        err = fmt.Errorf("failed to send response - %s", err)
        break}}Copy the code

3. nsqdSummary of flow chart

The above process is summarized as follows:


[Summary] It can be seen from the source code that the code logic is clear and clear, and the use of Go coroutine is efficient and concurrent to process the message production and consumption of distributed multi-node NSQD. There are many details to be carefully analyzed in the next step, so that we can apply what we have learned.