preface

Usually, when we operate the set data, we usually use for or iterator to traverse, which is not very nice. Java provides the concept of Stream, which allows us to treat a collection of data as if it were an individual element, and provides a multi-threading pattern

  • The creation of a flow
  • Stream various data operations
  • The termination operation of the stream
  • Aggregation processing of streams
  • The use of concurrent streams and completableFuture

Pay attention to the public number, communicate together, WeChat search a search: sneaking forward

Making the addressThank you, star,

Stream 1 is constructed in the following way

Stream Built-in constructors

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
public static<T> Builder<T> builder()
public static<T> Stream<T> of(T t)
public static<T> Stream<T> empty()
public static<T> Stream<T> generate(Supplier<T> s)

The stream function declared by the Collection

default Stream<E> stream()
  • Collection declares a stream conversion function. That is, any Collection subclass has a method that the authorities implement for us to convert from Collection to stream
  • For example, List to Stream

    public static void main(String[] args){
        List<String> demo =  Arrays.asList("a","b","c");
        long count = demo.stream().peek(System.out::println).count();
        System.out.println(count);
    }
    -------result--------
    a
    b
    c
    3

Stream defines how to operate on an element

Filter filter

Stream<T> filter(Predicate<? super T> predicate)
  • Predicate is a functional interface that can be directly replaced by lambda. If there is complex filtering logic, the OR, AND, Negate methods are used
  • The sample

    List<String> demo = Arrays.asList("a", "b", "c");
    Predicate<String> f1 = item -> item.equals("a");
    Predicate<String> f2 = item -> item.equals("b");
    demo.stream().filter(f1.or(f2)).forEach(System.out::println);
    -------result--------
    a
    b

Transformation Map

<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
  • The sample

    static class User{ public User(Integer id){this.id = id; } Integer id; public Integer getId() { return id; } } public static void main(String[] args) { List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); Stream ().map(User::getId).foreach (System.out::println); } -------result-------- 1 2 3

    Data processing PEEK

    Stream<T> peek(Consumer<? super T> action);
  • The difference from map is that it has no return value
  • The sample

    static class User{ public User(Integer id){this.id = id; } Integer id; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } } public static void main(String[] args) { List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); // id square, Stream (). Peek (user-> user.setid (user.id *) user.id)).map(User::getId).forEach(System.out::println); } -------result-------- 1 4 9

The mapping flattens the FlatMap

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
  • FlatMap: Flatmaps streams with elements of type Stream\

    to a Stream with element type T
  • The sample

    public static void main(String[] args) {
        List<Stream<Integer>> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1));
        demo.stream().flatMap(Function.identity()).forEach(System.out::println);
    }
    -------result--------
    5
    2
    1

To heavy distinct

Stream<T> distinct();
  • The sample

    List<Integer> demo = Arrays.asList(1, 1, 2);
    demo.stream().distinct().forEach(System.out::println);
    -------result--------
    1
    2

Sorting sorted

Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
  • The sample

    List<Integer> demo = Arrays.asList(5, 1, 2); // default ascending demo.stream().sorted().foreach (System.out::println); Public int (); public int (int) {public int (); public int (); demo.stream().sorted(comparator).forEach(System.out::println); Default ascending result -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 2 5 -- -- -- -- -- -- -- descending result -- -- -- -- -- -- -- -- 5 2 to 1

Limit the number and skip the number

Stream<T> limit(long maxSize); Stream< > skip(long n);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); Stream ().skip(2).limit(2).foreach (System.out::println); -------result-------- 3 4

New operations provided by JDK9

  • TakeWhile takes an element that satisfies the condition until it does not. The dropWhile discards an element that satisfies the condition until it does not

    default Stream<T> takeWhile(Predicate<? super T> predicate);
    default Stream<T> dropWhile(Predicate<? super T> predicate);

3 stream terminates action

Traverse the consumption

// void forEach(Consumer<? super T> action); ForeachOrdered void foreachOrdered (Consumer<? super T> action);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    demo.parallelStream().forEach(System.out::println);
    demo.parallelStream().forEachOrdered(System.out::println);
    -------forEach result--------
    2
    3
    1
    -------forEachOrdered result--------
    1
    2
    3

Get array result

Object[] toArray(); A[] toArray(intFunction <A[]> Generator)
  • The sample

    List<String> demo = Arrays.asList("1", "2", "3");
    //<A> A[] toArray(IntFunction<A[]> generator)
    String[] data = demo.stream().toArray(String[]::new);

Maximum and minimum

Optional<T> min(Comparator<? Super T> compare) // Get the maximum Optional<T> Max (comparator <? super T> comparator)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    Optional<Integer> min = demo.stream().min(Comparator.comparing(item->item));
    Optional<Integer> max = demo.stream().max(Comparator.comparing(item->item));
    System.out.println(min.get()+"-"+max.get());
    -------result--------
    1-3

To find the matching

Boolean anyMatch(Predicate<? Predicate) // Boolean allMatch(predicate <? Boolean noneMatch(predicate <? Predicate?); Super T> predicate) // Find the first Optional<T> findFirst(); // Optional<T> findAny();

Reduction to merge

Optional<T> reduce(BinaryOperator<T> accumulator) <U> U Reduce (U Identity, BiFunction<U, BiFunction<U, BiFunction<U, BiFunction) super T, U> accumulator, BinaryOperator<U> combiner)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); / / number into a String, and then use the "-" pieced together String data = demo. The stream (). The reduce (" 0 ", (u, t) - > u + "-" + t, (s1, s2) - > s1 + "-" + s2); System.out.println(data); -------result-------- 0-1-2-3-4-5-6-7-8

Count Elements

long count()
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);
    System.out.println(demo.stream().count());
    -------result--------
    6

Convergent treatment of convection

/** * Supplier: producer of the return result type * Accumulator: element consumer (to process and add R) * Combiner: */ <R> R Collect (Supplier<R> Supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R, R> combiner); /** * Collector is an aggregation class composed of supplier, accumulator, combiner, finisher, and characteristics ** Collectors can provide some built-in aggregation classes or methods */ <R, A> R collect(Collector<? super T, A, R> collector);
  • For an example, see below

4 Collector’s tool class collection Collectors

Interface Collector and implementation class CollectorImpl

// The returned producer Supplier<A> Supplier (); // Accumulator <A, bB0 accumulator(); BinaryOperator<A> combiner(); binaryOperator <A> combiner(); Function<A, R> finisher(); Function<A, R> finisher(); // Set<Characteristics> Characteristics (); public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics)

The stream aggregates are converted to lists and sets

// Collector<T, Collector<T, Collector<T, Collector<T,? , List<T>> toList() // Collector<T, , Set<T>> toSet()
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3);
    List<Integer> col = demo.stream().collect(Collectors.toList());
    Set<Integer> set = demo.stream().collect(Collectors.toSet());

Stream aggregation is converted to MAP

// Map public static <T, K, U> Collector<T,? , Map<K,U>> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? Extends U> valueMapper) /** * mergeFunction: public static <T, K, U> Collector<T,? , Map<K,U>> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? Extends U> valueMapper, binaryOperator <U> mergeFunction) /** * mergeFunction: The same key, the value is merged * mapSupplier: */ public static <T, K, U, M extends Map<K, U>> Collector<T,? , M> toMap( Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction, Supplier<M> mapSupplier)
  • If there is an element with the same key, an error will be reported. Or use GroupBy
  • The sample

    List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));
    Map<Integer,User> map = demo.stream().collect(Collectors.toMap(User::getId,item->item));
    System.out.println(map);
    -------result-------
    {1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}

String stream aggregation concatenation

Public static Collector<CharSequence,? , String> joining(); Public static Collector<CharSequence,? // Collector<CharSequence,? , String> joining(CharSequence delimiter)
  • The sample

    A List < String > demo = Arrays. AsList (" c ", "s", "c", "w", "stealth on"); String name = demo.stream().collect(Collectors.joining("-")); System.out.println(name); -------result C-S-C-W

The mapping process then aggregates the flow

  • Map collect = collect map collect

    /** * mapper: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream: downstream; , R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream);
  • The sample

    List<String> demo = Arrays.asList("1", "2", "3");
    List<Integer> data = demo.stream().collect(Collectors.mapping(Integer::valueOf, Collectors.toList()));
    System.out.println(data);
    -------result-------
    [1, 2, 3]

The results are aggregated and then converted

/ * * * downstream: aggregate processing * finisher: results conversion processing * / public static < T, A, R, RR > Collector < T, A, RR > collectingAndThen (Collector < T, A, R > downstream, Function<R, RR> finisher);
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); / / aggregate into the List, and finally the size of an array of extraction as a return value Integer size = demo. The stream () collect (Collectors. CollectingAndThen (Collectors. ToList (), List::size)); System.out.println(size); ---------result---------- 6

Stream grouping (Map is HashMap)

Public static <T, K> Collector<T,? Collector<T,? , Map<K, List<T>>> groupingBy( Function<? super T, ? extends K> classifier); /** * Classifier: downstream: downstream: downstream: aggregator */ public static <T, K, A, D> Collector<T,? , Map<K, D>> groupingBy( Function<? super T, ? Extends K> classifier, Collector<? Super T, A, D downstream: downstream) /** * Classifier: downstream: downstream: factory. */ public static <T, K, D, A, M extends Map<K, D>> Collector<T,? , M> groupingBy( Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream)
  • The sample

    public static void main(String[] args) throws Exception { List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(15) .collect(Collectors.toList()); // Divide into three groups, Map<Integer, List<String>> Map = Demo.stream (). Collect (item-> item % 3,). GroupingBy (item-> item % 3,). HashMap::new, Collectors.mapping(String::valueOf, Collectors.toList()))); System.out.println(map); } ---------result---------- {0=[0, 3, 6, 9, 12], 1=[1, 4, 7, 10, 13], 2=[2, 5, 8, 11, 14]}

Stream grouping (the Map used for grouping is ConcurrentHashMap)

/** * classifier: classifier; */ public static <T, K> Collector<T,? , ConcurrentMap<K, List<T>>> groupingByConcurrent( Function<? super T, ? extends K> classifier); /** * Classifier: downstream: downstream: aggregator */ public static <T, K, A, D> Collector<T,? , ConcurrentMap<K, D>> groupingByConcurrent( Function<? super T, ? extends K> classifier, Collector<? Super T, A, D downstream: downstream) /** * Classifier: downstream: downstream: downstream: downstream: downstream: downstream: */ public static <T, K, A, D, M extends ConcurrentMap< T, D>> Collector<T,? , M> groupingByConcurrent( Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream);
  • It’s used in the same way as groupingBy

Split, one to two (equivalent to a special GroupingBy)

public static <T> Collector<T, ? , Map<Boolean, List<T>>> partitioningBy( Predicate<? Super T> predicate) /** * predicate: downstream: aggregator */ public static <T, D, A> Collector<T,? , Map<Boolean, D>> partitioningBy( Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2,3,4, 5,6); / / t group Map < Boolean, List < Integer > > Map = demo. The stream () collect (Collectors. PartitioningBy (item - > item % 2 = = 0)); System.out.println(map); ---------result---------- {false=[1, 3, 5], true=[2, 4, 6]}

Aggregation averaging

// Return Double public static <T> Collector<T,? , Double> averagingDouble(ToDoubleFunction<? Super T> mapper) // return Long type public static <T> Collector<T,? , Double> averagingLong(ToLongFunction<? Super T> mapper) // return Int public static <T> Collector<T,? , Double> averagingInt(ToIntFunction<? super T> mapper)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5); Double data = demo.stream().collect(Collectors.averagingInt(Integer::intValue)); System.out.println(data); -- -- -- -- -- -- -- -- -- the result -- -- -- -- -- -- -- -- -- -- 2.6666666666666665

Stream aggregation looks for maximum and minimum values

Public static < Collector<T,? , Optional<T>> minBy(Comparator<? Public static <T> Collector<T,? , Optional<T>> maxBy(Comparator<? super T> comparator)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5);
    Optional<Integer> min = demo.stream().collect(Collectors.minBy(Comparator.comparing(item -> item)));
    Optional<Integer> max = demo.stream().collect(Collectors.maxBy(Comparator.comparing(item -> item)));
    System.out.println(min.get()+"-"+max.get());
    ---------result----------
    1-5

Aggregate the statistical results

  • You can get the total number of elements, the cumulative sum of elements, the minimum, the maximum, and the average

    // Return Int public static <T> Collector<T,? , IntSummaryStatistics> summarizingInt( ToIntFunction<? Super T> mapper) // return Double type public static <T> Collector<T,? , DoubleSummaryStatistics> summarizingDouble( ToDoubleFunction<? Super T> mapper) // return Long type public static <T> Collector<T,? , LongSummaryStatistics> summarizingLong( ToLongFunction<? super T> mapper)
  • The sample

    List<Integer> demo = Arrays.asList(1, 2, 5); IntSummaryStatistics data = demo.stream().collect(Collectors.summarizingInt(Integer::intValue)); System.out.println(data); -- -- -- -- -- -- -- -- -- the result -- -- -- -- -- -- -- -- -- -- IntSummaryStatistics {count = 3, the sum = 8, min = 1, business = 2.666667, Max = 5}

New aggregation methods provided by JDK12

Public static <T, R1, R2, rbb0 Collector<T,? , R> teeing( Collector<? super T, ? , R1> downstream1, Collector<? super T, ? , R2> downstream2, BiFunction<? super R1, ? super R2, R> merger)

5. Use of concurrent ParalleStream

  • With the use of CompletableFuture and thread pools
  • The sample

    public static void main(String[] args) throws Exception{ List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(5) .collect(Collectors.toList()); // Example 1 stopWatch stopWatch = stopWatch.createStarted (ticker.systemTicker ()); demo.stream().forEach(item -> { try { Thread.sleep(500); System. The out. Println (" example 1 - "+ Thread. CurrentThread (). The getName ()); } catch (Exception e) { } }); System. Out. Println (" example 1 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); // For example 2, note that you need ForkJoinPool, ParallelStream to use the thread specified by executor, Otherwise, use the default ForkJoinPool.commonPool() executorService executor = new ForkJoinPool(10); stopwatch.reset(); stopwatch.start(); CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System.out.println(" Example 2-" + Thread.currentThread().getName()); } catch (Exception e) { } }), executor).join(); System. Out. Println (" sample 2 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); // Example 3 stopWatch.reset (); stopwatch.start(); demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System. The out. Println (" example 3 - "+ Thread. CurrentThread (). The getName ()); } catch (Exception e) { } }); System. Out. Println (" example 3 - "+ stopwatch. Stop (). The elapsed (TimeUnit. MILLISECONDS)); executor.shutdown(); }
  • ——————-result————————–

    Example 1- Main Example 1- Main Example 1- Main Example 1- Main Example 1- Main Example 1-2501 Example 2-ForkJoinPool-1- Worker -19 Example 2-ForkJoinPool-1- Worker -9 Example 2-forkJoinPool -1-worker-5 example 2-forkJoinPool -1-worker-27 example 2-forkJoinPool -1-worker-23 example 2-1004 example 3-main Example 3-ForkJoinPool.com monpool-worker -5 example 3-ForkJoinPool.com monpool-worker -7 example 3-ForkJoinPool.com monpool-worker -9 Example 3-ForkJoinPool.com monpool-worker -3 Example 3-1001
  • ParallelStream does run with multiple threads and can specify a thread pool, but the custom thread must be of type ForkJoinPool, or it will default to ForkJoinPool.commonPool()