In the previous chapters of this series, THE transformation process and several basic features of NSQ were introduced. In this paper, we continue to introduce several advanced features and their application scenarios, which are all important functions extracted from the summary of favorable business scenarios.

NSQ extends the design of message formats

The uplike middleware introduced a message format to support extended content in NSQ by supporting extended message format. The ability of the business side to define additional data outside the message body extends application capabilities to support more scenarios.

Compared with message-oriented middleware such as Kafka, NSQ message formats are simpler in content and quantity. In addition to the basic metadata, the content of a message is the message body. Message metadata mainly includes the timestamp when the message is generated on the server, the number of times the server sends the message, and the message ID. Kafka message format (Record Batch,control record,record) appears in part of the metadata such as compression format (SNappy), NSQ in the process of client connection through IDENTIFY, and part of the metadata, such as CRC, transaction attributes, There is no corresponding implementation in NSQ.

The relatively simple message format makes IT more efficient for NSQ to transmit message content and easier to write an NSQ client. The disadvantage of the simple format is that NSQ messages cannot carry much additional information beyond the message body itself. Existing message formats still need to be modified when transferring data that can be decoupled from business processes, and each business party that needs to transfer extended data needs to reinvent its own business message format due to lack of reusability.

In order to make NSQ support more scenarios, Youzan middleware improves the original NSQ message format, and designs and implements a message format that supports extension.

[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... [x][x][x][x]... | (int64) || || (binary string ) || || || ext string || (binary) | 8-byte || || 16-byte || || || extN-byte || N-byte ------------------------------------------------------------------------------------------------------------------------ -- -- -- -- -- -- -- -- -... nanosecond timestamp ^^ message ID ^ ^ extend data message body (uint16) uint8, uint16 2-byte 1-byte, 2-byte attempts ver of extend, length of extend

You can see that the new message format adds three parts (in green) to the existing message format:

  • Version of Extension Content: The length is 1 byte, which is used to distinguish the type and format of extension content. For example, 0x01 is json extension;
  • Length of extension content: 2 bytes, indicating the length of extension content.
  • Expanded binary string: variable length, an expanded binary byte array;

By introducing the above additional information into the message format, NSQ can provide additional information without modifying the original message format during message transmission. The business side or application framework can support new scenarios and new functions by expanding the message format. Here we describe the use of extended messages in detail using several typical scenarios used in the like service.

Link stamping is a typical scenario in a production environment. The manometer produces a large amount of online manometer data in a short period of time to measure the performance and availability of links on the line. Aiming at the application of message middleware on pressure measurement links, the message middleware can provide the following functions in link pressure measurement scenarios by expanding message design.

When a producer application processes a pressure message, it marks the message as a pressure message in the extended message header. NSQ delivers online messages and pressure test messages uniformly to downstream consumers. The downstream consumers judge whether the message is pressure test flow by checking the pressure test field in the extended message. The application framework decides whether to send the message to the application or intercept the pressure test message according to the content of the extended message header. The advantage of this solution is that there is no need for the application to change the production/consumption configuration of existing NSQ topics. The new VERSION of NSQ enables topics to support expanded message formats by upgrading existing topics. The business side only needs to focus on the processing of the pressure message. The disadvantage of this scheme is that online messages and pressure messages share one topic and are not isolated. The new NSQ uses the backtracking consumption function to “wash out” the pressure messages when the producer’s processing of the pressure messages is wrong, or when the downstream consumption application exceeds the load, the operation of isolating the pressure data is complicated and requires the business side to modify the code.

Another scenario for extending messages is application link isolation. The scenarios are as follows: There are always two types of applications in the QA environment. The first type is the stable version of the application in the QA environment, and the other type is the version of the application for the development/verification of new features in the QA environment. Applications in QA environments are decoupled through NSQ. New message processing logic has been added to consume messages that are not supported in stable QA environments. Before NSQ does not support link isolation, development needs to:

  1. Stop QA stable consumption and start consumption for new feature verification;
  2. Verify new functionality on NSQ;
  3. Stop new feature verification consumption and restore stable QA consumption;
  4. Repeat the above steps until the original QA is replaced;

The new NSQ supports the need for link isolation on the business side by implementing delivery priorities based on extended header content on the NSQ server.

Messages for validation by the new feature will be marked with additional information in the extended message header, and the NSQ server will route the message according to the post information (Tag) in the message header when it is delivered:

  1. When there are no consumers with the same delivery information, the message is uniformly delivered to the consumers in the QA stable environment.
  2. If the same delivery information exists in the consumer as in the message header, the message is delivered to that consumer.
  3. If no delivery information is included in the message delivery, the message is uniformly delivered to consumers in the QA stable environment.

By implementing this rule, the new NSQ enables business parties to implement environmental link isolation. The consumption mode of NSQ messages is that messages are multicast between channels, and clients (consumers) in channels compete for a message.

Similar to the idea of link isolation, the new NSQ supports message filtering within a channel by filtering specified values of message extension headers.

Consumers subscribes to the same channel with the same extended message keyword, when NSQ delivers a message:

  1. If the message content has no identification information or the identification information is empty, only channels with no filter_key or empty filter_key are sent.
  2. A message with a filter identifier is delivered to the consuming channel of the matching filter_key. Channels without filter are also delivered.
  3. If a channel does not match a message, the server considers that the message has been consumed and the channel does not deliver the message.

This functionality is implemented based on message extension headers and can be implemented on the server side, separately on the client side, or jointly by the server and client side. NSQ Migrate Proxy For users who are using the open source VERSION of NSQ, NSQ Migrate Proxy provides the ability to migrate the open source version of NSQ to the preferred version of NSQ. With this tool, topics can be migrated without user awareness. NSQ Migrate Proxy acts as the proxy of open source NSQ and self-developed NSQ during migration. According to changes in migration phase, the NSQ Migrate Proxy will proxy lookup requests to open source NSQ and self-developed NSQ, integrate nsQlookupd results and return them to the client. Using a migration agent requires connecting to clients to implement read and write policies. The migration agent distinguishes between producers and consumers based on read (R) and write (W) parameters.

1. In phase 1, the agent will return to the client the lookup result containing the node information of the two NSQ clusters. The consumer will establish a consumer connection between the two clusters. Production continues to move to open source NSQ.

2. In phase 2, the agent returns only the lookup result of migrating the target cluster to the producer’s lookup request. At this point message production will point to the target NSQ cluster. Consumers continue to maintain dual cluster consumption.

3. After confirming that the messages in the open source NSQ cluster have been consumed, the migration enters the final stage. The proxy returns only the target NSQ node information for the consumer’s lookup request. The connection between consumer and open source NSQ will be broken. At this point, the production and consumption of messages are migrated to the self-developed NSQ cluster. Migration is complete.

Future plans

In order to support more diverse business requirements, Uzan NSQ continues to improve and enrich more new features, including the development of NSQ’s own features, as well as the development of external extension system based on NSQ. In the coming years, we plan to add the following key features to look forward to.

Flow control

At present, a large number of popular topics are deployed in a large cluster, benefiting from Golang’s Goroutine model. Each topic is basically processed independently and has little direct influence on each other. However, some situations with a large amount of data will still have certain influence on other topics. Especially for some topics with very large network traffic, in order to reduce the impact of such topic traffic, we need to limit the upper limit of the traffic of some topics to ensure the stability of the whole cluster. In terms of design, we plan to use the token bucket scheme commonly used in the industry.

Bulk subscription

At present, NSQ still adopts the per-message ACK mode and maintains the compatibility feature. Although the performance meets the current and future business requirements, there is still room for improvement. Especially in some scenarios with high network latency, bulk subscriptions can greatly improve throughput. Bulk subscriptions will allow you to consume one set of messages at a time and ack one set of messages at a time, reducing network overhead.

Enrich security audit functions

The original NSQ has supported some security audit functions, including the use of security links and the use of authentication servers. We will provide independent security verification services for the production of topic and some operations of channel in the future, and do audit logs well to prevent some security problems. In addition, nsqadmin will also get through the internal unified login authentication, targeted to limit some dangerous operations of the business.

Distributed transaction coordinator

The pain point of microservice separation is the consistency between multiple systems, so a unified framework is urgently needed to solve this problem. Distributed transaction coordinator will be an important product based on NSQ, which will make full use of some features of NSQ to solve business pain points.

Consumption filtering based on message content

Although there are preliminary filter based on message extension head support functions, but also some business requirements are customized, need more complex filtering rules, in order to avoid this kind of circumstance affect NSQ core code, we also plan on NSQ build a more complex filtering system to do things and the coupling of the business, Avoid injecting too much business coupling into THE NSQ.

In this paper, the extension support message format introduced by Uzan middleware in NSQ is first shown, and the gameplay of the new message format is shown through three business scenarios. Later sections describe extension tools developed around your own version of NSQ, including agents for migration and extensions that integrate NSQ with Spark and Flume. Finally, the future plan is introduced and some new features in the plan are anticipated. ## References

  • [1]NSQ spark consumer: https://github.com/youzan/spark-nsq-consumer
  • [2]NSQ flume sink: https://github.com/DoraALin/flume-nsq-sink

, click the link directly to the “How we redesigned the NSQ” series all article: https://tech.youzan.com/tag/paas/