
Usually, when we operate the set data, we usually use for or iterator to traverse, which is not very nice. Java provides the concept of Stream, which allows us to treat a collection of data as if it were an individual element, and provides a multi-threading pattern

  • The creation of a flow
  • Stream various data operations
  • The termination operation of the stream
  • Aggregation processing of streams
  • The use of concurrent streams and completableFuture

Making the address

Stream 1 is constructed in the following way

Stream Built-in constructors

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
public static<T> Builder<T> builder()
public static<T> Stream<T> of(T t)
public static<T> Stream<T> empty()
public static<T> Stream<T> generate(Supplier<T> s)

The stream function declared by the Collection

default Stream<E> stream()
  • Collection declares a stream conversion function. That is, any Collection subclass has a method that the authorities implement for us to convert from Collection to stream
  • For example, List to Stream

    public static void main(String[] args){
        List<String> demo =  Arrays.asList("a","b","c");
        long count =;

Stream defines how to operate on an element

Filter filter

Stream<T> filter(Predicate<? super T> predicate)
  • Predicate is a functional interface that can be directly replaced by lambda. If there is complex filtering logic, the OR, AND, Negate methods are used
  • The sample

    List<String> demo = Arrays.asList("a", "b", "c");
    Predicate<String> f1 = item -> item.equals("a");
    Predicate<String> f2 = item -> item.equals("b");;

Transformation Map

<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
  • The sample

    static class User{ public User(Integer id){ = id; } Integer id; public Integer getId() { return id; } } public static void main(String[] args) { List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); Stream ().map(User::getId).foreach (System.out::println); } -------result-------- 1 2 3

    Data processing PEEK

    Stream<T> peek(Consumer<? super T> action);
  • The difference from map is that it has no return value
  • The sample

    static class User{ public User(Integer id){ = id; } Integer id; public Integer getId() { return id; } public void setId(Integer id) { = id; } } public static void main(String[] args) { List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); // id square, Stream (). Peek (user-> user.setid ( *); } -------result-------- 1 4 9

The mapping flattens the FlatMap

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
  • FlatMap: Flatmaps streams with elements of type Stream\

    to a Stream with element type T
  • The sample

    public static void main(String[] args) {
        List<Stream<Integer>> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1));;

To heavy distinct

Stream<T> distinct();
  • The sample

    List<Integer> demo = Arrays.asList(1, 1, 2);;

Sorting sorted

Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
  • The sample

    List<Integer> demo = Arrays.asList(5, 1, 2); // default ascending (System.out::println); Public int (); public int (int) {public int (); public int ();; Default ascending result -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 2 5 -- -- -- -- -- -- -- descending result -- -- -- -- -- -- -- -- 5 2 to 1

Limit the number and skip the number

Stream<T> limit(long maxSize); Stream< > skip(long n);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); Stream ().skip(2).limit(2).foreach (System.out::println); -------result-------- 3 4

New operations provided by JDK9

  • TakeWhile takes an element that satisfies the condition until it does not. The dropWhile discards an element that satisfies the condition until it does not

    default Stream<T> takeWhile(Predicate<? super T> predicate);
    default Stream<T> dropWhile(Predicate<? super T> predicate);

3 stream terminates action

Traverse the consumption

// void forEach(Consumer<? super T> action); ForeachOrdered void foreachOrdered (Consumer<? super T> action);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    -------forEach result--------
    -------forEachOrdered result--------

Get array result

Object[] toArray(); A[] toArray(intFunction <A[]> Generator)
  • The sample

    List<String> demo = Arrays.asList("1", "2", "3");
    //<A> A[] toArray(IntFunction<A[]> generator)
    String[] data =[]::new);

Maximum and minimum

Optional<T> min(Comparator<? Super T> compare) // Get the maximum Optional<T> Max (comparator <? super T> comparator)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    Optional<Integer> min =>item));
    Optional<Integer> max =>item));

To find the matching

Boolean anyMatch(Predicate<? Predicate) // Boolean allMatch(predicate <? Boolean noneMatch(predicate <? Predicate?); Super T> predicate) // Find the first Optional<T> findFirst(); // Optional<T> findAny();

Reduction to merge

Optional<T> reduce(BinaryOperator<T> accumulator) <U> U Reduce (U Identity, BiFunction<U, BiFunction<U, BiFunction<U, BiFunction) super T, U> accumulator, BinaryOperator<U> combiner)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); / / number into a String, and then use the "-" pieced together String data = demo. The stream (). The reduce (" 0 ", (u, t) - > u + "-" + t, (s1, s2) - > s1 + "-" + s2); System.out.println(data); -------result-------- 0-1-2-3-4-5-6-7-8

Count Elements

long count()
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);

Convergent treatment of convection

/** * Supplier: producer of the return result type * Accumulator: element consumer (to process and add R) * Combiner: */ <R> R Collect (Supplier<R> Supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R, R> combiner); /** * Collector is an aggregation class composed of supplier, accumulator, combiner, finisher, and characteristics ** Collectors can provide some built-in aggregation classes or methods */ <R, A> R collect(Collector<? super T, A, R> collector);
  • For an example, see below

4 Collector’s tool class collection Collectors

Interface Collector and implementation class CollectorImpl

// The returned producer Supplier<A> Supplier (); // Accumulator <A, bB0 accumulator(); BinaryOperator<A> combiner(); binaryOperator <A> combiner(); Function<A, R> finisher(); Function<A, R> finisher(); // Set<Characteristics> Characteristics (); public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics)

The stream aggregates are converted to lists and sets

// Collector<T, Collector<T, Collector<T, Collector<T,? , List<T>> toList() // Collector<T, , Set<T>> toSet()
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    List<Integer> col =;
    Set<Integer> set =;

Stream aggregation is converted to MAP

// Map public static <T, K, U> Collector<T,? , Map<K,U>> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? Extends U> valueMapper) /** * mergeFunction: public static <T, K, U> Collector<T,? , Map<K,U>> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? Extends U> valueMapper, binaryOperator <U> mergeFunction) /** * mergeFunction: The same key, the value is merged * mapSupplier: */ public static <T, K, U, M extends Map<K, U>> Collector<T,? , M> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction, Supplier<M> mapSupplier)
  • If there is an element with the same key, an error will be reported. Or use GroupBy
  • The sample

    List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));
    Map<Integer,User> map =,item->item));
    {1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}

String stream aggregation concatenation

Public static Collector<CharSequence,? , String> joining(); Public static Collector<CharSequence,? // Collector<CharSequence,? , String> joining(CharSequence delimiter)
  • The sample

    A List < String > demo = Arrays. AsList (" c ", "s", "c", "w", "stealth on"); String name ="-")); System.out.println(name); -------result C-S-C-W

The mapping process then aggregates the flow

  • Map collect = collect map collect

    /** * mapper: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream; , R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream);
  • The sample

    List<String> demo = Arrays.asList("1", "2", "3");
    List<Integer> data =, Collectors.toList()));
    [1, 2, 3]

The results are aggregated and then converted

/ * * * downstream: aggregate processing * finisher: results conversion processing * / public static < T, A, R, RR > Collector < T, A, RR > collectingAndThen (Collector < T, A, R > downstream, Function<R, RR> finisher);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); / / aggregate into the List, and finally the size of an array of extraction as a return value Integer size = demo. The stream () collect (Collectors. CollectingAndThen (Collectors. ToList (), List::size)); System.out.println(size); ---------result---------- 6

Stream grouping (Map is HashMap)

Public static <T, K> Collector<T,? Collector<T,? , Map<K, List<T>>> groupingBy( Function<? super T, ? extends K> classifier); /** * Classifier: downstream: downstream: downstream: aggregator */ public static <T, K, A, D> Collector<T,? , Map<K, D>> groupingBy( Function<? super T, ? Extends K> classifier, Collector<? Super T, A, D downstream: downstream) /** * Classifier: downstream: downstream: factory. */ public static <T, K, D, A, M extends Map<K, D>> Collector<T,? , M> groupingBy( Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream)
  • The sample

    public static void main(String[] args) throws Exception { List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(15) .collect(Collectors.toList()); // Divide into three groups, Map<Integer, List<String>> Map = (). Collect (item-> item % 3,). GroupingBy (item-> item % 3,). HashMap::new, Collectors.mapping(String::valueOf, Collectors.toList()))); System.out.println(map); } ---------result---------- {0=[0, 3, 6, 9, 12], 1=[1, 4, 7, 10, 13], 2=[2, 5, 8, 11, 14]}

Stream grouping (the Map used for grouping is ConcurrentHashMap)

/** * classifier: classifier; */ public static <T, K> Collector<T,? , ConcurrentMap<K, List<T>>> groupingByConcurrent( Function<? super T, ? extends K> classifier); /** * Classifier: downstream: downstream: aggregator */ public static <T, K, A, D> Collector<T,? , ConcurrentMap<K, D>> groupingByConcurrent( Function<? super T, ? extends K> classifier, Collector<? Super T, A, D downstream: downstream) /** * Classifier: downstream: downstream: downstream: downstream: downstream: downstream: */ public static <T, K, A, D, M extends ConcurrentMap< T, D>> Collector<T,? , M> groupingByConcurrent( Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream);
  • It’s used in the same way as groupingBy

Split, one to two (equivalent to a special GroupingBy)

public static <T> Collector<T, ? , Map<Boolean, List<T>>> partitioningBy( Predicate<? Super T> predicate) /** * predicate: downstream: aggregator */ public static <T, D, A> Collector<T,? , Map<Boolean, D>> partitioningBy( Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2,3,4, 5,6); / / t group Map < Boolean, List < Integer > > Map = demo. The stream () collect (Collectors. PartitioningBy (item - > item % 2 = = 0)); System.out.println(map); ---------result---------- {false=[1, 3, 5], true=[2, 4, 6]}

Aggregation averaging

// Return Double public static <T> Collector<T,? , Double> averagingDouble(ToDoubleFunction<? Super T> mapper) // return Long type public static <T> Collector<T,? , Double> averagingLong(ToLongFunction<? Super T> mapper) // return Int public static <T> Collector<T,? , Double> averagingInt(ToIntFunction<? super T> mapper)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5); Double data =; System.out.println(data); -- -- -- -- -- -- -- -- -- the result -- -- -- -- -- -- -- -- -- -- 2.6666666666666665

Stream aggregation looks for maximum and minimum values

Public static < Collector<T,? , Optional<T>> minBy(Comparator<? Public static <T> Collector<T,? , Optional<T>> maxBy(Comparator<? super T> comparator)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5);
    Optional<Integer> min = -> item)));
    Optional<Integer> max = -> item)));

Aggregate the statistical results

  • You can get the total number of elements, the cumulative sum of elements, the minimum, the maximum, and the average

    // Return Int public static <T> Collector<T,? , IntSummaryStatistics> summarizingInt( ToIntFunction<? Super T> mapper) // return Double type public static <T> Collector<T,? , DoubleSummaryStatistics> summarizingDouble( ToDoubleFunction<? Super T> mapper) // return Long type public static <T> Collector<T,? , LongSummaryStatistics> summarizingLong( ToLongFunction<? super T> mapper)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5); IntSummaryStatistics data =; System.out.println(data); -- -- -- -- -- -- -- -- -- the result -- -- -- -- -- -- -- -- -- -- IntSummaryStatistics {count = 3, the sum = 8, min = 1, business = 2.666667, Max = 5}

New aggregation methods provided by JDK12

Public static <T, R1, R2, rbb0 Collector<T,? , R> teeing( Collector<? super T, ? , R1> downstream1, Collector<? super T, ? , R2> downstream2, BiFunction<? super R1, ? super R2, R> merger)

5. Use of concurrent ParalleStream

  • With the use of CompletableFuture and thread pools
  • The sample

    public static void main(String[] args) throws Exception{ List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(5) .collect(Collectors.toList()); // Example 1 stopWatch stopWatch = stopWatch.createStarted (ticker.systemTicker ()); -> { try { Thread.sleep(500); System. The out. Println (" example 1 - "+ Thread. CurrentThread (). The getName ()); } catch (Exception e) { } }); System. Out. Println (" example 1 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); // For example 2, note that you need ForkJoinPool, ParallelStream to use the thread specified by executor, Otherwise, use the default ForkJoinPool.commonPool() executorService executor = new ForkJoinPool(10); stopwatch.reset(); stopwatch.start(); CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System.out.println(" Example 2-" + Thread.currentThread().getName()); } catch (Exception e) { } }), executor).join(); System. Out. Println (" sample 2 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); // Example 3 stopWatch.reset (); stopwatch.start(); demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System. The out. Println (" example 3 - "+ Thread. CurrentThread (). The getName ()); } catch (Exception e) { } }); System. Out. Println (" example 3 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); executor.shutdown(); }
  • ——————-result————————–

    Example 1- Main Example 1- Main Example 1- Main Example 1- Main Example 1- Main Example 1-2501 Example 2-ForkJoinPool-1- Worker -19 Example 2-ForkJoinPool-1- Worker -9 Example 2-forkJoinPool -1-worker-5 example 2-forkJoinPool -1-worker-27 example 2-forkJoinPool -1-worker-23 example 2-1004 example 3-main Example monpool-worker -5 example monpool-worker -7 example monpool-worker -9 Example monpool-worker -3 Example 3-1001
  • ParallelStream does run with multiple threads and can specify a thread pool, but the custom thread must be of type ForkJoinPool, or it will default to ForkJoinPool.commonPool()