This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!

>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…

A. The preface

This article takes a step-by-step approach to functional interfaces and discusses the principles and benefits of this feature in Java 8

Before functional programming has been based on use, recently combing the source code, found that this concept is really everywhere, simply a complete comb processing

The basic use of Java Stream can see this article: Operation Manual: Stream Processing manual

This paper will be divided into two main bodies:

  • Functional programming in Java
  • Java Stream principle

Ii. Principles of functional programming

Functional programming has four main interfaces: Consumer, Supplier, Predicate, and Function, each of which has a separate abstract method

Note: Functional interfaces are annotated with the @functionalInterface annotation, which can only be annotated on interfaces that have only one abstract method.

// In the example of Consumer, you can see that the only abstract method is Accept
@FunctionalInterface
public interface Consumer<T> {

    void accept(T t);

    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return(T t) -> { accept(t); after.accept(t); }; }}Copy the code

Here are a few concepts:

  1. There can only be one abstract method in a functional programming interface
  2. There can be static and default methods (PS: not abstract methods)
  3. You can override the Object method (PS: function itself inherits from Object, but we’ll see what happens later).
  4. Comments not required

2.1 Use of functional programming

Let’s define a functional programming process and see the details:

2.1.1 Arrow function Demo

// Look at a simple use of functional programming:

// Step 1: Customize the functional interface
@FunctionalInterface
public interface FunctionInterface<T.D> {

    /** * function interface abstract method, the final execution method * PS: so generic */
    T invoke(D input);

    /** * Object, which does not violate the rules of functional programming **@param var1
     * @return* /
    boolean equals(Object var1);

    /** * default is not abstract, does not violate the principle */
    default void defaultMethod(a) {}/**
     * static不是抽象方法
     */
    static void staticMethod(String msg) { System.out.println(msg); }}// Step 2: Define the calling function
public void testFunction(a) {
    logger.info("------> [execute functional methods] <-------");

    // Method 1: pass in the code block
    invokeInterface((input) -> {
        return input + "-output1";
    });

    // Method 2: direct access
    invokeInterface((input) -> input + "-output2");
    
    // Method 3: pass in the object
    FunctionInterface<String, String> funtion = (input) -> input + "-output3";
    invokeInterface(funtion);

}


public void invokeInterface(FunctionInterface<String, String> funtion) {
    String response = funtion.invoke("test");
    logger.info("------> this is output :{} <-------", response);
}

// Print the result
FunctionService    : ------> this is output :test-output1 <-------
FunctionService    : ------> this is output :test-output2 <-------
FunctionService    : ------> this is output :test-output3 <-------
Copy the code

2.1.2 Double colon function Demo

public void testDoubleFunction(a) {

    // Example 1: pass in an instance method
    Consumer consumer = System.out::println;
    invokeDoubleFunction(consumer);

    // Example 2: passing static methods
    Consumer<String> consumer2 = FunctionInterface::staticMethod;
    invokeDoubleFunction(consumer2);

    // Example 3: passing a superclass
    Consumer<String> consumer3 = super::superMethod;
    invokeDoubleFunction(consumer3);

    // Example 4: Pass the constructor
    Consumer<ArrayList> consumer4 = ArrayList::new;
    invokeDoubleFunction2(consumer4);
    
    // For common cases, pass in custom methods
    Consumer<String> createRoot = (msg) -> {
        logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
    };
    invokeDoubleFunction2(createRoot);;
}

public void invokeDoubleFunction(Consumer consumer) {
    Stream<String> stream = Stream.of("aaa"."bbb"."ddd"."ccc"."fff");
    stream.forEach(consumer);
}

public void invokeDoubleFunction2(Consumer consumer) {
    Stream<Collection<String>> stream = Stream.of(Arrays.asList("aaa"."bbb"));
    stream.forEach(consumer);
}
Copy the code

2.2 Related annotations/interfaces

Having seen the custom ways of functional programming above, here is the related interface 👉

There are four main functional interfaces known to Java: Consumer, Supplier, Predicate, Function, and others like it: IntConsumer, IntSupplier, UnaryOperator, etc. Here we just need to do a simple analysis of the four main ones

Consumer: Consumer interface

Understanding: The purpose of this interface function is to consume the parameters passed in. The main focus is on the use of the parameters.

public interface Consumer<T> {
    void accept(T t);
}
Copy the code

Supplier: Supplier

Understanding: Supplier’s role scenario is primarily a simple resource structure: no input, only return

public interface Supplier<T> {
    T get(a);
}
Copy the code

Predicate: Predicate type

An assertion that represents a parameter structure: an object is passed in and returns a Boolean

public interface Predicate<T> {
    boolean test(T t);
}

/ / usage:
stream.filter(predicate).collect(Collectors.toList());
Copy the code

Function: Functional

Understanding: Functions that take a parameter and produce a result are also the most functional

public interface Function<T.R> {
    R apply(T t);
}

/ / usage:
Function<String, Integer> function = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return s.length();// Get the length of each string and return}}; Stream<Integer> stream1 = stream.map(function);Copy the code

Differences between Consumer and IntConsumer similar interfaces, and optimization thinking

As you can see, there are some basic types of interfaces that are manually extended for each of these function interfaces, such as IntConsumer, and so on

From the previous data, we know that the purpose of this is to optimize the basic type, but there is no direct difference when looking at the source code:

On a personal note, generics are mostly handled by the compiler and have little impact in the actual use phase, and the wrapper functionality of the Java basic types is not really optimized, so what is being optimized here? Or is it just for clarity?

// PS: I don’t believe the JDK does this, so there must be room!!

Think of the fact that the actual type is determined before the compiler, so there must be some intuitive place in the business code to handle the type, rather than using things like reflection to handle the type at runtime.

TODO: That was interesting, but we’ll see…

2.2 Method function principle

Now that we’ve talked about functional programming, let’s take a look at how functional programming works:

Use of the arrow function:

// In the custom case, what is passed in is actually an interface objectMethod functions are the concept of lambda, which is based on how they are used at the Class level.// TODO: decompile is not necessary

Copy the code

3. Steam depth

3.1 Stream architecture

As you can see from the diagram, the architecture is basically the same, like a tree structure:

The second layer is an abstraction level that contains five classes: AbstractPipeline/Stream/IntStream/DoubleStream/LongStream The fourth layer of the DoublePipeline/LongPipeline/IntPipeline/ReferencePipeline is an inner class, and each implementation has several: Head / StatelessOp / StatefulOp / OfInt

3.2 Operating principle of Stream

As you can see, the Stream structure is primarily based on the concept of Pipeline, with three additional optimizations for the basic types.

At the same time, StatefulOp and StatelessOp are used to make a simple conceptual arrangement for the intermediate operations of due and stateless states:

Process Diagram (a diagram to show you the main process)


Pre-added: AbstractPipeline class

AbstractPipeline 的作用 :

Stage is a virtual concept. AbstractPipeline represents the initial part of a stream pipeline, encapsulating the stream source and zero or more intermediate operations. An AbstractPipeline is regarded as a stage, where each stage describes either the stream source or the intermediate operations.

Stage properties:

There are three stage concepts in AbstractPipeline (used to annotate spatial structures)

F-abstractpipeline sourceStage: points to the head of the pipeline chain (self if this is the sourceStage) “Upstream” pipeline, empty if this is the source level f-abstractpipeline nextStage: the nextStage in the pipeline, empty if this is the last stage

The above three attributes are used to annotate the spatial structure, meaning where does the process go, and the rest is how to annotate the behavior

Type of behavior:

The properties Head, StatefulOp, and StatelessOp all inherit from AbstractPipeline, and they identify three types of operations

Head: indicates the first Stage, the source Stage, which collects resources. StatefulOp: stateless operations StatelessOp: stateless operations

The usual execution logic is Head -> StatefulOp -> StatelessOp

Other attributes:

// Upstream pipeline, null when the first stream is created
previousStage = null; 
// Source cutters, implemented in each tool class
sourceSpliterator = source; 
// The operation flag of the intermediate operation represented in this pipe object.
sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; 

// Merge source and operation flags for all operations of the source and the operation represented by the pipe object
combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;

/** * Non-parallel: with the depth of the source stream (number of steps) * Parallel: represents the preceding stateful operand **/
depth = 0; 

/** * Whether the pipeline is parallel **/
parallel = parallel; 
Copy the code

AbstractPipeline constructor

AbstractPipeline(Supplier<? extends Spliterator<? >> source,int sourceFlags, boolean parallel) {
	this.previousStage = null;  / / upper stage
	this.sourceSupplier = source;
	this.sourceStage = this;
	this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
	this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
	this.depth = 0;
	this.parallel = parallel;
}
Copy the code

3.2.1 Stream process

The front points

Here are all the key points of Stream operation learned from related blogs for the rest of the article:

/ / main process- each operation will create a stage - upstream sink sink through AbstractPipeline. To find out the downstream opWrapSink sink// End the process- The end operation will not create a new pipeline Stage. - The end operation will wrap a Sink operated by itself as the last Sink of the pipeline// The collection process- For Boolean and Optional operations, it will be recorded in the corresponding Sink. - For reduction operations, the final result will be placed in the container specified by the user when calling. - The return array will be first placed in a Node data structure// Operate in parallel- Performs tasks by ForkJoin in parallelCopy the code

Use case

// Follow the following example to see the main process:
List<String> randomValues = Arrays.asList(
	"E11"."D12"."A13"."F14"."C15"."A16"."B11"."B12"."C13"."B14"."B15"."B16"."F12"."E13"."C11"."C14"."A15"."C16"."F11"."C12"."D13"."E14"."D15"."D16"
);

randomValues.stream().filter(value -> value.startsWith("C")).sorted().forEach(System.out::println);
Copy the code

3.2.1.1 Step 1 :The creation of a Stream

It’s usually possible to create a Stream from a Collection or an Array. We won’t go into too much detail here, but what does it look like

  1. Spliterator to split collections (methods of collections or arrays themselves)
  2. Build a stream from streamSupport. stream
// As you can see, a Stream is built using ReferencePipeline.Head
return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);

// 补充 spliterator :Spliterator has many implementations. ArrayListSpliterator has three main properties:private final ArrayList<E> list; // Pass the current Stream collection
private int index; // The current index is modified as it grows and cuts
private int fence; // The current number of collections. When -1 indicates that all collections are finished
private int expectedModCount; // Initialization when fence is set


// Complement StreamOpFlag: this object converts Spliterator into a stream flag that identifies the characteristics of the stream.
public static final int ORDERED    = 0x00000010; // Define the meeting order
public static final int DISTINCT   = 0x00000001; // Define unique features
public static final int SORTED     = 0x00000004; // The characteristic values that follow the defined sort order are comparable
public static final int SIZED      = 0x00000040; // Represents an exact count of the number of elements that will be encountered during a full walk
public static final int NONNULL    = 0x00000100; // Represents the eigenvalue where the source guarantees that the element encountered is not empty
public static final int IMMUTABLE  = 0x00000400; // The element source is an eigenvalue that cannot be modified by the structure
public static final int CONCURRENT = 0x00001000; // Indicates that the element source can be safely modified concurrently by multiple threads (allowing addition, replacement, and/or removal) without the need for external synchronization
public static final int SUBSIZED = 0x00004000; // All subspliterators, whether direct or indirect, will be counted (subclass count)



// parallel: whether the returned stream is parallel
Copy the code

3.2.1.2 Step 2 :The Filter to Filter

We saw the Stream creation process earlier:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // Check with the passed method function
                    if (predicate.test(u))
                        // Verify that the output stream was added successfully (PS: we will talk about the List after it was added)downstream.accept(u); }}; }}; }StreamShape is an enumeration that describes the type characteristics of a stream abstraction. It has four properties
REFERENCE : object
INT_VALUE 
LONG_VALUE
DOUBLE_VALUE
Copy the code

What does this code focus on?

  1. A StatelessOp is built
  2. Sink.ChainedReference is a reverse callback logic that will only be executed when the stream foreach

Supplement: Core operation wrapSink

/ / function:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
	Objects.requireNonNull(sink);

	for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
		sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
	}
	return (Sink<P_IN>) sink;
}
Copy the code

Focus on this link:

What is a pipe? A pipe is a line of water that goes in at one end and out at the other

The filter is equivalent to the shunt valve, which can get some water out, but all the premise is that the switch of the pipeline should be opened

That is, when the process reaches a Terminal operation such as foreach, the flow starts to run and the intermediate operations set in the process are executed

3.2.1.3 Step 3 :The forEach process

The forEach entrance

C- ReferencePipeline (java.util.stream.SortedOps$OfRef)
public void forEach(Consumer<? super P_OUT> action) {
	evaluate(ForEachOps.makeRef(action, false));
}

// --> evaluate the actual implementation
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape(a) == terminalOp.inputShape();
    // linkedOrConsumed -> This pipe has been applied or consumed, meaning the pipe can only be used once
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
	
    // Whether the stream is parallel
    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}


// Additional: Parallel object distinctionEvaluateParallel: Performs parallel computation of the operation using the specified PipelineHelper. EvaluateSequential: Performs sequential computation of the operation using the specified parametersCopy the code

Internal process Step 1 :wrapSink Build Sink

public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
	return helper.wrapAndCopyInto(this, spliterator).get();
}

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

// Add wrapSink:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
}

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
	Objects.requireNonNull(sink);

	// If the input is already naturally sorted and this operation
	// also naturally sorted then this is a no-op
	if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
		return sink;
	else if (StreamOpFlag.SIZED.isKnown(flags))
		return new SizedRefSortingSink<>(sink, comparator);
	else
		return new RefSortingSink<>(sink, comparator);
}


@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Notify that data is coming
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        // End must be called after all data has been sent
        wrappedSink.end();
        
        // PS: begin and accept cannot be called again after end
    }
    else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code

Internal process Step 2: Initiate a Foreach loop

public void end(a) {
    // At this point, the list has been filtered and sorted
	list.sort(comparator);
    
    Downstream is also a Sink
	downstream.begin(list.size());
	if(! cancellationWasRequested) {// PS: List is successfully added in the preceding Filter
		list.forEach(downstream::accept);
	} else {
		for (T t : list) {
			if (downstream.cancellationRequested()) break;
				downstream.accept(t);
		}
	}
	downstream.end();
	list = null;
}
Copy the code

Internal process Step 3: Foreach main process

public void forEach(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    final int expectedModCount = modCount;

    final E[] elementData = (E[]) this.elementData;
    final int size = this.size;
    for (int i=0; modCount == expectedModCount && i < size; i++) {
        action.accept(elementData[i]);
    }
    if(modCount ! = expectedModCount) {throw newConcurrentModificationException(); }}Copy the code

Internal process Step 4: Execute accept

static final class OfRef<T> extends ForEachOp<T> {
    final Consumer<? super T> consumer;

    OfRef(Consumer<? super T> consumer, boolean ordered) {
        super(ordered);
        this.consumer = consumer;
    }

    @Override
    public void accept(T t) {
        // Here we execute the function passed in foreachconsumer.accept(t); }}Copy the code

3.2.1.4 Supplementary Sink:

Function:

  • Begin (Long size) : This method is called before you start traversing the elements, notifying Sink to get ready
  • End () : called after all elements have been traversed to tell Sink that there are no more elements
  • CancellationRequested () : Whether the operation can be finished, so that the short circuit operation can be finished as early as possible (short circuit operation must be implemented)
  • Accept (T T) : Called when an element is traversed, accepting an element to be processed and processing the element (PS: Stage in the list is called by the accept underlayer)
// Call process:Each Stage will encapsulate its own operation into a Sink, and the latter Stage only needs to call the accept() method of the previous Stage, which is similar to the idea of recursion, but different recursion is from the bottom layer to the top layer, and Stream is from the top layer to the bottom layer after executing the bottom layerCopy the code

Iv. Key points

4.1 the Map process

// 
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // u is the current object. Mapper. apply is used to apply this function to a given parameterdownstream.accept(mapper.apply(u)); }}; }}; }Copy the code

4.2 the Collection process

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if(isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (! isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ?super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
           ? (R) container
           : collector.finisher().apply(container);
}


// The Supplier object is injected here
public static<T> Collector<T, ? , List<T>> toList() {return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
}

// Actual here
static class CollectorImpl<T.A.R> implements Collector<T.A.R> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;
    
}

// InvokeSupplier when executing the ReduceOps object
public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? superT, I, ? > collector) {
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T.I.ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) { state = combiner.apply(state, other.state); }}return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink(a) {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags(a) {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0; }}; }Copy the code

4.3 Differences between stateless and stateful treatments

  • Stateless: Stateless operation
  • Stateful: stateful operation
    abstract static class StatelessOp<E_IN.E_OUT>
            extends ReferencePipeline<E_IN.E_OUT> { StatelessOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape,int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful(a) {
            return false; }}abstract static class StatefulOp<E_IN.E_OUT>
            extends ReferencePipeline<E_IN.E_OUT> { StatefulOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape,int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful(a) {
            return true;
        }

        @Override
        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper
       
         helper, Spliterator
        
          spliterator, IntFunction
         
           generator)
         []>
        ;
    }


/ / contrast:
1StatefulOp implements opEvaluateParallel as an extraCopy the code

4.4 Parallel Processing

// We already know that evaluate is created in parallel through evaluateParallel
C- AbstractPipeline
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    / /... Whether the stream is parallel
    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

/ / principle:The parallel processing of streams is based on the ForkJoin framework, which is created differently in each Op, but each creates a Task// C- ReduceOp
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
	return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

// C- MatchOp
public <S> Boolean evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
	return new MatchTask<>(this, helper, spliterator).invoke();
}

// C- ForEachOp
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
	if (ordered)
		new ForEachOrderedTask<>(helper, spliterator, this).invoke();
	else
		new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
	return null;
}

// C- FindOp
public <P_IN> O evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
	return new FindTask<>(this, helper, spliterator).invoke();
}
Copy the code

public void compute(a) {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        while(sizeEstimate > sizeThreshold && (ls = rs.trySplit()) ! =null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
}
Copy the code

Here you can have a look at the senior’s article, written very clearly, the following is the handling of the picture @Java8 Stream principle in-depth analysis – Zhihu.com

4.5 Op module system

// PS: create a Sink for each Op

 copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
 
 
 // The core is: wrapSink, the final call after creation
Copy the code

4.6 Multiple stages

  • Set by the constructor
  • Depth Specifies the depth

The other structures are shown

conclusion

The principle of Stream does not go into too much depth, mainly because of some curiosity about its principle, and only one process is analyzed.

Stream source code is a pleasure to read. It is rare to see code with such an interesting structure.

How streams are processed efficiently through parallelism is not fully understood here. In the next article, we will look at performance analysis

thinking

Spend so long, comb through these source code, always want to learn something from inside, here try to do a little summary:

  • The Stream process is very interesting. It is a recursion-like but slightly different structure. The underlying logic is the switch that starts the entire process, and when the water flows, it is executed from scratch
  • The architecture of Stream is also interesting. It’s a bit like Node next and Pre, but with virtual stage objects
  • In the inheritance system, the first feeling is neat, and very detailed, in the interface structure, very valuable reference

The appendix

Stream common methods:

reference

@deep understanding of Java8 Stream implementation principles _LCgoing blog -CSDN blog _stream principles

@Java8 Stream Principle in-depth analysis – Zhihu (zhihu.com)

@ www.javabrahman.com/java-8/unde…)