The previous article introduced the code logic and flow chart of NSQD. This article will analyze another big module in NSQ, NSQLookupd, which is responsible for maintaining the topology information of NSQD nodes and realizing the decentralized service registration and discovery.

1. nsqlookupdPerform the entrance

In NSQ/apps/nsqlookupd/main. Go execute the entry file can be found, as follows:


2. nsqlookupdExecute master logic

The main process is similar to the NSQD execution logic mentioned in the previous chapter, but the specific tasks are different.

2.1 Elegant background process management through third-party SVC package, svc.run () -> svc.init () -> svc.start (), Start nsqlookupd instance;

func main(a) {
  prg := &program{}
  iferr := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err ! =nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start(a) error {
  opts := nsqlookupd.NewOptions()

  flagSet := nsqlookupdFlagSet(opts)
  ...
}
Copy the code


2.2 Initial configuration parameters (priority: flagSet- command line parameter > cfg-profile > opts- default), start coroutine, enter nsqlookupd.main () Main function;

options.Resolve(opts, flagSet, cfg)
  nsqlookupd, err := nsqlookupd.New(opts)
  iferr ! =nil {
    logFatal("failed to instantiate nsqlookupd", err)
  }
  p.nsqlookupd = nsqlookupd

  go func(a) {
    err := p.nsqlookupd.Main()
    iferr ! =nil {
      p.Stop()
      os.Exit(1)}} ()Copy the code


2.3 Enable Goroutine to execute tcpServer and httpServer to listen NSQD and NSQadmin client requests respectively;

func (l *NSQLookupd) Main(a) error {
  ctx := &Context{l}

  exitCh := make(chan error)
  var once sync.Once
  exitFunc := func(err error) {
    once.Do(func(a) {
      iferr ! =nil {
        l.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
  }

  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func(a) {
    exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
  })
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func(a) {
    exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  })

  err := <-exitCh
  return err
}
Copy the code


2.4 TCPServer listens to client requests repeatedly, establishes a long connection for communication, and enables handler to process conn on each client.

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  logf(lg.INFO, "TCP: listening on %s", listener.Addr())

  for {
    clientConn, err := listener.Accept()
    iferr ! =nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if! strings.Contains(err.Error(),"use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

  logf(lg.INFO, "TCP: closing %s", listener.Addr())

  return nil
}
Copy the code


2.5 httpServer uses the http_api.Decorate HTTP routes with handlers, such as adding logs and outputting V1 protocol versions in a unified format.

func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  router.Handle("GET"."/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET"."/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET"."/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET"."/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET"."/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET"."/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}
Copy the code


2.6 TCP Parses V1 and circulates client commands using prot.IOLoop(conn) encapsulated by internal protocols. The connection is closed until all client commands are parsed.

var prot protocol.Protocol
  switch protocolMagic {
  case " V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)
Copy the code


2.7 Perform P. ec(execute command) and P. sendresponse (return result) through internal protocols to ensure that each NSQD node can register and unregister services correctly and perform heartbeat detection (ping) for node availability. Ensure that the list of NSQD nodes fetched by the client is up to date.

for {
    line, err = reader.ReadString('\n')
    iferr ! =nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, "")

    var response []byte
    response, err = p.Exec(client, reader, params)
    iferr ! =nil {
      ctx := ""
      ifparentErr := err.(protocol.ChildErr).Parent(); parentErr ! =nil {
        ctx = "-" + parentErr.Error()
      }
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      ifsendErr ! =nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }
      continue
    }

    ifresponse ! =nil {
      _, err = protocol.SendResponse(client, response)
      iferr ! =nil {
        break
      }
    }
  }

  conn.Close()
Copy the code


3. nsqlookupdSummary of flow chart

The above process is summarized as follows:


【 Summary 】 By reading and analyzing the source code, it can be seen that the role of NSQlookupd is to manage the authentication, registration, deregistration, heartbeat detection of NSQD nodes, and dynamically maintain the latest available NSQD node list in the distributed cluster for clients to access;

Source code used a lot of RWMutex read and write lock, interface protocol public interface, Goroutine /channel coroutine concurrent communication, thus ensuring high availability, high throughput application ability.