Home page address

background

StreamApi annotations such as @enableBinding and @Output are marked deprecated after SpringCloud version 3.1

/**
 * Indicates that an output binding target will be created by the framework.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @author Artem Bilan
 *
 * @deprecatedAs of 3.1 in favor of functional programming model */

@Qualifier
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Deprecated
public @interface Output {

	/**
	 * Specify the binding target name; used as a bean name for binding target and as a
	 * destination name by default.
	 * @return the binding target name
	 */
	String value(a) default "";

}
Copy the code
/**
 * Enables the binding of targets annotated with {@link Input} and {@link Output} to a
 * broker, according to the list of interfaces passed as value to the annotation.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @author David Turanski
 * @author Soby Chacko
 *
 * @deprecatedAs of 3.1 in favor of functional programming model */
@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({ BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class })
@EnableIntegration
@Deprecated
public @interface EnableBinding {

	/**
	 * A list of interfaces having methods annotated with {@link Input} and/or
	 * {@link Output} to indicate binding targets.
	 * @return list of interfaces
	 */Class<? >[] value()default {};

}
Copy the code

Related articles

Producing and Consuming Messages

@EnableBinding @deprecated as of 3.1 in favor of functional programming model

The type EnableBinding is deprecated

Therefore, all apis that use imperative programming are officially recommended to use simpler functional programming apis when upgrading to new versions

Complete code sample

FJiayang/spring-cloud-stream-rabbit-example

The upgrade way

Imperative programming model

Taking simple multi-topic messages as an example, the original programming was as follows

producers

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); }}/ * * *@authorJia Yang * F@dateThe 2018-10-08 17:57 * /
@RestController
@EnableBinding(MySource.class)
public class Producer {

    @Autowired
    private MySource channel;

    @RequestMapping("/send")
    public String send(a) {
        channel.output().send(MessageBuilder.withPayload(new Date()).build());
        return "success"; }}/ * * *@authorJia Yang * F@dateThe 2018-10-08 18:01 * /
public interface MySource {
    String OUTPUT = "output";

    @Output(MySource.OUTPUT)
    MessageChannel output(a);
}
Copy the code

The configuration file

spring:
  rabbitmq:
    host: 192.168163.128.
    username: cms
    password: cms-mq-admin
  cloud:
    stream:
      bindings:
        output:
          destination: my-test-channel
server:
  port: 8082
Copy the code

consumers

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }}/ * * *@authorJia Yang * F@dateThe 2018-10-08 and * /
@EnableBinding(MySink.class)
public class Consumer {
    @StreamListener(MySink.INPUT)
    public void receive(Message<String> message) {
        System.out.println("Received MQ message :"+ message.getPayload()); }}/ * * *@authorJia Yang * F@dateThe 2018-10-08 18:07 * /
public interface MySink {
    String INPUT = "input";

    @Input(MySink.INPUT)
    SubscribableChannel input(a);
}
Copy the code

The configuration file

spring:
  rabbitmq:
    host: 192.168163.128.
    username: cms
    password: cms-mq-admin
  cloud:
    stream:
      bindings:
        input:
          destination: my-test-channel
server:
  port: 8081
Copy the code

Functional programming model

To upgrade from imperative to functional programming, you first need to upgrade the SpringCloud version to above 3.1, here to the latest version 2020.0.1


      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>top.fjy8018</groupId>
    <artifactId>cloud-stream</artifactId>
    <version>0.0.1 - the SNAPSHOT</version>
    <packaging>pom</packaging>

    <name>cloud-stream</name>
    <description>Demo project for Spring Boot</description>

    <modules>
        <module>producer</module>
        <module>consumer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>

    <properties>
        <spring-cloud.version>2020.0.1</spring-cloud.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
Copy the code

At present, there are few clear upgrade guidelines in China, so we can only refer to official documents

Programming Model/

Model 1 – System timing driven message queue

Official Document Description

Suppliers (Sources)

Function and Consumer are pretty straightforward when it comes to how their invocation is triggered. They are triggered based on data (events) sent to the destination they are bound to. In other words, they are classic event-driven components.

However, Supplier is in its own category when it comes to triggering. Since it is, by definition, the source (the origin) of the data, it does not subscribe to any in-bound destination and, therefore, has to be triggered by some other mechanism(s). There is also a question of Supplier implementation, which could be imperative or reactive and which directly relates to the triggering of such suppliers.

Consider the following sample:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier(a) {
		return() - >"Hello from Supplier"; }}Copy the code

The preceding Supplier bean produces a string whenever its get() method is invoked. However, who invokes this method and how often? The framework provides a default polling mechanism (answering the question of “Who?” ) that will trigger the invocation of the supplier and by default it will do so every second (answering the question of “How often?” ). In other words, the above configuration produces a single message every second and each message is sent to an output destination that is exposed by the binder.

The general idea is that, using the default functional programming model, a message is sent to the consumer every second, with timing parameters that can be configured

The upgrade is as follows

producers
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public Supplier<Date> source1(a) {
        return() - >newDate(); }}Copy the code

Functional programming does not need to define listening queues in code, just in configuration files by convention

Official Document Description

Functional binding names

Unlike the explicit naming required by annotation-based support (legacy) used in the previous versions of spring-cloud-stream, the functional programming model defaults to a simple convention when it comes to binding names, thus greatly simplifying application configuration. Let’s look at the first example:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase(a) {
	    returnvalue -> value.toUpperCase(); }}Copy the code

In the preceding example we have an application with a single function which acts as message handler. As a Function it has an input and output. The naming convention used to name input and output bindings is as follows:

  • input – <functionName> + -in- + <index>
  • output – <functionName> + -out- + <index>

The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function, so it’s only relevant for Functions with multiple input and output arguments.

So if for example you would want to map the input of this function to a remote destination (e.g., topic, queue etc) called “my-topic” you would do so with the following property:

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic
Copy the code

Note how uppercase-in-0 is used as a segment in property name. The same goes for uppercase-out-0.

The configuration file

spring:
  rabbitmq:
    host: 192.168163.128.
    username: cms
    password: cms-mq-admin

  cloud:
    stream:
      bindings:
        source1-out-0:
          destination: test1
    function:
      definition: source1

server:
  port: 8083
Copy the code

As a result, the overall configuration is much simplified. Most of the configuration is defined in a way that the specification is greater than the configuration. Note that the function configuration is located at the same level as the stream configuration

consumers
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public Consumer<Date> sink1(a) {
        returnSystem.out::println; }}Copy the code

Same with configuration files

spring:
  rabbitmq:
    host: 192.168163.128.
    username: cms
    password: cms-mq-admin

  cloud:
    stream:
      bindings:
        sink1-in-0:
          destination: test1
    function:
      definition: sink1


server:
  port: 8081
Copy the code
The results

The producer sends a message every second

Model 2 –StreamBridge

In actual production, however, it is more triggered by business scenarios and therefore cannot be configured using this pattern. SpringCloud also provides another StreamBridge pattern

Official Documentation

Sending arbitrary data to an output (e.g. Foreign event-driven sources)

There are cases where the actual source of data may be coming from the external (foreign) system that is not a binder. For example, the source of the data may be a classic REST endpoint. How do we bridge such source with the functional mechanism used by spring-cloud-stream?

Spring Cloud Stream provides two mechanisms, so let’s look at them in more details

Here, for both samples we’ll use a standard MVC endpoint method called delegateToSupplier bound to the root web context, delegating incoming requests to stream via two different mechanisms – imperative (via StreamBridge) and reactive (via EmitterProcessor).

The idea is that the actual data source can be external event driven, for example through a Rest interface

The official sample

Using StreamBridge
@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream-out-0", body); }}Copy the code

Here we autowire a StreamBridge bean which allows us to send data to an output binding effectively bridging non-stream application with spring-cloud-stream. Note that preceding example does not have any source functions defined (e.g., Supplier bean) leaving the framework with no trigger to create source bindings, which would be typical for cases where configuration contains function beans. So to trigger the creation of source binding we use spring.cloud.stream.source property where you can declare the name of your sources. The provided name will be used as a trigger to create a source binding. So in the preceding example the name of the output binding will be toStream-out-0 which is consistent with the binding naming convention used by functions (see Binding and Binding names). You can use ; to signify multiple sources (e.g., –spring.cloud.stream.source=foo; bar)

Refer to the official document after modification

producers

The consumption pattern of multiple topics is directly posted here, and most of the actual production is also subscribed to multiple topics

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); }}/ * * *@authorJia Yang * F@dateThe 2018-10-08 17:57 * /
@RestController
public class Producer {

    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping("/send1")
    public String send1(a) {
        streamBridge.send("source1-out-0".new Date());
        return "success1";
    }

    @RequestMapping("/send2")
    public String send2(a) {
        streamBridge.send("source2-out-0", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        return "success2"; }}Copy the code

The configuration file

spring:
  rabbitmq:
    host: 192.168133.128.
    username: dev-user
    password: devpassword

  cloud:
    stream:
      bindings:
        source1-out-0:
          destination: test2
        source2-out-0:
          destination: test3
    function:
      definition: source1; source2


server:
  port: 8083
Copy the code
consumers
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public Consumer<Date> sink1(a) {
        return System.out::println;
    }

    @Bean
    public Consumer<String> sink2(a) {
        returnSystem.out::println; }}Copy the code

The configuration file

spring:
  rabbitmq:
    host: 192.168133.128.
    username: dev-user
    password: devpassword

  cloud:
    stream:
      bindings:
        sink1-in-0:
          destination: test2
        sink2-in-0:
          destination: test3
    function:
      definition: sink1; sink2

server:
  port: 8081
Copy the code
The results

Trigger/send1

Trigger/send2

See that messages are routed correctly

conclusion

Overall, functional programming is simpler than imperative programming, and with the SpringCloud specification being larger than configuration, it can significantly reduce the amount of configuration code, which is the trend of the future.