preface

RabbitMQ :amqp-client:4.8.3 RabbitMQ :amqp-client:4.8.3 RabbitMQ :amqp-client:4.8.3 RabbitMQ :amqp-client:4.8.3 RabbitMQ :amqp-client:4.8.3

Warning: this is for those who have some knowledge of RabbitMQ

  • Channels

    www.rabbitmq.com/channels.ht…

RabbitMQ client Demo

Java Client Connecting to RabbitMQ Demo RabbitMQ Channel

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();
//
Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
Copy the code

AMQP protocol interaction — Channel

You can see that Channel is simply called Channel = connection.createchannel (); The client sends channel. Open, and the broker returns channel. OpenOk (the client creates the Channel; Broker received and created channel completed)

The RabbitMQ client cache mode is Channel

The RabbitMQ client cache mode is Channel: a Connection corresponds to multiple channels (2048 by default, one of which is special channel0).

  • Connection: used for AMQP protocol parsing and channel multiplexing
  • Channel: routing, security, and coordination
  • Queue: In-memory messages with a permanent Queue index (there is also an exchange between channels and queues, which is not explained here)

Channel source code analysis

The relationship between Channel Connection, Connection and Channel in the AMQP protocol interaction process is briefly introduced above

This article introduces channels and the source code related to Connection, starting with connection.createchannel

/** Public API - {@inheritDoc} * /
    @Override
    public Channel createChannel(a) throws IOException {
        // Verify that connection is open
        ensureIsOpen();
      	/ / channel management
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        // Create channel core method
        Channel channel = cm.createChannel(this);
        // For exposure indicators
        metricsCollector.newChannel(channel);
        return channel;
    }
Copy the code

As you can see, channels are called and managed by connection:

  • ensureIsOpen()— Confirm that connection is openshutdownCauseIf connection is closed,shutdownCauseWith instructions for closing)
  • channelManagerTuneOk (connection, TuneOk, TuneOk, TuneOk, TuneOk, TuneOk) The default ChannelMax is 2047 (2048-1, which corresponds to the special ChannelMaxchannel0 )

Focus on the channelManager. CreateChannel (this) logic

public ChannelN createChannel(AMQConnection connection) throws IOException {
        ChannelN ch;
  			// The monitor monitors _channelMap and channelNumberAllocator
        synchronized (this.monitor) {
          	// Get the number assigned by the channel
            int channelNumber = channelNumberAllocator.allocate();
            if (channelNumber == -1) {
                return null;
            } else {
                // Add a new channelch = addNewChannel(connection, channelNumber); }}// Open the new channel
        ch.open(); // now that it's been safely added
        return ch;
    }
Copy the code

The channelManager manages channel creation, connection release, and so on:

  • synchronized (this.monitor)Get channelManager firstmonitorLock to prevent concurrent multithreading operations
  • channelNumberAllocator.allocate— Gets unallocated items in the rangechannelNumberTo return to- 1It is considered that new channels cannot be redistributed, and the main internal logic is determined byBitSetRealized (interested can understand)

The subsequent analysis focuses on addNewChannel and open logic

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
  			/ / to heavy
        if (_channelMap.containsKey(channelNumber)) {
            // That number's already allocated! Can't do it
            // This should never happen unless something has gone
            // badly wrong with our implementation.
            throw new IllegalStateException("We have attempted to "
                    + "create a channel with a number that is already in "
                    + "use. This should never happen. "
                    + "Please report this as a bug.");
        }
  			/ / build
        ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
  			// Add _channelMap for unified management
        _channelMap.put(ch.getChannelNumber(), ch);
        return ch;
    }


public ChannelN(AMQConnection connection, int channelNumber,
        ConsumerWorkService workService, MetricsCollector metricsCollector) {
        // AMQChannel constructor
        super(connection, channelNumber);
        // Build the consumption allocator
        this.dispatcher = new ConsumerDispatcher(connection, this, workService);
        this.metricsCollector = metricsCollector;
    }
Copy the code

InstantiateChannel builds and instantiates channels, including connections, channel numbers, timeouts, dispatchers, etc. Each channel has a dispatcher. However, the connection and thread pool are shared with the same connection

Finally get the newly created channel and open ch.open().

public void open(a) throws IOException {
        // Send channel. Open to rabbitmq broker and wait for the broker to return channel. OpenOk
        exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
    }
    

public AMQCommand exnWrappingRpc(Method m)
        throws IOException
    {
        try {
        		// Make an RPC call against this method
            return privateRpc(m);
        } catch (AlreadyClosedException ace) {
            // Do not wrap it since it means that connection/channel
            // was closed in some action in the past
            throw ace;
        } catch (ShutdownSignalException ex) {
            throwwrap(ex); }}private AMQCommand privateRpc(Method m)
        throws IOException, ShutdownSignalException
    {
    		// Used to block waiting during RPC calls
        SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
        rpc(m, k);
        
        // Wait without timeout
        if(_rpcTimeout == NO_RPC_TIMEOUT) {
            return k.getReply();
        } else {
            try {
              	// Wait for timeout
                return k.getReply(_rpcTimeout);
            } catch (TimeoutException e) {
                throwwrapTimeoutException(m, e); }}}Copy the code

The logic to open a new channel is simple: make an RPC call to the RabbitMQ broker: The client sends channel. Open, and the broker returns channel. OpenOk

The last

The RabbitMQ Client and RabbitMQ Broker will interact with each other through the AMQP protocol. The RabbitMQ Client and RabbitMQ Broker will interact with each other through the AMQP protocol


My wechat official account: Java Architect advanced programming focus on sharing Java technology dry goods, looking forward to your attention!