ActiveMQ

01 is introduced

ActiveMQ is a message-oriented product implemented entirely based on the JMS specification. Apache is a messaging middleware developed by Apache Open Source Foundation. ActiveMQ is mainly applied in distributed system architecture to help build a high availability, high performance and scalable enterprise-level message-oriented service system.

02 What is JMS?

Java Message Service (Java Message Service) is an API for message-oriented middleware in the Java platform. It is used to send messages and communicate asynchronously between two applications or in distributed systems. JMS is a platform-neutral API that is supported by the vast majority of MOM (Message Oriented Middleware) providers. ActiveMQ, for example, is one such implementation.

03 What is MOM?

MOM is message-oriented middleware that uses a messaging provider to coordinate messaging operations. Moms need to provide apis and management tools. The client uses API calls to send messages to destinations managed by the provider. After sending the message, the client continues to do other work, and the provider retains the message until the recipient receives confirmation of the message.

04 the JMS specification

We already know that the purpose of the JMS specification is to enable Java applications to access existing MOM (message-oriented middleware) systems, creating a unified standard specification that addresses collaboration between different message-oriented middleware. When creating the JMS specification, the designers wanted to incorporate the essence of existing messaging, for example

  1. Different messaging modes or domains, such as point-to-point messaging and publish-subscribe messaging

  2. Provides tools for receiving synchronous and asynchronous messages

  3. Support for reliable messaging

  4. Common message formats, such as stream, text, and byte

05 JMS object model

1) Connection factory. The ConnectionFactory is created by the administrator and bound into the JNDI tree. The client uses JNDI to look up the connection factory and then creates a JMS connection using the connection factory.

2) JMS connection. A JMS Connection represents an active Connection between a JMS client and a server. It is established by the client by calling a Connection factory method.

3) JMS session. A JMS Session represents the state of the Session between a JMS client and a JMS server. A JMS session is established over a JMS connection and represents a session thread between a client and a server.

4) JMS purpose. JMS destinations, also known as message queues, are the actual message sources.

5) JMS producers and consumers. Message Producer and Message Consumer objects are created by the Session object and are used to send and receive messages.

6) JMS messages generally come in two types:

① Point-to-point. In a point-to-point messaging system, messages are distributed to a single consumer. Point-to-point messages are often associated with queues (javax.jms.queue).

② Publish/Subscribe. Publish/subscribe messaging systems support an event-driven model in which message producers and consumers participate in the delivery of messages. Producers publish events, and consumers subscribe to events of interest and consume them. This type of message is typically associated with a specific Topic (javax.jms.topic).

06 ActiveMQ installation

Windows installation

Download address: activemq.apache.org/activemq-51…

After downloading, decompress and enter the bin directory to run activemq.bat.

If you encounter any of the following problems, port 5672 is occupied

You can modify the Activemq. XML in the Conf directory of activemQ and change the port of AMQP to another one, which is 5673 here

Start again:

Access the address: http://127.0.0.1:8161/admin/ into the background the initial account password admin admin page

Docker ActiveMQ installation

docker run -d –name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq

07 ActiveMQ Quick start

Springboot integration ActiveMQ

Import dependence

<dependencies> <! --Springboot--> <dependency> <groupId>org.springframework.boot</groupId> < artifactId > spring - the boot - starter - web < / artifactId > < version > 2.3.0. RELEASE < / version > < / dependency > <! --ActiveMq--> <dependency> <groupId>org.springframework.boot</groupId> < artifactId > spring - the boot - starter - activemq < / artifactId > < version > 1.5.0. RELEASE < / version > < / dependency > <! -- Message queue connection pool --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId> Activemq -pool</artifactId> The < version > 5.15.0 < / version > < / dependency > < / dependencies >Copy the code

Configuring MQ

Server: port: 8080 Spring: ActivemQ: broker-URL: TCP ://127.0.0.1:61616 user: admin password: admin close-timeout: 15s # Time to wait before considering end in-memory: true # Whether the default proxy URL should be in memory. If an explicit proxy is specified, this value is ignored. Non-blocking -redelivery: false # Whether to stop message delivery before rolling back the rollback message. This means that message order is not preserved when this command is enabled. Send-timeout: 0 # Indicates the time to wait for a response message to be sent. Set to 0 to wait forever. Queue-name: active. Queue topic-name: active. Topic.name. Model # packages: # trust-all: true Idle-timeout: 30,000 idle-timeout: 30 seconds # JMS: # pub-sub-domain: Activemq provides queue mode by default, To use the topic need to configure the configuration below # trust all packages # spring. The activemq. Packages. Trust - all = # to trust a comma-separated list of specific packages (when distrust all packages) # spring. Activemq. Packages. Trusted = # is blocking the connection request and the pool when it is full. Setting false throws "JMSException". # spring. Activemq. Pool. Block - if - full = true # if the pool is still full, the blocking time before throwing an exception. # spring. Activemq. Pool. Block - if - full - timeout = 1 # whether ms create connection at boot time. Can be used to heat the tank at startup. # spring. Activemq. Pool. The create - connection - on - startup = true whether # Pooledconnectionfactory instead of ordinary ConnectionFactory. # spring. Activemq. Pool. Enabled = false # connection expiration timeout. # spring. Activemq. Pool. Expiry - timeout = 0 # # connection idle timeout ms spring. The activemq. Pool. The idle timeout = 30 - # s connection pool maximum number of connections # Spring.activemq.pool. max-connections=1 # Maximum number of valid sessions per connection. # spring. Activemq. Pool. The maximum - active - session - per - connection = 500 # when there are "JMSException" to try to reconnect # spring. Activemq. Pool. Reconnect - on - exception = true # in the free connection clearance between threads run time. When negative, no idle connection expulsion thread runs. # spring. Activemq. Pool. The time between - expiration - check = # 1 ms whether using only a MessageProducer #spring.activemq.pool.use-anonymous-producers=trueCopy the code

Writing configuration classes

/** * @author original * @date 2020/12/16 * @since 1.0 **/ @configuration public class BeanConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Value("${spring.activemq.user}") private String username; @Value("${spring.activemq.topic-name}") private String password; @Value("${spring.activemq.queue-name}") private String queueName; @Value("${spring.activemq.topic-name}") private String topicName; @Bean(name = "queue") public Queue queue() { return new ActiveMQQueue(queueName); } @Bean(name = "topic") public Topic topic() { return new ActiveMQTopic(topicName); } @Bean public ConnectionFactory connectionFactory(){ return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } /** * In Queue mode, Listening for messages requires the configuration of containerFactory * @param connectionFactory * @return */ @bean ("queueListener") public JmsListenerContainerFactory<? > queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new  SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } /** * In Topic mode, Listening for messages requires the configuration of containerFactory * @param connectionFactory * @return */ @bean ("topicListener") public JmsListenerContainerFactory<? > topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new  SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; }}Copy the code

Writing a startup class

/** * @author original * @date 2020/12/8 * @since 1.0 **/ @springbootapplication @enablejms // EnableJms support public class DemoApplication { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @postconstruct public void sendMsg(){/* PostConstruct public void sendMsg(){ jmsMessagingTemplate.convertAndSend(queue,"queue-test"); jmsMessagingTemplate.convertAndSend(topic,"topic-test"); }}Copy the code

View the ActivemQ background

Active-queue indicates the name of the queue

The Number Of Pending Messages is 3 because I sent them 3 times

Messages Enqueued Number of Messages that have been queued

Messages remain unconsumed because there are no consumers. Now let’s write the consumer code.

/** * @author original * @date 2020/12/16 * @since 1.0 **/ @component public class QueueConsumerListener { @JmsListener(destination = "${spring.activemq.queue-name}",containerFactory = "queueListener") public void GetQueue (String message){system.out.println (" accept queue:"+message); } @JmsListener(destination = "${spring.activemq.topic-name}",containerFactory = "topicListener") public void GetTopic (String message){system.out.println (" accept topic:"+message); getTopic(String message){system.out.println (" accept topic:"+message); }}Copy the code

Send a message in the background

Console printing

Sending topic messages

Console printing:

But the problem is that before there was no consumption, there were three queues and one topic, but when I started the consumer, three messages from the queue were consumed, and no topic was consumed. This is because:

The Topic pattern has regular subscriptions and persistent subscriptions

Regular subscription: a message sent before the consumer launches, after which the consumer will not consume;

Persistent subscriptions: messages sent before the consumer launches, and then the consumer launches to consume;

08 Principle analysis of ActiveMQ

Synchronous message sending and asynchronous message sending

ActiveMQ supports both synchronous and asynchronous sending modes to send messages to the broker. During synchronous sending, a sender sending a message blocks until the broker sends back an acknowledgement that the message has been processed. This mechanism ensures message security. However, because it is a blocking operation, the performance of the message sent by the client is affected. In the asynchronous sending process, the sender does not need to wait for the feedback from the broker, so the performance is relatively high. However, messages can be lost. The premise of using asynchronous sending is that data loss is allowed under certain circumstances. By default, nonpersistent messages are sent asynchronously, persistent messages are sent synchronously and in nontransactional mode. However, when transactions are enabled, messages are sent asynchronously. The efficiency of asynchronous sending is higher than that of synchronous sending. So try to open the transaction session when sending persistent messages.

Mechanism of Message Sending

The meaning of ProducerWindowSize

Each time producer sends a message, the producer counts the number of bytes sent. When the number of bytes reaches the ProducerWindowSize value, the producer waits for the broker to confirm the message before continuing to send.

The code in line 1957 of ActiveMQSession is mainly used to restrict the size of messages that the producer side allows to be backlogged (unack) when sending asynchronously, and is only meaningful for sending asynchronously. Each time a message is sent, the size of memoryUsage increases (+message.size) and decreases when the broker returns producerAck (Producerack.size, which represents the size of the previously sent message).

This can be set in two ways: ø Set in brokerUrl: “TCP ://localhost:61616? Jms.producerwindowsize =1048576”, this setting will apply to all producers. ø Set destinationUri to “test-queue? WindowSize =1048576”, this parameter invalidates only producer using this Destination instance and overrides the producerWindowSize value in brokerUrl.

Note: The larger this value is, the more memory the Client consumes.

Source code analysis

ActiveMQMessageProducer.send(…) methods

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed(); If (destination == null) {queue or topic if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } // Encapsulate Destination ActiveMQDestination Dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } // Encapsulate Message if (transformer! = null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage ! = null) { message = transformedMessage; }} // If producerWindow is set, check the size of producerWindow if (producerWindow! = null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); }} // send a message this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); Stats.onmessage (); }Copy the code

Send method of ActiveMQSession

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, Throws JMSException {// Check connection checkClosed(); If (destination.istemtempo () &&tempo (destination)) {throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } // Mutex. If multiple producers of a session send messages to this location, Synchronized (sendMutex) {// tell the Broker we are about to start a new transaction doStartTransaction(); TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, See 1.1 spec section 3.4.11 message. SetJMSDeliveryMode (deliveryMode); // Set whether to persist long expiration = 0L; if (! producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration); // Message expiration time message.setjmspriority (priority); / / message priority message. SetJMSRedelivered (false); Transform to our own message format here ActiveMQMessage MSG = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination); // set the MessageId msg.setmessageid (new MessageId(producer.getproducerinfo ().getproducerid (), sequenceNumber)); // Set the message id. if (msg ! SetJMSMessageID (msg.getMessageId().toString())); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend(); Msg.setproducerid (msg.getMessageId().getproducerId ())); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } // If onComplete is not set, send timeout is less than 0, the message does not need feedback, the connector is not synchronous send mode, the message is non-persistent, the connector is asynchronous send mode, or there is a transaction ID, send asynchronously, If (onComplete== NULL && sendTimeout <= 0 &&! msg.isResponseRequired() && ! connection.isAlwaysSyncSend() && (! msg.isPersistent() || connection.isUseAsyncSend() || txid ! = null)) { this.connection.asyncSendPacket(msg); if (producerWindow ! = null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); / / under the condition of asynchronous transmission, need to set the size of the producerWindow producerWindow. IncreaseUsage (size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout); / / the synchronous with timeout / / back to adjust the synchronous} else {this. Connection. SyncSendPacket (MSG, onComplete); // Call back to sync send}}}}Copy the code

Take a look at the asynchronously sent code activemQConnection.AsyncsendPacket ()

/** * send a Packet through the Connection - for internal use only * * @param command * @throws JMSException */ public void asyncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { doAsyncSendPacket(command); } } private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); }}Copy the code

What is transport? Where is it instantiated? It is certainly not a pure object, according to previous source code conventions. Based on my experience with source code, it must be initialized during connection creation. So we locate the code

/ / from the connection = connectionFactory. The createConnection (); This line of code as the entrance, has been tracking ActiveMQConnectionFactory createActiveMQConnection this method. Protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport(); Connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); }// This method instantiates 1 of Transport. 2. Use this URL to create a transportFactory.connect TCP connection protected Transport createTransport() throws transportFactory.connect JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL); / / the code inside to continue to look down} the catch (Exception e) {throw JMSExceptionSupport. Create (" Could not create Transport. Reason: "+ e, e); }}Copy the code

TransportFactory. findTransportFactory

  1. From the TRANSPORT_FACTORYS Map, obtain an instance of the TransportFactory specified by Scheme

  2. If it doesn’t exist in the Map set, go to the TRANSPORT_FACTORY_FINDER and build an instance of it. Again, this is similar to the idea of SPI that we’ve seen before, right? He would from METAINF/services/org/apache/activemq/transport/this path, according to the URI assembly scheme to find matching class object and instantiation, Therefore, T cpT ransportFactory can be found in the corresponding path according to the TCP key

    //TransportFactory.connect(connectBrokerUL) public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); }

    //findTransportFactory(location)
        public static TransportFactory findTransportFactory(URI location) throws IOException {
        String scheme = location.getScheme();
        if (scheme == null) {
            throw new IOException("Transport not scheme specified: [" + location + "]");
        }
        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
        if (tf == null) {
            // Try to load if from a META-INF property.
            try {
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
            }
        }
        return tf;
    }
    Copy the code

Call TransportFactory doConnect to build a connection

public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( ! options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf); Transport rc = configure(transport, wf, options); //remove auto IntrospectionSupport.extractProperties(options, "auto."); if (! options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); }}Copy the code

configure

Public Transport configure(Transport Transport, WireFormat wf, Map options) throws Exception { There are two layers of packaging, one for IactivityMonitor. The other one is WireFormatNegotiator Transport = compositeConfigure(transport, WF, Options); transport = new MutexTransport(transport); MutexTransport transport = new ResponseCorrelator(transport); // ResponseCorrelator return transport; }Copy the code

So far, this transport is really just a chain of calls, His chain structure is ResponseCorrelator(MutexT Ransport (WireFormatNegotiator(IactivityMonitor(T cpT Ransport ()))). What does each layer of packaging mean? ResponseCorrelator is used to implement asynchronous requests. MutexT Ransport implements a write lock that allows only one request to be sent at a time. The WireFormatNegotiator implemented the data parsing protocol information, such as the parsing version, when the client connected to the broker. Whether to use InactivityMonitor such as cache to check the heartbeat after a successful connection. The client sends heartbeat information every 10 seconds. The server reads the heartbeat information every 30 seconds.

The difference between synchronous and asynchronous sending

public Object request(Object command, int timeout) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); // Block from the future method and wait to return}Copy the code

How persistent and nonpersistent messages are stored

Normally, nonpersistent messages are stored in memory, and persistent messages are stored in files. Maximum message data that can be stored The systemUsage node systemUsage configuration in ${ActiveMQ_HOME}/conf/ Activemq.xml sets some system memory and disk capacities

<systemUsage> <systemUsage> <memoryUsage> // This child tag sets the "available memory limit" for the entire ActiveMQ node. This value cannot exceed the maximum memory size set by ActiveMQ itself. The percentOfJvmHeap attribute represents a percentage. 70% heap memory <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> // This flag sets up the entire ActiveMQ node for storing "available disk space" for persistent messages. The limit attribute of this child tag must be set <storeUsage limit="100 GB "/> </storeUsage> <tempUsage> // Once the ActiveMQ service node stores memoryUsage messages, Non-persistent messages will be dumped to the Temp Store area. Although we said that non-persistent messages are not stored in the temp Store, ActiveMQ prevents the memory from being exhausted due to the large accumulation of non-persistent messages when the "data flood peak" occurs. Nonpersistent messages are still written to the temp Store, a temporary area of the disk. <tempUsage limit="50 GB "/> </tempUsage> </systemUsage> </systemUsage>Copy the code

From the above configuration we need to get to the conclusion that ActiveMQ will write non-persistent messages in memory to a temporary file to free up memory when the non-persistent messages pile up to a certain point, that is, when memory exceeds the specified setting threshold. However, the difference is that after a restart, persistent messages are recovered from the file, and non-persistent temporary files are deleted directly

Analysis of persistence strategies for messages

Message persistence is a better approach to reliable messaging, where the instant sender and receiver are not online at the same time or the message center is down after the sender sends a message and can still be sent after the message center is restarted. The principle of message persistence is very simple. After sending a message, the message center first stores the message in a local file, memory, or remote database, and then sends the message to the receiver. After successfully sending the message, the message is deleted from the storage. Let’s look at the implementation of persistent storage of messages on the broker

Persistent storage supports types

ActiveMQ supports a number of different persistence methods, mainly the following, but the message storage logic is consistent regardless of the persistence method used. ø KahaDB storage (default storage)

Ø JDBC stored

Ø Memory storage

Ø LevelDB storage

ø JDBC With ActiveMQ Journal

KahaDB Storage KahaDB is the default storage mode and can be used in any scenario to improve performance and recovery capability. The message store uses a transaction log and only an index file to store all of its addresses. KahaDB is a solution specifically for message persistence that optimizes typical message usage patterns. In Kaha, data is appended to data logs. The log file is discarded when the data in it is no longer needed.

Configuration mode

<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
Copy the code

In the data/ KahaDB directory, four files are generated. Data is the index file of the message, essentially a B-tree. Log: restores message contents of ø db. log by using b-tree as index to ø db. log. New data is appended to the end of the log file as an APPEND. It is sequential writing, so message storage is faster. The default value is 32 MB. The ø lock file lock is automatically incresed when the threshold is reached, representing the broker that currently has read and write permission to kahaDB

JDBC storage using THE JDBC persistence mode, the database will create three tables: Activemq_MSgs, ActivemQ_acks and Activemq_lock. The ACTIVEMQ_ACKS store the information about the persistent subscription and the ID of the message received by the last persistent subscription. The ACTIVEMQ_LOCKS table is used to ensure that at some point, Only one ActiveMQ Broker instance can access the database JDBC storage configuration

<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
Copy the code

DataSource Specifies whether the persistent database bean, createT ablesOnStartup, creates tables at startup. The default value is true. Change to false after Mysql persistent Bean configuration

<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" Value = "JDBC: mysql: / / 192.168.11.156:3306 / activemq? RelaxAutoCommit = true" / > < property name = "username" value = "root" / > <property name="password" value="root"/> </bean>Copy the code

LevelDB storage

LevelDB has higher persistence performance than KahaDB, although KahaDB is still the default. In addition, ActiveMQ 5.9 provides LevelDB and Zookeeper based data replication mode, which is the preferred data replication scheme in master-slave mode. LevelDB is officially recommended for use and is no longer supported by LevelDB. KahaDB is recommended for use

<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
Copy the code

Memory message store A memory-based message store that stores all persistent messages in Memory. Persistent = “false”, which indicates that persistent storage is not configured and stored directly in memory

<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors> </broker>
</beans>
Copy the code

JDBC Message store with ActiveMQ Journal

This approach overcomes the weakness of the JDBC Store, which requires the library to be written and read every time a JDBC message comes in. ActiveMQ Journal, using cache write technology, significantly improves performance. Journal files can greatly reduce the number of messages that need to be written to the DB when the consumption rate of consumers can keep up with the production rate of producer messages. For example, if a producer produces 1000 messages and these 1000 messages are saved to the Journal file, if the consumer consumes more than 90% of the messages before the journal file is synchronized to the DB, Only the remaining 10% of messages need to be synchronized to DB. If the consumer is slow to consume, the journal file allows messages to be written to DB in bulk. ø Comment out the original label ø add the following label

<persistenceFactory>
<journalPersistenceAdapterFactory  dataSource="#Mysql-DS" dataDirectory="activemqdata"/>
</persistenceFactory>
Copy the code

ø Send messages in a loop on the server. You can see that the data is synchronized to the database lazily

The principle of consuming messages on the consumer side

We know that there are two ways to receive messages, one using the synchronous blocking MessageConsumer#receive method. Another is to use a MessageListener, MessageListener. It is important to note that both cannot work at the same time in the same session, which means that different messages cannot be received in different ways. Otherwise an exception will be thrown. As for why this is done, the biggest reason is that in a transactional session, the transaction between the two consumption modes is difficult to manage

Consumption flow chart

ActiveMQMessageConsumer. Synchronous receive consumer end receives the message source entry

public Message receive() throws JMSException { checkClosed(); checkMessageListener(); // Check whether receive and MessageListener are configured at the same time in the current session. Synchronous consumption does not need to set MessageListener; otherwise sendPullCommand(0) will be reported. // If PrefetchSizeSize is 0 and unconsumerMessage is empty, the pull command MessageDispatch md = dequeue(-1); If (md == null) {return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); // Send ack to broker return createActiveMQMessage(md); // Get the message and return}Copy the code

SendPullCommand sends the pull command to get messages from the broker if prefetchSize=0 and unconsumedMessages is empty. UnconsumedMessage represents an unconsumedMessage, in which the message size is prefetchSize

protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); // Send messagePull to the server asynchronously}}Copy the code

clearDeliveredList

In the sendPullCommand method above, the clearDeliveredList method is called first, mainly to clean up deliveredMessages that have been distributed. Distributed to consumers but also did not reply message store chain table Ø if the session is transaction, will be to the message in the traversal deliveredMessage previouslyDeliveredMessage do resend Ø if the session is a transaction, Select different reply operations based on the pattern of ACK

// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again private void clearDeliveredList() { if (clearDeliveredList) { synchronized (deliveredMessages) { if (clearDeliveredList) { if (! deliveredMessages.isEmpty()) { if (session.isTransacted()) { if (previouslyDeliveredMessages == null) { previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); } LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); } else { if (session.isClientAcknowledge()) { LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); // allow redelivery if (! this.info.isBrowser()) { for (MessageDispatch md: deliveredMessages) { this.session.connection.rollbackDuplicate(this, md.getMessage()); } } } LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); deliveredMessages.clear(); pendingAck = null; } } clearDeliveredList = false; }}}}Copy the code

dequeue

From unconsumedMessage to retrieve a message, when creating a consumer, will be for the consumer to create a consumer not news that the channel is divided into two kinds, one kind is simple priority queue SimplePriorityMessageDispatchChannel distribution channels, The other is a fifo FifoMessageDispatchChannel distribution channels. As to why such a channel exists, you can imagine that it would be inefficient for the broker to pick up a message after the consumer has consumed one. This design allows sessions to distribute multiple messages to one consumer at a time. By default, prefetchSize is 1000 for a queue

beforeMessageIsConsumed

If the ACK type is not DUPS_OK_ACKNOWLEDGE or queue mode (in short, with the exception of opic and DupAck), all messages are placed at the top of the deliveredMessages list. And if the current is a transactional session, transactedIndividualAck is judged, which, if true, indicates that a single message directly returns an ACK. Otherwise, ackLater is called to respond in batches. The client does not send ACK after consuming the message, but caches it down (pendingACK). When the number of messages reaches a certain threshold, only an ACK instruction is needed to confirm them all. This is a significant performance improvement over validating each message individually

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); if (! isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } if (session.getTransacted()) { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); }}}}Copy the code

afterMessageIsConsumed

The main function of this method is to perform the reply operation, which does the following operations: if the message is expired, it returns the ACK of the expired message. If it is a transaction type session, no processing is done. If it is an AUTOACK or (DUPS_OK_ACK and queue), and optimizes the ACK operation, If the ack is DUPS_OK_ACK, the ackLater logic is used. If the ACK is CLIENT_ACK, the ackLater logic is executed

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (! deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack ! = null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just Below // 0.5 * prefetch as used in ackLater() if (pendingAck! = null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack! =null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false;  synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); }}}Copy the code

09 Advantages and disadvantages of ActiveMQ

ActiveMQ uses message push, so the most suitable scenario is that default messages can be consumed in a short period of time. The larger the volume of data, the slower the message is to be found and consumed, and the message backlog is inversely proportional to the message speed.

disadvantages

1. The throughput is low. As ActiveMQ needs to build indexes, throughput decreases. This is an insurmountable disadvantage, and this level of TPS is acceptable as long as you use message-oriented middleware that is fully compliant with the JMS specification. 2. No sharding function. This is a lack of functionality, as JMS does not specify a clustering or sharding mechanism for message-oriented middleware. As ActiveMQ is a message middleware developed and designed by Wei enterprise, it is not intended to deal with massive messages and high concurrent requests. If a server can’t handle more messages, it needs to be split horizontally. ActiveMQ does not officially provide sharding mechanism, which needs to be implemented by itself.

Applicable scenario

ActiveMQ can be used to realize systems with low TPS requirements. On the one hand, ActiveMQ is relatively simple and can be developed quickly; on the other hand, it has good controllability and better monitoring mechanism and interface

Not applicable scenarios

Scenarios with large message volumes. ActiveMQ does not support automatic message sharding mechanism. If a server cannot process all messages due to the large amount of messages, it needs to develop message sharding function by itself.