In the previous article, “How Spring Cloud Stream Handles message re-consumption,” we addressed a common starting point for message re-consumption in multi-instance deployments by configuring consumption groups. This article continues with another oft-asked question: how can microservices produce messages that themselves want to consume?

Common mistakes

Before Posting the standard answer, post a common error pose and alarm information (so that you can find it here through the search engine). The following error is based on Spring Boot 2.0.5, Spring Cloud Finchley SR1.

First, according to the introductory example, in order to produce and consume messages, you need to define two channels: one input and one output. Like this:

public interface TestTopic {

    String OUTPUT = "example-topic";
    String INPUT = "example-topic";

    @Output(OUTPUT)
    MessageChannel output(a);

    @Input(INPUT)
    SubscribableChannel input(a);

}
Copy the code

Consume the messages it emits by using the same name for INPUT and OUTPUT, so that the production and consumption messages point to the same Topic.

Next, create an HTTP interface and produce messages through the output channel contacts defined above, such as:

@Slf4j
@RestController
public class TestController {

    @Autowired
    private TestTopic testTopic;

    @GetMapping("/sendMessage")
    public String messageWithMQ(@RequestParam String message) {
        testTopic.output().send(MessageBuilder.withPayload(message).build());
        return "ok"; }}Copy the code

Now that you have a production message implementation, let’s create a listener on the input channel to implement the message consumption logic.

@Slf4j
@Component
public class TestListener {

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload);
        throw new RuntimeException("BOOM!"); }}Copy the code

Finally, in the application main class, use the @enableBinding annotation to enable it, such as:

@EnableBinding(TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); }}Copy the code

This may seem like a seamless operation, but at startup, you may receive the following error:

org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=null at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingB EanDefinitionRegistryUtils. Java: 64) ~ [spring - cloud - stream - 2.0.1. The jar: 2.0.1. RELEASE] the at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(Bi NdingBeanDefinitionRegistryUtils. Java: 54) ~ [spring - cloud - stream - 2.0.1. The jar: 2.0.1. RELEASE] the at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils. Java: 86) ~ [spring -- cloud - stream - 2.0.1. The jar: 2.0.1. RELEASE] the at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~ [spring - core - 5.0.9. RELEASE. The jar: 5.0.9. RELEASE] the at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~ [spring - core - 5.0.9. RELEASE. The jar: 5.0.9. RELEASE] the at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(Binding BeanDefinitionRegistryUtils. Java: 76) ~ [spring - cloud - stream - 2.0.1. The jar: 2.0.1. RELEASE] the at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~ [spring - cloud - stream - 2.0.1. The jar: 2.0.1. RELEASE] the at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrarsThe $1(ConfigurationClassBeanDefinitionReader. Java: 358) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at Java. Util. LinkedHashMap. ForEach (LinkedHashMap. Java: 684) ~ [na: 1.8.0 comes with _151] the at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(Configur AtionClassBeanDefinitionReader. Java: 357) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(C OnfigurationClassBeanDefinitionReader. Java: 145) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBean DefinitionReader. Java: 117) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPo StProcessor. Java: 328) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationCl AssPostProcessor. Java: 233) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostPro CessorRegistrationDelegate. Java: 271) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegis TrationDelegate. Java: 91) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContex T.j ava: 694) ~ [spring - the context - 5.0.9. The jar: 5.0.9. RELEASE] the at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~ [spring - the context - 5.0.9. RELEASE. The jar: 5.0.9. RELEASE] the at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationCo Ntext. Java: 61) ~ [spring - the boot - at 2.0.5. The jar: at 2.0.5. RELEASE] the at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring - the boot - at 2.0.5. RELEASE. The jar: at 2.0.5. RELEASE] the at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring - the boot - at 2.0.5. RELEASE. The jar: at 2.0.5. RELEASE] the at Org. Springframework. Boot. SpringApplication. Run (SpringApplication. Java: 333) [spring - the boot - at 2.0.5. The jar: at 2.0.5. RELEASE]  at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring - the boot - at 2.0.5. RELEASE. The jar: at 2.0.5. RELEASE] the at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring - the boot - at 2.0.5. RELEASE. The jar: at 2.0.5. RELEASE] at the didispace. Stream. TestApplication. Main (TestApplication. Java: 13) [classes/:na]Copy the code

Correct posture

According to the error: Invalid bean definition with the name ‘example – topic defined in com. Didispace. Stream. TestTopic: Definition with this name already exists. The definition with this name already exists because there is already a bean named example-topic.

In fact, in Version F of Spring Cloud Stream, when we define a message channel using the @Output and @input annotations, we create a Bean based on the channel name passed in. In the above example, the @output and @input names are defined the same, because the Input and Output of our system are the same Topic, so as to realize the consumption of the message produced by ourselves.

Since we can no longer define the same channel name, we can only point the pair of inputs and outputs to the same actual Topic by defining different channel names and configuring the same target Topic for both channels. For the above error program, only two changes need to be made:

Step 1: Change the channel name to a different one

public interface TestTopic {

    String OUTPUT = "example-topic-output";
    String INPUT = "example-topic-input";

    @Output(OUTPUT)
    MessageChannel output(a);

    @Input(INPUT)
    SubscribableChannel input(a);

}
Copy the code

Step 2: In the configuration file, set the same Topic name for both channels, for example:

spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic
spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic
Copy the code

Thus, both input and output channels point to a topic named AAA-Topic.

Finally, restart the program, no error. Then access interface: localhost: 8080 / sendMessage? Message =hello-didi, you can see the following information in the console:

The 2018-11-17 23:24:10. 32039-425 the INFO [ctor - HTTP - nio - 2] O.S.A.R.C.C achingConnectionFactory: Attempting to connect to: [localhost: 5672] the 2018-11-17 23:24:10. 453 INFO - 32039 [ctor - HTTP - nio - 2] O.S.A.R.C.C achingConnectionFactory: Created new connection: rabbitConnectionFactory.publisher# 266753 da: 0 / SimpleConnection @ 627 fba83 [delegate = it: / / [email protected]:5672 /, localPort = 60752]The 2018-11-17 23:24:10. 32039-458 the INFO [ctor - HTTP - nio - 2] O.S.A MQP. Rabbit. Core. RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted whilethe connection factory is alive, 2018-11-17 23:24:10.483 INFO 32039 -- [iAFxrHKFbFI1A-1] com.didispace.stream.TestListener : Received: hello-didiCopy the code

Consuming your own production message succeeds! Readers can also access the /actuator/ Beans endpoints of the application to see which beans are available in the current Spring context. You should see the following beans, which are the Bean objects for the two channels analyzed above

"example-topic-output": {
    "aliases": [],
    "scope": "singleton",
    "type": "org.springframework.integration.channel.DirectChannel",
    "resource": null,
    "dependencies": []
},
"example-topic-input": {
    "aliases": [],
    "scope": "singleton",
    "type": "org.springframework.integration.channel.DirectChannel",
    "resource": null,
    "dependencies": []
},          
Copy the code

Afterword.

Most of the problems developers encounter with Spring Cloud Stream stem from a lack of understanding of the core concepts of Spring Cloud Stream. Therefore, it is recommended to read the following articles and examples:

  • An introduction to the sample
  • The core concept
  • Consumer groups
  • Consumption of partition

Code sample

Readers of this article’s sample can check out the stream-consumer-self project in the following repository:

  • Github
  • Gitee

If you are interested in these, welcome to star, follow, favorites, forward to give support!

The following tutorials may be of interest to you

  • Spring Boot Basics tutorial
  • Spring Cloud Basics tutorial