Welcome to xiaobaiai blog wechat official account xiaobai.net

[TOC]

1 introduction

Spring Integration provides inbound and outbound channel adapters to support the MQTT messaging protocol. To use both adapters, you need to add dependencies:

<! -- Maven --> <dependency> <groupId>org.springframework.integration</groupId> < artifactId > spring integration - the MQTT < / artifactId > < version > 5.2.1. RELEASE < / version > < / dependency >Copy the code

/ / Gradle compile "org. Springframework. Integration: spring integration - the MQTT: 5.2.1. RELEASE"Copy the code

The current MQTT Integration implementation uses the Eclipse Paho MQTT client library. Two adapter configuration is implemented using DefaultMqttPahoClientFactory. For more information about configuration options, see the Eclipse Paho MQTT documentation definitions.

It is recommended to configure MqttConnectOptions objects and inject them into the factory, rather than setting (not recommended) MQTT connection options in the factory itself.

2 Inbound (message-driven) channel adapter

Inbound channel adapter by MqttPahoMessageDrivenChannelAdapter implementation. Common configuration items are as follows:

  • The client ID
  • MQTT Broker URL
  • List of topics to subscribe to
  • List of subject QoS values with subscription
  • MqttMessageConverter(Optional). By default, by defaultDefaultPaHomeMessageConverterGenerates a message with a string payload with the following header:
    • mqtt_topic: Subject to receive messages
    • mqtt_duplicate: If the message is repeatedtrue
    • mqtt_qos: Quality of service, you canDefaultPahoMessageConverterDeclared asAnd will bepayloadAsBytesProperty set totrue, which willDefaultPahoMessageConverterReturns the raw in the payloadbyte[]
  • Client factory
  • Send timeout. This applies only if the channel is likely to block, such as a bounded queue channel that is currently full.
  • Wrong channel. Downstream exceptions are sent to this channel as error messages if provided. The payload contains the failure message and the causeMessagingException.
  • Restore the interval. It controls the interval at which the adapter tries to reconnect after a failure. The default value is 10000 milliseconds (10 seconds).

Starting with Spring 4.1, you can omit urls. On the contrary, you can be in DefaultMqttPahoClientFactory server URIs, attribute provides the server URIs. For example, this allows you to connect to a high availability (HA) cluster.

Starting with Spring 4.2.2, the MqttSubscribedEvent event is emitted when the adapter has successfully subscribed to a topic. When the connection fails or subscribe to failure, MqttConnectionFailedEvent event will be triggered. Both of these events can be received by a Bean by implementing ApplicationListener. In addition, a new property called recoveryInterval controls how long the adapter tries to reconnect after a failure. The default value is 10000 milliseconds (10 seconds).

@Component public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> { private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class); @Override public void onApplicationEvent(MqttSubscribedEvent event) { LOGGER.debug("Subscribed Success: " + event.getMessage()); }}Copy the code

Prior to Spring 4.2.3, the client always unsubscribed when the adapter stopped. This is incorrect, because if the client QOS is greater than 0, we need to keep the subscription active to deliver the message that arrived when the adapter stopped on the next startup. This also requires that the cleanSession property on the client factory be set to false. The default is true. As of version 4.2.3, the adapter will not unsubscribe (by default) if the cleanSession attribute is false. This default behavior can be overridden by setting the consumerCloseAction attribute on the factory. It can have the following values: UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER, and UNSUBSCRIBE_CLEAN, the last of which (the default) is unsubscribed only when the cleanSession attribute is true. To revert to the behavior before 4.2.3, always use the “Unsubscribe” setting item.

Note: As of Spring 5.0, topic, qos, and retained attributes are mapped to. RECEIVED_… Headers (MqttHeaders. RECEIVEDtopic, MqttHeaders. RECEIVEDqos and MqttHeaders RECEIVED_retained), To avoid accidental propagation to (by default) outbound messages using MQttheanders. Topic, MQttheanders. Qos, and MQttheanders.

public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
        }
    };
}Copy the code

2.1 Add and remove topics at run time

Starting with Spring4.1, you can programmatically change the topic to which the adapter is subscribed. Spring Integration provides addTopic() and removeTopic() methods. When adding a topic, you can specify the QoS value (1 by default). You can also modify the topic by sending the appropriate message to the one with the appropriate payload. Example:

myMqttAdapter.addTopic('foo', 1)Copy the code

Stopping and starting the adapter has no effect on the Topics setting item (it does not revert to the original Settings in the configuration). These changes do not survive beyond the life cycle of the application context. The new application context is restored to the configured Settings.

Changing the list of topics when the adapter stops (or disconnects from the agent) will take effect the next time the connection is established.

2.2 Using Java Configuration

The following Spring Boot application shows an example of how to configure an inbound adapter using Java configuration:

@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<? > message) throws MessagingException { System.out.println(message.getPayload()); }}; }}Copy the code

2.3 Using Java DSL Configuration

The following Spring Boot application provides an example of configuring an inbound adapter using Java DSL:

@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttInbound() { return IntegrationFlows.from( new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2");) .handle(m -> System.out.println(m.getPayload())) .get(); }}Copy the code

3 Outbound channel adapter

The outbound channel adapter is implemented by the MqttPahoMessageHandler, which is wrapped in the ConsumerEndpoint. For convenience, you can configure it using namespaces.

As of Spring 4.1, the adapter supports asynchronous send operations to avoid blocking until delivery is confirmed. If needed, application events can be emitted to cause the application to confirm delivery.

The following list shows the properties available for the outbound channel adapter:

<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter"  client-factory="clientFactory" default-qos="1" qos-expression="" default-retained="true" retained-expression="" default-topic="bar" topic-expression="" async="false" async-events="false" channel="target" />Copy the code

  • MQTT Client ID
  • MQTT Broker URL
  • Converter (MqttMessageConver, optional), defaultDefaultPaHomeMessageConverterThe following headings can be recognized:
    • mqtt_topic: The topic to which the message will be sent
    • mqtt_retained: True if you want to preserve the message
    • mqtt_qos: Message service quality
  • Client factory
  • Default-qos: indicates the default quality of service. If the MQTT_qos header is not found or the qos expression returns a null value, it is used. If a custom converter is provided, it is not used.
  • An expression used to calculate to determine qos. The default value isheaders[mqtt_qos].
  • Leave the default values for flags. If you don’t find itmqtt_retainedHeader, then use it. If a custom converter is provided, it is not used.
  • Evaluate to determine the expression that preserves a Boolean value. The default isheaders[mqtt_retained]
  • Default topic to which messages are sent (if not foundmqtt_topicHead, then use)
  • The expression to evaluate to determine the target topic. The default isheaders['mqtt_topic']
  • Async does not block if true. Instead, you wait for delivery confirmation when sending a message. Default is false (sending will block until sending is confirmed)
  • Async-events, emitted when both async and async events are trueMqttMessageSentEvent. It contains the message, topic, message ID generated by the client library, clientId, and clientInstance (incremented each time a client is connected). Emitted when the client library confirms deliveryMqttMessageDeliveredEvent. It contains messageId, clientId, and clientInstance to make delivery related to sending. anyApplicationListenerOr event inbound channel adapters can receive these events. Please note that,MqttMessageDeliveredEventMay be inMqttMessageSentEventReceived earlier. The default value is false

Note that, again, starting with Spring 4.1, you can omit urls. Instead, you can in DefaultMqttPahoClientFactor server URIs, attributes provide server URIs. For example, this allows you to connect to a high availability (HA) cluster.

3.1 Using Java Configuration

The following Spring Boot application shows an example of how to configure an outbound adapter using Java configuration:

@SpringBootApplication @IntegrationComponentScan public class MqttJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); }}Copy the code

3.2 Using Java DSL configuration

The following Spring Boot application provides an example of configuring an outbound adapter using Java DSL:

@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); }}Copy the code

4 Reference Materials

  • https://docs.spring.io/spring-integration/reference/html/mqtt.html
  • https://my.oschina.net/chkui/blog/1838801 Spring core – the context and the IoC
  • https://my.oschina.net/u/923324/blog/832269 factory pattern, annotation

Please follow CSDNfreeape or xiaobaiai’s wechat official account or xiaobai.net