К сожалению, Stream API имеет ограниченные возможности для создания собственных операций короткого замыкания. Не очень чистым решением было бы бросить RuntimeException
и поймать его. Вот реализация для IntStream
, но ее можно обобщить и для других типов потоков:
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
Пример использования:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
Выход:
2
3
4
5
0
Result: 0
Обратите внимание, что даже если это работает с параллельными потоками, не гарантируется, что другие параллельные задачи будут завершены, как только одна из них выдаст исключение. Подзадачи, которые уже запущены, скорее всего, будут выполняться до конца, поэтому вы можете обработать больше элементов, чем ожидалось.
Обновление: альтернативное решение, которое намного длиннее, но более удобно для параллелизма. Он основан на пользовательском разделителе, который возвращает не более одного элемента, который является результатом накопления всех базовых элементов). Когда вы используете его в последовательном режиме, он выполняет всю работу за один вызов tryAdvance
. Когда вы разделяете его, каждая часть генерирует соответствующий одиночный частичный результат, который сокращается движком Stream с помощью функции объединителя. Вот общая версия, но возможна и примитивная специализация.
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
Методы, аналогичные Stream.reduce(identity, accumulator, combiner)
и Stream.reduce(identity, combiner)
, но с cancelPredicate
:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
Протестируем обе версии и посчитаем, сколько элементов реально обрабатывается. Давайте поставим 0
ближе к концу. Исключительная версия:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
Типичный вывод:
product: 0/count: 281721
product: 0/count: 500001
Таким образом, хотя результат возвращается, когда обрабатываются только некоторые элементы, задачи продолжают работать в фоновом режиме, а счетчик продолжает увеличиваться. Вот версия сплиттера:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
Типичный вывод:
product: 0/count: 281353
product: 0/count: 281353
Все задачи фактически завершены, когда результат возвращен.
person
Tagir Valeev
schedule
10.09.2015
takeWhile
будет добавляться кStream
в Java 9, такие вещи станут проще в будущем. Всякий раз, когда Java 9 выпущен... - person Lii   schedule 10.09.2015