An overview of

NSQ is a real-time distributed messaging platform designed to operate on a large scale, processing billions of messages a day, and is used by many Internet companies;

NSQD is a daemon that receives, queues and delivers messages to clients. It can run on its own, but is typically configured by the cluster in which the nsQlookupd instance is located (where it can declare topics and channels for everyone to find); It listens on two TCP ports, one for the client and the other for the HTTP API; It can also listen for HTTPS on a third port

The module

NSQ consists of NSQD, NSQlookupd, and NSqadmin

nsqlookupd

Nsqlookupd is a daemon that manages topology information. The client finds the producers of a given topic by querying nsQlookupd, and the NSQD node broadcasts topic and channel information, with the following capabilities

  • Uniqueness, in onensqThere’s only one in the servicensqlookupdServices, of course, can also be deployed in multiple clustersnsqlookupdBut there is no correlation between them
  • Decentralization, even thoughnsqlookupdCrash will also not affect the runningnsqdservice
  • Act as ansqdandnaqadminMiddleware for information interaction
  • To provide ahttpQuery the service and update the client regularlynsqdAddress directory of

nsqd

NSQD is a daemon that receives, queues, and delivers messages to clients

  • Pair subscribed to the sametopicAnd the samechannel70% of consumers use load balancing strategies (not polling)
  • As long aschannelExist, even if there is nochannelThe consumers will also be the producersmessageCaches to queues (note expiration processing of messages)
  • In the guaranteed queuemessageWill be consumed at least once, even thoughnsqdExit, which also temporarily stores messages in the queue to disk (except for unexpected circumstances such as terminating a process)
  • Limited memory footprint, configurablensqdIn eachchannelThe queue is cached in memorymessageThe quantity, once exceeded,messageWill be cached to disk
  • topic.channelOnce created, it will always exist, to be timely in the admin console or with code to remove invalidtopicandchannelAvoid the waste of resources

nsqadmin

Is a set of WEB UI that brings together real-time statistics for a cluster and performs various administrative tasks

The official pictures

Source code analysis

This article and the following analysis are based on version 1.0.0 code. For readability purposes, I have put comments outside the function and covered them basically. I will not explain how to use them in this article

nsqlookupd.go

package nsqlookupd

/ / lock
// Configure options
// tcpListener TCP HTTP port listener as mentioned above
// httpListener
// The waitGroup thread synchronizes
/ / database
type NSQLookupd struct {
  sync.RWMutex
  opts         *Options
  tcpListener  net.Listener
  httpListener net.Listener
  waitGroup    util.WaitGroupWrapper
  DB           *RegistrationDB
}

// If no Logger is specified, create a new one
// new NSQLookupd does something with 'NewRegistrationDB'
// Parse log level
func New(opts *Options) *NSQLookupd {
  if opts.Logger == nil {
    opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  }
  n := &NSQLookupd{
    opts: opts,
    DB:   NewRegistrationDB(),
  }

  var err error
  opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
  iferr ! =nil {
    n.logf(LOG_FATAL, "%s", err)
    os.Exit(1)
  }

  n.logf(LOG_INFO, version.String("nsqlookupd"))
  return n
}

// Create context, CTX is NSQLookupd, why not introduce native context struct?
// create tcpListener, where lock is used, indicating concurrency in the scenario
// Create a tcpServer based on CTX
// Create a TCPServer after the waitGroup thread is synchronized
// Repeat the above steps to create the HTTPServer
func (l *NSQLookupd) Main(a) {
  ctx := &Context{l}

  tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
  iferr ! =nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.tcpListener = tcpListener
  l.Unlock()
  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func(a) {
    protocol.TCPServer(tcpListener, tcpServer, l.logf)
  })

  httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
  iferr ! =nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.httpListener = httpListener
  l.Unlock()
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func(a) {
    http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
  })
}

// Obtain the TCP address and continue to lock, indicating that the address may be changed
func (l *NSQLookupd) RealTCPAddr(a) *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.tcpListener.Addr().(*net.TCPAddr)
}

// Get the HTTP address
func (l *NSQLookupd) RealHTTPAddr(a) *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.httpListener.Addr().(*net.TCPAddr)
}

// Close tcpListener and wait for thread synchronization
func (l *NSQLookupd) Exit(a) {
  ifl.tcpListener ! =nil {
    l.tcpListener.Close()
  }

  ifl.httpListener ! =nil {
    l.httpListener.Close()
  }
  l.waitGroup.Wait()Copy the code

If you want to know how to use nsqlookupd_test.go, you can see the test nsqlookupd_test.go 😂. In the above code, we see the db part

registrationdb.go

package nsqlookupd

/ / lock
// Store Producers with a Registration key
type RegistrationDB struct {
  sync.RWMutex
  registrationMap map[Registration]Producers
}

type Registration struct {
  Category string
  Key      string
  SubKey   string
}
type Registrations []Registration

// * Node information *
// Last updated
/ / identifier
/ / address
/ / host name
// Broadcast address
/ / TCP addresses
/ / the HTTP address
/ / version number
type PeerInfo struct {
  lastUpdate       int64
  id               string
  RemoteAddress    string `json:"remote_address"`
  Hostname         string `json:"hostname"`
  BroadcastAddress string `json:"broadcast_address"`
  TCPPort          int    `json:"tcp_port"`
  HTTPPort         int    `json:"http_port"`
  Version          string `json:"version"`
}

// * producer *
// Node information
// Whether to delete
// Delete time
type Producer struct {
  peerInfo     *PeerInfo
  tombstoned   bool
  tombstonedAt time.Time
}

type Producers []*Producer

// Convert to a string
func (p *Producer) String(a) string {
  return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}

/ / delete
func (p *Producer) Tombstone(a) {
  p.tombstoned = true
  p.tombstonedAt = time.Now()
}

// Whether to delete
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
  return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}

/ / create RegistrationDB
func NewRegistrationDB(a) *RegistrationDB {
  return &RegistrationDB{
    registrationMap: make(map[Registration]Producers),
  }
}

// Add a registry key
func (r *RegistrationDB) AddRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  _, ok := r.registrationMap[k]
  if! ok { r.registrationMap[k] = Producers{} } }// Add a producer to the registration
// Take the producers and iterate through,
// If it doesn't exist, add it
// If so, return false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
  r.Lock()
  defer r.Unlock()
  producers := r.registrationMap[k]
  found := false
  for _, producer := range producers {
    if producer.peerInfo.id == p.peerInfo.id {
      found = true}}if found == false {
    r.registrationMap[k] = append(producers, p)
  }
  return! found }// Delete producer from registration based on id
// If it does not exist, return false
// Create a new producer, iterate over the original Producers,
// If the id is different, add it and delete it successfully
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool.int) {
  r.Lock()
  defer r.Unlock()
  producers, ok := r.registrationMap[k]
  if! ok {return false.0
  }
  removed := false
  cleaned := Producers{}
  for _, producer := range producers {
    ifproducer.peerInfo.id ! = id { cleaned =append(cleaned, producer)
    } else {
      removed = true}}// Note: this leaves keys in the DB even if they have empty lists
  r.registrationMap[k] = cleaned
  return removed, len(cleaned)
}

// Delete a registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  delete(r.registrationMap, k)
}

// Need to filter
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
  return key == "*" || subkey == "*"
}

// Look up Registrations by category, key, subkey
// if key == '*' or subkey == '*', only one is found
// Otherwise, the registrationMap is iterated and all registrations that meet the criteria are returned
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
  r.RLock()
  defer r.RUnlock()
  if! r.needFilter(key, subkey) { k := Registration{category, key, subkey}if _, ok := r.registrationMap[k]; ok {
      return Registrations{k}
    }
    return Registrations{}
  }
  results := Registrations{}
  for k := range r.registrationMap {
    if! k.IsMatch(category, key, subkey) {continue
    }
    results = append(results, k)
  }
  return results
}

// Find Producers by category, key, and subkey
// Same as above, there is nothing to say
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
  r.RLock()
  defer r.RUnlock()
  if! r.needFilter(key, subkey) { k := Registration{category, key, subkey}return r.registrationMap[k]
  }

  results := Producers{}
  for k, producers := range r.registrationMap {
    if! k.IsMatch(category, key, subkey) {continue
    }
    for _, producer := range producers {
      found := false
      for _, p := range results {
        if producer.peerInfo.id == p.peerInfo.id {
          found = true}}if found == false {
        results = append(results, producer)
      }
    }
  }
  return results
}

// Look up Registrations by id
// Still traversal nothing to say
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
  r.RLock()
  defer r.RUnlock()
  results := Registrations{}
  for k, producers := range r.registrationMap {
    for _, p := range producers {
      if p.peerInfo.id == id {
        results = append(results, k)
        break}}}return results
}

// Whether it matches
func (k Registration) IsMatch(category string, key string, subkey string) bool {
  ifcategory ! = k.Category {return false
  }
  ifkey ! ="*"&& k.Key ! = key {return false
  }
  ifsubkey ! ="*"&& k.SubKey ! = subkey {return false
  }
  return true
}

/ / filter
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
  output := Registrations{}
  for _, k := range rr {
    if k.IsMatch(category, key, subkey) {
      output = append(output, k)
    }
  }
  return output
}

// keys
func (rr Registrations) Keys(a) []string {
  keys := make([]string.len(rr))
  for i, k := range rr {
    keys[i] = k.Key
  }
  return keys
}

// subkeys
func (rr Registrations) SubKeys(a) []string {
  subkeys := make([]string.len(rr))
  for i, k := range rr {
    subkeys[i] = k.SubKey
  }
  return subkeys
}

// Filter by time
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
  now := time.Now()
  results := Producers{}
  for _, p := range pp {
    cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
    if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
      continue
    }
    results = append(results, p)
  }
  return results
}

// Node information
func (pp Producers) PeerInfo(a)[] *PeerInfo {
  results := []*PeerInfo{}
  for _, p := range pp {
    results = append(results, p.peerInfo)
  }
  return results
}Copy the code

Now, you can see that RegistrationDB contains all node information in a map structure; Name db, in fact, at most count a cache just 2333333; The client queries nsqlookupd to discover the producer of a given topic.

Well, the first chapter is over for the time being, so stay tuned for the rest