sequence

This paper focuses on Reactive Streams processors

Processors classification

Processors are both Publisher and Subscriber. There are many implementations of processor in the Project Reactor, which are roughly classified as follows:

  • Direct (DirectProcessor and UnicastProcessor)

  • Synchronous (EmitterProcessor and ReplayProcessor)

  • Asynchronous (TopicProcessor and WorkQueueProcessor)

direct

DirectProcessor

It does not support backpressure, and if publisher publishes N data, an IllegalStateException is thrown if one of the subscriber requests is less than N.

    @Test
    public void testDirectProcessor(){
        DirectProcessor<Integer> directProcessor = DirectProcessor.create();
        Flux<Integer> flux = directProcessor
                .filter(e -> e % 2 == 0)
                .map(e -> e +1);
        flux.subscribe(new Subscriber<Integer>() {
            private Subscription s;
            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
//                s.request(2);
            }

            @Override
            public void onNext(Integer integer) {
                LOGGER.info("subscribe:{}".integer);
            }

            @Override
            public void onError(Throwable t) {
                LOGGER.error(t.getMessage(),t);
            }

            @Override
            public void onComplete() {}}); Intstream.range (1,20). ForEach (e -> {directProcessor. OnNext (e); }); directProcessor.onComplete(); directProcessor.blockLast(); }Copy the code

The output is as follows

16:00:11. 201. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 16:00:11, 216 [main] ERROR com. Example. The demo. ProcessorTest - Can't deliver value due to lack of requests
reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
	at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304)
	at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106)
	at com.example.demo.ProcessorTest.lambda$testDirectProcessorA $5(ProcessorTest.java:82)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
	at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.junit.runners.model.FrameworkMethodThe $1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunnerThe $1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.accessThe $000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Copy the code

UnicastProcessor

  • The backpressure feature is supported at the cost of one subscriber at most. The default value is unbounded. If the subscriber has not made a request after the data is released, the data will be cached.
  • If a bounded queue is set, when the buffer is full and the subscriber does not send enough requests, the processor will reject data push. In this scenario, the processor has a built-in callback that is fired whenever an element is rejected.
    @Test
    public void testUnicastProcessor() throws InterruptedException { UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get()); Flux<Integer> flux = unicastProcessor .map(e -> e) .doOnError(e -> { LOGGER.error(e.getMessage(),e); }); Intstream.rangeclosed (1,12).foreach (e -> {logger.info ("emit:{}",e); unicastProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); }}); LOGGER.info("begin to sleep 7 seconds");
        TimeUnit.SECONDS.sleep(7);
        //UnicastProcessor allows only a single Subscriber
        flux.subscribe(e -> {
            LOGGER.info("flux subscriber:{}",e); }); unicastProcessor.onComplete(); TimeUnit.SECONDS.sleep(10); // unicastProcessor.blockLast(); //blockLast is also a subscriber}Copy the code

Output instance

16:31:04. 970. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 16:31:04. 977. [the main] INFO com example. Demo. ProcessorTest - emit: 990 [main] 1 16:31:05. INFO com. Example. Demo. ProcessorTest - emit: 2 16:31:06. 991 [main] INFO com. Example. Demo. ProcessorTest - emit: 3 16:31:07. [the main] INFO 994 com. Example. Demo. ProcessorTest - emit: 4 16:31:08. [the main] INFO com. 998 example. Demo. ProcessorTest 002 - emit: 5 16:31:10. [the main] INFO com. Example. Demo. ProcessorTest - emit: 6 16:31:11. 007 [main] INFO Com. Example. Demo. ProcessorTest - emit: 7 16:31:12. [the main] INFO com. 010 example. Demo. ProcessorTest - emit: 8 16:31:13. 014 [the main] INFO com. Example. Demo. ProcessorTest - emit: 9 16:31:14. The 029 [main] INFO com. Example. Demo. ProcessorTest - emit: 10 16:31:14. 030. [the main] the DEBUG reactor. The core. Publisher. Operators - onNextDropped: 10 16:31:15. [the main] INFO 034 com. Example. Demo. ProcessorTest - emit: 11 16:31:15, 034 [main] the DEBUG reactor.core.publisher.Operators - onNextDropped: 11 16:31:16. [the main] INFO 038 com. Example. Demo. ProcessorTest - emit: 12 16:31:16, 038 [main] the DEBUG reactor.core.publisher.Operators - onNextDropped: 12 16:31:17. [the main] INFO 043 com. Example. Demo. ProcessorTest - begin to sleep 7 seconds 16:31:24. [the main] 053 INFO Com. Example. Demo. ProcessorTest - flux subscriber: 1 16:31:24. 053 [main] INFO com. Example. Demo. ProcessorTest - flux The subscriber: 2 16:31:24. 053 [main] INFO com. Example. Demo. ProcessorTest - flux subscriber: 3 16:31:24. [the main] 053 INFO Com. Example. Demo. ProcessorTest - flux subscriber: 4 16:31:24. 053 [main] INFO com. Example. Demo. ProcessorTest - flux The subscriber: 5 16:31:24. 054 [main] INFO com. Example. The demo. ProcessorTest - flux subscriber: 6 16:31:24. [the main] 054 INFO Com. Example. Demo. ProcessorTest - flux subscriber: 7 16:31:24. 054 [main] INFO com. Example. The demo. ProcessorTest - flux The subscriber: 8 16:31:24. 058. [The main] ERROR com example. Demo. ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...) reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202)
	at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330)
	at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
Copy the code

synchronous

EmitterProcessor

  • It can support multiple subscribers and support backpressure for each subscriber. It can also subscribe to Publisher and synchronize the data for replay.
  • It has a bufferSize parameter, which is used for the period when there are no subscribers after the data is published, and onNext blocks until the data is consumed; When the first subscriber subscribes, it receives data from the buffer, and subsequent subscribers can only consume data published since the time they subscribed.
  • When all subscriber subscriptions are cancelled, the processor will empty the buffer and stop receiving new subscriptions.
	@Test
    public void testEmitterProcessor() throws InterruptedException { int bufferSize = 3; FluxProcessor<Integer, Integer> processor = EmitterProcessor. Create (bufferSize); FluxProcessor<Integer, Integer> processor = EmitterProcessor. Flux<Integer> flux1 = processor.map(e -> e); Flux<Integer> flux2 = processor.map(e -> e*10); IntStream. RangeClosed (1, 8). The forEach (e - > {LOGGER. The info ("emit:{}",e); processor.onNext(e); // If the published unconsumed data exceeds bufferSize, it will block here}); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream. RangeClosed (9, 10). The forEach (e - > {LOGGER. The info ("emit:{}",e); processor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Flux2.subscribe (e -> {logger.info ()"flux2 subscriber:{}",e);
        });

        processor.onNext(11);
        processor.onNext(12);

        processor.onComplete();
        processor.blockLast();
    }
Copy the code

Output instance

17:27:01. 008. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 17:27:01. 044. [the main] INFO com example. Demo. ProcessorTest - emit: 084 [main] 1 17:27:01. INFO com. Example. Demo. ProcessorTest - emit: 2 17:27:01. 084 [main] INFO com. Example. Demo. ProcessorTest - emit: 3 17:27:01. [the main] INFO 084 com. Example. Demo. ProcessorTest - emit: 4 17:27:01. [the main] INFO com. 084 example. Demo. ProcessorTest 084 - emit: 5 17:27:01. [the main] INFO com. Example. Demo. ProcessorTest - emit: 6 17:27:01. 084 [main] INFO Com. Example. Demo. ProcessorTest - emit: 7 17:27:01. [the main] INFO com. 084 example. Demo. ProcessorTest - emit: 8 17:27:01. 086 [the main] INFO com. Example. Demo. ProcessorTest - flux1 subscriber: 1 17:27:01. 086 [main] INFO com. Example. Demo. ProcessorTest - flux1 subscriber: 2 17:27:01. 087 [main] INFO com. Example. Demo. ProcessorTest - flux1 subscriber: 3 17:27:01. 087 (main) The INFO com. Example. Demo. ProcessorTest - flux1 subscriber: 4 17:27:01. 087 [main] INFO com. Example. Demo. ProcessorTest - flux1 The subscriber: 5 17:27:01. 087 [main] INFO com. Example. The demo. ProcessorTest - flux1 subscriber: 6 17:27:01. [the main] 087 INFO Com. Example. Demo. ProcessorTest - flux1 subscriber: 7 17:27:01. 087 [main] INFO com. Example. The demo. ProcessorTest - flux1 The subscriber: 8 17:27:01 088 [main] INFO com. Example. The demo. ProcessorTest - emit: 9 17:27:01. [the main] 088 INFO Com. Example. Demo. ProcessorTest - flux1 subscriber: 9 17:27:02. The 091 [main] INFO com. Example. Demo. ProcessorTest - emit: 10 17:27:02. [the main] INFO 092 com. Example. Demo. ProcessorTest - flux1 subscriber: 10. 17:27:03 096 [main] INFO Com. Example. Demo. ProcessorTest - flux1 subscriber: 11 17:27:03. 096 [main] INFO com. Example. Demo. ProcessorTest - flux2 The subscriber: 110 17:27:03 096 [main] INFO com. Example. The demo. ProcessorTest - flux1 subscriber: 12 17:27:03. [the main] 096 INFO com.example.demo.ProcessorTest - flux2 subscriber:120Copy the code

ReplayProcessor

You can cache data generated by sink or subscribe to Publisher and play it back to subsequent subscribers. There are four configurations

  • cacheLast

Only the last data is cached

  • create(int)

Cache the last N data

  • createTimeout(Duration)

Timestamp each data and cache only data whose age is within the specified TTL

  • createSizeOrTimeout(int,Duration)

Timestamp each data and cache only N data whose age is within the specified TTL

The instance

	@Test
    public void testReplayProcessor() throws InterruptedException {
        ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3);
        Flux<Integer> flux1 = replayProcessor
                .map(e -> e);
        Flux<Integer> flux2 = replayProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e); }); Intstream.rangeclosed (1,5). ForEach (e -> {replayprocessor.onnext (e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); }}); LOGGER.info("finish publish data");
        TimeUnit.SECONDS.sleep(3);

        LOGGER.info("begin to subscribe flux2");
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        replayProcessor.onComplete();
        replayProcessor.blockLast();
    }
Copy the code

The output is as follows

15:13:39. 415. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 15:13:39, 438 [main] INFO com. Example. The demo. ProcessorTest - flux1 subscriber: 1 15:13:40. [the main] INFO 445 com. Example. Demo. ProcessorTest - flux1 subscriber: 2 15:13:41. [the main] 449 INFO Com. Example. Demo. ProcessorTest - flux1 subscriber: 3 15:13:42. 454 [main] INFO com. Example. Demo. ProcessorTest - flux1 The subscriber: 4 15:13:43. 459 [main] INFO com. Example. Demo. ProcessorTest - flux1 subscriber: 5 15:13:44. [the main] 463 INFO Com. Example. Demo. ProcessorTest - finish the publish data 15:13:47. The 466 [main] INFO com. Example. The demo. ProcessorTest - begin to Subscribe flux2 15:13:47. 467. [the main] INFO com example. Demo. ProcessorTest - flux2 subscriber: 3 15:13:47. [the main] 467 INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 4 15:13:47. 468 [main] INFO com. Example. Demo. ProcessorTest - flux2 subscriber:5Copy the code

asynchronous

TopicProcessor

  • The TopicProcessor is an asynchronous processor that supports concurrent replay of multiple publishers when shared is set to true. If you subscribe to the publisher is a concurrent stream or need concurrent invocations Topicrocessor onNext, onCompleete, onError method, you must open the share. Disabling share is a processor that follows the Reactive Streams specification and does not allow concurrent calls.
  • TopicProcessor also supports the broadcast of messages (fan-out) to multiple subscribers, and it binds one thread to each subscriber. The maximum number of subscribers that can be supported is limited by the thread pool Executor.
  • TopicProcessor uses the RingBuffer data structure to push data, and each subscriber thread records its consumption position in the RingBuffer
  • TopicProcessor also supports the autoCancel option, which defaults to true, meaning publisher will be automatically cannel when all subscriber subscriptions are unsubscribed
    @Test
    public void testTopicProcessor() throws InterruptedException {
        TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder()
                .share(true)
//                .executor(Executors.newSingleThreadExecutor())
                .build();
        Flux<Integer> flux1 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux2 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux3 = topicProcessor
                .map(e -> e);

        AtomicInteger count = new AtomicInteger(0);
        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
            count.incrementAndGet();
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.rangeClosed(1,100)
                .parallel()
                .forEach(e -> {
//                    LOGGER.info("emit:{}",e);
                    topicProcessor.onNext(e);
                });

        topicProcessor.onComplete();
        topicProcessor.blockLast();

        TimeUnit.SECONDS.sleep(10);
        System.out.println(count.get());
    }
Copy the code

Notice two things:

  • share

EventLoopProcessor: reactor-core-3.1.2. release-sources.jar /reactor/core/publisher/EventLoopProcessor.java

EventLoopProcessor(
			int bufferSize,
			@Nullable ThreadFactory threadFactory,
			@Nullable ExecutorService executor,
			ExecutorService requestExecutor,
			boolean autoCancel,
			boolean multiproducers,
			Supplier<Slot<IN>> factory,
			WaitStrategy strategy) {

		if(! Queues.isPowerOfTwo(bufferSize)) { throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
		}

		if (bufferSize < 1){
			throw new IllegalArgumentException("bufferSize must be strictly positive, " +
					"was: "+bufferSize);
		}

		this.autoCancel = autoCancel;

		contextClassLoader = new EventLoopContext(multiproducers);

		this.name = defaultName(threadFactory, getClass());

		this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor");

		if (executor == null) {
			this.executor = Executors.newCachedThreadPool(threadFactory);
		}
		else {
			this.executor = executor;
		}

		if (multiproducers) {
			this.ringBuffer = RingBuffer.createMultiProducer(factory,
					bufferSize,
					strategy,
					this);
		}
		else{ this.ringBuffer = RingBuffer.createSingleProducer(factory, bufferSize, strategy, this); }}Copy the code

If share is true, createMultiProducer is created. If multiple threads call the processor’s onNext method and share is not enabled, there will be concurrency problems, that is, data will be lost. For example, in the code above, if share(true) is commented out, then the last count size is not necessarily 100, whereas turning share on true ensures that the last count size is 100

If set executor (Executors newSingleThreadExecutor ()), flux1, flux2, flux3 subscribers is sequential, rather than concurrently.

WorkQueueProcessor

  • The WorkQueueProcessor is also an asynchronous processor that supports concurrent replay of multiple publishers when shared is set to true.

  • The WorkQueueProcessor uses the RingBuffer data structure to push data.

  • The WorkQueueProcessor does not create a thread for each incoming subscriber and therefore scales a little better than TopicProcessor. The maximum number of subscribers that can be supported is limited by the thread pool Executor. However, it is worth noting that it is best not to add too many subscriber to the WorkQueueProcessor, as this will increase lock contention for the processor. It is best to use a ThreadPoolExecutor or ForkJoinPool, where the processor can check their capacity and throw an exception if there are too many subscribers.

  • The WorkQueueProcessor does not follow the specification of Reactive Streams and therefore consumes fewer resources than the TopicProcessor. As a trade-off, all subscriber requests are added together, and then the WorkQueueProcessor replays data for only one subscriber at a time. Compared with the TopicProcessorde fan-out broadcast mode, It is similar to the round-robin mode, but fair round-robin mode is not guaranteed.

    @Test
    public void testWorkQueueProcessor(){
        WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create();
        Flux<Integer> flux1 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux2 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux3 = workQueueProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e); }); IntStream. Range (1, 20). The forEach (e - > {workQueueProcessor. OnNext (e); }); workQueueProcessor.onComplete(); workQueueProcessor.blockLast(); }Copy the code

Output instance

21:56:58. 203. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 21:56:58. 214. [the main] the DEBUG reactor. The core. Publisher. UnsafeSupport - Starting UnsafeSupport initinJava 1.8 21:56:58. 215. [the main] the DEBUG reactor. The core. Publisher. UnsafeSupport - the Unsafe is 21:56:58. 228 INFO com [WorkQueueProcessor - 1]. The example. The demo. ProcessorTest - flux1 subscriber: 1 21:56:58. [WorkQueueProcessor - 3] 228 INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 3 21:56:58. [WorkQueueProcessor - 2] 228 INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 2 21:56:58. 229 INFO [WorkQueueProcessor - 1] Com. Example. Demo. ProcessorTest - flux1 subscriber: 4 21:56:58. [WorkQueueProcessor - 3] 229 INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 5 21:56:58. [WorkQueueProcessor - 2] 229 INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 6 21:56:58. 230 INFO [WorkQueueProcessor - 1] Com. Example. Demo. ProcessorTest - flux1 subscriber: 7 21:56:58. [WorkQueueProcessor - 3] 230 INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 8 21:56:58. [WorkQueueProcessor - 2] 230 INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 9 21:56:58. 230 INFO [WorkQueueProcessor - 1] Com. Example. Demo. ProcessorTest - flux1 subscriber: 10. 21:56:58 230 [WorkQueueProcessor - 3] INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 11 21:56:58. The 230 [WorkQueueProcessor - 2] INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 12 21:56:58. 230 INFO [WorkQueueProcessor - 1] Com. Example. Demo. ProcessorTest - flux1 subscriber: 13 21:56:58. The 230 [WorkQueueProcessor - 3] INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 14 21:56:58 230 [WorkQueueProcessor - 2] INFO Com. Example. Demo. ProcessorTest - flux2 subscriber: 15 21:56:58. The 230 [WorkQueueProcessor - 3] INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 17 21:56:58. 230 INFO [WorkQueueProcessor - 1] Com. Example. Demo. ProcessorTest - flux1 subscriber: 16 21:56:58. The 230 [WorkQueueProcessor - 3] INFO Com. Example. Demo. ProcessorTest - flux3 subscriber: 19 21:56:58. The 230 [WorkQueueProcessor - 2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18Copy the code

It can be seen that the subscriber of WorkQueueProcessor is similar to the consumer of Kafka that belongs to the same group, and the total messages consumed by them are the total messages released by Publisher, unlike the broadcast message transmission of TopicProcessor.

doc

  • processor-overview
  • Disruptor -3.3.2 Source Code parsing (3)- Release event