1. Introduce dependencies first

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

2. Configure the parameters in the configuration file

Rabbitmq: host: 192.168.10.233 Port: 5672 username: admin password: admin virtual-host: My-project-dev cloud: stream: bindings: / / myOutput: / / destination: / / streamExchange: / / # Use groups to avoid repeated consumption of messages, and implement message persistence group: group1Copy the code

3. Implement message producers

  • Build the message channel interface
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; @component public interface MyStreamChannel {String OUTPUT = "myOutput"; String INPUT = "myInput"; /** * @output (myStreamchannel.output) MessageChannel Output(); /** * message consumption ** @return */ @input (myStreamchannel.input) SubscribableChannel Input(); }Copy the code
  • Create the sending message interface
Public void sendMsg(); /** * public void sendMsg(); }Copy the code
  • Realize the function of sending messages
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * Start the binder, Channel */ @component @enableBinding (myStreamChannel.class) public class StreamServiceImpl implements StreamService { @Autowired private MyStreamChannel myStreamChannel; @Override public void sendMsg() { AppUser user = new AppUser(); user.setId("1001"); user.setNickname("Tom"); / / send a message to the mq myStreamChannel. The output (). The send (MessageBuilder. WithPayload (user). The build ()); }}Copy the code

This requires an annotation @enableBinding (myStreamChannel.class) to the corresponding channel

Send () method parameterMessageIs generic, any type of message data will work

4. Implement message consumers

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * Message consumer */ @component @enableBinding (myStreamChannel.class) public class MyStreamConsumer {/** * @param user {@link AppUser} */ @StreamListener(MyStreamChannel.INPUT) public void receiveMsg(AppUser user) { // Implement specific business logic system.out.println (user.getid ()); System.out.println(user.getNickname()); }}Copy the code

The @enableBinding (myStreamChannel.class) annotation is also required to bind to the corresponding channel

5. Test it through the interface

@RestController
@RequestMapping("/hello")
public class HelloController{

    @Autowired
    private StreamService streamService;

    @GetMapping("/stream")
    public String stream() {
        streamService.sendMsg();
        return "send ok!";
    }

}
Copy the code