热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java8Stream流底层原理

JavaStream函数式接口​初识lambda呢,函数式接口肯定是绕不过去的,函数式接口就是一个有且仅有一个抽象方法,但是可以有多个

Java Stream

函数式接口

​ 初识lambda呢,函数式接口肯定是绕不过去的,函数式接口就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。函数式接口可以被隐式转换为lambda表达式。

@FunctionalInterface

public interface Closeable {

   

    void close();

}

​ 在java.util.function它包含了很多类,用来支持Java的函数式编程,该包中的函数式接口有:

函数式接口    函数描述符    原始类型特化

Predicate T -> boolean IntPredicate,LongPredicate,DoublePredicate

Consumer     T -> void       IntConsumer,LongConsumer,DoubleConsumer

Function      T -> R    IntFunction, IntToDoubleFunction,IntToLongFunction,LongFunction,

LongToIntFunction,LongToDouble,FunctionDoubleFunction,DoubleToIntFunction,DoubleToLongFunctionToIntFunction,

ToLongFunction,ToDoubleFunction

Supplier  () -> T    BooleanSupplier,IntSupplier,LongSupplier,DoubleSupplier

UnaryOperator     T -> T     IntUnaryOperator,LongUnaryOperator,DoubleUnaryOperator

BinaryOperator     (T,T) -> T       IntBinaryOperator,DoubleBinaryOperator,LongBinaryOperator

BiPredicate   (L,R) -> boolean  

BiConsumer (T,U) -> void  ObjIntConsumer,ObjDoubleConsumer,ObjLongConsumer

BiFunction       (T,U) -> R      ToIntBiFunction,ToLongBiFunction,ToDoubleBiFunction

操作

Stream操作分类  

中间操作(Intermediate operations)    无状态(Stateless)   unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()

有状态(Stateful)     distinct() sorted() limit() skip()

结束操作(Terminal operations)   非短路操作   forEach() forEachOrdered() toArray() reduce() collect() max() min() count()

短路操作(short-circuiting)   anyMatch() allMatch() noneMatch() findFirst() findAny()

流程

Stream相关接口继承图:

Stream流水线组织结构示意图(图是盗的):

Collection

​ 类路径java.util.colltction

@Override

default Spliterator spliterator() {

    return Spliterators.spliterator(this, 0);

}

// 常用Stream流转换

default Stream stream() {

    return StreamSupport.stream(spliterator(), false);

}

// 并行流

default Stream parallelStream() {

    return StreamSupport.stream(spliterator(), true);

}

// java.util.stream.StreamSupport#stream(java.util.Spliterator, boolean)

public static Stream stream(Spliterator spliterator, boolean parallel) {

    Objects.requireNonNull(spliterator);

    return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);

}

AbstractPipeline

​ 类路径java.util.stream.AbstractPipeline

// 反向链接到管道链的头部(如果是源阶段,则为自身)。

private final AbstractPipeline sourceStage;

// “上游”管道,如果这是源阶段,则为null。

private final AbstractPipeline previousStage;

// 此管道对象表示的中间操作的操作标志。

protected final int sourceOrOpFlags;

// 管道中的下一个阶段;如果这是最后一个阶段,则为null。 在链接到下一个管道时有效地结束。

private AbstractPipeline nextStage;

// 如果是顺序的,则此管道对象与流源之间的中间操作数;如果是并行的,则为先前有状态的中间操作数。 在管道准备进行评估时有效。

private int depth;

// 源和所有操作的组合源标志和操作标志,直到此流水线对象表示的操作为止(包括该流水线对象所代表的操作)。 在管道准备进行评估时有效。

private int combinedFlags;

// 源拆分器。 仅对头管道有效。 如果管道使用非null值,那么在使用管道之前, sourceSupplier必须为null。 在使用管道之后,如果非null,则将其设置为null。

private Spliterator sourceSpliterator;

// 来源供应商。 仅对头管道有效。 如果非null,则在使用管道之前, sourceSpliterator必须为null。 在使用管道之后,如果非null,则将其设置为null。

private Supplier> sourceSupplier;

// 如果已链接或使用此管道,则为True

private boolean linkedOrConsumed;

// 如果正在执行任何有状态操作,则为true;否则为true。 仅对源阶段有效。

private boolean sourceAnyStateful;

private Runnable sourceCloseAction;

// 如果管道是并行的,则为true;否则,管道为顺序的;否则为true。 仅对源阶段有效。

private boolean parallel;

ReferencePipeline

​ 类路径:java.util.stream.ReferencePipeline

filter

// java.util.stream.ReferencePipeline#filter

@Override

public final Stream

filter(Predicate predicate) {

    Objects.requireNonNull(predicate);

    // 返回一个匿名无状态的管道

    return new StatelessOp

(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {

        // 下游生产线所需要的回调接口

        @Override

        Sink

opWrapSink(int flags, Sink

sink) {

            return new Sink.ChainedReference

(sink) {

                @Override

                public void begin(long size) {

                    downstream.begin(-1);

                }

                        // 真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调

                @Override

                public void accept(P_OUT u) {

                    // 只有满足条件的元素才能被下游执行

                    if (predicate.test(u))

                        downstream.accept(u);

                }

            };

        }

    };

}

map

// java.util.stream.ReferencePipeline#map

public final Stream map(Function mapper) {

    Objects.requireNonNull(mapper);

    // 返回一个匿名无状态的管道

    return new StatelessOp

(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {

        // 下游生产线所需要的回调接口

        @Override

        Sink

opWrapSink(int flags, Sink sink) {

            return new Sink.ChainedReference

(sink) {

                // 真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调

                @Override

                public void accept(P_OUT u) {

                    // 执行转换后提供给下游执行

                    downstream.accept(mapper.apply(u));

                }

            };

        }

    };

}

flatMap

// java.util.stream.ReferencePipeline#flatMap

@Override

public final Stream flatMap(Function> mapper) {

    Objects.requireNonNull(mapper);

    // 返回一个匿名无状态的管道

    return new StatelessOp

(this, StreamShape.REFERENCE,

                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {

        // 下游生产线所需要的回调接口

        @Override

        Sink

opWrapSink(int flags, Sink sink) {

            return new Sink.ChainedReference

(sink) {

                @Override

                public void begin(long size) {

                    downstream.begin(-1);

                }

                        // 真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调

                @Override

                public void accept(P_OUT u) {

                    try (Stream result = mapper.apply(u)) {

                         // 划分为多个流执行下游(分流)

                        if (result != null)

                            result.sequential().forEach(downstream);

                    }

                }

            };

        }

    };

}

peek

// java.util.stream.ReferencePipeline#peek

@Override

public final Stream

peek(Consumer action) {

    Objects.requireNonNull(action);

    // 返回一个匿名无状态的管道

    return new StatelessOp

(this, StreamShape.REFERENCE, 0) {

        // 下游生产线所需要的回调接口

        @Override

        Sink

opWrapSink(int flags, Sink

sink) {

            return new Sink.ChainedReference

(sink) {

                // 真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调

                @Override

                public void accept(P_OUT u) {

                    // 先执行自定义方法,在执行下游方法

                    action.accept(u);

                    downstream.accept(u);

                }

            };

        }

    };

}

sorted

@Override

public final Stream

sorted() {

    // 不提供Comparator,会使用元素自实现Comparator的compareTo方法

    return SortedOps.makeRef(this);

}

@Override

public final Stream

sorted(Comparator comparator) {

    return SortedOps.makeRef(this, comparator);

}

// Sorted.makeRef

static Stream makeRef(AbstractPipeline upstream,

                             Comparator comparator) {

    return new OfRef<>(upstream, comparator);

}

// ofRef类

private static final class OfRef extends ReferencePipeline.StatefulOp {

        private final boolean isNaturalSort;

        private final Comparator comparator;

        @Override

        public Sink opWrapSink(int flags, Sink sink) {

            Objects.requireNonNull(sink);

                 // 根据不同的flag进行不同排序

            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)

                return sink;

            else if (StreamOpFlag.SIZED.isKnown(flags))

                return new SizedRefSortingSink<>(sink, comparator);

            else

                return new RefSortingSink<>(sink, comparator);

        }

    }

distinct

@Override

public final Stream

distinct() {

    return DistinctOps.makeRef(this);

}

static ReferencePipeline makeRef(AbstractPipeline upstream) {

    // 返回一个匿名有状态的管道

    return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

        @Override

        Sink opWrapSink(int flags, Sink sink) {

            Objects.requireNonNull(sink);

            if (StreamOpFlag.DISTINCT.isKnown(flags)) {

                // 已经是去重过了

                return sink;

            } else if (StreamOpFlag.SORTED.isKnown(flags)) {

                 // 有序流

                return new Sink.ChainedReference(sink) {

                    boolean seenNull;

                    // 这个为先执行的前序元素

                    T lastSeen;

                    @Override

                    public void begin(long size) {

                        seenNull = false;

                        lastSeen = null;

                        downstream.begin(-1);

                    }

                    @Override

                    public void end() {

                        seenNull = false;

                        lastSeen = null;

                        downstream.end();

                    }

                              // 这里通过有序的特性,前序元素与后序元素比较,如果相等则跳过执行后序的元素

                    @Override

                    public void accept(T t) {

                        if (t == null) {

                            // 这里控制元素为null只有一个

                            if (!seenNull) {

                                seenNull = true;

                                downstream.accept(lastSeen = null);

                            }

                        } else if (lastSeen == null || !t.equals(lastSeen)) {

                            // 这里将前序元素赋值给lastSeen

                            downstream.accept(lastSeen = t);

                        }

                    }

                };

            } else {

                // 底层通过Set进行去重,所以该元素需要重写hashCode和equals方法

                return new Sink.ChainedReference(sink) {

                    Set seen;

                    @Override

                    public void begin(long size) {

                        seen = new HashSet<>();

                        downstream.begin(-1);

                    }

                    @Override

                    public void end() {

                        seen = null;

                        downstream.end();

                    }

                    @Override

                    public void accept(T t) {

                        if (!seen.contains(t)) {

                            seen.add(t);

                            downstream.accept(t);

                        }

                    }

                };

            }

        }

    };

}

skip、limit

public static Stream makeRef(AbstractPipeline upstream,

                                        long skip, long limit) {

        if (skip <0)

            throw new IllegalArgumentException("Skip must be non-negative: " + skip);

           // 返回一个匿名有状态的管道

        return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, flags(limit)) {

            Spliterator unorderedSkipLimitSpliterator(Spliterator s, long skip, long limit, long sizeIfKnown) {

                if (skip <= sizeIfKnown) {

                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;

                    skip = 0;

                }

                return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);

            }

                 // 自己实现真正操作的方法

            @Override

            Sink opWrapSink(int flags, Sink sink) {

                return new Sink.ChainedReference(sink) {

                    long n = skip;

                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override

                    public void begin(long size) {

                        downstream.begin(calcSize(size, skip, m));

                    }

                    @Override

                    public void accept(T t) {

                        if (n == 0) {

                            // limit

                            if (m > 0) {

                                m--;

                                downstream.accept(t);

                            }

                        }

                        // skip

                        else {

                            n--;

                        }

                    }

                    @Override

                    public boolean cancellationRequested() {

                        return m == 0 || downstream.cancellationRequested();

                    }

                };

            }

        };

    }

reduce

// java.util.stream.ReferencePipeline#reduce(P_OUT, java.util.function.BinaryOperator

)

@Override

public final P_OUT reduce(final P_OUT identity, final BinaryOperator

accumulator) {

    return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));

}

// java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator

)

@Override

public final Optional

reduce(BinaryOperator

accumulator) {

    return evaluate(ReduceOps.makeRef(accumulator));

}

// java.util.stream.ReferencePipeline#reduce(R, java.util.function.BiFunction, java.util.function.BinaryOperator)

@Override

public final R reduce(R identity, BiFunction accumulator, BinaryOperator combiner) {

    return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));

}

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)

final R evaluate(TerminalOp terminalOp) {

    assert getOutputShape() == terminalOp.inputShape();

    if (linkedOrConsumed)

        throw new IllegalStateException(MSG_STREAM_LINKED);

    linkedOrCOnsumed= true;

    return isParallel()

        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))

        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

}

collect

// java.util.stream.ReferencePipeline#collect(java.util.stream.Collector)

@Override

@SuppressWarnings("unchecked")

public final R collect(Collector collector) {

    A container;

    if (isParallel()

        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))

        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {

        cOntainer= collector.supplier().get();

        BiConsumer accumulator = collector.accumulator();

        forEach(u -> accumulator.accept(container, u));

    }

    else {

        cOntainer= evaluate(ReduceOps.makeRef(collector));

    }

    // 具有特定转换的使用finisher处理

    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)

        ? (R) container

        : collector.finisher().apply(container);

}

// java.util.stream.ReferencePipeline#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)

@Override

public final R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) {

    return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));

}

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)

final R evaluate(TerminalOp terminalOp) {

    assert getOutputShape() == terminalOp.inputShape();

    if (linkedOrConsumed)

        throw new IllegalStateException(MSG_STREAM_LINKED);

    linkedOrCOnsumed= true;

    return isParallel()

        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))

        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

}

forEach

// java.util.stream.ReferencePipeline#forEach

@Override

public void forEach(Consumer action) {

    evaluate(ForEachOps.makeRef(action, false));

}

// java.util.stream.ForEachOps#makeRef

public static TerminalOp makeRef(Consumer action, boolean ordered) {

    Objects.requireNonNull(action);

    return new ForEachOp.OfRef<>(action, ordered);

}

// java.util.stream.ForEachOps.ForEachOp.OfRef

static final class OfRef extends ForEachOp {

    final Consumer consumer;

    OfRef(Consumer consumer, boolean ordered) {

        super(ordered);

        this.cOnsumer= consumer;

    }

    // 只是简单的消费

    @Override

    public void accept(T t) {

        consumer.accept(t);

    }

}

Head

​ 流的数据元的头,类路径java.util.stream.ReferencePipeline.Head

// java.util.stream.ReferencePipeline.Head

static class Head extends ReferencePipeline {

   

    Head(Supplier> source, int sourceFlags, boolean parallel) {

        super(source, sourceFlags, parallel);

    }

    Head(Spliterator source, int sourceFlags, boolean parallel) {

        super(source, sourceFlags, parallel);

    }

    @Override

    final boolean opIsStateful() {

        throw new UnsupportedOperationException();

    }

    @Override

    final Sink opWrapSink(int flags, Sink sink) {

        throw new UnsupportedOperationException();

    }

    // Optimized sequential terminal operations for the head of the pipeline

    @Override

    public void forEach(Consumer action) {

        if (!isParallel()) {

            sourceStageSpliterator().forEachRemaining(action);

        }

        else {

            super.forEach(action);

        }

    }

    @Override

    public void forEachOrdered(Consumer action) {

        if (!isParallel()) {

            sourceStageSpliterator().forEachRemaining(action);

        }

        else {

            super.forEachOrdered(action);

        }

    }

}

StatelessOp

​ 无状态的中间管道,类路径java.util.stream.ReferencePipeline.StatelessOp

// java.util.stream.ReferencePipeline.StatelessOp

abstract static class StatelessOp extends ReferencePipeline {

    StatelessOp(AbstractPipeline upstream, StreamShape inputShape, int opFlags) {

        super(upstream, opFlags);

        assert upstream.getOutputShape() == inputShape;

    }

    @Override

    final boolean opIsStateful() {

        return false;

    }

}

StatefulOp

​ 有状态的中间管道,类路径java.util.stream.ReferencePipeline.StatefulOp

// java.util.stream.ReferencePipeline.StatefulOp

abstract static class StatefulOp extends ReferencePipeline {

    StatefulOp(AbstractPipeline upstream, StreamShape inputShape, int opFlags) {

        super(upstream, opFlags);

        assert upstream.getOutputShape() == inputShape;

    }

    @Override

    final boolean opIsStateful() {

        return true;

    }

    @Override

    abstract

Node opEvaluateParallel(PipelineHelper helper,

                                                   Spliterator

spliterator,

                                                   IntFunction generator);

TerminalOp

​ 管道流的结束操作,类路径java.util.stream.TerminalOp

interface TerminalOp {

   

       // 获取此操作的输入类型的形状

    default StreamShape inputShape() { return StreamShape.REFERENCE; }

    // 获取操作的流标志。 终端操作可以设置StreamOpFlag定义的流标志的有限子集,并且这些标志与管道的先前组合的流和中间操作标志组合在一起。

    default int getOpFlags() { return 0; }

    // 使用指定的PipelineHelper对操作执行并行评估,该操作描述上游中间操作。

    default

R evaluateParallel(PipelineHelper helper, Spliterator

spliterator) {

        if (Tripwire.ENABLED)

            Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");

        return evaluateSequential(helper, spliterator);

    }

    // 使用指定的PipelineHelper对操作执行顺序评估,该操作描述上游中间操作。

   

R evaluateSequential(PipelineHelper helper, Spliterator

spliterator);

}

ReduceOp

​ 类路径java.util.stream.ReduceOps.ReduceOp

    private static abstract class ReduceOp> implements TerminalOp {

        private final StreamShape inputShape;

        ReduceOp(StreamShape shape) {

            inputShape = shape;

        }

        public abstract S makeSink();

        @Override

        public StreamShape inputShape() {

            return inputShape;

        }

        // 通过匿名子类实现makeSink()获取Sink

        @Override

        public

R evaluateSequential(PipelineHelper helper, Spliterator

spliterator) {

            return helper.wrapAndCopyInto(makeSink(), spliterator).get();

        }

        @Override

        public

R evaluateParallel(PipelineHelper helper, Spliterator

spliterator) {

            return new ReduceTask<>(this, helper, spliterator).invoke().get();

        }

    }

MatchOp

​ 类路径java.util.stream.MatchOps.MatchOp

private static final class MatchOp implements TerminalOp {

        private final StreamShape inputShape;

        final MatchKind matchKind;

        final Supplier> sinkSupplier;

        MatchOp(StreamShape shape, MatchKind matchKind, Supplier> sinkSupplier) {

            this.inputShape = shape;

            this.matchKind = matchKind;

            this.sinkSupplier = sinkSupplier;

        }

        @Override

        public int getOpFlags() {

            return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;

        }

        @Override

        public StreamShape inputShape() {

            return inputShape;

        }

           // 使用内置的sinkSupplier获取Sink

        @Override

        public Boolean evaluateSequential(PipelineHelper helper, Spliterator spliterator) {

            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();

        }

        @Override

        public Boolean evaluateParallel(PipelineHelper helper, Spliterator spliterator) {

            return new MatchTask<>(this, helper, spliterator).invoke();

        }

    }

FindOp

​ 类路径java.util.stream.FindOps.FindOp

private static final class FindOp implements TerminalOp {

        private final StreamShape shape;

        final boolean mustFindFirst;

        final O emptyValue;

        final Predicate presentPredicate;

        final Supplier> sinkSupplier;

        FindOp(boolean mustFindFirst,

                       StreamShape shape,

                       O emptyValue,

                       Predicate presentPredicate,

                       Supplier> sinkSupplier) {

            this.mustFindFirst = mustFindFirst;

            this.shape = shape;

            this.emptyValue = emptyValue;

            this.presentPredicate = presentPredicate;

            this.sinkSupplier = sinkSupplier;

        }

        @Override

        public int getOpFlags() {

            return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);

        }

        @Override

        public StreamShape inputShape() {

            return shape;

        }

           // 通过内置sinkSupplier获取Sink

        @Override

        public O evaluateSequential(PipelineHelper helper, Spliterator spliterator) {

            O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();

            return result != null ? result : emptyValue;

        }

        @Override

        public

O evaluateParallel(PipelineHelper helper, Spliterator

spliterator) {

            return new FindTask<>(this, helper, spliterator).invoke();

        }

    }

ForEachOp

​ 类路径java.util.stream.ForEachOps.ForEachOp

static abstract class ForEachOp implements TerminalOp, TerminalSink {

        private final boolean ordered;

        protected ForEachOp(boolean ordered) {

            this.ordered = ordered;

        }

        @Override

        public int getOpFlags() {

            return ordered ? 0 : StreamOpFlag.NOT_ORDERED;

        }

      

           // 自己实现了Sink

        @Override

        public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) {

            return helper.wrapAndCopyInto(this, spliterator).get();

        }

        @Override

        public Void evaluateParallel(PipelineHelper helper, Spliterator spliterator) {

            if (ordered)

                new ForEachOrderedTask<>(helper, spliterator, this).invoke();

            else

                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();

            return null;

        }

        @Override

        public Void get() {

            return null;

        }

        static final class OfRef extends ForEachOp {

            final Consumer consumer;

            OfRef(Consumer consumer, boolean ordered) {

                super(ordered);

                this.cOnsumer= consumer;

            }

            @Override

            public void accept(T t) {

                consumer.accept(t);

            }

        }

              ...

    }

Sink

​ 类路径java.util.stream.Sink

interface Sink extends Consumer {

       // 开始遍历元素之前调用该方法,通知Sink做好准备。

    default void begin(long size) {}

       // 所有元素遍历完成之后调用,通知Sink没有更多的元素了。

    default void end() {}

       // 是否可以结束操作,可以让短路操作尽早结束。

    default boolean cancellationRequested() {

        return false;

    }

    // 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

    void accept(T t);

}

​ 这里Sink的子类实现中分为两种:中间操作匿名实现ChainedReference和TerminalOp子类所提供的Sink。

ChainedReference

​ 类路径java.util.stream.Sink.ChainedReference,这里是中间操作的默认模板父类

    static abstract class ChainedReference implements Sink {

        protected final Sink downstream;

        public ChainedReference(Sink downstream) {

            this.downstream = Objects.requireNonNull(downstream);

        }

        @Override

        public void begin(long size) {

            downstream.begin(size);

        }

        @Override

        public void end() {

            downstream.end();

        }

        @Override

        public boolean cancellationRequested() {

            return downstream.cancellationRequested();

        }

    }

​ 在上述的中间操作管道流中都是通过匿名类继承ChainedReference实现onWrapSink(int, Sink)返回一个指定操作的Sink。

TerminalSink

​ 这里为什么讲提供呢?这是因为不同的实现TerminalOp的子类中在实现java.util.stream.TerminalOp#evaluateSequential中都是通过helper.wrapAndCopyInto(TerminalOp子类实现提供的Sink, spliterator)中通过参数传递的方式提供的,不同的子类传递的方式不一样所以此处用了一个提供Sink

​ 由ReduceOps中实现TerminalOp所提供的ReducingSink,它是由匿名类实现java.util.stream.ReduceOps.ReduceOp#makeSink来交付给helper.wrapAndCopyInto(makeSink(), spliterator)的。

    public static TerminalOp makeRef(U seed, BiFunction reducer, BinaryOperator combiner) {

        Objects.requireNonNull(reducer);

        Objects.requireNonNull(combiner);

        class ReducingSink extends Box implements AccumulatingSink {

            @Override

            public void begin(long size) {

                state = seed;

            }

            @Override

            public void accept(T t) {

                state = reducer.apply(state, t);

            }

            @Override

            public void combine(ReducingSink other) {

                state = combiner.apply(state, other.state);

            }

        }

        return new ReduceOp(StreamShape.REFERENCE) {

            @Override

            public ReducingSink makeSink() {

                return new ReducingSink();

            }

        };

    }

​ 由ForEachOps中实现TerminalOp所提供的是this,它的提供方式就是通过this交付给helper.wrapAndCopyInto(this, spliterator)。

// 这里ForEachOp自己通过TerminalSink间接的实现了Sink

static abstract class ForEachOp implements TerminalOp, TerminalSink {

        @Override

        public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) {

            return helper.wrapAndCopyInto(this, spliterator).get();

        }

}

​ 由MatchOps中实现TerminalOp所提供的sinkSupplier通过构造函数由外部赋值,通过Supplier接口的get()来交付给helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)。

    private static final class MatchOp implements TerminalOp {

        final Supplier> sinkSupplier;

        @Override

        public Boolean evaluateSequential(PipelineHelper helper,Spliterator spliterator) {

            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();

        }

    }

1

2

3

4

5

6

7

8

​ 由FindOps中实现TerminalOp所提供的与上述MatchOps是一致的

    private static final class FindOp implements TerminalOp {

        final Supplier> sinkSupplier;

        @Override

        public O evaluateSequential(PipelineHelper helper, Spliterator spliterator) {

            O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();

            return result != null ? result : emptyValue;

        }

    }

Collector

​ 在Collector中有以下几个实现接口:

Supplier:结果类型的提供器。

BiConsumer:将元素放入结果的累加器。

BinaryOperator:合并部分结果的组合器。

Function:对结果类型转换为最终结果类型的转换器。

Set:保存Collector特征的集合

并行流

​ 前述都是基于串行流的讲解,其实并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)这个方法上面做的一层基于ForkJoinTask多线程框架的封装。

ForkJoinTask

​ ForkJoin框架的思想就是分而治之,它将一个大任务切割为多个小任务这个过程称为fork,将每个任务的执行的结果进行汇总的过程称为join。ForkJoin框架相关的接口关系图如下(图是盗的):

AbstractTask

​ 类路径java.util.stream.AbstractTask,AbstractTask继承了在JUC中已经封装好的ForkJoinTask抽象子类java.util.concurrent.CountedCompleter。

​ 此类基于CountedCompleter ,它是fork-join任务的一种形式,其中每个任务都有未完成子代的信号量计数,并且该任务隐式完成并在其最后一个子代完成时得到通知。 内部节点任务可能会覆盖CountedCompleter的onCompletion方法,以将子任务的结果合并到当前任务的结果中。

​ 拆分和设置子任务链接是由内部节点的compute()完成的。 在叶节点的compute()时间,可以确保将为所有子代设置父代的子代相关字段(包括父代子代的同级链接)。

​ 例如,执行减少任务的任务将覆盖doLeaf()以使用Spliterator对该叶节点的块执行减少Spliterator ,并覆盖onCompletion()以合并内部节点的子任务的结果:

@Override

protected ReduceTask

makeChild(Spliterator

spliterator) {

    // 返回一个ForkJoinTask任务

    return new ReduceTask<>(this, spliterator);

}

@Override

protected S doLeaf() {

    // 其他实现大同小异

    return helper.wrapAndCopyInto(op.makeSink(), spliterator);

}

@Override

public void onCompletion(CountedCompleter caller) {

    // 非叶子节点进行结果组合

    if (!isLeaf()) {

        S leftResult = leftChild.getLocalResult();

        leftResult.combine(rightChild.getLocalResult());

        setLocalResult(leftResult);

    }

    // GC spliterator, left and right child

    super.onCompletion(caller);

}

​ AbstractTask封装了分片任务的算法模板,通过是Spliterator的trySplit()方法来实现分片的细节,详细算法源码如下(类路径:java.util.stream.AbstractTask#compute):

@Override

public void compute() {

    // 将当前这个spliterator作为右节点(此时为root节点)

    Spliterator

rs = spliterator, ls;

    // 评估任务的大小

    long sizeEstimate = rs.estimateSize();

    // 获取任务阈值

    long sizeThreshold = getTargetSize(sizeEstimate);

    boolean forkRight = false;

    @SuppressWarnings("unchecked") K task = (K) this;

    // 细节不多赘述,下面我用图来讲解算法

      /**

         * 根节点指定为:右边节点

         *              root

         *              split()

         *    left               right

         * left.fork()

         *                       split()

         *                   l            r

         *            rs = ls

         *                      right.fork()

         *    split()

         * l           r

         *    l.fork()

         */

    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {

        K leftChild, rightChild, taskToFork;

        task.leftChild  = leftChild = task.makeChild(ls);

        task.rightChild = rightChild = task.makeChild(rs);

        task.setPendingCount(1);

        if (forkRight) {

            forkRight = false;

            // 左右节点切换进行fork和split

            rs = ls;

            task = leftChild;

            taskToFork = rightChild;

        }

        else {

            forkRight = true;

            task = rightChild;

            taskToFork = leftChild;

        }

        // fork任务加入队列中去

        taskToFork.fork();

        sizeEstimate = rs.estimateSize();

    }

    // 将执行doLeaf底层就是单个串行流的操作

    task.setLocalResult(task.doLeaf());

    // 将结果组合成一个最终结果

    task.tryComplete();

}

​ AbstractTask执行与分片流程图如下:

到这里Stream流的相关知识介绍到这,这里附上一副总体图来加深下印象


推荐阅读
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 如何优化Webpack打包后的代码分割
    本文介绍了如何通过优化Webpack的代码分割来减小打包后的文件大小。主要包括拆分业务逻辑代码和引入第三方包的代码、配置Webpack插件、异步代码的处理、代码分割重命名、配置vendors和cacheGroups等方面的内容。通过合理配置和优化,可以有效减小打包后的文件大小,提高应用的加载速度。 ... [详细]
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
  • 使用圣杯布局模式实现网站首页的内容布局
    本文介绍了使用圣杯布局模式实现网站首页的内容布局的方法,包括HTML部分代码和实例。同时还提供了公司新闻、最新产品、关于我们、联系我们等页面的布局示例。商品展示区包括了车里子和农家生态土鸡蛋等产品的价格信息。 ... [详细]
  • 本文整理了315道Python基础题目及答案,帮助读者检验学习成果。文章介绍了学习Python的途径、Python与其他编程语言的对比、解释型和编译型编程语言的简述、Python解释器的种类和特点、位和字节的关系、以及至少5个PEP8规范。对于想要检验自己学习成果的读者,这些题目将是一个不错的选择。请注意,答案在视频中,本文不提供答案。 ... [详细]
  • 带添加按钮的GridView,item的删除事件
    先上图片效果;gridView无数据时显示添加按钮,有数据时,第一格显示添加按钮,后面显示数据:布局文件:addr_manage.xml<?xmlve ... [详细]
  • 本文介绍了Java类的访问级别,包括public、private、protected和package-private,并重点解释了package-private的含义和作用。package-private表示类只能在其所在的包内可见,而不能被其他包的类访问。该文章还提到了其他访问级别的作用和范围,并对Java类的可见性进行了详细说明。 ... [详细]
  • 正则表达式及其范例
    为什么80%的码农都做不了架构师?一、前言部分控制台输入的字符串,编译成java字符串之后才送进内存,比如控制台打\, ... [详细]
  • 巧用arguments在Javascript的函数中有个名为arguments的类数组对象。它看起来是那么的诡异而且名不经传,但众多的Javascript库都使用着它强大的功能。所 ... [详细]
author-avatar
Wei-Micro_788
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有