What is the Nsq

A realtime distributed messaging platform
Copy the code

Features:

  • A distributed

NSQ promotes distributed and decentralized topologies without single points of failure, supports fault tolerance and high availability, and provides reliable messaging guarantees

  • extensible

NSQ scales horizontally, without any centralized proxy. Built-in discovery makes it easy to add nodes to a cluster. Support for publish-subscribe and load-balanced messaging. It’s also fast.

  • Friendly operation

NSQ is easy to configure and deploy, and comes with an administration UI. Binaries have no runtime dependencies and we provide pre-compiled versions for Linux, Darwin, FreeBSD, and Windows, as well as official Docker images.

  • The complete

Official Go and Python libraries are available, as well as community-supported libraries for most major languages (see client libraries).

Other features are listed below:

  • Low Latency Push based messaging (performance)
  • Load balancing and multicast style message routing combination
  • Excelled in flow (high throughput) and work-oriented (low throughput) workloads
  • Primarily in memory (messages beyond the high-water mark are transparently held on disk)
  • Runtime discovery service for consumers to lookup producers (NSQlookupd)
  • Transport Layer Security (TLS)
  • Agnostic data format
  • Few dependencies (easy to deploy) and a reasonable, bounded default configuration
  • Simple TCP protocol that supports client libraries in any language
  • HTTP interfaces for statistics, management operations, and producers (no need to publish client libraries)
  • Integration with STATSD for real-time detection
  • Powerful cluster management interface (NSqadmin)

ensure

As with any distributed system, you need to make smart trade-offs to achieve your goals. By transparently understanding the realities of these trade-offs, we want to set expectations for how NSQS will behave when deployed in production.

  • Message not persistent (default)

Although the system supports a –mem-queue-size option, which allows messages to be transparently saved to disk, it is primarily an in-memory messaging platform.

You can set –mem-queue-size to 0 to ensure that all incoming messages are persisted to disk. In this case, if a node fails, your failure surface is reduced

There is no built-in replication. However, this trade-off can be managed in a number of ways, such as deployment topologies and techniques for actively subordinating topics and persisting them to disk in a fault-tolerant manner.

  • The message is delivered at least once

Closely related to the above, this assumes that a given NSQD node will not fail.

This means that messages can be delivered multiple times for a variety of reasons (clients time out, disconnect, requeue, and so on). It is the customer’s responsibility to perform idempotent operations or deduplication.

  • The message received was out of order

You cannot rely on the order in which messages are delivered to consumers.

Similar to messaging semantics, this is the result of requeuing, the combination of in-memory storage and disk storage, and the fact that each NSQD node does not share any facts.

Loose sorting (that is, messages are ordered for a given consumer, rather than the entire cluster) can be easily implemented by introducing a wait time window in your consumer to receive messages and sort them before processing them. To keep this invariant, messages outside the window must be discarded.

  • Consumers will eventually find all theme producers

The discovery service (NSQlookupd) is designed to ultimately be consistent. Nsqlookupd node is not coordinated maintain status or answer queries.

Network partitioning does not affect availability because both sides of the partition can still answer queries. Deployment topologies have the most important role in mitigating these types of problems.

Nsq components

NSQ consists of three daemons:

  • NSQD is the daemon that receives, queues, and delivers messages to the client. It can run independently, but is typically configured in a cluster with an instance of NSQlookupd (in which case it will announce topics and discovery channels). It listens on two TCP ports, one for the client and one for the HTTP API. It can choose to listen on a third port of HTTPS.
  • Nsqlookupd is a daemon that manages topology information and provides the ultimate consistent discovery service. The client queries NSQlookupd to discover NSQD producers for a particular topic, and the NSQD node broadcasts topic and channel information. There are two interfaces: the NSQD TCP interface for broadcasting and the HTTP interface for clients to perform discovery and management operations.
  • Nsqadmin is a Web UI for introspecting the cluster (and performing various administrative tasks) in real time.

The core concept

Switchable viewer and Channels

Themes and channels are the core primitives of NSQ and best illustrate how the system design translates seamlessly into Go functionality. Channels in Go are the natural way to represent queues, so an NSQ topic/channel is like a Go channel with a cache. The buffer size is equal to the –mem-queue-size configuration parameter.

After reading data online, publishing a message to a topic involves:

  • Instantiation of message structure (and allocation of message body [] bytes)
  • Read the lock to get the subject
  • Read the lock to check publishing capability
  • Send to go-chan with buffer

A typical Go Channel receive primitive cannot be relied on to get messages from a topic into its channel. Because multiple Goroutines receiving messages from a Go channel distribute messages, the desired end result is to copy each message to each channel (Goroutine).

Instead, each topic maintains three main Goroutines. The first, called the Router, is responsible for reading the newly published message from the incoming Go-chan and storing it in a queue (memory or disk).

The second, called messagePump, is responsible for copying the message and pushing it to the channel as described above.

The third, responsible for DiskQueue IO, will be discussed later.

Channels are slightly more complex, but share a basic goal of exposing a single input and a single output go-chan (to abstract away the fact that internal messages might be in memory or on disk) :

In addition, each channel maintains two chronological priority queues that are responsible for delay and ongoing message timeouts (as well as two accompanying Goroutines for monitoring).

Parallelism can be improved by managing the data structure for each channel rather than relying on the global timer scheduler at the Go runtime.

Note: Internally, the Go runtime uses a single priority queue and a Goroutine to manage timers. This supports (but is not limited to) the entire Time package. Typically, it does not require a chronological priority queue at the user level, but it is important to remember that it is a single data structure with a single lock that may affect GOMAXPROCS> 1 performance. See Runtime /time.go.

Backend and DiskQueue

One of the design goals of NSQ is to limit the number of messages in memory. It does this by transparently overwriting messages to disk through the DiskQueue(the third main Goroutine of topics or channels).

Since the memory queue is just a Go channel, it’s easy to route the message to memory first, and then write it back to disk if possible:

for msg := range c.incomingMsgChan {
	select {
	case c.memoryMsgChan <- msg:
	default:
		err := WriteMessageToBackend(&msgBuf, msg, c.backend)
		iferr ! = nil { // ... handle errors ... }}}Copy the code

This function can be expressed in a few lines of code using the Go SELECT statement: default above is executed only when memoryMsgChan is full. Nsq also has ephemeral topic/channel to concepts, which discard messages when they overflow (rather than writing them to disk) when they no longer have client subscriptions. This is a good use case for the GO interface, where the struct members of topics and channels are declared as Backend interfaces rather than concrete types. Normal themes and channels use DiskQueue, while temporary themes and channels use stubs in DummyBackendQueue, which implements no-operation Backend.

Design principle

Simplify configuration and management

A single NSQD instance is designed to process multiple data streams at once. Streams are called “topics” and a topic has one or more “channels.” Each channel receives a copy of all messages for a topic. In effect, channels map to downstream services that consume topics.

Themes and channels are not pre-configured. Topics can be created when first used by publishing to a named topic or subscribing to a channel on a named topic. Create a channel when first used by subscribing to the specified channel.

Topics and channels buffer data independently of each other, preventing slow users from creating backlogs for other channels (also at the topic level).

A single channel can often connect to multiple clients. Assuming that all connected clients are in a state ready to receive messages, each message will be delivered to a random client. Such as:

In summary, messages are multicast from a topic -> channel (each channel receives copies of all messages for that topic), but distributed evenly from a channel -> consumer (each consumer receives a portion of the message for that channel).

NSQ also includes a helper application, NSQLookupd, which provides a directory service where consumers can lookup the addresses of NSQD instances that provide subscription topics of interest to them. On the configuration side, this reduces complexity and maintenance by separating consumers from producers (both of whom only need to know where to contact a generic instance of NSQlookupd, but not each other).

At a lower level, each NSQD has a long-term TCP connection to nsQlookupd through which it periodically pushes its status. This data is used to inform NSQLookupd which NSQD addresses will be given to consumers. For the consumer, the HTTP/Lookup endpoint is exposed for polling.

To introduce a new and different consumer of a topic, you simply start an NSQ client that uses the address configuration of the NSQlookupd instance. New consumers or publishers can be added without requiring configuration changes, greatly reducing overhead and complexity.

Note: In future releases, the method used by the heuristic NSQlookupd to return an address may be based on depth, number of connected clients, or other “smart” policies. The current implementation is just that. Ultimately, the goal is to ensure that the depth of all producers read remains close to zero.

It is important to note that the NSQD and NSQlookupd daemons are designed to operate independently, with no communication or coordination between them.

We also think it is important to have a way to view, reflect, and manage summary clusters. We built NSQadmin to do this. It provides a Web UI to browse the subject/channel/consumer hierarchy and examine the depth of each layer and other key statistics. In addition, it supports administrative commands such as deleting and emptying channels (a useful tool when messages in channels can be safely discarded to bring the depth back to zero).

Direct Upgrade Path

This is one of our highest priorities. Our production systems were all based on our existing messaging tools to handle a lot of traffic, so we needed a way to slowly and methodically upgrade specific parts of the infrastructure with little impact.

First, on the message generator side, we build NSQD to match the SimpleQueue. Specifically, NSQD, like SimpleQueue, exposes an HTTP/PUT endpoint to the POST binary (one caveat is that this endpoint takes an additional query parameter to specify a “topic”). Switching to services to start publishing to NSQD requires only a few code changes.

Second, we’ve built libraries in Both Python and Go that match the functionality and idioms we’re used to in our existing libraries. This simplifies the transition for message consumers by limiting code changes to the bootstrap. All business logic remains the same.

Finally, we built utilities that glue old and new components together. These can be found in the examples directory of the repository:

  • Expose http-like pubsub interfaces to topics in NSQ clusters
  • Persistently writes all messages for a given topic to a file
  • An HTTP request that executes all messages in a topic to (multiple) endpoints

To eliminate the SPOT

NSQ is designed to be used in a distributed manner. The NSQD client connects (via TCP) to all instances that provide the specified topic. No middlemen, no message brokers, and no SPOF:

This topology eliminates the need to link individual aggregated feeds. Instead, you consume directly from all the producers. Technically, it doesn’t matter which client is connected to which NSQ, as long as enough clients are connected to all producers and the message volume is satisfied, you can guarantee that all will eventually be processed.

With NSQlookupd, high availability can be achieved by running multiple instances. They do not communicate directly with each other, and the data is ultimately considered consistent. The consumer polls all configured NSQlookupd instances and merges responses. Obsolete nodes, unreachable nodes, or other malfunctioning nodes do not bring the system down.

Messaging guarantee

NSQ guarantees that the message will be delivered at least once, although the message may be repeated. Consumers should expect this and do deduplication or idempotent operations.

This guarantee is enforced as part of the protocol and works as follows (assuming the client has successfully connected and subscribed to the topic) :

  • The client indicates that they are ready to receive the message
  • NSQ sends messages and stores data locally temporarily (in case of requeuing or timeout)
  • If the client replies FIN or REQ, the client succeeds or fails. If the client does not reply to NSQ within the configured interval, the NSQ times out and the message is automatically queued again.)

This ensures that the only extreme case that causes message loss is an abnormal shutdown of the NSQD process. In this case, any messages in memory (or any buffered writes that are not flushed to disk) will be lost.

Even this extreme can be mitigated if preventing message loss is the most important thing. One solution is to start redundant instances of NSQD copies (on separate hosts) to receive the same part of the message. Because you have the consumer idempotent, processing these messages twice has no downstream impact and enables the system to withstand any single-node failure without losing messages.

The conclusion is that NSQ provides building blocks to support a variety of production use cases and configurable levels of persistence.

Limited memory footprint

NSQD provides the configuration option — MEM-queue-size, which determines the number of messages retained in memory for a given queue. If the depth of the queue exceeds this threshold, messages are transparently written to disk. This limits the memory footprint of a given NSQD process to mem-queue-size * #_of_channels_and_topics:

Moreover, an astute observer may have found that by setting this value to a lower value, such as 1 or even 0, this is a convenient way to obtain a higher delivery guarantee.

For the data protocol, we made a key design decision to maximize performance and throughput by pushing data to clients rather than waiting for it to be pulled out. This concept, which we call RDY state, is essentially a form of client-side flow control.

When a client connects to NSQD and subscribes to a channel, it is in RDY state 0. This means that no messages will be sent to the client. When the client is ready to receive the message, it sends a command that updates its RDY status to approximately #, which it is ready to process, for example, 100. If there are no other commands, 100 messages are pushed to the client as they become available (decreasing the server-side RDY count for that client each time).

The client library is designed to send a command to update the RDY count when it reaches 25% of the maximum dynamic setting that can be configured (appropriately allocating connections to multiple NSQD instances).

This is an important performance knob because some downstream systems are able to batch messages more easily and benefit from higher max-in-flight.

It is worth noting that since it is both buffered and push based and meets the need for separate copies of streams (channels), we generated a daemon that behaves like a combination of SimpleQueue and PubSub. This is useful in simplifying the system topology that we traditionally maintain with the old toolchains discussed above.

Go

We made a strategic decision early on to build the NSQ core in Go. We recently introduced our use of Go in a little blog and hinted at the project – it might be helpful to browse the post to see what we think of the language.

With regard to NSQ, the Go channel (not to be confused with NSQ channel) and the language’s built-in concurrency capabilities are well suited for the internal workings of NSQD. We use buffered channels to manage message queues in memory and write overflows seamlessly to disk.

The library makes it easy to write network layer and client code. Built-in memory and CPU performance analysis hooks highlight optimization opportunities and require minimal effort to integrate. We also found it really easy to isolate, test and use mock types of interfaces and iterative build capabilities.

Simple deployment

The following steps will run a small NSQ cluster on your local computer and step through the process of publishing, consuming, and archiving messages to disk.

  • 1. Start in a shellnsqlookupd
$ nsqlookupd
Copy the code
  • 2. Start in another shellnsqd
$NSQD - lookupd - TCP - address = 127.0.0.1:4160Copy the code
  • 3. Start in another shellnsqadmin:
- address = $nsqadmin - lookupd - HTTP 127.0.0.1:4161Copy the code
  • 4. Publish an initialization message (create topic in cluster) :
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub? topic=test'
Copy the code
  • 5. Finally, in another shell, startnsq_to_file:
$ nsq_to_file --topic=test- the output - dir = / TMP - - address = 127.0.0.1 lookupd - HTTP: 4161Copy the code
  • 6. Post more informationnsqdOn:
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub? topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub? topic=test'
Copy the code
  • 7. Validation To verify that things work the way we expect them to, open them in a Web browserhttp://127.0.0.1:4171/To query the NSQadmin UI, and query the static data, while checking/tmpLog files in the directorytest.*.logThe content of the.

Deploy NSQ using Docker

This summary details how to deploy and run NSQ binaries in Docker containers

There is a minimum NSQ image that contains all the NSQ binaries. You can run each binary by specifying the binary as a command when running Docker. The basic format is:

docker run nsqio/nsq /<command>
Copy the code

Please note the/before the command. Such as:

docker run nsqio/nsq /nsq_to_file
Copy the code

Run nsqlookupd

docker pull nsqio/nsq
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
Copy the code

Run NSQD

First, get the IP of the Docker host:

ifconfig | grep addr
Copy the code

Second, run the NSQD container:

docker pull nsqio/nsq
docker run --name nsqd -p 4150:4150 -p 4151:4151 \
    nsqio/nsq /nsqd \
    --broadcast-address=<host> \
    --lookupd-tcp-address=<host>:<port>
Copy the code

Set the –lookupd-tcp-address flag to the IP and TCP port of the host that previously ran NSQlookupd, namely dockerIP:4160:

Special note: do not use 127.0.0.1 here when deploying tests locally

For example, if the host IP address is 172.17.42.1,

Docker run --name NSQD -p 4150:4150-p 4151:4151 \ nsqio/ NSQ/NSQD \ --broadcast-address=172.17.42.1 \ - lookupd - TCP - address = 172.17.42.1:4160Copy the code

Notice that this port uses port 4160, which is the port we exposed when we started the NSQLookupd container (it is also the default port for NSQLookupd).

To use a non-default port, change the -p parameter:

docker run --name nsqlookupd -p 5160:4160 -p 5161:4161 nsqio/nsq /nsqlookupd
Copy the code

This will make NSQlookupd available on Docker host IP on ports 5160 and 5161.

The use of TLS

To use TLS for containerized NSQ binaries, you need to include the certificate file, private key, and root CA file. Docker image has a volume installation on /etc/ssl/certs/ that can be used for this purpose. Load the host directory containing the files into the volume, and then specify the files on the command line as usual:

docker run -p 4150:4150 -p 4151:4151 -p 4152:4152 -v /home/docker/certs:/etc/ssl/certs \
    nsqio/nsq /nsqd \
    --tls-root-ca-file=/etc/ssl/certs/certs.crt \
    --tls-cert=/etc/ssl/certs/cert.pem \
    --tls-key=/etc/ssl/certs/key.pem \
    --tls-required=true \
    --tls-client-auth-policy=require-verify
Copy the code

This loads the certificate from /home/docker-certs into the Docker container for use at run time.

Persist NSQ data

To store NSQD data on host disks, use the /data volume as the data directory, which allows you to load into a data-only Docker container or into a host directory:

docker run nsqio/nsq /nsqd \
    --data-path=/data
Copy the code

Use the docker – compose

To use docker-compose, start NSQD, nsQlookupd and nsqadmin together, then create docker-comemess.yml.

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160"
      - "4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150"
      - "4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"
Copy the code

Run the following command from the same directory as the docker-comemage.yml you created earlier.

docker-compose up -d
Copy the code

A private network will be created and three containers will be started using the private network. On the localhost, each container will have a random port mapped to the port exposed in docker-comemage.yml. View the running container status and mapped ports.

docker-compose ps
Copy the code

View logs in a running container.

docker-compose logs
Copy the code

If nsQlookupd maps host port 31001 to container port 4161, you can use curl to perform a simple ping.

The curl http://127.0.0.1:31001/pingCopy the code

Go instance

consumers

package main
 
import (
	"time"
	"fmt"
	"log"
	
	"github.com/nsqio/go-nsq"
)
 
func main() {
	err := initConsumer("test"."count"."127.0.0.1:4161")
	iferr ! = nil { log.Fatal("init Consumer error")
	}
	err = initConsumer("test"."count2"."127.0.0.1:4161")
	iferr ! = nil { log.Fatal("init Consumer error")
	}
	select {
 
	}
}
 
 
typeNsqHandler struct {nsqConsumer * NSQ.Consumer messagesReceived int} func (nh *nsqHandler)HandleMessage(MSG *nsq.Message) error{ nh.messagesReceived++ fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))
	fmt.Println()
	return nil
}
 
func initConsumer(topic, channel, addr string) error {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 3*time.Second
	c,err := nsq.NewConsumer(topic,channel,cfg)
	iferr ! = nil { log.Println("init Consumer NewConsumer error:",err)
		return err
	}
 
	handler := &nsqHandler{nsqConsumer:c}
	c.AddHandler(handler)
 
	err = c.ConnectToNSQLookupd(addr)
	iferr ! = nil { log.Println("init Consumer ConnectToNSQLookupd error:",err)
		return err
	}
	return nil
}


Copy the code

Producer instance

package main
 
import (
	"fmt"
	"log"
	"bufio"
	"os"
 
	"github.com/nsqio/go-nsq"
)
 
 
func main() {
	strIP1 := "127.0.0.1:4150"
	strIP2 := "127.0.0.1:4152"
 
	producer1,err := initProducer(strIP1)
	iferr ! = nil { log.Fatal("init producer1 error:",err)
	}
	producer2,err := initProducer(strIP2)
	iferr ! = nil { log.Fatal("init producer2 error:",err)} defer producer1.stop () defer producer2.stop ()for {
		fmt.Print("please say:")
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command= ="stop" {
			fmt.Println("stop producer!")
			return
		}
		if count % 2 == 0 {
			err := producer1.public("test1".command)
			iferr ! = nil { log.Fatal("producer1 public error:",err)
			}
		}else {
			err := producer2.public("test2".command)
			iferr ! = nil { log.Fatal("producer2 public error:",err)
			}
		}
 
		count++
	}
}
 
typeNsqProducer struct {* nsq.producer} // Initialize Producer func initProducer(addr string) (*nsqProducer, error) {fmt.println (*nsqProducer, error)"init producer address:",addr)
	producer,err := nsq.NewProducer(addr,nsq.NewConfig())
	iferr ! = nil {return nil,err
	}
	return&nsqProducer{producer},nil} // Publish messages func (NP *nsqProducer)public(Topic,message String) error {err := np.Publish(topic,[]byte(message))iferr ! = nil { log.Println("nsq public error:",err)
		return err
	}
	return nil
}
Copy the code

When consumers consume information