The RocketMQ concept and the Java API will be introduced in RocketMQ. The RocketMQ concept and the Java API will be introduced in RocketMQ.

RocketMQ architecture

This is the RocketMQ website architecture diagram, rocketmq.apache.org/docs/rmq-ar…

The typical architecture diagram looks like this, and these important roles need to be explained.

Broker

RocketMQ’s service, or process, is called the Broker, which stores and forwards messages. RocketMQ single machines can handle around 100,000 QPS of requests and are often clustered to improve Broker performance (load balancing) and availability (preventing single points of failure).

Like a Kafka or Redis Cluster, each Broker node of a RocketMQ Cluster holds a portion of the total data, because horizontal scaling is available. To improve reliability (to prevent data loss), each Broker can have its own copy (Slave).

Topic

Topic is used to group messages by Topic, such as order messages, payment messages, and people messages. Note that in RocketMQ, unlike Kafka, Topic is a logical concept and messages are not stored by Topic.

Producers send messages to specific topics, and consumers subscribe to the Topic and receive the messages. A Topic has a many-to-many relationship with both producers and consumers. A producer can send messages to multiple topics, and a consumer can subscribe to multiple topics.

NameServer

In the early version of RocketMQ (2.x), there was no NamesRV component, and ZooKeeper was used for distributed coordination and service discovery. However, ali Data improved and optimized its own lightweight NamesRV based on actual business requirements. Namesrv is used to register the request routing work of Client service and Broker. Namesrv does not store any message location. Frequent operation of ZooKeeper location storage data affects the overall cluster performance.

To ensure high availability, NameServer itself can be deployed in clusters without any synchronization between nodes.

Producer

Producers with the same Producer Group form a cluster, establish a long connection with one node (randomly selected) in the Name Server cluster, and get Topic routing information from the Name Server regularly. Establish a long connection to the Master providing Topic services, and periodically send heartbeat to the Master. Producer is stateless and can be deployed in a cluster.

RocketMQ producers support bulk delivery

Consumer

Consumers are instances of receiving messages for consumption. Consumers with the same Consumer Group form a cluster, establish a long connection with one of the nodes in the Name Server cluster (randomly selected), and regularly fetch Topic routing information from the Name Server. A long connection is established for the Master and Slave that provide Topic services, and the heartbeat is periodically sent to the Master and Slave. Consumers can subscribe to messages from either Master or Slave, and the subscription rules are determined by the Broker configuration.

Consumers have two types of consumption: cluster consumption (message polling) and broadcast consumption (all receive the same copy).

From the perspective of consumption model, one is pull, the other is push, passive reception. But actually RocketMQ is all pull mode, but push encapsulates the pull mode. PushConsumer registers a MessageListener listener, and when it gets the message, Wake up a MessageListener’s ConsumeMessage() to consume it, and for the user it feels like the message is being pushed. RocketMQ is based on long rounds to pull messages.

Message Queue

You know that messages sent to a Topic are distributed across different brokers. In Kafka, a partition is designed so that a Topic can be split into multiple partitions. These partitions can be distributed across different brokers, which allows data to be sharded and allows Kafka to scale horizontally.

There is only one stored file in RocketMQ, and it is not stored separately by Topic like Kafka. So it creates the logical concept of a Message Queue, similar to a partition.

First, when we create a Topic, we specify the number of queues, one called writeQueueNums (number of write queues) and one called readQueueNums (number of read queues). The number of write queues determines how many Message queues there are. The number of read queues determines how many threads consume these Message queues (just for load). Perm stands for queue permission, 2 for W, 4 for R, and 6 for RW.

This is what we specify when we create a topic. What if we create a topic automatically by code with the default number of Message queues?

 // The server creates a Topic with 8 queues by default, in the BrokerConfig class
 private int defaultTopicQueueNums = 8;
 // Topic does not exist. The default four queues are created when producers send messages, in the DefaultMQProducer class
 private volatile int defaultTopicQueueNums = 4;
 // Finally the server is created with a judgment, take a smaller value, in the MQClientInstance class
 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
Copy the code

The final result should be 4, let’s find a topic created by code, it is 4.

The number of write queues and the number of read queues must be equalIf they are not equal in clustered mode, say writeQueueNums=6 and readQueueNums=3, then there will be 3 queues of messages per broker that cannot be consumed.

If the number of consumers is larger than readQueueNumbs, then some consumers will not consume the message, wasting resources.

Java API

Now to start using the API tutorial, the official website provides the Java client API, just need to introduce the POM dependencies.

< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.7.1 < / version > </dependency>Copy the code

Write a Producer class.

/ * * *@author jackxu
 */
public class Producer {

    public static void main(String[] args) throws MQClientException {
        // Producer group
        DefaultMQProducer producer = new DefaultMQProducer("jackxu_producer_group");
        // Producers need to obtain routing information for all brokers via NameServer, separated by semicolons, like Redis sentinels
        producer.setNamesrvAddr("39.103.144.86:9876; 42.192.77.73:9876");
        / / start
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                /*Message(String topic, String tags, String keys, byte[] body) Message represents a Message. The first argument is topic, which is the topic. The second argument is tags, which is optional. The third parameter used to filter messages on the consumer side is keys, which is also optional. If there are more than one, separate them with Spaces. RocketMQ can quickly retrieve messages based on these keys, which act as an index of the message and can be set to a unique message number (primary key). * /
                Message msg = new Message("jackxu_test_topic"."TagA"."6666", ("RocketMQ Test message " + i).getBytes());
                //SendResult is the encapsulation of the sent result, including the message status, message ID, selected queue, etc., as long as no exception is thrown, it means that the sent successfully
                SendResult sendResult = producer.send(msg);
                System.out.println("The first" + i + "Send result:" + sendResult);
            } catch(Exception e) { e.printStackTrace(); } } producer.shutdown(); }}Copy the code

In SendResult, there is a SendStatus state that indicates the sending status of the message. There are four states.

  1. FLUSH_DISK_TIMEOUT: indicates that flushing is not completed within the specified time. (This error is reported only when SYNC_FLUSH is configured on the Broker’s flush policy.)
  2. FLUSH_SLAVE_TIMEOUT: indicates that in master/master mode, the Broker is set to SYNC_MASTER mode and the synchronization between master and slave is not completed within the specified time.
  3. SLAVE_NOT_AVAILABLE: This state is similar to FLUSH_SLAVE_TIMEOUT. In flush_slave mode, the Broker is set to SYNC_MASTER, but no Broker configured as Slave is found.
  4. SEND_OK: indicates that the message is sent successfully

Write another SimpleConsumer class, SimpleConsumer:

/ * * *@author jackxu
 */
public class SimpleConsumer {

    public static void main(String[] args) throws MQClientException {
        // Consumer group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jackxu_consumer_group");
        // The consumer gets the Broker address of the topic queue from NameServer, separated by a semicolon
        consumer.setNamesrvAddr("39.103.144.86:9876; 42.192.77.73:9876");
        // Set Consumer to start consumption from the head of the queue
        // If it is not the first time, continue to consume at the same place as the last consumption
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Subscribe the first argument is topic, the second argument is the tags that the producer sent, * means match all messages,
        / / want to receive the specific news | | is separated, such as "TagA | | TagB | | TagD." "
        consumer.subscribe("jackxu_test_topic"."*");
        //Consumer can be started in two modes: Broadcast and Cluster. In Broadcast mode, a message is sent to all consumers.
        // In clustered mode, messages are sent to only one Consumer
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // Batch consumption, 10 at a time
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // MSGS is a List, and usually Consumer starts first, so it's one data at a time
                / / if the Producer to start the first start after the Consumer end, accumulate data, setConsumeMessageBatchMaxSize will take effect right now,
                // MSGS data is ten
                StringBuilder sb = new StringBuilder();
                sb.append("MSGS number:" + msgs.size());
                MessageExt messageExt = msgs.get(0);
                // The message was resent three times
                if (messageExt.getReconsumeTimes() == 3) {
                    // Todo persistent message record table
                    // Try again for three times
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                for (MessageExt msg : msgs) {
                    try {
                        String topic = msg.getTopic();
                        String messageBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        // Todo business logic processing
                        sb.append("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
                    } catch (Exception e) {
                        e.printStackTrace();
                        // Re-consume
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                System.out.println(sb.toString());
                // This tells the broker that the purchase has been successful and the offset is ready to be updated.
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code

The functions of each line of code are written in the comments, partners to watch carefully oh. Now to test it, start the consumer, then start the producer.

These are the ten pieces of data that the producer sent when it started.

MsgId: unique producer-generated number, globally unique, also called uniqId.

OffsetMsgId: Message offset ID. This ID records the physical address of the cluster where the message is stored, including the address of the Broker server (IP and port number) and the physical offset of the Commitlog file.

The message can be queried on the console via msgId.You can also find it by KeyOn the consumer side, ten data points have been successfully consumed

Rocketmq \ Example: RocketMQ \ Example: rocketMQ \ Example Address: github.com/xuhaoj/rock…

The packages in Rocketmq \example do the following:

package role
batch Batch messages, sent with a List
broadcast Broadcast messages, setMessageModel (MessageModel. -)
delay Delayed message, msg.setdelayTimelevel (3)
filter Filter based on tag or SQL expression
ordermessage The order message
quickstart An introduction to
rpc Implementing RPC calls
simple ACL, asynchronous, Assign, subscribe
tracemessage Message tracking
transaction Transaction message

Spring integration of the Boot

The Srping Boot provides a simpler configuration and operation mode, and is very comfortable, clean, and simple to use. First, the RocketMQ starter dependency is introduced.

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> The < version > 2.0.4 < / version > < / dependency >Copy the code

The client configuration is then written directly to application.properties

server.port=9096
spring.application.name=springboot-rocketmq-demo
rocketmq.name-server=39.103.144.86:9876; 42.192.77.73:9876
rocketmq.producer.group=jackxu-springboot-rocketmq-group
rocketmq.producer.send-message-timeout=3000
Copy the code

Create a Consumer class with the @RocketmqMessagelistener annotation to listen for messages

/ * * *@author jackxu
 */
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "springboot-consumer-group", selectorExpression = "tag1", selectorType = SelectorType.TAG, messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            System.out.println("Rocketmq message received :" + message);
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

The configuration inside the annotations is the same as in the Java API, and I believe you can understand it.

MessageModel has two options, BROADCASTING, which represents all consumers consuming the same message, and CLUSTERING, which represents multiple consumers polling the consumption message (the default).

ConsumeMode also has two options, CONCURRENTLY consumption at the consumer (the default). Message order is not guaranteed. How many threads CONCURRENTLY consume depends on the size of the thread pool.

Sequential consumption requires locking the queue to be processed, ensuring that only one consuming thread is allowed to process on the same queue at a time. It is obvious that concurrent consumption is more efficient.

Create a producer class MessageSender. The producer code is much simpler and just needs to inject RocketMQTemplate to send messages.

/ * * *@author jackxu
 */
@Component
public class MessageSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void syncSend(a) {
        /** * Send a reliable synchronous message, which can retrieve the SendResult return data * Synchronous send means that after a message has been sent, the next packet will be sent only after receiving a response from MQ. * Argument 1: topic:tag * Argument 2: message body can be an object * Argument 3: timeout milliseconds */
        SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag"."This is a synchronous message.".10000);
        System.out.println(result);
    }


    public void asyncSend(a) throws Exception {
        /** * Send a reliable asynchronous message * After sending a message, the next packet is sent without waiting for mq to respond. The sender can set the callback interface to receive the response from the server and process the response result. Parameter 1: topic:tag * Parameter 2: message body can be an object * Parameter 3: callback object */
        rocketMQTemplate.asyncSend("springboot-topic:tag1"."This is an asynchronous message".new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Callback sendResult." + sendResult);
            }

            @Override
            public void onException(Throwable e) { System.out.println(e.getMessage()); }}); TimeUnit.SECONDS.sleep(100000);
    }


    public void sendOneWay(a) {
        /** * Sends a one-way message, which is only responsible for sending messages without waiting for a response from the server and without triggering a callback function. * The process of sending messages in this way is very short, usually in the microsecond level. Application scenario: This mode is applicable to scenarios that require very short time consuming but have low reliability requirements, such as log collection. * Argument 1: topic:tag * Argument 2: the message body can be an object */
        rocketMQTemplate.sendOneWay("springboot-topic:tag1"."This is a one-way message.");
    }


    public void sendOneWayOrderly(a) {
        Parameter 1: topic:tag parameter 2: The message body can be an object */
        rocketMQTemplate.sendOneWayOrderly("springboot-topic:tag1"."This is a sequential message."."8888"); }}Copy the code

There are three types in total, and their use and use are described in the comments. The options are as follows:

  • When the sent message is not important, the one-way mode is adopted to improve throughput.The most efficient
  • Sync is used when the sent message is important and insensitive to response time
  • Async is used when the message being sent is important and sensitive to response time

Write a test class to test

/ * * *@author jackxu
 */
@SpringBootTest
class SpringbootRocketmqApplicationTests {

    @Autowired
    private MessageSender sender;

    @Test
    public void syncSendTest(a) {
        sender.syncSend();
    }


    @Test
    public void asyncSendTest(a) throws Exception {
        sender.asyncSend();
    }


    @Test
    public void sendOneWayTest(a) {
        sender.sendOneWay();
    }


    @Test
    public void sendOneWayOrderlyTest(a) { sender.sendOneWayOrderly(); }}Copy the code

Send sendOneWayTest asynchronously. Execute sendOneWayTest and check the result.

Check the console. There’s this message, tooThe consuming end also consumes successfully, the test completes.

The source code has been uploaded to github.com/xuhaoj/spri… , interested partners can download down to watch.

conclusion

Finally recommended an ebook RocketMQ actual combat and principle of analytical as extracurricular reading, download links in: pan.baidu.com/s/1Ah1Gm3CX… Extraction code: Jack, original is not easy, think write good please click a like.