Message queue comparison reference table

RocketMQ 4.5.1 Installation Tutorial

A, download

Go to http://rocketmq.apache.org/release_notes/release-notes-4.5.1/ to download Binary files.

Two, system requirements

  • 64-bit OPERATING system (Linux, Unix, and MacOS are recommended in the production environment. For details about the Windows OS installation, see The Windows OS Installation Guide.)
  • 64-bit JDK 1.8 (Currently RocketMQ does not support JDK 11)
  • 4G+ disks available

Linux/Unix/MacOS installation tutorial

3.1 build

  • Decompression package
Unzip rocketmq - all - 4.5.1 - bin - release. ZipCopy the code
  • Switch the directory to the RocketMQ root
CD rocketmq - all - 4.5.1 - bin - releaseCopy the code
  • Start the Name Server
nohup sh bin/mqnamesrv &
Copy the code

Verify whether start OK:

Tail - f ~ / logs/rocketmqlogs/namesrv log # if successful start, log can see similar to the following:  2019-07-18 17:03:56 INFO main - The Name Server boot success. ...Copy the code
  • Start theBroker
nohup sh bin/mqbroker -n localhost:9876 &
Copy the code

Verify whether start OK:

Tail - f ~ / logs/rocketmqlogs/broker. The startup log # if successful, could see log similar to the following:  2019-07-18 17:08:41 INFO main - The broker[local, 192.168.43.197:10911] Boot Success. SerializeType =JSON and name server is localhost:9876Copy the code

3.2 (Optional) Verifying the RocketMQ Function

3.2.1 Verifying production Messages

Run the following command:

export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code

You can see output similar to the following:

SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
Copy the code
3.2.2 Verifying that consumption Messages are normal

Run the following command:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code

You can see output similar to the following:

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....
Copy the code

3.3 stop

Run the following two commands in sequence

The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK # Command sh bin/mqshutdown namesrv # The mqnamesrv(36664) is is successfully stopped running... Send shutdown request to mqnamesrv(36664) OKCopy the code

Windows operating system installation tutorial

www.jianshu.com/p/4a275e779…

Five, production available cluster construction tutorial

www.itmuch.com/books/rocke…

RocketMQ Console installation tutorial

Download the code

# a, git download, execute the following command git clone way https://github.com/apache/rocketmq-externals.git # 2, direct download, Visit the following address https://github.com/apache/rocketmq-externals/archive/master.zipCopy the code

Modify the console code

2.1 Modifying the Configuration

Find rocketmq – the console/SRC/main/resources/application. The properties according to the demand, modify the configuration

ContextPath = /console server.port=8080... # if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, You can set it in ops page. The default localhost: 9876 # Name Server address rocketmq. Config. NamesrvAddr = # if you use rocketmq Version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=falseCopy the code

The author only modified the following two items:

# console port server.port=17890 # name server address In the console navigation bar - operations - set rocketmq NameSvrAddrList column. Config. NamesrvAddr = localhost: 9876Copy the code

2.2 Modifying Dependencies

Modify pom. XML to modify RocketMQ dependent versions

find

<rocketmq.version>4.4.0</rocketmq.version>
Copy the code

Modified to

<rocketmq.version>Your version of RocketMQ</rocketmq.version>
Copy the code

I used RocketMQ 4.5.1, so I changed it

<rocketmq.version>4.5.1</rocketmq.version>
Copy the code

2.3 Modifying the Code

After modifying pom. XML, org. Apache. Rocketmq. Console. Service. Impl. MessageServiceImpl# queryMessageByTopic compiler complains, so need to solve it. will

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
Copy the code

To:

RPCHook rpcHook = null;
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
Copy the code

Can.

2.4 Packaged Build

# rocketmq-externals # rocketmq-console # build MVN clean package -DskipTestsCopy the code

Three, the lazy bag

Download address: https://github.com/eacdy/rocketmq-externals/releases

Four, start,

Java jar rocketmq - the console - ng - 1.0.1. JarCopy the code

Five, access,

accesshttp://localhost:17890(The port is specified as server.port=17890 in application. Properties above, default is 8080), you can see a screen similar to the following:

Vi. Instructions for use of the console

Github.com/eacdy/rocke…

Springboot 2.x integrates Rocketmq-spring-boot-starter

Import maven dependencies

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>The 2.0.3</version>The built-in rocketMQ version is 4.5.1. Note the corresponding match for other versions</dependency>
Copy the code

2. Yml configuration

Producer configuration:

rocketmq:
  name-server: 127.0. 01.: 9876 # RocketMQ service address
  producer:
    group: rocketmq_test # create a custom group name. Group must be specified
    send-message-timeout: 3000  Timeout period for sending messages
Copy the code

Consumer Configuration:

rocketmq:
  name-server: 127.0. 01.: 9876 # RocketMQ service address
Copy the code

Configure the producer message sending utility class:

@Slf4j
@Component
public class SpringProducer
{
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void sendMsg(String topic, String msg)
    {
        log.info("Send a packet:" + msg);
        this.rocketMQTemplate.convertAndSend(topic, msg); }}Copy the code

Configure consumers for message processing:

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}") 
// The producer group is placed in the configuration file, and the consumer group is placed here
public class SpringConsumer implements RocketMQListener<String>
{
    @Override
    public void onMessage(String msg)
    {
        log.info("Received a message :"+ msg); }}Copy the code

Note: producer code – RocketMQTemplate for Rocketmq, KafkaTemplate for Kafka, AmqpTemplate for RabbitMQ; Consumer write RoctetMQ: RoctMQMessageListener, ActiveMQ/Artemis: JmsListener, RabbitMQ: RabbitListener, Kafka: KafkaListener

RocketMQ transactional messages

All works

Three states of a transaction message:

  • Commit: Commits a transaction message that can be consumed by consumers
  • Rollback: Rollback the transaction message. The broker will delete the message and the consumer cannot consume it
  • UNKNOWN: The broker needs to check back to confirm the status of the message

coded

Producers:

@Component
public class SpringTransactionProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /** * Send message **@param topic
     * @param msg
     */
    public void sendMsg(String topic, String msg) {
        Message message = MessageBuilder.withPayload(msg).
        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
        build();
        / / and @ RocketMQTransactionListener myTransactionGroup to (txProducerGroup = "myTransactionGroup") has the same definition
        this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",topic,message, null);
        System.out.println("Message sent successfully"); }}Copy the code

Consumer:

@Component
@rocketmqMessagelistener (topic = "spring-tx-my-topic", // keep the same as the producer topic)
public class SpringTxConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("Message received ->"+ msg); }}Copy the code

Local transaction monitoring:

@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup") // Align with the producer transaction group
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

    /** * Execute business logic **@param message
     * @param o
     * @return* /
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        try {
            System.out.println("Perform operation");
            Thread.sleep(500);
            // Set transaction state
            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
            // return the transaction status to the producer
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
        }

        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.ROLLBACK;

    }

    /** * return to **@param message
     * @return* /
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        System.out.println("Query message -> transId =" + transId + ", state = " + STATE_MAP.get(transId));
        returnSTATE_MAP.get(transId); }}Copy the code