MQ

Message Queue, FIFO structure.

For example, on e-commerce platforms, corresponding operations are performed after users pay for orders.

Advantages:

  • asynchronous
  • Peak clipping
  • The decoupling

disadvantages

  • Increasing system complexity
  • Data consistency
  • availability

JMS

Java Message Service (Java Message Service), similar to JDBC, provides a standard for accessing databases. JMS also formulates a set of specifications for Message communication between systems.

Unlike JDBC, jMS-related interfaces are not defined in the JDK native package.

  1. ConnectionFactory

  2. Connection

  3. Destination

  4. Session

  5. MessageConsumer

  6. MessageProducer

  7. Message

Collaboration mode is illustrated as;

The industry products

ActiveMQ RabbitMQ RocketMQ kafka
Single machine throughput All level All level The class of 100000 The class of 100000
availability high high Very high Very high
reliability Low probability of message loss Basic don’t throw You can lose zero You can lose zero
Function support The more perfect Based on Erlang, strong concurrency, good performance, low delay Distributed, good scalability, support distributed transactions Relatively simple, mainly used in real-time computing with big data, log collection and so on
Community activity low In the high high

ActiveMQ

As an open source project under Apache, the JMS specification is fully supported. And Spring Boot has built-in automatic configuration of ActiveMQ, which is perfect for getting started.

Quick start

Add dependencies;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>
Copy the code

Message sending;

// 1. Create connection factories
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. Factory creates connection
Connection connection = factory.createConnection();
// 3. Start the connection
connection.start();
// 4. Create a connection session. The first parameter is whether to process in transaction and the second parameter is reply mode
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. Create a message queue destination based on session
Destination queue = session.createQueue("test-queue");
Create producers based on session and destination queue
MessageProducer producer = session.createProducer(queue);
// 7. Create a message entity based on session
Message message = session.createTextMessage("hello world!");
// 8. Send message entities through producers
producer.send(message);
// close the connection
connection.close();
Copy the code

Spring integration of the Boot

Automatic injection reference: org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfig uration

Add dependencies;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
Copy the code

Add yamL configuration;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    # message mode true: broadcast (Topic), false: Queue (default false)
    pub-sub-domain: true
Copy the code

Send and receive messages;

@Autowired
private JmsTemplate jmsTemplate;

// Receive the message
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
    System.out.println(msg);
}

// Send a message
public void sendMsg(String destination, String msg) {
    jmsTemplate.convertAndSend(destination, msg);
}
Copy the code

High availability

Implement master-slave architecture based on ZooKeeper, modify the Configuration of Activemq.xml node persistenceAdapter;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind=TCP: / / 0.0.0.0:0 ""
        zkAddress="172.17.0.4: $2181172.17. 0.4:2182172.17. 0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>
Copy the code

Broker address is: failover (TCP: / / 192.168.4.19:61616, TCP: / / 192.168.4.19:61617, TCP: / / 192.168.4.19:61618)? randomize=false

Load balancing

Add node networkConnectors to high availability cluster node Activemq.xml;

<networkConnectors>
    <networkConnector uri="Static: (TCP: / / 192.168.0.103:61616, TCP: / / 192.168.0.103:61617, TCP: / / 192.168.0.103:61618)." duplex="false"/>
</networkConnectors>
Copy the code

More detailed information can be reference: blog.csdn.net/haoyuyang/a…

Cluster consumption

Because of the publish-subscribe model, all subscribers receive messages, and in a production environment, the consumer cluster has the problem of repeated message consumption.

ActiveMQ provides the function of VirtualTopic to solve the problem of multiple consumers receiving the same message. A VirtualTopic is a topic to a producer and a queue to a consumer.

Add node destinationInterceptors to Activemq.xml;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>
Copy the code

Producers normally send messages to testTopic, and subscribers can modify the subscription topic to be consumed like consumer.a.Table Topic.

For more details: blog.csdn.net/java_collec…

RocketMQ

It is a message middleware of queue model, which has the characteristics of high performance, high reliability, high real-time and distributed.

Architectural graphic

  1. Name Server

    A name server, similar to a Zookeeper registry, provides Broker discovery;

  2. Broker

    RocketMQ’s core components, most of the work is done in the Broker, receiving requests, processing consumption, message persistence, etc.

  3. Producer

    Message producer;

  4. Consumer

    Message consumer;

Quick start

After installation, nameserver and Broker are started in sequence, and you can use MQadmin to manage topics, clusters, and brokers.

Segmentfault.com/a/119000001…

Add dependencies;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>
Copy the code

Message sending;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic"."msg"."hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();
Copy the code

The default delaylevels start from 1:1s 5s 10s 30s 1M 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1H 2H.

Reference org. Apache. Rocketmq. Store. The schedule. The ScheduleMessageService# parseDelayLevel.

Message receiving;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic"."msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    for (MessageExt msg : list) {
        System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
Copy the code

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p “hello rocketmq” -n localhost:9876

Spring integration of the Boot

Add dependencies;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>
Copy the code

Add yamL configuration;

rocketmq:
  name-server: 127.0. 01.: 9876
  producer:
    group: producer
Copy the code

Send a message;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {
    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}
Copy the code

Receive messages;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) { System.out.println(message); }}Copy the code

The Console Console

The RocketMQ extension pack provides an administrative console;

Github.com/apache/rock…

Repeat purchases

Causes:

  1. Producer redelivery;
  2. Message queue exception;
  3. Abnormal consumer consumption;

How to solve the problem of repeated consumption, in other words, how to keep message consumption idempotent.

Typically, schema implementations are based on local message tables where messages are processed and not processed.

The order message

Reasons for the message confusion:

  1. One message queue, multiple consumers consume;
  2. A queue corresponds to a consumer, but consumers are multithreaded;

To ensure sequential consumption of messages, there are three key points:

  1. Sequential message sending
  2. Message sequential storage
  3. Message order consumption

Refer to MessageQueueSelector and MessageListenerOrderly in RocketMq.

Distributed transaction

In distributed systems, a transaction consists of multiple local transactions. Here is a distributed transaction solution based on MQ.

The HA of the broker is high available and the status of the prepare message is checked periodically to ensure final consistency.