Series of articles (under update)

  • Source code analysis series
    • SpringBoot series -FatJar technology analysis
    • SpringBoot series – Boot process parsing
    • SpringBoot series – Event mechanism parsing
    • SpringBoot series – Life cycle and extension of beans
    • SpringBoot series – Logging framework parsing
    • SpringBoot series – Resource access parsing
    • SpringBoot Series – Embedded Web container resolution
    • SpringBoot series – Configuration parsing
    • SpringBoot series – Automatic configuration and starter mechanism analysis
  • Practice series
    • SpringBoot Practices – Integration with RocketMQ
    • SpringBoot series -Kafka introduction & integrated SpringBoot
    • SpringBoot Practices – Externalizing configuration priorities

RocketMQ starts fast

RocketMQ Overview: Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-scale capacity, and flexible scalability. It provides a variety of functions, specific reference: github.com/apache/rock… .

RocketMQ installation requires the following conditions as mentioned in the official guide quick Start:

  • 64-bit OS: Linux, Unix, and Mac are recommended
  • 64 – bit JDK 1.8 +
  • Maven 3.2.x
  • 4g+ Free disk for Broker Server

Download, install and compile

Unzip wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip Rocketmq - all - 4.7.0 - source - the zipcdRocketmq -all-4.7.0/ MVN -Prelease- all-dskiptests clean install -ucdDistribution/target/rocketmq - 4.7.0 / rocketmq - 4.7.0Copy the code

1. Start Name Server

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
Copy the code

Start the Broker

> nohup sh bin/mqbroker -n localhost:9876 &
# nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f~ / logs/rocketmqlogs/broker. Log The broker [172.30.30.233% s: 10911] The boot success...Copy the code

AutoCreateTopicEnable: When using RocketMQ to send messages, you must specify topic. For topic Settings, there is a switch autoCreateTopicEnable. The default setting autoCreateTopicEnable = true is used in development test environments. However, as a result, topic Settings are not easily managed, there is no uniform auditing, etc., so in a formal environment the parameter autoCreateTopicEnable = false is set at Broker startup. When a topic needs to be added, it can be added on the Web management interface or through admin Tools

SpringBoot integration

RocketMQ currently does not provide a starter to integrate SpringBoot, so access is now programmed by introducing a client. Here’s a look at SpringBoot’s integration with RocketMQ.

Introduce RocketMQ client dependencies

The latest version of github is 4.7.0. Here is the latest version:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>
Copy the code

Provides automatic configuration classes for producers

/ * * *@author: guolei.sgl ([email protected]) 2020/4/5 5:17 PM
 * @since: * * /
@Configuration
public class MQProducerConfiguration {

    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);

    @Value("${rocketmq.producer.groupName}")
    private String             groupName;

    @Value("${rocketmq.producer.namesrvAddr}")
    private String             namesrvAddr;

    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer            maxMessageSize;

    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer            sendMsgTimeout;

    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer            retryTimesWhenSendFailed;

    @Bean
    @ConditionalOnMissingBean
    public DefaultMQProducer defaultMQProducer(a) throws RuntimeException {
        DefaultMQProducer producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        // If different producers in the same JVM need to send messages to different MQ clusters, different instanceName needs to be set
        //producer.setInstanceName(instanceName);
        // Maximum limit if messages are sent
        producer.setMaxMessageSize(this.maxMessageSize);
        // Send message timeout
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        // If the message fails to be sent, set the number of retries. The default value is 2
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        try {
            producer.start();
            LOGGER.info("producer is started. groupName:{}, namesrvAddr: {}", groupName, namesrvAddr);
        } catch (MQClientException e) {
            LOGGER.error("failed to start producer.", e);
            throw new RuntimeException(e);
        }
        returnproducer; }}Copy the code
  • GroupName: The Settings for sending the same type of messages are set to the same group, which is guaranteed to be unique. This is not required by default. Rocketmq uses ip@pid(pid stands for JVM name) as a unique identifier.
  • NamesrvAddr: Name Server ADDRESS
  • MaxMessageSize: Maximum limit for messages, 4M by default
  • SendMsgTimeout: timeout time for sending messages. The default value is 3 seconds
  • RetryTimesWhenSendFailed: Retry times of message sending failures. The default value is two

Provides automatic configuration classes for consumers

@Configuration
public class MQConsumerConfiguration {
    public static final Logger  LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String                        namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String                        groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int                           consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int                           consumeThreadMax;
    // Subscribe to the topic specified
    @Value("${rocketmq.consumer.topics}")
    private String                        topics;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int                           consumeMessageBatchMaxSize;

    @Autowired
    private MessageListenerHandler mqMessageListenerProcessor;

    @Bean
    @ConditionalOnMissingBean
    public DefaultMQPushConsumer defaultMQPushConsumer(a) throws RuntimeException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);

        // Set whether consumer starts to consume at the head of the queue or at the end of the queue
        // If it is not the first time, continue to consume at the same place as the last consumption
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Set the consumption model, cluster or broadcast, cluster by default
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // Set the number of consumption messages at a time. The default value is 1
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

        try {
            // Set the theme and tag to which the consumer subscribes. If you want to subscribe to all tags under the theme, use *;
            consumer.subscribe(topics, "*");
            // Start consumption
            consumer.start();
            LOGGER.info("consumer is started. groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr);

        } catch (Exception e) {
            LOGGER.error("failed to start consumer . groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr,e);
            throw new RuntimeException(e);
        }
        returnconsumer; }}Copy the code

Refer to the producer section above for parameters. One listener configuration just start here consumption, specific consumption need to implement a MessageListenerConcurrently interface.

/ * * *@author: guolei.sgl ([email protected]) 2020/4/5 5:21 PM
 * @since: * * /
@Component
public class MessageListenerHandler implements MessageListenerConcurrently {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageListenerHandler.class);
    private static String TOPIC = "DemoTopic";

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
        if (CollectionUtils.isEmpty(msgs)) {
            LOGGER.info("receive blank msgs...");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgs.get(0);
        String msg = new String(messageExt.getBody());
        if (messageExt.getTopic().equals(TOPIC)) {
            // Mock consumption logic
            mockConsume(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private void mockConsume(String msg){
        LOGGER.info("receive msg: {}.", msg); }}Copy the code

Use the client to send messages

The logic of using the client to send messages is relatively simple. It is to get the DefaultMQProducer object and call send, which supports synchronous, asynchronous and Oneway calls.

@RestController
public class TestController {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);

    private static String TOPIC = "DemoTopic";
    private static String TAGS = "glmapperTags";

    @Autowired
    private DefaultMQProducer defaultMQProducer;

    @RequestMapping("send")
    public String test(a) throws Throwable {
        Message msg = new Message(TOPIC, TAGS, ("Say Hello RocketMQ to Glmapper").getBytes(RemotingHelper.DEFAULT_CHARSET));
        // Call the client to send the message
        SendResult sendResult = defaultMQProducer.send(msg);
        LOGGER.info("sendResult: {}.",sendResult);
        return "SUCCESS"; }}Copy the code

test

The test application here puts the production side and the consumer side together, so the configuration is as follows:

Spring. The application. The name = test - rocket for server port = 8008 # producer rocketmq. Producer. IsOnOff = # on whether the application to enable producers rocketmq.producer.groupName=${spring.application.name} rocketmq.producer.namesrvAddr=sofa.cloud.alipay.net:9876 rocketmq.producer.maxMessageSize=4096 rocketmq.producer.sendMsgTimeout=3000 rocketmq.producer.retryTimesWhenSendFailed=2 # consumer rocketmq. Consumer. Rocketmq isOnOff = # on whether the application to enable consumers. Consumer. GroupName = ${spring. Application. The name} rocketmq.consumer.namesrvAddr=sofa.cloud.alipay.net:9876 rocketmq.consumer.topics=DemoTopic rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64 rocketmq.consumer.consumeMessageBatchMaxSize=1Copy the code

Start the program and view the log output:

The 2020-04-05 22:53:15. 46817-141 the INFO [main] C.G.B.B.C.M QProducerConfiguration: producer is started. The groupName:test-rocket, namesrvAddr: Sofa.cloud.alipay.net: 9876 2020-04-05 22:53:15. 46817-577 the INFO [main] C.G.B.B.C.M QConsumerConfiguration: consumer is started. groupName:test-rocket, topics:DemoTopic, namesrvAddr:sofa.cloud.alipay.net:9876
Copy the code

As you can see here, producer and consumer auto configuration is in effect and started. Curl localhost:8008/send

The 22:54:21 2020-04-05. 46817-654 the INFO [nio - 8008 - exec - 1] C.G.B.B oot. Controller. TestController: sendResult: SendResult [sendStatus=SEND_OK, msgId=1E0FC3A2B6E118B4AAC21983B3C50000, offsetMsgId=64583D7C00002A9F0000000000011788, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=6], QueueOffset = 50]. 2020-04-05 22:54:21. 46817-658 the INFO/MessageThread_1 C.G.B.B.P.M essageListenerHandler: receive msg: Say Hello RocketMQ to Glmapper.Copy the code

See the logs for sending and receiving messages.

Use hooks to intercept messages

RocKetMQ provides two hook interfaces: SendMessageHook and ConsumeMessageHook interfaces can be used to intercept messages before, after, before and after message consumption. There is no description about this part in the official documentation, so here we look at how to use these two hook interfaces to do some things.

SendMessageHook

Define a ProducerTestHook as follows:

public class ProducerTestHook implements SendMessageHook {

    public static final Logger LOGGER = LoggerFactory.getLogger(ProducerTestHook.class);

    @Override
    public String hookName(a) {
        return ProducerTestHook.class.getName();
    }

    @Override
    public void sendMessageBefore(SendMessageContext sendMessageContext) {
        LOGGER.info("execute sendMessageBefore. sendMessageContext:{}", sendMessageContext);
    }

    @Override
    public void sendMessageAfter(SendMessageContext sendMessageContext) {
        LOGGER.info("execute sendMessageAfter. sendMessageContext:{}", sendMessageContext); }}Copy the code

In the producer auto-configuration class above, register the ProducerTestHook with the producer.

/ / register SendMessageHook
producer.getDefaultMQProducerImpl().registerSendMessageHook(new ProducerTestHook());
Copy the code

ConsumeMessageHook

Create a custom ConsumerTestHook with the following code:

public class ConsumerTestHook implements ConsumeMessageHook {

    public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTestHook.class);

    @Override
    public String hookName(a) {
        return ConsumerTestHook.class.getName();
    }

    @Override
    public void consumeMessageBefore(ConsumeMessageContext consumeMessageContext) {
        LOGGER.info("execute consumeMessageBefore. consumeMessageContext: {}",consumeMessageContext);
    }

    @Override
    public void consumeMessageAfter(ConsumeMessageContext consumeMessageContext) {
        LOGGER.info("execute consumeMessageAfter. consumeMessageContext: {}",consumeMessageContext); }}Copy the code

In the consumer auto-configuration class above, register the ConsumerTestHook with the Consumer

/ / register ConsumeMessageHook
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumerTestHook());
Copy the code

The result is as follows:

execute sendMessageBefore. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
execute sendMessageAfter. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
sendResult: SendResult [sendStatus=SEND_OK, msgId=0A0FE8F8C02F18B4AAC21C1275FB0000, offsetMsgId=64583D7C00002A9F0000000000011850, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=5], queueOffset=50].
execute consumeMessageBefore. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
receive msg: Say Hello RocketMQ to Glmapper.
execute consumeMessageAfter. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
Copy the code

Some of the problems I encountered

Several problems encountered during the integration process are recorded as follows:

1. The Broker failed to start.

When I started Boker after the Name Server was started, THE SSH connection would tell me connect Conversation Fail. Through the dmesg | egrep – I – B100 ‘killed the process’ view the process is the record of the kill, get the following logs:

[2257026.030741] Cgroup out of Memory: Kill Process 110719 (systemd) score 0 or Sacrifice child [2257026.031888] Kill Process 100735 (sh) total-VM :15708kB, Anon-rss :176kB, file-RSS :1800kB, shmem- RSS :0kB [2257026.133506] Memory cgroup out of Memory: Kill Process 110719 (Systemd) Score 0 or Sacrifice child [2257026.133539] Kill Process 100745 (VSAR) Total-vm :172560kB, anon-RSS :22936kB, file-RSS :1360kB, shmem- RSS :0kB [2257026.206872] Memory cgroup out of Memory: Kill process 104617 (Java) score 3 or Sacrifice child [2257026.207742] Kill Process 104617 (Java) total-VM :9092924kB, anon-rss:4188528kB, file-rss:496kB, shmem-rss:0kBCopy the code

The result seen here is that OOM has occurred, which is caused by not allocating enough space during startup (default configuration file initial memory setting is too large). The solution: After entering to the compilation of distribution/target/apache – rocketmq/bin directory, find runbroker. Sh and runserver then executes sh two script files, These two scripts understand that the default parameters specified at startup are very large (4G /8g/ 2G). The total of my offline test machine is only 1C2G, so I adjusted the parameters appropriately:

  • runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
Copy the code
  • runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
Copy the code

After modification, restart namesrv and broker

$ jps
98633 Jps
55689 BrokerStartup
54906 NamesrvStartup
Copy the code

2, No Topic Route Info, XXX

This is mentioned in the official FAQ, indicating that the frequency of encounters must be very high. The official plan can rocketmq.apache.org/docs/faq/ article 4 steps here. If you can’t find this topic, create it on a broker via admin tools command updateTopic or Web console. This solves:

sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t DemoTopic # Execute this command to create DemoTopic
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create topic to localhost:10911 success.
TopicConfig [topicName=DemoTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
Copy the code

conclusion

I looked at some of the RocketMQ code while working on the SOFATracer integrated messaging component, but there were a few twists and turns in the actual operation. Overall, SpringBoot integration with RocketMQ is relatively straightforward, as noted here. If there is a description of the wrong place, also ask you to comment correct.

Reference documentation

  • Rocketmq.apache.org/docs/quick-…
  • Blog.csdn.net/ph3636/arti…