Networks of brokers

instructions

A cluster of brokers. When a consumer of one broker fails to consume the messages, the Network of Brokers supported by ActiveMQ can forward the messages accumulated by that broker to other brokers with consumers

ActiveMQ cluster has several modes, among which network cluster mode is divided into static network and dynamic network. This paper mainly introduces the static network cluster mode of ActiveMQ.

  1. Static network:

A static network is configured in the Broker cluster configuration to set up the cluster when the service is started. A static network is configured based on networkConnector elements

  1. Dynamic network:

By configuring discovery addresses, nodes can be dynamically added after services are started, which is suitable for dynamic capacity expansion scenarios. Configuration is completed based on the transportConnector element

The cluster structures,

ActiveMQ’s networkConnector is one-way by default, with one Broker sending messages at one end and the other receiving messages at the other, known as “bridging”. ActiveMQ also supports bidirectional linking, creating a bidirectional channel for both brokers to not only send messages but also receive messages from the same channel, typically mapped as a Duplex Connector

The recommended static network application architecture is as follows:

A static network cluster is built according to the author’s environment. (Due to hardware resource limitations, I directly set up a cluster on a single machine)

1. Directory planning

/opt/server

Broker1, Broker2, broker3

2. Server and port planning:

The serial number The service name Server IP Service port Monitor the port
1 broker1 10.10.1.44 6161657621883 8161
2 broker2 10.10.1.44 6161757631884 8162
3 broker3 10.10.1.44 6161857641885 8163

For easy memory, I made the host mapping locally, server1.com, after accessing the server or console can directly use server1.com:8161 access

3. Set server parameters

For the above three nodes, static network configuration parameters need to be added to conf/ Activemq.xml of each node, as follows

<networkConnectors>
    <networkConnector name="bridge" uri="static://(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)" dynamicOnly="false" duplex="true" />
</networkConnectors>
Copy the code

Some parameters are described in this article

  1. Name: default bridge
  2. DynamicOnly: the default is false. If true, the network persistence subscription is created only when the persistence subscription is activated. The default is activated at startup
  3. DecreaseNetworkConsumerPriority: the default is false. Set consumer priority. If true, the network’s consumer priority is reduced to -5. If false, the default is 0, just like the local consumer
  4. NetworkTTL: The default is 1, the number of brokers on the network for message and subscription consumption
  5. MessageTTL: The default is 1, the number of brokers on the network used for messages
  6. ConsumerTTL: The default is 1, the number of brokers on the network for consumption
  7. ConduitSubscriptions: default to true, whether to treat multiple consumers of the same broker as one (set to false when clustering if there are multiple consumers)
  8. DynamicallyIncludedDestinations: the default is empty, to include dynamic message address class is suitable for excludedDestinations;

4. See the example

The following are the available properties based on the example networkConnector configuration

Broker1

Broker2

Broker3

5. Verify

Complete the configuration and restart each Broker to verify that the configuration is successful

Console verification

Open the MQ WEB console directly, the default address http://10.10.1.44:8161/ to the network menu item to see the following effect

Then go to the other two consoles to check whether the following configurations are also available. If yes, the configuration is complete. If no, refer to the preceding steps.

Message bridge validation

The producer client sends messages to Broker1 and the Consumer client subscribes to messages from Broker3 or Broker2

Example code: Producer

public class MQProducer {
   public static void main(String[] args) throws JMSException {
      // Connect to ActiveMQ server
      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://server1.com:61616");
      Connection connection = factory.createConnection();
      connection.start();
      Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
      // Create a theme
      Topic topic = session.createTopic("VirtualTopic.TEST");
      MessageProducer producer = session.createProducer(topic);
      // NON_PERSISTENT PERSISTENT: PERSISTENT mode is used when sending messages
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
      TextMessage message = session.createTextMessage();
      for (int i = 0; i < 4; i++) {
         message.setText("Topic news."+i);
         message.setStringProperty("property"."Property");
         // Publish the topic message
         producer.send(message);
      }
      System.out.println("Message sent!); session.close(); connection.close(); }}Copy the code

Example code :Consume

public class MQConsumer {
   public static void main(String[] args) throws JMSException {
      // Connect to ActiveMQ server
      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://server1.com:61618");
      Connection connection = factory.createConnection();
      connection.start();
      Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
      // Create a theme
      Queue topicA = session.createQueue("Consumer.A.VirtualTopic.TEST");
      Queue topicB = session.createQueue("Consumer.B.VirtualTopic.TEST");
      // Create consumer groups 1 to 3 subscriptions
      String port = "61618 -";
      createConsumerQueue(port + "A1", topicA, session);
// createConsumerQueue(port + "A2", topicA, session);
      // createConsumerQueue("A3", topicA, session);
      // Create consumer B groups 1-3 subscriptions
      // createConsumerQueue("B1", topicB, session);
      // createConsumerQueue("B2", topicB, session);
      // createConsumerQueue("B3", topicB, session);
   }

   private static void createConsumerQueue(String name, Queue topic, Session session) throws JMSException {
      // Create A consumer group A subscription
      MessageConsumer consumerA1 = session.createConsumer(topic);
      consumerA1.setMessageListener(new MessageListener() {
         @Override
         public void onMessage(Message msg) {
            try {
               System.out.println(msg.getJMSType());
               if (msg instanceof TextMessage) {
                  TextMessage tm = (TextMessage) msg;
                  System.out.println("Received message " + name + ":" + tm.getText() + ":"
                        + tm.getStringProperty("property"));

               } else if (msg instanceof ActiveMQBytesMessage) {
                  ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) msg;
                  if(bytesMessage ! =null) {
                     byte[] bt = new byte[(int) bytesMessage.getBodyLength()];
                     bytesMessage.readBytes(bt);
                     System.out.println("Received message " + name + ":" +newString(bt)); }}}catch(JMSException e1) { e1.printStackTrace(); }}}); }}Copy the code

First, Consumer 61618 and 61617 are run, and then Producer is run. You can see that the message is consumed evenly by both clients

61617 the Broker 61618 the Broker

The preceding information shows that the cluster setup has taken effect

6. Application:

After the configuration is complete, the following problems may occur during the actual operation

Repeated message consumption: Sending multiple messages (say 10) to a virtual topic and then opening multiple clients, each connected to a different Broker. You can then observe that the message is being consumed repeatedly, and that each server receives the message and has the same.

Solution:

To discover this problem, add the following configuration parameters

<networkConnectors>
  <networkConnector uri="Static: (TCP: / / 127.0.0.1:61617, TCP: / / 127.0.0.1:61618)." duplex="true">
    <excludedDestinations>
        <topic physicalName="VirtualTopic.*" />
    </excludedDestinations>
  </networkConnector>
</networkConnectors>
Copy the code

This section provides supplementary information about how to configure networkConnector

  1. Name: default bridge
  2. DynamicOnly: the default is false. If true, the network persistence subscription is created only when the persistence subscription is activated. The default is activated at startup
  3. DecreaseNetworkConsumerPriority: the default is false. Set consumer priority. If true, the network’s consumer priority is reduced to -5. If false, the default is 0, just like the local consumer
  4. NetworkTTL: The default is 1, the number of brokers on the network for message and subscription consumption
  5. MessageTTL: The default is 1, the number of brokers on the network used for messages
  6. ConsumerTTL: The default is 1, the number of brokers on the network for consumption
  7. ConduitSubscriptions: default to true, whether to treat multiple consumers of the same broker as one (set to false when clustering if there are multiple consumers)
  8. DynamicallyIncludedDestinations: the default is empty, to include dynamic message address class is suitable for excludedDestinations, such as:
<dynamicallyIncludedDestinations>
  <queue physicalName="include.test.foo"/>
  <topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
Copy the code
  1. StaticallyIncludedDestinations: the default is empty, to include the static message address. Similar to excludedDestinations, as:
<staticallyIncludedDestinations>
    <queue physicalName="always.include.queue"/>
</staticallyIncludedDestinations>
Copy the code
  1. ExcludedDestinations: Default is empty, specifying an excluded address as shown in the following example:
<networkConnectors>
  <networkConnector uri="static://(tcp://localhost:61617)" name="bridge" dynamicOnly="false" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false">
  <excludedDestinations>
    <queue physicalName="exclude.test.foo">
    <topic physicalName="exclude.test.bar">
  </excludedDestinations>
  <dynamicallyIncludedDestinations>
    <queue physicalName="include.test.foo"/>
    <topic physicalName="include.test.bar"/>
  </dynamicallyIncludedDestinations>
  <staticallyIncludedDestinations>
    <queue physicalName="always.include.queue"/>
  </staticallyIncludedDestinations>
  </networkConnector>
</networkConnectors>
Copy the code
  1. PrefetchSize: The default is 1000, the maximum number of unacknowledged messages held, which must be greater than 0 because network consumers cannot poll for messages themselves
  2. SuppressDuplicateQueueSubscriptions: by default, false if it is true, repeating a subscription relationships is stop
  3. BridgeTempDestinations: Default true, whether to broadcast advisory Messages to create temporary destinations
  4. AlwaysSyncSend: false by default. If true, nonpersistent messages will also be sent to the remote broker using request/ Reply instead of Oneway
  5. StaticBridge: by default, false if it is true, only staticallyIncludedDestinations configured in destination can be processed