1. Introduction

Recently, I have been studying some message middleware, such as RabbitMQ,ActiveMQ,Kafka, etc. NSQ is a distributed real-time messaging platform based on Go language. It is released based on MIT open source protocol and is an easy-to-use messaging middleware open-source by Bitly Company. Official and third parties have also developed a number of client libraries for NSQ, For example, the official HTTP-based NSQD, Go client Go-NSq, Python client PyNSq, JavaScript client NSQJS based on Node.js, alternative C client libnsq, Java client NSQ-Java and many third-party clients based on various languages are provided Can the library.

1.1 the Features

1). Distributed NSQ provides a Distributed, decentralized, and no single point of failure topology, stable message transmission and release guarantee, with high fault tolerance and HA (high availability) features. NSQ supports horizontal scaling without centralized brokers. The built-in discovery service simplifies adding nodes to a cluster. Both pub-sub and load-balanced message distribution are supported. 3). Ops Friendly NSQ is very easy to configure and deploy, and comes naturally bundled with an administrative interface. Binary packages have no runtime dependencies. Docker Image is official. Official Go and Python libraries are available. Libraries are also available for most languages.

1.2 components

  • Topic: A Topic is a logical key for an application to publish messages, and is created when the application first publishes a message.
  • Channels: Channel is related to consumers and is a load balancer among consumers. In a sense, channel is a “queue”. Every time a publisher sends a message to a topic, the message is copied to all channels to which the consumer is connected, and the consumer reads the message through this particular channel, in effect creating the channel the first time the consumer subscribes. Channels arrange messages, and if no consumer reads the message, the message is first queued in memory and saved to disk when the equivalent is too large.
  • Messages: Messages form the backbone of our data flow, and consumers can choose to end Messages to indicate that they are being processed normally, or re-queue them for later processing. Each message contains the number of delivery attempts, and when messages exceed a certain threshold number of delivery attempts, we should either discard them or process them as additional messages.
  • NSQD: NSQD is a daemon that receives, queues, and delivers messages to clients. It can run independently, but is typically configured by the cluster in which the nsQlookupd instance is located (where it can declare topics and channels for everyone to find).
  • Nsqlookupd: Nsqlookupd is a daemon that manages topology information. The client finds the producers of a given topic by querying nsQlookupd, and the NSQD node broadcasts topic and channel information. There are two interfaces: the TCP interface, which NSQD uses to broadcast. HTTP interface that clients use to discover and manage.
  • Nsqadmin: NSQadmin is a set of WEB UI used to assemble real-time statistics for a cluster and perform different administrative tasks.

Common tools:

  • Nsq_to _file: consumes the specified topic/channel and writes it to a file, optionally scrolling and/or compressing the file.
  • Nsq_to _HTTP: consumes the specified topic/channel and executes HTTP requests (GET/POST) to the specified endpoint.
  • Nsq_to _NSQ: The consumer specifies the topic/channel and republishes the message to the destination NSQD over TCP.

1.3 Topology

NSQ recommends using co-located publishers through their corresponding NSQD instances, meaning that messages are kept locally until they are read by a consumer, even in the face of network partitions. More importantly, publishers don’t have to discover other NSQD nodes; they can always publish messages to local instances.

NSQ architecture diagram

First, a publisher sends a message to its local NSQD. To do this, first open a connection and then send a publish command containing the topic and the message body. In this case, we publish the message to the event topic to spread out among our different workers.

Event topics copy these messages and queue them on the channels of each connected topic, in our case three channels, one of which is the archive channel. The consumer gets these messages and uploads them to S3.

nsqd

Messages for each channel will be queued until a worker consumes them. If the queue exceeds the memory limit, messages will be written to disk. Nsqd nodes will first broadcast their location information to NSQLookup. Once they are registered successfully, the worker will find all Nsqd nodes containing event topics from the NSQLOOKUP server node.

nsqlookupd

Each worker then subscribs to each NSQD host to indicate that the worker is ready to receive messages. We do not need a complete connected graph here, but we must ensure that each individual NSQD instance has enough consumers to consume its messages, otherwise channels will be queued up.

2. Internals

2.1 Messaging guarantee

NSQ guarantees that the message will be delivered at least once, although the message may be duplicated. Consumers should be aware of this and either delete duplicate data or perform actions such as Idempotent. This guarantee is part of the protocol and workflow and works as follows (assuming the client successfully connects and subscribes to a topic) : 1) The client indicates that it is ready to receive the message 2) NSQ sends a message and temporarily stores the data locally (in re-queue or timeout) 3) The client replies with FIN (end) or REQ (re-queue) indicating success or failure, respectively. If the client does not reply, NSQ will automatically re-queue messages at a set time. This ensures that the only possible case of message loss is an abnormal termination of the NSQD process. In this case, any information that is in memory (or any buffer not flushed to disk) will be lost. How to prevent message loss is the most important, even if this unexpected situation can be mitigated. One solution is to make copies of the same parts of the received messages (on different hosts) for redundant NSQD pairs. Because the consumers you implement are idempotent, processing these messages in twice the time has no downstream impact and allows the system to withstand any single node failure without loss of information.

2.2 Simplifying Configuration and management

A single NSQD instance is designed to process multiple data streams simultaneously. Streams are called “topics” and topics have one or more “channels”. Each channel receives a copy of all messages in a topic. In practice, a channel maps to a topic of downstream service consumption. Topics and channels are not pre-configured. Topics are created by first Posting to a named topic or by first subscribing to a named topic. The channel is created by the first subscription to the specified channel. All buffered data for topics and channels is independent of each other, preventing slow consumers from creating backlogs on other channels (also at the topic level). A channel typically has multiple client connections. Assuming that all connected clients are in a state ready to receive messages, each message will be delivered to a random client. Nsqlookupd, which provides a directory service where consumers can find NSQD addresses that provide subscription topics of interest to them. On the configuration side, decouple consumers from producers (they each need only know where to connect to the common instance of NSQlookupd, not the other, respectively), reducing complexity and maintenance. At a lower level, each NSQD has a long-term TCP connection to nsQlookupD that periodically pushes its state. This data is used by NSQlookupd to inform consumers of the NSQD address. To the consumer, an exposed HTTP/LOOKUP interface is used for polling. To introduce a new consumer to the topic, simply start an NSQ client configured with the nsQLookup instance address. There is no need to change the configuration to add any new consumers or producers, greatly reducing overhead and complexity.

2.3 Eliminating single Points of failure

NSQ is designed to be used in a distributed manner. The NSQD client connects (via TCP) to all producer instances of a given topic. There are no middlemen, no message brokers and no single points of failure. This topology eliminates single chain, aggregation, and feedback. Instead, your consumers have direct access to all producers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough consumers connecting to all producers to satisfy a large number of messages, guaranteeing that everything will eventually be processed. With NSQlookupd, high availability is achieved by running multiple instances. They do not communicate directly with each other and data is considered ultimately consistent. Consumers poll all configured NSQlookupd instances and merge Response. Nodes that fail, are inaccessible, or otherwise fail do not bring the system to a standstill.

2.4 efficiency

For data protocols, maximize performance and throughput by pushing data to clients, rather than waiting for clients to pull data. This concept, called RDY state, is basically a form of client flow control.

When a client connects to NSQD and subscribes to a channel, it is placed in an RDY of 0 state. This means that no information has been sent to the client yet. When the client is ready to receive messages sent, update its command RDY status to the number of messages it is ready to process, such as 100. Without any additional instructions, 100 messages will be delivered to the client when they are available (the server decrements the RDY count for that client each time). The client library is designed to reach configured max-in-flight in RDY numbers

25% send a command to update the RDY count (and assign appropriately in case of connections to multiple NSQDS).

efficiency

2.5 Heartbeat And Timeout

NSQ’s TCP protocol is push oriented. After establishing the connection, handshake, and subscription, the consumer is placed in an RDY state of 0. When the consumer is ready to receive a message, it updates the RDY status to the number of messages it is ready to receive. The NSQ client library constantly manages, behind the scenes, the results of message control flow. Every once in a while, NSQD will send a heartbeat line connection. The client can configure the interval between heartbeats, but the NSQD will expect a response before it sends the next heartbeat. Combining application-level heartbeat and RDY states to avoid head blocking and possibly make heartbeats useless (i.e., if the consumer is in the receive buffer for later processing message flows, the operating system will fill up and block heartbeats) to ensure progress, all network IO time caps must be associated with the configured heartbeat interval. This means that you can literally unplug the network connection between NSQD and the consumer, and it will detect and handle errors correctly. When a fatal error is detected, the client connection is forcibly closed. Messages in transit time out and are requeued for delivery to another consumer. Finally, errors are recorded and accumulated to various internal metrics.

2.6 distributed

Because NSQ does not share information between daemons, it was born for distributed operation from the beginning. Individual machines can go down and start up at will without affecting the rest of the system, and message publishers can publish locally, even facing network partitions. This “distributed first” design philosophy meant that NSQ could basically scale forever, requiring higher throughput? Add more NSQDS. The only shared state is stored on the LOOKUP nodes, even if they do not require a global view. Configuring certain NSQDS to register with certain Lookup nodes is a simple configuration. The only key point is that the consumer can retrieve the entire node set from the Lookup node. Clear failure events – NSQ establishes a set of failure trade-offs within components that are clear about the possible causes of failure, which makes sense for both messaging and recovery. While they may not offer the same level of assurance as Kafka systems, the simplicity of NSQ operations makes failure situations very obvious.

2.7 no replication

Unlike other queue components, NSQ does not provide any form of replication or clustering, which is what makes it so easy to run, but it does not provide sufficient guarantees for some highly guaranteed and reliable message publishing. We can partly avoid this by reducing the time it takes to synchronize files, simply by configuring a flag to support our queue through EBS. But there is still a case where a message dies immediately after being published, missing a valid write.

2.8 There is no strict order

While Kafka consists of an ordered log, NSQ does not. Messages can be queued at any time and in any order. In the case we used, this usually doesn’t matter because all the data is time-stamped, but it’s not appropriate for cases where strict order is required.

2.9 No Data Deduplication Function

For the timeout system, NSQ uses the heartbeat detection mechanism to test whether the consumer is alive or dead. There are many reasons why our consumer could not complete the heartbeat detection, so there must be a separate step in the consumer to ensure idempotency.

3. Practice the installation process

This article will omit the specific installation process of NSQ cluster, you can refer to the official website, relatively simple. This section introduces the topology of the author’s experiment and the related information of NSqadmin.

3.1 Topology

topology

Three NSQD services and two LOOKUPD services were used in the experiment. Adopt the official recommended topology, message publishing service and NSQD on one host. There are five machines. NSQ basically does not have configuration files. You can specify parameters through the command line. The main commands are as follows: LOOKUPD command

bin/nsqlookupdCopy the code

NSQD command

Bin/NSQD - lookupd - TCP - address = 172.16.30.254:4160 - broadcast - address = 172.16.30.254Copy the code
Bin/nsqadmin -- - address = 172.16.30.254 lookupd - HTTP: 4161Copy the code

Utility class, stored in a local file after consumption.

Bin /nsq_to_file --topic=newtest --channel=test --output-dir=/ TMP --lookupd-http-address=172.16.30.254:4161Copy the code

Post a message

The curl - d '5' hello world 'http://172.16.30.254:4151/put? topic=test'Copy the code

3.2 nsqadmin

View Streams details, including NSQD nodes, specific channels, number of messages in the queue, number of connections, and so on.

nsqadmin

nsqadmin channel

List all NSQD nodes:

nodes

Message statistics:

msgs

Lookup host list:

hosts

4. To summarize

At its core, NSQ is simple. It’s a simple queue, which means it’s easy to fault reason and easy to find bugs. Consumers can handle failure events without affecting the rest of the system.

In fact, simplicity was the primary factor in our decision to use NSQ, which was easy to maintain with many of our other software, enabled us to perform perfectly by introducing queues, and even allowed us to increase throughput by several orders of magnitude through queues. More and more consumers need a set of strict reliability and sequential guarantees, which have gone beyond the simple functions provided by NSQ.

In combination with our business system, we are relatively sensitive to the invoice messages we need to transmit. We cannot tolerate the breakdown of an NSQD or the unusable disk, and the accumulated messages on this node cannot be retrieved. This is the main reason we didn’t choose this message-oriented middleware. Simplicity and reliability do not seem to be sufficient. Ops is responsible for more operations than Kafka. On the other hand, it provides us with better service by having a reproducible, ordered log. However, for other consumers suitable for NSQ, it has served us quite well, and we are looking forward to consolidating its solid foundation.


Ps: This article was first published on my CSDN blog.

reference

  1. NSQ: Distributed real-time messaging platform
  2. NSQ – NYC Golang Meetup
  3. NSQ Docs