Introduction to the

Characteristics of ActiveMQ

ActiveMQ is an open source message middleware produced by Apache, which aims to provide efficient, scalable, stable and secure enterprise-level message communication for applications. It is designed to provide standard, message-oriented, multilingual application integration messaging middleware. ActiveMQ implements JMS 1.1 and provides many additional features such as JMX management, master-slave management, message group communication, message priority, delayed receiving messages, virtual receivers, message persistence, message queue monitoring, and more. Its main features are:

  1. Supports Java, C, C++, C#, Ruby, Perl, Python, PHP and other languages and protocols. Protocols include OpenWire, Stomp, AMQP, and MQTT.
  2. Provides advanced features such as message group communication, message priority, delayed message receipt, virtual receiver, message persistence, and so on
  3. Full support for JMS 1.1 and J2EE 1.4 specifications (including persistence, distributed transaction messages, transactions)
  4. With support for the Spring framework, ActiveMQ can be easily embedded into Spring applications via Spring configuration files
  5. Passed the usual J2EE server tests, such as TomEE, Geronimo, JBoss, GlassFish, WebLogic
  6. Diversified connection modes. ActiveMQ provides a variety of connection modes, such as IN-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA
  7. Support for fast persistence of messages using JDBC and Journal
  8. Designed for high-performance cluster, client-server, point-to-point communication scenarios
  9. Provides a technology – and language-neutral REST API interface
  10. Support Ajax to call ActiveMQ
  11. ActiveMQ can be easily integrated with Web Service technologies such as CXF and Axis to provide reliable messaging
  12. Available as an in-memory JMS provider, it is ideal for JMS unit testing

The basic concept

Since ActiveMQ fully supports JMS 1.1, the basic concepts are consistent with the JMS 1.1 specification from a Java consumer perspective.

Messaging model
  1. The point-to-point model uses queues as message communication carriers and satisfies the producer-consumer model. A message can only be used by one consumer, and unconsumed messages are kept in queues until consumed or timed out.

  2. The Pub-sub model (Pub/Sub) uses a topic as a message communication carrier, similar to the broadcast model, in which a publisher publishes a message and the message is delivered to all subscribers through a topic, and subscribers who subscribe after a message is broadcast do not receive the message.

The basic components

The basic components used with ActiveMQ are the same as for JMS:

  1. Broker represents a message queue server entity that accepts client connections and provides the core service for message communication.
  2. Producer Is the originator of a business, responsible for producing messages and transmitting them to the Broker.
  3. The Consumer, message Consumer, is the business handler responsible for retrieving messages from the Broker and performing business logic processing.
  4. Topic, Topic, publish and subscribe mode is a unified gathering place for messages. Different producers send messages to Topic, and brokers distribute messages to different subscribers, realizing the broadcast of messages.
  5. Queue, Queue, point-to-point mode in which a particular producer sends a message to a particular Queue, and a consumer subscribes to a particular Queue to receive the message and process the business logic.
  6. Message: A packet encoded according to a fixed format defined by different communication protocols to encapsulate service data and realize Message transmission.

Since these concepts are covered in JMS, they will not be covered here.

The connector

The main function of ActiveMQ Broker is to provide a communication mechanism for client applications. For this purpose, ActiveMQ provides a connection mechanism, which is described by a connector. There are two types of connectors in ActiveMQ. One is a transport connector used for communication between clients and message brokers. One is a network connector for broker-to-broker communication between message broker servers. Connector is represented by a URI in the following format :< schema name>:

[?

][#

] Schema name represents the protocol, for example: foo://username:[email protected]:8042/over/there/index.dtb?type=animal&name=narwhal#nose


The schema name is partly foo, hierarchical part is username:[email protected]: 8042 / over/s/index DTB, Query = type=animal&name=narwhal, fragment = nose

  1. Transport Connectors To exchange messages, both message producers and message consumers (collectively referred to as clients) need to connect to a message proxy server. This communication between the client and the message proxy server is completed through Transport Connectors. In many cases, users have different requirements when connecting to the message broker. Some are more concerned with performance and some are more concerned with security. Therefore, ActiveMQ provides a series of L connection protocols to choose from to cover these usage scenarios. From the point of view of the message broker, the transport connector is used to process and listen for client connections. Check the ActiveMQ Demo configuration file (/examples/conf/ Activemq-Demo.xml). The transport connection configuration is as follows:
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="ws" uri="ws://localhost:61614/" />
        </transportConnectors>
Copy the code

TransportConnectors are defined in the
element. A
element defines a specific connector. A connector must have its own unique name and URI attribute, but the discoveryUri attribute is optional. Currently, the commonly used transport connector connection protocols in the latest 5.15 version of ActiveMQ include vm, TCP, UDP, multicast, NIO, SSL, HTTP, HTTPS, webSocket, AMQP, MQTT, STOMP and so on

  • Vm, which allows clients and message servers to communicate directly within the VM using direct local method calls instead of Socket connections, thus avoiding the overhead of network transport. Application scenarios are limited to servers and clients in the same JVM.
  • TCP, through which the client connects to a remote message server.
  • Udp, through which the client connects to a remote message server.
  • Multicast, which allows the use of multicast transport to connect to message servers.
  • Nio, NIO and TCP serve the same purpose, except niO uses the Java NIO package, which may provide better performance in some scenarios.
  • SSL, SSL allows users to use SSL over TCP.
  • HTTP and HTTPS allow clients to connect using REST or Ajax, which means you can send messages directly to ActiveMQ using Javascript.
  • Websocket, which allows the client to connect to the message server through webSocket in HTML5.
  • Amqp, available in version 5.8.
  • MQTT, STOMP, as of version 5.6.

The specific configuration of each agreement see website (http://activemq.apache.org/uri-protocols.html). In addition to the above basic protocols, ActiveMQ also supports some advanced protocols that can be configured by URI, such as Failover and Fanout.

  • Failover is a reconnection mechanism that works on top of the connection protocol described above and is used to establish reliable transmissions. Its configuration syntax allows you to specify any number of composite URIs, and it automatically selects one of them to try to establish a connection, and if that connection fails, it continues to select other URIs to try. Configuration syntax such as failover: (TCP: / / localhost: 61616, TCP: / / remotehost: 61616)? initialReconnectDelay=100
  • Fanout is a reconnection and replication mechanism that works on top of other connections, replicating messages to multiple messaging servers. Configuration syntax, for example: the fanout: (TCP: / / localhost: 61629, TCP: / / localhost: 61639, TCP: / / localhost: 61649)
  1. In many cases, the data we need to process may be massive. In this scenario, it is difficult for a single server to support, which requires the clustering function. Therefore, ActiveMQ provides a network connection mode. Thus improving the overall external message service capability. In this way, queues and consumer lists can be shared between server instances connected together for the purpose of distributed queues, and network connectors are used to configure communication between servers.

As shown in the figure, server S1 and S2 are connected by NewworkConnector. Consumers C3 and C4 can receive messages sent by producer P1, and consumers C1 and C2 can receive messages sent by producer P3. To use the network connector functionality, add the following configuration under the Broker node in server S1’s Activemq.xml (assuming 192.168.11.23:61617 is the address of S2) :

<networkConnectors>      
          <networkConnector uri="Static: (TCP: / / 192.168.11.23:61617)"/>    
</networkConnectors>
Copy the code

If this is all, S1 can send messages to S2, but this is only one-way communication, and messages sent to S2 cannot yet be sent to S1. If S1 also receives messages from S2, add the following configuration under the broker node in S2’s Activemq.xml (suppose 192.168.11.45:61617 is S1’s address) :

<networkConnectors>      
          <networkConnector uri="Static: (TCP: / / 192.168.11.45:61617)"/>    
</networkConnectors>
Copy the code

In this way, S1 and S2 can communicate in both directions. Currently, there are static and multicast network connector protocols commonly used in the latest 5.15 version of ActiveMQ.

  • Static: a protocol used to create static configurations for multiple agents on a network. This configuration protocol supports composite URIs (urIs that contain other URIs). For example,static://(tcp://ip:61616,tcp://ip2:61616)
  • Multicast, a multicast protocol in which message servers broadcast their own services and locate other agents. This approach is used for dynamic identification between servers rather than static IP address groups.

On the interested can look at the official documentation: http://activemq.apache.org/networks-of-brokers.html

Message storage

There are two ways to distribute messages in the JMS specification: nonpersistent and persistent. For nonpersistent messages, JMS implementors must ensure that the messages are distributed as best they can, but are not persisted; Messages distributed in a persistent manner must be stored persistently. Nonpersistent messages are often used to send notifications or real-time data, and are an option if you are concerned about system performance and even if some message loss does not affect business. A persistent message sent to a message server persists if the consumer of the current message is not running, and is not removed from the message server until the message is processed and confirmed by the consumer.

ActiveMQ supports both of the above methods and also supports message recovery by caching intermediate state messages in memory. To sum up, there are three kinds of message storage of ActiveMQ: memory storage, file storage and database storage. Specifically, ActiveMQ provides a plug-in message storage mechanism, which is similar to multi-point message transmission and mainly realizes the following:

  • AMQ, the default message storage mode of ActiveMQ 5.0 and earlier, is a file-based, transaction-based message storage solution. In this scheme, messages themselves are persisted in the form of logs and stored in Data logs. In addition, the reference index of the messages in the log is made to facilitate the quick retrieval of messages.
  • KahaDB, also a file-based message store with transaction support, is recommended as of 5.3 for storing messages and offers better scalability and recoverability than AMQ message stores.
  • JDBC. It is relatively slow to store messages in the database based on JDBC. Therefore, ActiveMQ suggests to store messages in combination with Journal.
  • Memory storage, which means putting all messages to persist in memory, requires care in setting the JVM and memory size of the message server because there is no dynamic cache.
  • LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB, LevelDB In version 5.9, LevelDB and Zookeeper are used as the preferred Master-Slave data replication schemes.

An engineering example

Java accesses the ActiveMQ instance

There are two ways to deliver messages in the JMS specification, a Queue in the point-to-point model and a Topic in the publish-subscribe model. Let’s take a look at a Java example of using ActiveMQ to deliver messages as themes.

Introduction of depend on

Java project needs to introduce the dependency of ActiveMQ package, jar package version is the same as you install ActiveMQ version:

< the dependency > < groupId > org, apache activemq < / groupId > < artifactId > activemq -all < / artifactId > < version > 5.15.2 < / version > </dependency>Copy the code
Message producer
package org.study.mq.activeMQ; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; Public class TopicPublisher {/ * * * * the default user name/public static final String USERNAME = ActiveMQConnection. DEFAULT_USER; / * * * the default PASSWORD * / public static final String PASSWORD = ActiveMQConnection. DEFAULT_PASSWORD; Default connection address / * * * * / public static final String BROKER_URL = ActiveMQConnection. DEFAULT_BROKER_URL; Public static void main(String[] args) {// create a ConnectionFactory ConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); Try {/ / create a Connection Connection Connection = connectionFactory. The createConnection (); // Start connection.start(); Session = connection.createsession (false, Session.AUTO_ACKNOWLEDGE); MyTestTopic = session.createTopic("activemq-topic-test1"); // MessageProducer Producer = session. CreateProducer (myTestTopic);for (int i = 1; i <= 3; i++) {
                TextMessage message = session.createTextMessage("Send message"+ i); producer.send(myTestTopic, message); } // Close the resource session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); }}}Copy the code

In the Topic pattern, message producers are used to publish messages, and most of the code is similar to that in the Queue pattern, except that in this case the Topic is created based on the Session, which acts as a destination for the consumer to consume messages.

Message consumer
package org.study.mq.activeMQ; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; Public class TopicSubscriber {/ * * * * the default user name/public static final String USERNAME = ActiveMQConnection. DEFAULT_USER; / * * * the default PASSWORD * / public static final String PASSWORD = ActiveMQConnection. DEFAULT_PASSWORD; Default connection address / * * * * / public static final String BROKER_URL = ActiveMQConnection. DEFAULT_BROKER_URL; Public static void main(String[] args) {// create a ConnectionFactory ConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); Try {/ / create a Connection Connection Connection = connectionFactory. The createConnection (); // Start connection.start(); Session = connection.createsession (false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("Consumer 1 receives message:"+ ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); }}}); MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic); messageConsumer2.setMessageListener(newMessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("Consumer 2 receives message:"+ ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); }}}); MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic); messageConsumer3.setMessageListener(newMessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("Consumer 3 receives a message:"+ ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); }}}); Thread.sleep(100 * 1000); // Close the resource session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); }}}Copy the code

In order to display the theme news broadcast to the mode of the function of multiple subscribers, three consumer object is created and subscribe to the same topic, more special is finally let the main thread to sleep for a long time, so the goal is to let the consumer object can continue to live, so that the console can print out listening to the message content.

Start the ActiveMQ server

ActiveMQ can be started by executing ActiveMQ start directly in the bin directory of ActiveMQ

Run TopicSubscriber

The main method of the TopicSubscriber class needs to be run first, so that the subscriber can receive the message when the publisher publishes the message. If the execution order is reversed, the message is published first but no subscriber is running, and the message is not consumed.

Run TopicPublisher

Then run the main method of the TopicPublisher class to publish 3 messages to the topic, and then you can see the received message content in the TopicSubscriber background:

Spring integration ActiveMQ

In the actual project, it is obviously rather tedious to use the native ActiveMQ API for development. During this process, codes such as connection factory creation and connection creation can be completely extracted and done by the framework in a unified way. Spring also thought of these things and helped us to do them. ActiveMQ fully supports spring-based configuration of JMS clients and servers. The following example shows how to use queue and topic modes to deliver messages in Spring.

Introduction of depend on
< the dependency > < groupId > org, apache activemq < / groupId > < artifactId > activemq -all < / artifactId > < version > 5.15.2 < / version > </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> < version > 4.3.10. RELEASE < / version > < / dependency > < the dependency > < groupId > org). The apache activemq < / groupId > < artifactId > activemq - pool < / artifactId > < version > 5.15.0 < / version > < / dependency >Copy the code

In addition to activemQ package, Spring package supporting JMS should be added in the project. Since the creation of Connection, session and producer will consume a lot of system resources, connection pool is used to reuse these resources, so the dependency of Activemq-Pool should be added.

Spring configuration file
<? xml version="1.0" encoding="UTF-8"? > <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="org.study.mq.activeMQ.spring"/>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/>
    </bean>
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
    <bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
    <bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>

    <bean id="queueContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="queueListener"/>
    </bean>
    <bean id="topic1Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic1Listener"/>
    </bean>
    <bean id="topic2Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic2Listener"/>
    </bean>

</beans>
Copy the code

The following project example, the Java code with the method of annotation, which is now a lot of usage of programmers, so initially defined in the configuration file annotation scanning package path org. Study. Mq. ActiveMQ. Spring, you can modify the package name, according to their actual situation All the Java code in this example is placed under this package.

Defines a JMS factory bean, next is the pooling connection factory class org.. Apache activemq. Pool. PooledConnectionFactory, actually is the internal activemq connection factory has increased the function of the connection pool, From its internal configuration can see is the org.. Apache activemq. The function of the ActiveMQConnectionFactory encapsulation, while ActiveMQConnectionFactory classes tend to be more familiar with, This is the class that the Java access ActiveMQ example above used to create the connection factory at the beginning. The brokerURL property configures the protocol and server address for connecting to the server. CachingConnectionFactory is another layer of connection factory enhancements that are commonly used in actual project code, using connection caching for efficiency, at your discretion.

JmsTemplate is Spring’s solution to long, repetitive code for JMS access. The two main properties that need to be configured are connectionFactory and messageConverter. The connectionFactory gets the connection, session, etc., and the messageConverter configters the messageConverter, which does this because messages typically require a pre – and post-processing before they are sent and after they are received. The actual code sends and receives messages directly through jmsTemplate, while the Spring framework does the work of creating connection factories, connections, and sessions each time a message is sent and received.

With a JMS template, you also need to know queues and topics as destinations to actually send and receive messages, so testQueue and testTopic are next defined as examples of both patterns. When receiving messages asynchronously, the implementation class of MessageListener needs to be provided. Therefore, queueListener is defined as the listener for receiving messages asynchronously in queue mode. Topic1Listener and topic2Listener are used as listeners to receive messages asynchronously in topic mode to demonstrate that multiple consumers can receive messages. The last queueContainer, topic1Container, topic2Container are used to bind message listeners to a specific message destination.

Message Service class

Below is a message service class that uses JMS templates to process messages

package org.study.mq.activeMQ.spring;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "testQueue")
    private Destination testQueue;

    @Resource(name = "testTopic")
    private Destination testTopic; Public void sendQueueMessage(String messageContent) {jmStemplate.send (testQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // set the messageContent msg.settext (messageContent);returnmsg; }}); } public void sendTopicMessage(String messageContent) {jmStemplate.send (testTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // set the messageContent msg.settext (messageContent);returnmsg; }}); }}Copy the code

The @service declares this class as a Service, and much of the Service code in the actual project is similar. JmsTemplate, defined in the configuration file above, can be used directly by importing the Resource annotation directly into the MessageService class. TestQueue and testTopic are similar, and the service class directly imports the queue and topic defined in the configuration file. The main point is that the following two methods of sending messages, sendQueueMessage to a queue and sendTopicMessage to a topic, both use jmsTemplate’s send method. The first argument to the send method is of type javax.jms.Destination, which represents the message Destination. Since javax.jms.Queue and javax.jms.Topic both inherit the Javax.jms. Destination interface, this method applies to both the Queue and Topic patterns. The send method of the second parameter is the org. Springframework. JMS. Core. MessageCreator, it USES the way of an anonymous inner class to create objects, from the support of the Session object is created in a text message, so you can send a message. As you can see, the code for sending messages via the Spring framework, whether queue or topic, is much cleaner than the previous Java code examples.

Message listener class
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("Queue listener receives text message:"+ messageStr); } catch (JMSException e) { e.printStackTrace(); }}else {
            throw new IllegalArgumentException("Only TextMessage messages are supported!"); }}}Copy the code

The queue message listener checks whether a message is a text message type when received and prints out the content if it is.

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("Topic listener 1 receives text message:"+ messageStr); } catch (JMSException e) { e.printStackTrace(); }}else {
            throw new IllegalArgumentException("Only TextMessage messages are supported!"); }}}Copy the code
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("Topic listener 2 receives text message:"+ messageStr); } catch (JMSException e) { e.printStackTrace(); }}else {
            throw new IllegalArgumentException("Only TextMessage messages are supported!"); }}}Copy the code

The code for a topic listener is similar to that for a queue listener, except that a different string is printed to indicate that the message is currently received by a different listener.

Start the application

To illustrate the example, I wrote a StartApplication class that loads Spring in the main method, After obtaining the MessageService service, call the sendQueueMessage and sendTopicMessage methods to send messages.

package org.study.mq.activeMQ.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        MessageService messageService = (MessageService) ctx.getBean("messageService");

        messageService.sendQueueMessage("My Test Message 1");
        messageService.sendTopicMessage("My Test Message 2");
        messageService.sendTopicMessage("My Test Message 3"); }}Copy the code

After starting the activeMQ service, run the StartApplication class and see the text message received on the console:

The queue listener listens to one message, and the topic listener listens to two messages.