preface

Today I would like to share with you springBoot integrated activeMq Topic (topic) — publish/subscribe mode, similar to wechat public account, we can receive messages if we follow the public account, topic requires consumers to subscribe first to receive messages, if there is no consumer subscription, A message produced by a producer is a waste message (publish/subscribe, a producer produces a message that can be consumed by multiple consumers). This instance supports Websocket, message retransmission, persistence…

Version information: SpringBoot2.1.5 ActiveMQ 5.15.10

Consumer engineering

Consumer Engineering Directory



Pom file

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency> Copy the code

Yml file configuration

server:
  port: 8085
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true

# Your theme name
myTopic: boot_actviemq_topicCopy the code

The configuration class

package com.example.topic_customer.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.jms.ConnectionFactory; import javax.jms.Topic; /** * @date 2019/11/13 10:22 * @desc Consumer config */ @configuration public class BeanConfig {@value ("${myTopic}") private String myTopic; /** * websocket configuration ** @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(myTopic);
    }

    public RedeliveryPolicy redeliveryPolicy() { RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); / / whether in each attempt to send failed, increase the waiting time redeliveryPolicy. SetUseExponentialBackOff (true); / / retransmission times, defaults to 6 times, here is set to 10, 1 said unlimited redeliveryPolicy. SetMaximumRedeliveries (1); / / retransmission time interval, the default is 1 ms, is set to 10000 milliseconds redeliveryPolicy. SetInitialRedeliveryDelay (10000); UseExponentialBackOff(true) fortrueEffective when/before/after the first failure to send wait for 10000 milliseconds, the second failure to wait for 10000 * 2 milliseconds / / third double 10000 * 2 * 2, and so on redeliveryPolicy. SetBackOffMultiplier (2); / / to avoid collision message redeliveryPolicy. SetUseCollisionAvoidance (true); // Set the maximum delay time for retransmissions to 360000 milliseconds.true) fortrueEffective when redeliveryPolicy. SetMaximumRedeliveryDelay (360000);return redeliveryPolicy;
    }

    public ConnectionFactory connectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); / / set properties resend connectionFactory. SetRedeliveryPolicy (redeliveryPolicy ());returnconnectionFactory; } /** * JMS queue listener container factory */ @bean (name ="jmsTopicListener")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(true);
        factory.setAutoStartup(true); / / open the persistence to subscribe to the factory. SetSubscriptionDurable (true); / / reconnection interval factory. SetRecoveryInterval (1000 l); factory.setClientId("topic_provider:zb1");
        returnfactory; }}Copy the code

There are two main points for setting up consumer persistence:

1. / / open persistence to subscribe to the factory. SetSubscriptionDurable (true); 2. The factory. SetClientId (” topic_provider: zb1 “); // This can be set arbitrarily

TopicCustomer class

package com.example.topic_customer.customer;

import lombok.Data;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Date 2019/11/13  13:31
 * @Desc
 */
@Component
@ServerEndpoint("/websocket") @data public class TopicCustomer {/** * private javax.websocket.session session; /** * thread-safe classes under the J.U.C package, Private static CopyOnWriteArraySet<TopicCustomer> CopyOnWriteArraySet = new */ Private static CopyOnWriteArraySet<TopicCustomer> CopyOnWriteArraySet = new CopyOnWriteArraySet<>(); @OnOpen public void onOpen(javax.websocket.Session session) { this.session = session; copyOnWriteArraySet.add(this); } @OnClose public voidonClose() {
        copyOnWriteArraySet.remove(this);
    }

    @OnMessage
    public void onMessage(String message) {
    }

    @OnError
    public void onError(javax.websocket.Session session, Throwable error) {
        error.printStackTrace();
    }

    @JmsListener(destination = "${myTopic}", containerFactory = "jmsTopicListener") public void Receive (TextMessage TextMessage, Javax.jms.session Session) throws JMSException {// Pass through the clientfor(TopicCustomer webSocket : CopyOnWriteArraySet) {try {/ / active push webSocket server. The session. GetBasicRemote () sendText (textMessage. The getText ()); System.out.println("-- Received topic persistent message --" + textMessage.getText());
            } catch (Exception e) {
                System.out.println("----- test rerun -----"); session.rollback(); // This cannot be omitted to resend the message using}}}}Copy the code

Start the class

package com.example.topic_customer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class TopicCustomerApplication { public static void main(String[] args) { SpringApplication.run(TopicCustomerApplication.class, args); }}Copy the code

A screenshot of MQ after a successful consumer startup:



Producer engineering

Producer Engineering Catalogue



Yml configuration file

 server:
  port: 8084
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true

myTopic: boot_actviemq_topicCopy the code

The configuration class

package com.example.topicprovider.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.stereotype.Component; import javax.jms.ConnectionFactory; import javax.jms.Topic; /** * @date 2019/11/13 10:22 * @desc @component public class BeanConfig {@value ()"${myTopic}")
    private String myTopic;

    public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy(); / / whether in each attempt to send failed, increase the waiting time redeliveryPolicy. SetUseExponentialBackOff (true); / / retransmission times, defaults to 6 times, here is set to 10, 1 said unlimited redeliveryPolicy. SetMaximumRedeliveries (1); / / retransmission time interval, the default is 1 ms, is set to 10000 milliseconds redeliveryPolicy. SetInitialRedeliveryDelay (10000); UseExponentialBackOff(true) fortrueEffective when/before/after the first failure to send wait for 10000 milliseconds, the second failure to wait for 10000 * 2 milliseconds / / third double 10000 * 2 * 2, and so on redeliveryPolicy. SetBackOffMultiplier (2); / / to avoid collision message redeliveryPolicy. SetUseCollisionAvoidance (true); // Set the maximum delay time for retransmissions to 360000 milliseconds.true) fortrueEffective when redeliveryPolicy. SetMaximumRedeliveryDelay (360000);return redeliveryPolicy;
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(myTopic);
    }

    public ConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); / / set properties resend connectionFactory. SetRedeliveryPolicy (redeliveryPolicy ());returnconnectionFactory; } /** * JMS queue listener container factory */ @bean (name ="jmsTopicListener")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(true);
        factory.setAutoStartup(true); / / open the persistence to subscribe to the factory. SetSubscriptionDurable (true); / / reconnection interval factory. SetRecoveryInterval (1000 l);returnfactory; }}Copy the code

TopicProvider class

package com.example.topicprovider.topic_provider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Topic;
import java.util.UUID;

/**
 * @Date 2019/11/13  10:25
 * @Desc
 */
@Component
public class TopicProvider {
    @Autowired
    private Topic topic;
    @Autowired
    private JmsTemplate jmsTemplate;

    @Scheduled(fixedDelay = 10000)
    private void produceMsg() {
        jmsTemplate.convertAndSend(topic, "Theme Producer" + UUID.randomUUID().toString().substring(1, 7));
        System.out.println( jmsTemplate.getDeliveryMode());
        System.out.println("Topic Producer 1"); }}Copy the code

Start the class

package com.example.topicprovider; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class TopicProviderApplication { public static void main(String[] args) { SpringApplication.run(TopicProviderApplication.class, args); }}Copy the code

Results after successful startup:

The last

Like can pay attention to my public number: Java small melon brother sharing platform. Thanks for your support!