1. The background

The company’s main development language is Java, and the algorithm department’s main language is Python. Algorithmic applications often need to subscribe to various messages generated by the business system, but the message queue used by the business is open source Qmq. The Qmq native does not provide Python support, so you need to write a Python Qmq Client. Although this article is based on Qmq, most of the other MQ clients follow the same pattern and are of universal significance.

2. Requirements analysis

From the perspective of function, Client can be divided into two modules: Consumer and Producer. Since the algorithm mainly requires subscription messages, the Consumer module is selected to be completed first, followed by the Producer module.

From the perspective of responsibilities, Client can be divided into the following four parts:

  1. network
  2. Serialization and deserialization
  3. The thread of control
  4. Interactive logic

This article starts from the above several parts respectively, explains and analyzes the realization of network, serialization deserialization, thread control, interaction logic four major parts of the need to understand the fundamental knowledge and possible problems and how to solve these problems in the implementation process, so as to help you better realize their own Client.

3. The network

This part explains network processing, mainly involving packet processing, response response callback, network connection management and framework selection.

3.1 Packet Processing

In the packet processing of network programming, we mainly face two core problems:

Determine the response corresponding to resquest

This problem means that the client sends a request to the server and the server replies with a response. But in practice the client sends a lot of requests to the server, and the server sends a lot of responses back to the client. But we need to match the response sent by the client to the request sent by the client. There are two ways to solve this problem:

The first is to serialize sending request and receiving response. The correspondence between request and response depends on the order. Typical methods such as Redis, such as sending redis server get KEY1, GET KEY2, get KEY3, Redis server replies with value of key1, value of key2, value of key3.

The second is mapping by request ID. In this case, a field must be defined in the packet to store the ID of the request. The response packet sent by the server to the request also has an ID field, which is the same as the ID field in the request. This allows us to determine the correspondence between request-response: An ID is generated when a request is sent, and a map maintains the relationship between ID <-> requests. When a response is received, the ID is extracted from the response to obtain the request from the map. Qmq uses the second approach here. There is no absolute superiority or inferiority between the two methods, but the choice is different.

Stick package/half package/unpack

The term sticky packets and half packets is not accurate because TCP is stream-oriented. What does stream-oriented mean? We can use the analogy of a river. The water in a river is endless. It has no clear boundaries. But in upper-level applications where we send a piece of data, there’s always a boundary. For example, in our CASE of MQ, we normally have a message as a unit, the sender sends two messages, and we want to read and fetch two messages from the TCP river

There are several ways to deal with this problem:

  • The message length is fixed and the packet size is fixed. Insufficient space is used to complete the message.
  • Use special delimiters. For example, the FTP protocol and Redis packet protocol use the carriage return newline character as the packet character.
  • Include a field in the header that represents the total length of the message (or the total length of the message body) to solve the sticky/unwrapped problem by parsing the length of the message body in the header with the buffer.

With that general solution in mind, how does Qmq solve these two problems? QMQ uses a message protocol in a custom format for network transport. User-defined packets are classified into header messages and body messages. For different purposes, the encoding and decoding formats of body messages are different. The format of QMQ packets is as follows:

QMQ packet format

The first four bytes are the total length of the packet, and the second two bytes are the header length. (Actually, we can see that the header size of Qmq packet is fixed, and there is no need for an additional header size, which may be left over from the consideration of changing the length at the beginning of the protocol design.) Qmq uses this field to determine the mapping between response and Request, and uses Map to store the request context: Before sending a request, use the packet ID of the request as key. The packet ID is generated using an atomic increment integer. Use the object that can be triggered asynchronously as value. For example, Future in Java, or Future or Deffered in Python. When the response arrives, the ID of the response is parsed and the map is queried. If the key of this ID does not exist, an exception and IO error will be thrown. Otherwise, the value object corresponding to this key is fired.

Also, we can see from the packet protocol that Qmq uses the Totalsize field to split packets for sticky/unpacked packets. When receiving TCP packets, we first determine whether the total length of the current packet in buffer is greater than or equal to four bytes, because the totalsize in our packet occupies four bytes. If it is larger than four bytes, we read the value of totalsize to determine whether the current length is greater than totalsize. If less than, it indicates that the current received content is not enough for a packet (the so-called half packet), then all the current byte content into a buffer, waiting for the next processing. Otherwise, byte fragments of this length are cut out and passed to the upper layer (or in this case, the network packet is encapsulated as a Datagram object).

3.2 Connection Management

The main purpose of connection management is to reuse TCP long connections to avoid re-establishing connections each time packets are sent. In terms of implementation, we can consider saving TCP connections to map. The string of destination IP and port of TCP connections is key. When sending TCP connections, determine whether there is such a connection. Since this map is accessed in multiple threads, thread-safety concerns need to be considered. Another thing to note is that we cache links in a map and pull them out of the map when needed, but we have to use valid links. What is a valid link? Actually we are not directly in the application layer method can judge whether a cache link effectively, the only way is to send a package to see if can response is received, if can receive response is effective, so the application of general use long connection, often sends a heartbeat packet in the link to the effectiveness of the detection link.

Simple implementation of connection management implementation class as follows:

class ClientManager(object):

    def __init__(self):
        self.channels = {}
        self.opaque = AtomicCounter()
        self.channelsLock = RLock()

    def getOrCreateChannel(self, remoteAddr):
        tmpChannel = self.channels.get(remoteAddr)
        if tmpChannel == None:
            with self.channelsLock:
                if self.channels.get(remoteAddr) == None:
                    serverClient = _ServerClient(reactor, HostnameEndpoint, remoteAddr, ClientIdProvider.get(),
                                                 _DEFAULT_RETRY_POLICY)
                    self.channels[remoteAddr] = serverClient
                    serverClient.clientManager = self
                    return serverClient
                else:
                    return self.channels.get(remoteAddr)
        else:
            return tmpChannel

clientManager = ClientManager()
Copy the code

3.3 IO Framework Selection

Sharpening the knife does not mistakenly cut wood work, select a correct IO framework can help us to build a simple, fast and efficient network layer. Therefore, choosing an optimal network layer framework that meets the requirements has become our current goal. Commonly used python network protocol frameworks supporting customization include Tornado, Twisted, Asyncio, etc. The following three indicators are mainly considered in the selection:

  • Ability to fulfill requirements (TCP-oriented stream processing)
  • Documentation is complete, the community is complete, the number of users is large, and it is easy to get help when there is a problem
  • High performance and low resource usage

Since we started with Python 2.7 and asyncio 3.4 support (recommended for Python 3.7), we focused on Tornado and Twsited. Both of them fulfilled our requirements, and the documentation and community were not far apart. The main consideration is performance. Tornado takes less time to process the same number of packets (6% shorter on average) after a simple test of sending and receiving, but the CPU usage is 40% higher than Twisted (this test is simple and rough, and may not be accurate). We chose Twisted as our network layer framework because we didn’t need to consume a lot of system resources for extremely high performance. To implement this, we first need to implement the abstract Protocol class, which manages connections, sending and receiving messages, and so on, and then we need to implement the ClientFactory class, which manages the creation and life cycle of custom Protocol objects.

Serialization and deserialization

Serialization and deserialization mainly work to deserialize the binary network message to the program internal object (or serialize the program internal Request object to the binary network message). For fixed format headers, we can use Python’s struct module to parse them. Such as:

struct.pack(('>hihhiih'),headersize,-557774114,code,version,opaque,flag,requestcode)
Copy the code

For serialization and deserialization of headers, we only need to pay attention to the direction of the binary (big/little).

For body, serialization and deserialization of common primitive types can be done by simply using struct apis. Note the serialization and deserialization of String (STR in Python). You need to pay special attention to encoding when serializing strings. The following is an example:

Def write_short_text(s "" if s is None: return _NULL_SHORT_STRING if not isinstance(s, str): raise TypeError('{! r} is not text'.format(s)) return write_short_bytes(s.encode('utf-8')) def write_short_bytes(b): if b is None: return _NULL_SHORT_STRING if not isinstance(b, bytes): raise TypeError('{! r} is not bytes'.format(b)) elif len(b) > 32767: raise struct.error(len(b)) else: return struct.pack('>h', len(b)) + bCopy the code

5. Thread control

This section focuses on how QMQ Client internal threads are designed. QMQ Client internal threads can be roughly divided into IO threads and business threads. First, we’ll show you how to separate the scope of the IO thread from the business thread. First, make it clear that tasks with uncertain or long execution time should never be put into the IO thread! In Twisted, the reactor thread is the thread where reactor.run() is located, so to make the IO thread separate from the user thread, we need to create a separate thread to execute reactor.run(). One small detail is to declare installSignalHanlders=0 as follows:

def run():
    """
    由于此线程不是用户线程,默认不能接收信号
    """
    reactor.run(installSignalHandlers=0)
_started.inc(1)
_iothread = threading.Thread(target=run, name="IOThread")
_iothread.start()
Copy the code

Second, we meet the execution relay problem of THE IO thread and other threads. How to deliver the request that the business thread needs to send to the IO thread? This is an interthread communication problem, and there are many ways to do it. In Twisted we can use the reactor.callFromThread() method to interact with the I/O thread. Furthermore, how to prevent users from improper use of system resources strain? Or how to gracefully control client traffic? The pull method was originally designed not to block, but to immediately return a Deferred object (similar to a Future object), but this caused a problem if a user called the method in a multi-threaded loop, resulting in abnormal client traffic and increased resource usage. After TL guidance, this interface is designed to block, so that the flow can be controlled adaptively (slow business code processing will be slow, fast business code processing will be fast) to enhance the robustness of the program.

Finally, how do you reduce thread-safe class bugs in multithreading? Thread safety issues are difficult to handle, error-prone, and difficult to troubleshoot. So we should consider minimizing thread-safety considerations while meeting our requirements. Since the user thread can inevitably be called by multiple threads, we can consider using a queue that maintains an internal thread listening to the queue, and each user call is converted to an element in the queue, so that we can convert the user thread to internal single thread execution. Since internal execution is single-threaded, there is no need to worry about thread safety.

6. Interactive logic

With this complex puzzle behind us, we now assemble the different pieces together to complete the Consumer Client for Qmq.

This part mainly covers the following parts:

  • Interactive logic analysis

  • Consumer interaction logic

    • Pull Pull logic
    • Listener Pull logic
    • Ack logic

6.1 Interactive Logic Analysis

There are several roles involved in the interaction logic of Qmq consumption: Metaserver, Server, and Consumer. The Metaserver is the control center for the entire interaction, and the Server actually stores and forwards messages. A Consumer makes two requests when consuming a message:

  • 1. Initiate a request to Metaserver to obtain the address set of servers. Metaserver records which topic messages fall on which servers
  • 2. Send a pull message request to the server address set obtained in Step 1

6.2 Consumer Interaction Logic

Consumer consumption messages are generally divided into pull mode and push mode from the perspective of the user’s API.

Pull mode

In pull mode, the Consumer actively invokes the Consumer’s pull API to pull a message. The Consumer starts a loop in which the message is pulled -> processed -> next… The cycle repeats.

The logic in Pull mode is shown as follows:

  • Al-qeada etOrCreatePullConsumer () to obtain a pullConsumer instance, and here will run method submitted to the thread pool in pullConsumer run.
  • P. ull () method creates a future object into pullConsumer requestQueue queue, and the future. The get () as the results returned
  • C. Pullconsumer’s run method is a while loop that continually tries to fetch elements from the blocking queue requestQueue and then initiates the request

Push mode

Push mode refers to the way that the Consumer invokes the Consumer API to add a listener to the consumption message. In this mode, the Consumer has the initiative to trigger the listener after pulling the message. The listener runs the business system code, which is generally executed in an independent thread pool. Although the usage level is divided into pull mode and push mode, but the underlying implementation is consistent, the Consumer initiates the pull message request to the Server.

The top-level relationship design of Listener is shown below:

PullRegister each listener is registered by the PullRegister. The PullRegister object is the global unique object of the client and has a pullEntryMap attribute. Depending on the subject (topic) the listener is listening to, Create different ParallelPullEntry objects, each of which has multiple DefaultPullentries, each of which is responsible for pulling a subject on a server. Because a subject may correspond to multiple servers on different machines, each DefaultPullEntry object is responsible for pulling the subject on one machine.

Ack logic

When the message is successfully processed, the Consumer needs an ACK message to indicate that the message has been successfully consumed and no further consumption is required. To improve efficiency, Qmq does not record the consumption status of each message, but uses an offset. For example, if a Consumer pulls 1-100 messages and the first 50 purchases are successful, it only needs to record a number 50 to indicate that the first 50 purchases are successful. However, message consumption is not sequential. For example, if the first 50 messages, the 30th message has not been successfully consumed, and 31-50 messages have been successfully consumed, we can only ACK 1-29. Therefore, the ACK mechanism is similar to a sliding window. When the user calls the ACK method to mark a message consumption success, we try to move the window to the right. If the message consumption success is not consecutive, the window cannot move to the right.

7. To summarize

From background requirements to networking, serialization and deserialization, thread control, interaction logic, etc., we have covered the basics of implementing an MQ Client, described the considerations to implement a Client, and simple knowledge development.

The author

As a junior intern in The Infrastructure group of Convenience Bee, in a short period of time, I independently analyzed the requirements and got familiar with the code of Qmq Java Client, completed the scheme design and implementation of Python well, and finally summarized some basic matters needing attention in network programming.

If you are interested in related technologies and are committed to improving r&d efficiency, welcome to join us.

  • Email address: [email protected]
  • Email title: Basic Components Department of Production and Research Platform

Recruitment website

Bianlifeng. Gllue. Me/portal/home…

Click “Read the article” to find out more about the job