Ps: This article is found in the draft box, written in 2018.09.01, the content inside has been a little old, readers please consider reading

The Apache ActiveMQ profile

Apache ActiveMQ is an open source, highly extensible, Java-based message-oriented middleware, which supports clients and protocols written in many different languages, such as Java, C, C ++, C #, Ruby, Perl, Python, PHP and so on. It also supports multiple protocols such as AMQP, MQTT, OpenWire, and STOMP, as well as JMS 1.1 and J2EE 1.4 specifications, which are ideal for communication and interaction issues between heterogeneous systems.

JMS profile

The JMS 1.1 specification defines:

JMS provides a common way for Java programs to create, send, receive and

Read an Enterprise Messaging System’s messages.

Wikipedia definition:

The Java Message Service (JMS) application Interface is an API for message-oriented middleware (MOM) in the Java platform for sending messages for asynchronous communication between two applications or in distributed systems

Can be seen from the definition, JMS is a set of API specification, in the Java platform is used to regulate how to interact between the client and the message middleware (create, send, receive, and read messages, etc.), it provides a common interface, used in between the two applications, or send messages between distributed systems, asynchronous communication. Spring-jms is an implementation of the JMS specification.

JMS two message modes

1. Publish/subscribe (Topic messaging mode)

Publisher publishes a TopicA to ActiveMQ with (optionally) some interactive data (in any mutually agreed data format),The Subscriber (client) that monitors TopicA will receive the message pushed by ActiveMQ. If some Subscriber loses connection at this time, the message will be lost, and the information contains the data transmitted by the publisher.

2. Point-to-point (Queue message mode)

JMS specification definition

Point-to-point systems are about working with queues of messages. They are

point-to-point in that a client sends a message to a specific queue. Some PTP systems blur the distinction between PTP and Pub/Sub by providing system clients that automatically distribute messages.

The JMS PTP model defines how a client works with queues: how it finds

them, how it sends messages to them, and how it receives messages from them.

Producer publishes a TopicA to ActiveMQ along with (optionally) some interactive data (in any mutually agreed data format),The message will be stored in the Queue of ActiveMQ (which can be configured as BD persistence, etc.). Only one of the consumers listening TopicA will receive the message. After receiving the message, ActiveMQ will be notified. Once ActiveMQ knows that the message is consumed, it deletes the corresponding message from the Queue or performs other operations. If no consumer consumes the message, it will remain in the ActiveMQ Queue until it is consumed.

Compare the technical features of the two message patterns

Compare the item Topic Queue
summary Publish/subscribe Producer/Consumer (point-to-point)
The message state Stateless, messages will not be saved. It is saved as a file in ActiveMQ or can be configured for DB persistence.
Message integrity assurance It is not guaranteed that all Subscriber can receive the message sent by Publisher, and it may be lost once offline. The Consumer can be guaranteed to receive every message posted by the Producer, but only one Consumer can receive it.
Publish and receive policies for messages All Subscriber listening to the same topic will receive the message from Publisher, and after receiving it, it will notify ActiveMQ, and ActiveMQ will record how many messages in the message queue have been queued up, and the messages that are not queued up will not be sent out again. One-to-one message publishing and receiving strategy. Only one Consumer can receive messages sent by a Producer (multiple consumers listening to the same topic can receive them first by setting priority). After receiving messages, ActiveMQ will be notified. ActiveMQ records how many messages are queued out and deletes the corresponding messages in the queue. The unqueued messages will be sent when the Consumer is online.

A comparison between typical RPC communication architecture and ActiveMQ communication architecture

Typical RPC communication architecture

RPCOne of the characteristics of the call is thatThe caller is concerned with the result of the call, such as login, registration and other functions, the client invokes the interface exposed by the server through RPC, and then the server gives feedback to the call. Finally, the client judges the following business process through feedback.

ActiveMQ communication architecture

Communication through ActiveMQ has one characteristic, namelyThe caller does not care about the result of executionFor example, in wechat, when you send a message to your friend, your mobile phone is only responsible for sending it, but you do not need to care whether the message reaches the user’s wechat, because if the user is not online.

ActiveMQ Application scenario

Use ActiveMQ when the caller does not care about the results returned by the execution.

As a counterexample, suppose that the upstream (client or server) does not care about the results returned by the downstream, what happens when you use RPC to interact with the downstream?

Such as:

Demand simulation: The user can place the trash can by scanning the TWO-DIMENSIONAL code on the trash can. After the placing, the following operations will be triggered: 1. After users put garbage, they need to call wechat service interface to push template message to inform users to put information and get points. 2. Invoke the garbage classification engine interface to score the garbage released by users. 3. Just at the present stage, some garbage classification needs manual inspection, so call the information management system interface to inform the relevant person in charge.Copy the code

At this time, RPC calls in such a requirement scenario will have serious coupling problem

  1. Once this garbage on the business expanding new business requirements, so in addition to downstream to increase new business function, also have to add a RPC calls new upstream business expose interfaces, trash cans of developers is not great, why do you need, change is I, but also have to remember how many businesses rely on downstream.
  2. Once the downstream business has problems, other basic services may be affected. For example, if the wechat message business has problems, if the caller does not handle the exception properly, it may directly report an error and affect the normal operation of other services.

Solution: Communication between upstream and downstream should be handled by ActiveMQ.

Then it looks like this:

Demand simulation: The user can place the trash can by scanning the TWO-DIMENSIONAL code on the trash can. After the placing, the following operations will be triggered: 1. After users put garbage, they need to notify wechat service to push template message to inform users to put information and get points. 2. Inform the garbage classification engine to score the garbage released by users. 3. If manual inspection is needed for part of garbage classification at this stage, relevant responsible persons will be notified.Copy the code

After the transformation, MQ is able to double decouple upstream and downstream physically and logically

  1. Physical decoupling: Upstream and downstream do not need to know each other’s existence and do not establish physical connection with each other. They only physically connect with ActiveMQ.

  2. Logically decoupled: the upstream does not need to know how many downstream businesses depend on it, it is only responsible for releasing garbage release messages to ActiveMQ, the downstream only needs to subscribe to the corresponding topic, new business expansion will not affect the upstream.

ActiveMQ does not apply to scenarios

When the caller is concerned about the results returned by execution, instead of ActiveMQ, RPC should be used.

For example, the login function directly invokes the interface of the downstream service through RPC, and the downstream service feeds back the result.

If the upstream sends a login.topic via ActiveMQ, subscribles to a login.result.topic, and then blocks the program, the downstream processes the logic and sends a login.result.topic notification upstream, complicating a simple matter.

Spring Boot integrates with external Apache ActiveMQ

Github has a Spring-boot-sample-ActivemQ that integrates Spring Boot with ActiveMQ. It only uses the ActiveMQ built in memory, while external ActiveMQ is often used in projects. The configuration of in Memory can be seen by demo, very simple.

Integrating external Apache ActiveMQ is also very simple, just follow these steps.

1. Install Apache ActiveMQ

Download from the official website. Download the corresponding version based on your operating system and start it.

Mac or Linux:

Window: Double-click the Activemq. bat file to start activemQ

Log file: /your_activemq_dir/data/activemq.log

2. Increase the dependence of ActivemQ

gradle

// activemq
compile group: 'org.springframework.boot', name: 'spring-boot-starter-activemq'
Copy the code

or

maven

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
	<version>${spring-boot-version}</artifactId>
</dependency>
Copy the code

Finally, ensure that dependencies are downloaded.

3. Application. (yml | properties) increase activemq configuration

Spring: ActivemQ: # ActivemQ Broker-URL: TCP ://0.0.0.0:61616 user: admin password: Admin # ActivemQ in-memory: false send-timeout: 3000 Pool: enabled False # reconnect to activemQ reconnect-on-exception: true # JMS: # activemQ is only used to start point-to-point messaging mode by default. If you want to start publish/subscribe, it needs to be set to true. True # ActivemQ Publish/subscribe topic topics: sample: sample- Topic # ActivemQ Queue queues: sample: Sample -queue # JMS publish subscribe mode listener name jms.pub-sub-listener: topicListenerFactoryCopy the code

The official website of Spring Boot provides all the configurations of ActivemQ and JMS, which can be configured according to their actual needs.

4. Configure the coexistence of the two message modes

Sorry to tell you that with the configuration above, queue and topic message modes cannot coexist (pub-sub-domain defaults to false, just queue), even with spring.jms.pub-sub-domain: True is used to start the topic message schema, at which point queue is replaced with topic, note that it is replaced, not added to topic message schema, because by default, The @jMSListener annotation defines a method that Spring Boot registers as a default ContainerFactory listening destination if necessary. The default ContainerFactory doesn’t keep both message modes. Instead, read the configuration directly. If you start a topic, the queue will be overwritten and all queues will become topic message patterns (which readers can verify for themselves), unless you customize the container factory.

Solution: Add a ContainerFactory listener for the Topic message pattern to handle topic messages.

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

/** * Jms configuration * Created by Blink on 1/14/2018 AD. **@author Blink
 */
@Configuration
public class JmsConfiguration {

    /** * Create a separate topic listener. When using publish/subscribe messaging, the subscriber needs to declare containerFactory="${jms.pub-sub-listener}" **@param connectionFactory
     * @param configurer
     * @return* /
    @Bean(name = "topicListenerFactory")
    publicJmsListenerContainerFactory<? > topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setPubSubDomain(true);
        return factory;
    }

    /** * Convert the received JSON data into an object **@return* /
    @Bean
    public MessageConverter jacksonJmsMessageConverter(a) {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        returnconverter; }}Copy the code

4. Use the producer/consumer model

Add producer code

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

/** * Created by Blink on 1/10/2018 AD. **@author Blink
 */
@Slf4j
@Service
public class Producer implements CommandLineRunner {

    @Value("${queues.sample}")
    public String sampleQueue;

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void send(String destination, String message) {
        log.info("============>>>>> Publish queue messages" + message);
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(destination), message);
    }

    @Override
    public void run(String... args) throws Exception {
        this.send(this.sampleQueue, "Test the producer-consumer messaging pattern."); }}Copy the code

Add consumer code

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

/** * Created by Blink on 1/10/2018 AD. **@author Blink
 */
@Slf4j
@Service
public class Consumer {

    @JmsListener(destination = "${queues.sample}")
    public void receive1(String text) {
        log.info("Consume1 " + text);
    }
    
    @JmsListener(destination = "${queues.sample}")
    public void receive2(String text) {
        log.info("Consume2 "+ text); }}Copy the code

Start the project and see console output: or

This confirms the Producer/Consumer messaging pattern, in which multiple consumers listen to the same topic and only one Consumer can receive messages sent by the Producer.

You can specify which Consumer receives the message first by setting the priority of the Consumer.

5. Use publish/subscribe

Remember that in the application. (yml | properties) Settings:spring.jms.pub-sub-domain: trueOtherwise, ActivemQ will output warning messages

Add publisher code

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

/** * Created by Blink on 1/11/2018 AD. **@author Blink
 */
@Slf4j
@Service
public class Publisher implements CommandLineRunner {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Value("${topics.sample}")
    public String sampleTopic;

    public void publish(String destination, String message) {
        log.info("============>>>>> release topic news" + message);
        jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(destination), message);
    }

    @Override
    public void run(String... args) throws Exception {
        this.publish(this.sampleTopic, "Test the publish/subscribe messaging pattern."); }}Copy the code

Add subscriber code

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

/** * Subscriber * Created by Blink on 1/11/2018 AD. **@author Blink
 */
@Slf4j
@Service
public class Subscriber {

    @JmsListener(destination = "${topics.sample}")
    public void subscribe1(String text) {
        log.info("Subscribe1 " + text);
    }

    @JmsListener(destination = "${topics.sample}")
    public void subscribe2(String text) {
        log.info("Subscribe2 " + text);
    }

    @JmsListener(destination = "${topics.sample}")
    public void subscribe3(String text) {
        log.info("Subscribe3 "+ text); }}Copy the code

Start the project and see console output:

In this way, it verifies what was said before that all subscribers listening to the same topic will receive messages from Publisher.

6. Add unit tests

Before adding tests, modify the previous producer and consumer code to make the tests as simple as possible

// Keep only one consumer
@Slf4j
@Service
public class Consumer {

    @JmsListener(destination = "${queues.sample}")
    public void receive(String text) {
        log.info("Consume "+ text); }}// Delete the CommandLineRunner interface
@Slf4j
@Service
public class Producer {

    @Value("${queues.sample}")
    public String sampleQueue;

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void send(String destination, String message) {
        log.info("============>>>>> Publish queue messages" + message);
        this.jmsMessagingTemplate.convertAndSend(newActiveMQQueue(destination), message); }}Copy the code

Producer/consumer unit tests

import static org.assertj.core.api.Assertions.*;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.rule.OutputCapture;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

/** * Producer unit tests * Created by Blink on 1/11/2018 AD. **@author Blink
 */
@RunWith(SpringRunner.class)
@SpringBootTest
@Import(Producer.class)
public class ProducerTest {

    @Autowired
    private Producer producer;

    @Rule
    public final OutputCapture capture = new OutputCapture();

    @Value("${queues.sample}")
    public String queue;

    /** * The message is expected to be received when the producer posts it **@throws InterruptedException
     */
    @Test
    public void sendSimpleMessageShouldReceived(a) throws InterruptedException {
        this.producer.send(this.queue, "Test message");
        Thread.sleep(1000L);
        assertThat(this.capture.toString().contains("Consumer Test message")).isTrue(); }}Copy the code

Publish/subscribe unit tests

import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.rule.OutputCapture;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;

/** * Publisher Created by Blink on 1/11/2018 AD. **@author Blink
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Rule
    public final OutputCapture capture = new OutputCapture();

    @Autowired
    private Publisher publisher;

    @Value("${topics.sample}")
    public String topic;

    /** * It is expected that when a publisher sends a message, it must be received by all subscribers@throws InterruptedException
     */
    @Test
    public void sendSimpleMessageShouldReceivedByAllSubscribers(a) throws InterruptedException {
        this.publisher.publish(this.topic, "Test message");
        Thread.sleep(1000L);
        assertThat(this.capture.toString().contains("Subscribe1 Test message")).isTrue();
        assertThat(this.capture.toString().contains("Subscribe2 Test message")).isTrue();
        assertThat(this.capture.toString().contains("Subscribe3 Test message")).isTrue(); }}Copy the code

Results of running unit tests

conclusion

This paper introduces Apache ActiveMQ, an Internet architecture decoupling device, by introducing JMS and ActiveMQ to let readers know about ActiveMQ, and examples and usage scenarios of Spring Boot integration with Apache ActiveMQ