如何以并发方式在同一个流上执行多种操作

最近看了《Java8实战》,了解了Java8 Stream的一些内容。在Java 8中,流的一个局限性在于,只能遍历一次,再次遍历的时候会出现异常:

java.lang.IllegalStateException: stream has already been operated upon or closed

对于这种情况,《Java8实战》附录C给出了一种实现方式,通过 Spliterator 结合 BlockingQueuesFutures 来实现这个功能。

主要思想

首先创建一个StreamForker类对当前需要被Fork的Stream进行包装,向StreamForker类中添加的操作由索引来标识,之后可以通过索引来取到该操作的结果。

StreamForker主要思想

StreamForker会遍历每个操作,并创建相应的副本,并行的在复制流上执行这些操作,并将最终的结果整合到一个Map中。 Results 结果定义如下,可以用来获取执行操作的结果。

public interface Results {
    public <R> R get(Object key);
}

下面,说明一下各个部分的实现。

复制流

当我们向StreamForker中添加了N个操作之后,就需要将Stream复制N份,复制N份的操作由 ForkingStreamConsumer 完成。

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class ForkingStreamConsumer<T> implements Consumer<T>, Results {
    static final Object END_OF_STREAM = new Object();

    private final List<BlockingQueue<T>> queues;
    private final Map<Object, Future<?>> actions;

    public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public void accept(T t) {
        queues.forEach(q -> q.add(t));
    }

    public void finish() {
        accept((T) END_OF_STREAM);
    }

    @Override
    public <R> R get(Object key) {
        try {
            return ((Future<R>) actions.get(key)).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

这个类实现了Consumer接口和Reuslts接口,并持有queues和actions两个引用。

在这里,每当ForkingStreamConsumer接受流中的一个元素,它就会将该元素添加到所有队列,finish()方法会将最后一个哨兵元素添加到所有队列。

Results接口的get()方法会根据键值取到相应的Future来获取结果,解析并返回。

应用操作

ForkingStreamConsumer负责分发元素,还需要一个去处理BlockingQueue里的元素。这里使用 Spliterator 来完成这个而操作,通过 StreamSupport.stream()Spliterator 的实现可以创建一个新的流。

import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;

public class BlockingQueueSpliterator<T> implements Spliterator<T> {

    private final BlockingQueue<T> q;

    public BlockingQueueSpliterator(BlockingQueue<T> q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        T t;
        while (true) {
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {

            }
        }

        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }

        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

Spliterator接口有四个方法需要实现。

  1. tryAdvance() 定义如何顺序遍历每个元素,如果还有需要遍历的元素,返回 true,否则返回 false;它从BlockingQueue中取得原始流的元素,这些元素由ForkingStreamConsumer添加,然后依次处理这些元素。直到发现哨兵元素,表示队列中没有需要处理的元素了。
  2. trySplit() 会将当前元素划分出一部分,返回一个新的Spliterator,同时这两个Spliterator会并行执行。如果无法划分则返回 null。
  3. estimateSize() 用于估算还有多少个元素需要遍历。在这里没有进行估算。
  4. characteristice() 表示该Spliterator有哪些特性,用于可以更好控制和优化Spliterator的使用。

通过实现的BlockingQueueSpliterator的延迟绑定能力,处理流中的各个元素。

框架

上面两部分都已经实现,下面需要实现一个框架,可以包装Stream,为Stream添加fork操作,并获取操作结果。

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamForker<T> {
    private final Stream stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }

    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        forks.put(key, f);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer<T> consumer = build();
        try {
            stream.forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer<T> build() {
        List<BlockingQueue<T>> queues = new ArrayList<>();

        Map<Object, Future<?>> actions = forks.entrySet().stream()
                .reduce(new HashMap<Object, Future<?>>(),
                        (map, e) -> {
                            map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                            return map;
                        }, (m1, m2) -> {
                            m1.putAll(m2);
                            return m1;
                        });
        return new ForkingStreamConsumer<>(queues, actions);
    }

    private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
        BlockingQueue<T> queue = new LinkedBlockingDeque<>();
        queues.add(queue);
        Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
        Stream<T> source = StreamSupport.stream(spliterator, false);
        return CompletableFuture.supplyAsync(() -> f.apply(source));
    }
}

StreamForker的构造方法接收一个流,表示需要被fork的流。通过 fork() 方法可以增加标识和操作。

getResults() 方法返回 Results 接口的实现,可以通过标识获取操作结果。通过 build() 构建一个ForkingStreamConsumer,将原始流中的元素挨个分发,最后finish()表示操作结束。

build() 方法用于构建ForkingStreamConsumer,主要创建一个Map,Map的键就是之前的各个操作的标识,值是各个操作的结果,用Futrue来表示。每个Future都是通过 getOperationResult() 创建的。

getOperationResult() 方法会创建一个新的 BlockingQueue,并将其添加到队列的列表。这个BlockingQueue用于存储分发的原始流中的元素,然后将队列绑定到BlockingQueueSpliterator上用来创建一个新的流,最后创建一个Future来执行需要在这个流上应用的操作。

使用

到此整个StreamForker就完成了,这里我们应用一下,同时求一个整数流的最大值、最小值和平均值。

import java.util.Comparator;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Stream<Integer> stream = IntStream.rangeClosed(1, 10).boxed();

        Results results = new StreamForker<Integer>(stream)
                .fork("min", s -> s.min(Comparator.naturalOrder()).orElse(null))
                .fork("max", s -> s.max(Comparator.naturalOrder()).orElse(null))
                .fork("avg", s -> s.mapToInt(x -> x).average().orElse(0))
                .getResults();

        int maxVal = results.get("max");
        int minVal = results.get("min");
        double avgVal = results.get("avg");

        System.out.println("Max Value: " + maxVal);
        System.out.println("Min Value: " + minVal);
        System.out.println("Avg Value: " + avgVal);
    }
}

使用StreamForker,可以复制一个整数流,并在每个流上应用不同的操作。同时内部实现是异步的, getResults()方法会立即返回,只有当需要取用结果的时候才会尝试等待操作执行完成。

上面代码的输出如下:

Max Value: 10 Min Value: 1 Avg Value: 5.5

参考内容