0 x00 the

Starting with this article, we introduce the message queue Kombu through a series (which lays the foundation for subsequent Celery analysis).

Kombu is a message queue abstraction compatible with the AMQP protocol and a library that encapsulates messaging as a unified interface. It is characterized by supporting a variety of APMQ protocol – compliant message queue systems. Support not only native AMQP message queues such as RabbitMQ, Qpid, but also virtual message queues such as Redis, mongodb, Beantalk, CouchDB, in-memory, etc.

In this series, you can learn how Kombu implements AMQP. This article first introduces related concepts and the overall logical architecture.

0x01 AMQP

AMQP is introduced because Kombu is positioned as a message queue abstraction compatible with the AMQP protocol.

AMQP (Advanced Message Queuing Protocol) is a network Protocol used to transmit asynchronous messages between processes.

1.1 Basic Concepts

The basic concepts of AMQP are as follows:

  • Producers and consumers: Producers create messages and publish them to queues on proxy servers, which then send messages to interested recipients. Consumers connect to the proxy server and subscribe to the queue to receive messages.
  • Channel channel: A channel is a virtual connection within a “real” TCP connection. AMQP commands are sent through the channel. Multiple channels can be created on a TCP connection.
    • Some applications require multiple connections to AMQP agents. Enabling multiple TCP connections at the same time is not appropriate because it consumes too many system resources and makes firewall configuration more difficult. AMQP 0-9-1 provides channels to handle multiple connections, which can be understood as multiple lightweight connections that share a TCP connection.
    • In applications involving multiple threads/processes, it is common to open a channel for each thread/process, and these channels cannot be shared by the thread/process.
    • Traffic on one particular channel is completely isolated from traffic on other channels, so each AMQP method needs to carry a channel number so that the client can specify which channel the method is intended for.
  • Queue: a place where messages are stored. Queues are bound to switches by routing keys, and producers send messages to queues through switches. We can say that the application registers a consumer, or subscribes to a queue. A queue can register multiple consumers, or it can register a single exclusive consumer (when the exclusive consumer exists, other consumers are excluded).
  • Exchange and binding: When a producer publishes a message, the message is first sent to Exchange, and the message is sent to the queue according to the Exchange and queue binding rules.
    • A switch is an AMQP entity that sends messages. The switch takes a message and routes it to one or zero queues. Which routing algorithm it uses is determined by switch types and Bindings rules.
    • The switch dispatches received messages to the Queue bound to the switch based on routing rules.
  • Topic, fanout, direct
    • Direct Exchange: A direct switch is a default switch that contains blank strings. When a queue is declared, the direct switch is bound to the default switch and uses the queue name as the routing key.
    • Fanout Exchange: This Exchange broadcasts incoming messages to a bound queue;
    • Topic Exchange: TOPIC switches can send messages to multiple queues using regular expressions for routing keys;

1.2 Working Process

The working process is:

  • Publishers publish messages, via exchanges. Messages are never sent directly to queues, and Producers may not even know that queues exist. A message is sent to a switch. When sending a message to a switch, you need to specify the routing_key attribute of the message.
  • The switch dispatches received messages to the Queue bound to the switch based on routing rules. When the switch receives the message, it either sends it directly to the queue (FANout) or matches the routing_key of the message and the banding_key between the queue and the switch, depending on the switch type. If it matches, the message is sent to the queue.
  • Finally, the AMQP broker delivers the message to consumers who subscribe to the queue, or consumers fetch it themselves as required. Consumers get messages from queues;

The basic picture is as follows:

                  +----------------------------------------------+
                  |                  AMQP Entity                 |
                  |                                              |
                  |                                              |
                  |                                              |
+-----------+     |    +------------+   binding   +---------+    |       +------------+
|           |     |    |            |             |         |    |       |            |
| Publisher | +------> |  Exchange  | +---------> |  Queue  | +--------> |  Consumer  |
|           |     |    |            |             |         |    |       |            |
+-----------+     |    +------------+             +---------+    |       +------------+
                  |                                              |
                  |                                              |
                  +----------------------------------------------+
Copy the code

0x02 Poll series models

Kombu uses the Poll model, so it’s worth mentioning. This is IO multiplexing.

IO multiplexing means that the kernel notifies a process as soon as it finds one or more IO conditions specified by the process ready to read. IO multiplexing is useful for example when a customer is dealing with multiple descriptors (typically interactive input and network sockets).

Compared with multi-process and multi-thread technology, the biggest advantage of I/O multiplexing technology is low system overhead, the system does not need to create processes/threads, and do not need to maintain these processes/threads, thus greatly reducing the system overhead.

2.1 select

Select monitors an array of multiple file descriptors (in Linux everything is a file, block device, socket connection, etc.) through a select() system call.

When select() returns, the ready file descriptors in the array are changed to ready by the kernel so that the process can obtain the file descriptors for subsequent reads and writes. (Select constantly monitors how many file descriptors in a directory of the network interface become ready. A connection creates a ‘file’], and when it becomes ready, select can operate on the file descriptor.

2.2 poll

Poll and SELECT are not much different in nature, but poll has no limit on the maximum number of file descriptors.

A drawback of poll and SELECT is that arrays containing a large number of file descriptors are copied between the user state and the kernel address space as a whole, regardless of whether the file descriptors are ready or not, and their overhead increases linearly with the number of file descriptors.

Select () and poll() tell the process ready file descriptors, and if the process does not IO on them, the next call to select() and poll() will report those file descriptors again, so they generally do not lose ready messages. This is called Level Triggered.

2.3 epoll

Epoll is supported directly by the kernel and can be both horizontal and Edge Triggered (Edge Triggered, which tells the process what file descriptor has just become ready, it says it once, and it won’t tell it again if we don’t take action, this is called Edge Triggered). In theory, edge triggering has higher performance.

Epoll also tells you only about ready file descriptors, and when we call epoll_wait() to get ready file descriptors, instead of the actual descriptor, we return a value that represents the number of ready file descriptors. You just need to fetch the corresponding number of file descriptors from an array specified by epoll. Memory mapping (MMAP) techniques are also used, which completely eliminates the overhead of copying these file descriptors during system calls.

Another essential improvement is ePoll’s use of event-based ready notification. In select/poll, the kernel scans all monitored file descriptors only after a certain method is called. Epoll registers a file descriptor through epoll_ctl(). Once a file descriptor is ready, the kernel uses a callback mechanism similar to callback. Activate this file descriptor quickly to be notified when the process calls epoll_wait().

2.4 Popular Understanding

2.4.1 Blocking I/O Mode

In blocking I/O mode, the kernel processes I/O events by blocking or waking up. A thread can process only one STREAM of I/O events. If you want to process multiple streams at the same time, you can either fork or multithread (pthread_CREATE), neither of which is efficient, unfortunately.

2.4.2 Non-blocking mode

Non-blocking busy polling I/O can process multiple streams at the same time. We just have to keep going through all the streams and start all over again. This allows you to process multiple streams, which is obviously not a good idea because if all streams have no data, you’re wasting CPU.

2.4.2.1 Proxy mode

In non-blocking mode, I/O events can be handed over to other objects (SELECT and epoll) or even ignored.

To avoid CPU idling, you can introduce an agent (first one called SELECT, then another called Poll, but the essence is the same). This agent can observe I/O events for many streams at the same time. When idle, it blocks the current thread. When one or more streams have I/O events, it wakes up from the blocking state and our program polls all streams (so we can remove the word “busy”). The code looks like this:

 while true {  
       select(streams[])  
       for i in streams[] {  
             if i has data  
             read until unavailable  
        }  
 }  
Copy the code

Thus, if no I/O event is generated, our program will block at select. The problem is that we only know from select that an I/O event occurred, but we don’t know which streams it was (there could be one, more, or even all of them). We have to poll all streams indiscriminately to find the ones that can read or write data, and then operate on them.

2.4.2.2 epoll

Epoll can be understood as an event poll. Unlike busy polling and undifferentiated polling, epoll only informs us of which I/O events have occurred in which flow. At this point our operations on these streams all make sense (complexity reduced to O(1)).

Epoll server implementation principle is similar to select server, through some way to check whether the socket can send and receive data, etc. But epoll is more efficient and has no upper limit.

The test in SELECT and poll is a passive polling test, while the test in epoll is an active event notification test, that is, when a socket meets the test requirements, it will be actively notified, so as to operate. This mechanism is naturally more efficient.

Epoll also uses file descriptors, which are essentially numbers.

The main uses of epoll are:

epoll_list = epoll.epoll()
Copy the code

If some socket clients send data while the process is processing code in the while loop, the operating system automatically writes the file descriptors corresponding to these sockets into the list. When the process executes epoll again, it will get the list. At this point, the information in the list indicates which sockets are available to send and receive. Because epoll does not look at the FDS in sequence, but simply takes away the FDS that can be sent and received, it is efficient!

0x03 Kombu Basic Concepts

The original implementation of Kombu was called Carrot, and it was refactored to become Kombu.

3.1 use

The main uses of Kombu are as follows:

  • Celery is the most popular asynchronous message queue framework in Python, supporting RabbitMQ, Redis, ZoopKeeper etc as brokers, while the abstraction of these message queues is implemented via Kombu:
    • Celery starts with support for RabbitMQ, i.e. using the AMQP protocol. As more and more message brokers are supported that do not support THE AMQP protocol, there is a need for something to unify the processing of all the message brokers, or even to “masquerade as AMQ protocol support.”
    • Kombu is compatible with AMQP Transport and non-AMQP TranSports (Redis, Amazon SQS, ZoopKeeper, etc.).
  • OpenStack uses KOMbu to connect to the RabbitMQ server by default. OpenStack uses KOMbu as the client library for message queues rather than the widely used PIKA library for two reasons:
    • Kombu supports both pure AMQP implementations and virtual AMQP implementations as message queue systems such as Redis, mongodb, Beantalk, etc.
    • Kombu can configure the underlying libraries for AMQP connections, such as LibrabbitMQ or PyAMQp. The former is a Python grafted C library implementation, the latter is a pure Python implementation. If you use a pure Python implementation of the AMQP library, you can use Eventlet’s framework to turn the part of the network IO design into a coroutine, improving overall network IO performance. For example, the Eventlet framework is used internally in openstack.

3.2 the term

In Kombu, there are several concepts (similar in part to AMQP) :

  • Message: The main body of sending and consuming, the basic unit of producing and consuming.

  • Connection: abstraction of MQ connections. A Connection corresponds to a Connection to MQ. Connection is the encapsulation of a Connection by AMQP;

  • Channel: Similar to the concept in AMQP, can be understood as multiple lightweight connections sharing a Connection; A Channel is an encapsulation of an AMQP operation on MQ;

  • Transport: Kombu supports flexible configuration of different message-oriented middleware as plug-ins. The use of the term Transport to refer to a concrete message-oriented middleware can be thought of as an abstraction of the broker:

    • Operations on MQ are bound to be connected, but instead of having channels directly send/receive requests using Connection, Kombu introduces a new abstract Transport, which is responsible for specific MQ operations, That is, all Channel operations fall on Transport. Introducing the abstract concept of transport makes it very easy to add transport to non-AMQP later;
    • Transport is a real MQ connection and an instance of a real connection to MQ(Redis/RabbitMQ);
    • Build-in support for Kombu includes Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ, and Pyro.
  • Producers: Abstract classes that send messages;

  • Consumers: Abstract classes that receive messages. The consumer needs to declare a queue, bind the queue to the specified Exchange, and receive messages from the queue.

  • Exchange: MQ routing, which is similar to RabbitMQ and supports 5 types. The message sender sends the message to Exchange, which distributes the message to the queue. Used to route messages that are sent to exchange and exchange to the corresponding queue. Routing compares routing-key (provided by the message) with binding-key (provided by the exchange when the queue is registered). To use it, you need to specify the name and type of exchange (direct, Topic, and FANout).

    The switch forwards the message by matching the routing_key of the message with the Binding_key, which is the binding between the consumer declaration queue and the switch.

  • Queue: The corresponding Queue abstraction stores messages that are about to be consumed by applications. Exchange distributes messages to the Queue and consumers receive messages from the Queue.

  • Routing keys: Each message is declared a routing_key when sent. The meaning of routing_key depends on the type of exchange. In general, there are four default exchange types defined in the AMQP standard, and vendor can also customize the exchange type. The most common types of exchange are:

    • Direct Exchange: If a message’s routing_key is the same as a consumer’s routing_key, the message will be sent to the queue that the consumer is listening on.
    • Fan-out Exchange: broadcast mode. Exchange sends incoming messages to all queues bound to it.
    • Topic Exchange: This type of exchange sends messages to queues that match its Routing_key type. Routing_key consists of a series of “. “*” means to match any word, and “#” means to match zero or more Words, similar to regular expressions.

0x04 Concept Description

4.1 an overview of the

With Redis as the broker, we briefly explain:

  • The objects that send messages are called producers.
  • Connections Establishes a redis connection. Channel is a connection session.
  • Exchange is responsible for exchanging messages, and messages are sent to Exchange through channels, because Exchange binds Queue and routing_key. Messages are forwarded to a Queue in Redis that matches routing_key.
  • The Consumer on the other side of the Queue is always listening to the Queue and calls the callback method to process the message if there is data in the Queue.

4.2 the Connection

A Connection is an abstraction of an MQ Connection, and a Connection corresponds to an MQ Connection. Now it is time to abstract the ‘redis://localhost:6379’ connection.

conn = Connection('redis://localhost:6379')
Copy the code

As you know from the previous discussion, a Connection is a Connection to a broker. As can be seen from the specific code, Connection is closer to a logical concept, and specific functions are delegated to others.

The main Connection member variables are:

  • _connection: kombu. Transport. Redis. Transport types, is used to be responsible for the operation of specific MQ, real meaning to the operation of the Channel can fall on the transport.
  • _transport: Is the abstraction of the broker mentioned above.
  • Cycle: A scheduling policy that interacts with the broker.
  • Failover_strategy: Selects the policy of other hosts when the connection fails.
  • Heartbeat: Used to implement the heartbeat.

The lite version is defined as follows:

class Connection:
    """A connection to the broker"""

    port = None

    _connection = None
    _default_channel = None
    _transport = None

    #: Iterator returning the next broker URL to try in the event
    #: of connection failure (initialized by :attr:`failover_strategy`).
    cycle = None

    #: Additional transport specific options,
    #: passed on to the transport instance.
    transport_options = None

    #: Strategy used to select new hosts when reconnecting after connection
    #: failure. One of "round-robin", "shuffle" or any custom iterator
    #: constantly yielding new URLs to try.
    failover_strategy = 'round-robin'

    #: Heartbeat value, currently only supported by the py-amqp transport.
    heartbeat = None

    failover_strategies = failover_strategies
Copy the code

4.3 the Channel

Channel: Similar to the concept in AMQP, can be understood as multiple lightweight connections sharing a Connection. It’s a real connection.

  • Connection is the encapsulation of a Connection by AMQP;
  • A Channel is an encapsulation of an AMQP operation on MQ;

A Channel can be thought of as an encapsulation of redis operations and connections. Each Channel can set up a connection with Redis on which redis can be operated. Each connection has a socket, and each socket has a file from which it can poll.

This will define

The simplified version is defined as follows:

class Channel(virtual.Channel) :
    """Redis Channel."""

    QoS = QoS

    _client = None
    _subclient = None
    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
    keyprefix_fanout = '/{db}.'
    sep = '\x06\x16'
    _fanout_queues = {}
    unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
    unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
    unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
    unacked_mutex_expire = 300  # 5 minutes
    unacked_restore_limit = None
    visibility_timeout = 3600   # 1 hour
    max_connections = 10
    queue_order_strategy = 'round_robin'

    _async_pool = None
    _pool = None

    from_transport_options = (
        virtual.Channel.from_transport_options +
        ('sep'.'ack_emulation'.'unacked_key'.'max_connections'.'health_check_interval'.'retry_on_timeout'.'priority_steps')  # <-- do not add comma here!
    )

    connection_class = redis.Connection if redis else None
    
	self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}    
Copy the code

4.3.2 Redis message callback function

Here’s what you need to say about the above member variables

 handlers = {dict: 2} 
  {
    'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0> >,'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>
  }
Copy the code

This is the redis callback when there is a message, that is:

  • When BPROP has a message, call channel. _brpop_read;
  • LISTEN calls channel. _receive when there is a message.

As follows:

+----------------------------------------------------------------------------------------------------------------------- ----------------+ | +--------------+6                       parse_response         |
            |                                +--> | Linux Kernel | +---+                                                                            |
            |                                |    +--------------+     |                                                                            |
            |                                |                         |                                                                            |
            |                                |                         |  event                                                                     |
            |                                |  1                      |                                                                            |
            |                                |                         |  2                                                                         |
            |                                |                         |                                                                            |
    +-------+---+    socket                  +                         |                                                                            |
    |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |
    |           |                                   |           +------+--------+                                                                   |
    |           |    socket                         |           |  Hub          |                                                                   |
    |           | <------------> port +-->  fd +--->----------> |               |                                                                   |
    | port=6379 |                                   |           |               |                                                                   |
    |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
    |           | <------------> port +-->  fd +--->+           |               |                     +                                             |
    +-----------+                                               +---------------+                     |                                             |
                                                                                                      |                                             |
                                                        3| | +----------------------------------------------------------------------------------------+ | | v | _receive_callback  |5    +-------------+                      +-----------+
+------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
|       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
|                   |                     |                         |      |                                                                                           8                |
|                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
|           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
|                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
|  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9
|          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
+-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
           |                                                                                                                                         |                                  v
           |                                                7_callback | +----------------------------------------------------------------------------------------------------------------------- ------------------+ User FunctionCopy the code

Mobile phone as shown below:

4.4 Transport

Transport: a real MQ connection, an instance of a real connection to MQ(redis/ RabbitMQ). Is the entity that stores and sends messages, and is used to distinguish whether the underlying message queue is implemented using AMQP, Redis, or some other implementation.

Let’s follow the logic:

  • Connection is the encapsulation of a Connection by AMQP;
  • A Channel is an encapsulation of an AMQP operation on MQ;
  • The relationship between the two is that operations on MQ cannot be done without connections, but Kombu does not directly make channels use connections to send/receive requests. Instead, it introduces a new abstract Transport, which is responsible for specific MQ operations. That is, all operations of a Channel are performed on Transport;

In the Kombu architecture, transport abstracts all brokers to provide a consistent solution for different brokers. With Kombu, developers have the flexibility to choose or change brokers based on their needs.

Transport takes care of the details, but much of the work is handed over to loop and MultiChannelPoller.

Its main member variables are:

  • The driver type of this transport, name;
  • Corresponding Channel;
  • Cycle: MultiChannelPoller, more on this later;

The key is the MultiChannelPoller. A Connection has a Transport, a Transport has a MultiChannelPoller, poll operations are done by the MultiChannelPoller, redis operations are done by the channel.

The definition is as follows:

class Transport(virtual.Transport) :
    """Redis Transport."""

    Channel = Channel

    polling_interval = None  # disable sleep between unsuccessful polls.
    default_port = DEFAULT_PORT
    driver_type = 'redis'
    driver_name = 'redis'

    implements = virtual.Transport.implements.extend(
        asynchronous=True,
        exchange_type=frozenset(['direct'.'topic'.'fanout']))def __init__(self, *args, **kwargs) :
        super().__init__(*args, **kwargs)
        # All channels share the same poller.
        self.cycle = MultiChannelPoller()

Copy the code

4.5 MultiChannelPoller

The MultiChannelPoller is an execution engine.

  • Collect channel;
  • Socks FD to channel mapping
  • Create channel to SOCKS FD mapping;
  • Using poll;

A MultiChannelPoller is a MultiChannelPoller.

  • If a socket corresponding to a Channel is associated with a poll, a poll operation can be performed on a Linux system because a socket is a file.
  • Add the fd corresponding to the poll to the MultiChannelPoller so that the MultiChannelPoller can get throughChannel - > socket - > poll - > fd - > read redisIf data is coming to Redis, the MultiChannelPoller is notified immediately by poll and goes to Redis to read it.

Specific definitions are as follows:

class MultiChannelPoller:
    """Async I/O poller for Redis transport."""

    eventflags = READ | ERR

    def __init__(self) :
        # active channels
        self._channels = set(a)# file descriptor -> channel map.
        self._fd_to_chan = {}
        # channel -> socket map
        self._chan_to_sock = {}
        # poll implementation (epoll/kqueue/select)
        self.poller = poll()
        # one-shot callbacks called after reading from socket.
        self.after_read = set(a)Copy the code

4.6 Consumer

The Consumer is the message receiver. The functions of Consumer & related components are as follows:

  • Exchange: MQ route where message senders send messages to Exchange, which distributes messages to queues.
  • Queue: The corresponding Queue abstraction stores messages that are about to be consumed by applications. Exchange distributes messages to queues from which consumers receive messages.
  • Consumers are abstract classes that receive messages. Consumers need to declare a queue, bind it to a specified Exchange, and then receive messages from the queue. That is, from the user’s perspective, knowing an Exchange can read messages from it, and that message is read from a queue.

In the concrete Consumer implementation, it associates a queue with a channel. A queue has a channel that accesses redis, and an Exchange that knows which redis key to access.

Consumer messages are consumed through a Queue, which is then passed on to a Channel.

So the logic on the server side is roughly as follows:

  1. Establish a connection;
  2. Create Exchange;
  3. Create Queue and bind Exchange to Queue with name routing_key;
  4. Create a Consumer on Queue listener;

Consumer is defined as follows:

class Consumer:
    """Message consumer. Arguments: channel (kombu.Connection, ChannelT): see :attr:`channel`. queues (Sequence[kombu.Queue]): see :attr:`queues`. no_ack (bool): see :attr:`no_ack`. auto_declare (bool): see :attr:`auto_declare` callbacks (Sequence[Callable]): see :attr:`callbacks`. on_message (Callable): See :attr:`on_message` on_decode_error (Callable): see :attr:`on_decode_error`. prefetch_count (int): see :attr:`prefetch_count`. """

    ContentDisallowed = ContentDisallowed

    #: The connection/channel to use for this consumer.
    channel = None

    #: A single :class:`~kombu.Queue`, or a list of queues to
    #: consume from.
    queues = None

    #: Flag for automatic message acknowledgment.
    no_ack = None

    #: By default all entities will be declared at instantiation, if you
    #: want to handle this manually you can set this to :const:`False`.
    auto_declare = True

    #: List of callbacks called in order when a message is received.
    callbacks = None

    #: Optional function called whenever a message is received.
    on_message = None

    #: Callback called when a message can't be decoded.
    on_decode_error = None

    #: List of accepted content-types.
    accept = None

    #: Initial prefetch count
    prefetch_count = None

    #: Mapping of queues we consume from.
    _queues = None

    _tags = count(1)   # global

Copy the code

The overall logic is as follows:

+----------------------+               +-------------------+
| Consumer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> | | channel +--------------------> | | +-----------------------------------------------------------+ | | | pool | | | +---------> | | <------------------------------------------------------------+ | queues | | | | | | | | +----> | connection +---------------+ | | | | | | | | | | +----------------------+ | | +-------------------+ | | | | | v | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | | | | | | | +-------+ | | | | | | _poller+---------> | poll | v | | +------------------+ |  | +-------+ | | +-------------+ +-------------------+ | +----------------+ | Queue | | | | Exchange | | _chann+l | +----+ | | | | | | | exchange +----------------> | channel | | | | | | | | | +-------------------+ +----------------+Copy the code

The mobile phone is as follows:

Now we know:

4.7 Producer

A Producer is a message sender. In Producer, the main variables are:

  • _channel: channel.
  • Exchange: exchange;
class Producer:
    """Message Producer. Arguments: channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ' '

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method. Disabled by default.
    compression = None

    #: By default, if a defualt exchange is set,
    #: that exchange will be declare when publishing a message.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None

Copy the code

The logic is shown as follows:

+----------------------+               +-------------------+
| Producer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> | | channel +------------------> | | +-----------------------------------------------------------+ | | | pool | | exchange | +---------> | | <------------------------------------------------------------+ | | | | | | | connection | | +----> | connection +---------------+ | | + | | | | | | | +--+-------------------+ | | +-------------------+ | | | | | |  v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+  | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ |  | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+Copy the code

Mobile phone as shown below:

4.8 the Hub

Users can read messages synchronously or, if they don’t want to read them themselves, through the Hub (which builds an asynchronous messaging engine itself).

4.8.1 Own Poller

The Hub is an Eventloop with its own poller.

As mentioned earlier in the MultiChannelPoller section, the MultiChannelPoller sets up its own internal poller. But Transport actually uses the hub poller, not the poller inside the MultiChannelPoller, when registering.

4.8.2 Connection

A Connection registers with a Hub. Each Connection corresponds to one Hub.

hub = Hub()
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
Copy the code

4.8.3 contact

During registration, the Hub configures its own internal poller into Transport. The MultiChannelPoller inside transport can poll the socket corresponding to the hub. poller and Channel. A socket in Linux is a file. You can poll;

Therefore, as mentioned above, the MultiChannelPoller can open a Channel –> socket –> poll –> fd –> read the Redis Channel. The MultiChannelPoller is immediately notified by poll and goes to Redis to read.

def register_with_event_loop(self, loop) :
    self.transport.register_with_event_loop(self.connection, loop)

Copy the code

4.8.4 definition

The Hub is defined as follows:

class Hub:
    """Event loop object. """

    def __init__(self, timer=None) :
        self.timer = timer if timer is not None else Timer()
        self.readers = {}
        self.writers = {}
        self.on_tick = set()
        self.on_close = set()
        self._ready = set()
        self._create_poller()

    @property
    def poller(self) :
        if not self._poller:
            self._create_poller()
        return self._poller

    def _create_poller(self) :
        self._poller = poll()
        self._register_fd = self._poller.register
        self._unregister_fd = self._poller.unregister

    def add(self, fd, callback, flags, args=(), consolidate=False) :
        fd = fileno(fd)
        try:
            self.poller.register(fd, flags)
        except ValueError:
            self._remove_from_loop(fd)
            raise
        else:
            dest = self.readers if flags & READ else self.writers
            if consolidate:
                self.consolidate.add(fd)
                dest[fd] = None
            else:
                dest[fd] = callback, args

    def run_forever(self) :
        self._running = True
        try:
            while 1:
                try:
                    self.run_once()
                except Stop:
                    break
        finally:
            self._running = False

    def run_once(self) :
        try:
            next(self.loop)
        except StopIteration:
            self._loop = None

    def create_loop(self, ...) :
        readers, writers = self.readers, self.writers
        poll = self.poller.poll

        while 1:
                for fd, event in events or ():
                    cb, cbargs = readers[fd]
                    if isinstance(cb, generator):
                        next(cb)
                        cb(*cbargs)
            else:
                # no sockets yet, startup is probably not done.
                sleep(min(poll_timeout, 0.1))
            yield

Copy the code

0 x05 summary

Let’s summarize this article with words and illustrations.

5.1 the logical

  • Message: the body that sends and consumes a Message.

  • Connection is AMQP’s encapsulation abstraction of message queue connections, so the relationship between the two is that operations on MQ cannot be done without connections.

  • A Channel is an ENCAPSULATION of the OPERATION of MQ by AMQP and can be understood as multiple lightweight connections sharing a Connection.

    • ChannelwillConsumerThe label,ConsumerThe queue to be consumed, along with the label mapping to the queue, is logged and waits for the loop to be invoked.
    • Also throughTransportDocument the mapping between the queue and the list of callback functions.
    • Kombu is used for all queues that need to be listened on_active_queuesIt queries until the query is complete or a Queue is available, then retrieves the message and calls the corresponding callback to the Queue.
    • The process of Channel initialization is the process of connection.
  • Rather than having channels use Connection directly to send/receive requests, Kombu introduces a new abstract Transport that takes care of specific MQ operations, meaning that all Channel operations fall on Transport. Based on Transport, the real Redis represented by Channel is associated with poll in Hub.

  • Queue: Message Queue. The carrier of message content. It stores messages that are about to be consumed by applications. Exchange distributes messages to a Queue from which consumers receive them.

  • Exchange: switch. The sender sends messages to the Exchange, which dispatches messages to the Queue.

    • The sending of messages is handed over to Exchange, but Exchange will only send themrouting_keyintoqueueThe name of the queue to which the queue should be sent; The actual sending still has to be channeled,
    • Get the routing_key –> queue from exchange, and then get the queue from routing_key. We know which queue the Consumer and Producer need to exchange messages on.
    • Each different Transport has a corresponding Channel; The producer sends the message to the Exchange, which routes the message to the queue by matching the BindingKey with the RouteKey in the message, and the queue delivers the message to the consumer.
  • Producers: An abstract class that sends messages. A Producer contains many things, including exchanges, routing_key, and channels.

  • Consumers: Abstract classes that receive messages. The consumer needs to declare a queue, bind it to a specified Exchange, and then receive messages from it.

    • A Consumer is bound to a message handler, and each Consumer is initialized to a Channel, which means that a Consumer contains a Queue and is associated with a Connection.
    • Consumer messages are consumed through a Queue, which is then passed on to a Channel, which is then passed on to a Connection.
  • Users can read messages synchronously or, if they don’t want to read them themselves, through the Hub (which builds an asynchronous messaging engine itself).

  • A Hub is an eventloop, a Connection is registered with a Hub, and a Connection corresponds to a Hub. The Hub has its own internal poller configured within the Transport. The MultiChannelPoller inside transport can poll the socket corresponding to the hub. poller and Channel. A socket in Linux is a file. You can poll;

  • The MultiChannelPoller is a Hub of connections and hubs and is responsible for figuring out which channels are available, but they all come from the same Connection.

Figure 5.2

As you can see in the figure,

  • At present, it takes Transport as the center and associates the real Redis represented by Channel with poll in Hub. However, it is not known how to use it.
  • The user uses Connection as the API entry, and Connection gets Transport.
+-------------------+
| Channel           |
|                   |        +-----------------------------------------------------------+
|    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|                   |        +-----------------------------------------------------------+
|                   |
|                   |        +---------------------------------------------------+-+
|    pool  +-------------->  |ConnectionPool<Connection<host=localhost,port=6379> | | | +---------------------------------------------------+-+ | | | | <------------------------------------------------------------+ | | | | connection +---------------+ | | | | | +-------------------+ | | v | +-------------------+ +---+-----------------+ +--------------------+ | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | _channels +--------+ | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | +<----+ poller | +-------------------+ +---------------------+ | | after_read | | | | +------------------+ +--------------+ +--------------------+ | Hub | | | | v | | +-------+-----+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | |  | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+Copy the code

We will describe the Kombu startup process using an example below.

Since this article is a review, you will find that some of the concept explanations will appear in both subsequent articles and reviews.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Registry and EntryPoint for kombu excellent open source projects

What does IO multiplexing mean?

Select summary for IO multiplexing

Kombu message framework

Summary of rabbitMQ fundamentals