Как сократить операцию сокращения () в потоке?

По сути, это тот же вопрос, что и Как сократить сокращение в потоке?. Однако, поскольку этот вопрос фокусируется на потоке логических значений, и его ответ нельзя обобщить для других типов и операций сокращения, я хотел бы задать более общий вопрос.

Как мы можем уменьшить поток, чтобы он замыкался при встрече с поглощающим элементом для редукционной операции?

Типичным математическим случаем будет 0 для умножения. Это Stream :

int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .reduce(1, (a, b) -> a * b);

будет потреблять последние два элемента (7 и 8) независимо от того факта, что когда 0 встречается, продукт известен.


person bowmore    schedule 10.09.2015    source источник
comment
Как вы думаете, это настолько распространено, что стоит добавлять условный оператор к каждому умножению? Вам понадобится много последующих умножений, чтобы компенсировать это. И тогда может оказаться, что хотспоты умнее вас, когда дело доходит до умножений в цикле…   -  person Holger    schedule 10.09.2015
comment
@Holger Я признаю, что это в основном академический вопрос, но я могу придумать по крайней мере несколько других сокращений, которые могут привести к короткому замыканию. (побитовое и против 0, побитовое или против 0xffff, полностью заполненные частичные данные против слияния частичных данных, ...)   -  person bowmore    schedule 10.09.2015
comment
@Holger, еще примеры, когда может быть полезно сокращение короткого замыкания: пересечение потока наборов (отмена, когда промежуточный результат пуст), объединение потока EnumSet (отмена, когда промежуточный результат содержит все возможные значения), присоединение к потоку строк с ограничением длины строки (при необходимости добавляя многоточие в конец).   -  person Tagir Valeev    schedule 10.09.2015
comment
Поскольку операция takeWhile будет добавляться к Stream в Java 9, такие вещи станут проще в будущем. Всякий раз, когда Java 9 выпущен...   -  person Lii    schedule 10.09.2015


Ответы (4)


К сожалению, Stream API имеет ограниченные возможности для создания собственных операций короткого замыкания. Не очень чистым решением было бы бросить RuntimeException и поймать его. Вот реализация для IntStream, но ее можно обобщить и для других типов потоков:

public static int reduceWithCancelEx(IntStream stream, int identity, 
                      IntBinaryOperator combiner, IntPredicate cancelCondition) {
    class CancelException extends RuntimeException {
        private final int val;

        CancelException(int val) {
            this.val = val;
        }
    }

    try {
        return stream.reduce(identity, (a, b) -> {
            int res = combiner.applyAsInt(a, b);
            if(cancelCondition.test(res))
                throw new CancelException(res);
            return res;
        });
    } catch (CancelException e) {
        return e.val;
    }
}

Пример использования:

int product = reduceWithCancelEx(
        IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 
        1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);

Выход:

2
3
4
5
0
Result: 0

Обратите внимание, что даже если это работает с параллельными потоками, не гарантируется, что другие параллельные задачи будут завершены, как только одна из них выдаст исключение. Подзадачи, которые уже запущены, скорее всего, будут выполняться до конца, поэтому вы можете обработать больше элементов, чем ожидалось.

Обновление: альтернативное решение, которое намного длиннее, но более удобно для параллелизма. Он основан на пользовательском разделителе, который возвращает не более одного элемента, который является результатом накопления всех базовых элементов). Когда вы используете его в последовательном режиме, он выполняет всю работу за один вызов tryAdvance. Когда вы разделяете его, каждая часть генерирует соответствующий одиночный частичный результат, который сокращается движком Stream с помощью функции объединителя. Вот общая версия, но возможна и примитивная специализация.

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
        Consumer<T>, Cloneable {
    private Spliterator<T> source;
    private final BiFunction<A, ? super T, A> accumulator;
    private final Predicate<A> cancelPredicate;
    private final AtomicBoolean cancelled = new AtomicBoolean();
    private A acc;

    CancellableReduceSpliterator(Spliterator<T> source, A identity,
            BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
        this.source = source;
        this.acc = identity;
        this.accumulator = accumulator;
        this.cancelPredicate = cancelPredicate;
    }

    @Override
    public boolean tryAdvance(Consumer<? super A> action) {
        if (source == null || cancelled.get()) {
            source = null;
            return false;
        }
        while (!cancelled.get() && source.tryAdvance(this)) {
            if (cancelPredicate.test(acc)) {
                cancelled.set(true);
                break;
            }
        }
        source = null;
        action.accept(acc);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super A> action) {
        tryAdvance(action);
    }

    @Override
    public Spliterator<A> trySplit() {
        if(source == null || cancelled.get()) {
            source = null;
            return null;
        }
        Spliterator<T> prefix = source.trySplit();
        if (prefix == null)
            return null;
        try {
            @SuppressWarnings("unchecked")
            CancellableReduceSpliterator<T, A> result = 
                (CancellableReduceSpliterator<T, A>) this.clone();
            result.source = prefix;
            return result;
        } catch (CloneNotSupportedException e) {
            throw new InternalError();
        }
    }

    @Override
    public long estimateSize() {
        // let's pretend we have the same number of elements
        // as the source, so the pipeline engine parallelize it in the same way
        return source == null ? 0 : source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source == null ? SIZED : source.characteristics() & ORDERED;
    }

    @Override
    public void accept(T t) {
        this.acc = accumulator.apply(this.acc, t);
    }
}

Методы, аналогичные Stream.reduce(identity, accumulator, combiner) и Stream.reduce(identity, combiner), но с cancelPredicate:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
        BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
        Predicate<U> cancelPredicate) {
    return StreamSupport
            .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
                    accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
            .orElse(identity);
}

public static <T> T reduceWithCancel(Stream<T> stream, T identity,
        BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}

Протестируем обе версии и посчитаем, сколько элементов реально обрабатывается. Давайте поставим 0 ближе к концу. Исключительная версия:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()), 1,
        (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

Типичный вывод:

product: 0/count: 281721
product: 0/count: 500001

Таким образом, хотя результат возвращается, когда обрабатываются только некоторые элементы, задачи продолжают работать в фоновом режиме, а счетчик продолжает увеличиваться. Вот версия сплиттера:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()).boxed(), 
                1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

Типичный вывод:

product: 0/count: 281353
product: 0/count: 281353

Все задачи фактически завершены, когда результат возвращен.

person Tagir Valeev    schedule 10.09.2015
comment
Я думаю, что код обновления будет работать, но на данный момент есть проблемы с копированием кода. - person bowmore; 11.09.2015
comment
@bowmore, какие проблемы? Возможно, какой-то импорт отсутствует? - person Tagir Valeev; 11.09.2015
comment
Неважно, оказывается, мой JDK 8 дома еще не был последним. Проблемы с выводом типов устранены после установки последней версии. - person bowmore; 11.09.2015
comment
Ах я вижу. Javac 1.8.0_25 аварийно завершает работу с NullPointerException в этом классе. Очень забавно. - person Tagir Valeev; 11.09.2015
comment
Для большинства сценариев спасение с помощью исключения будет намного дороже, чем итерация и обработка всего потока… - person Holger; 04.11.2016
comment
@Holger, после этого поста я провел гораздо более глубокое исследование и не могу с вами согласиться. Если вы отключите трассировку стека исключений, бросок обычно добавляет постоянную задержку ~200-300 нс, что довольно часто не так много по сравнению со всей потоковой операцией. Итерация через forEachRemaining обычно выполняется быстрее, чем через tryAdvance, так как последний должен поддерживать состояние, поэтому во многих случаях даже замена существующей операции короткого замыкания на forEachRemaining+throw может повысить производительность (не говоря уже о потоках, содержащих flatMap). - person Tagir Valeev; 15.11.2016
comment
«Если вы отключите трассировку стека исключений» — я думаю, на этом мы могли бы остановиться. В вашем ответе трассировка стека не отключена, но в любом случае от 200 до 300 нс все еще довольно дорого по сравнению с двумя умножениями, которые он сохраняет в сценарии OP. Я бы даже сказал, что введенное условное ветвление, выполняемое четыре раза в этом сценарии, само по себе может быть дороже, чем сэкономленные два умножения. Я не говорил, что выгодных сценариев вообще нет, но примеры вроде ОП ясно показывают, как большую часть времени потенциальная экономия сильно переоценивается… - person Holger; 15.11.2016

Общий метод статического редукции с коротким замыканием может быть реализован с помощью разделителя потока. Это даже оказалось не очень сложно! Использование сплитеров часто оказывается подходящим способом, когда нужно работать с парами более гибко.

public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {
    BoxConsumer<T> box = new BoxConsumer<T>();
    Spliterator<T> splitr = s.spliterator();

    while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {
        acc = op.apply(acc, box.value);
    }

    return acc;
}

public static class BoxConsumer<T> implements Consumer<T> {
    T value = null;
    public void accept(T t) {
        value = t;
    }
}

Применение:

    int product = reduceWithCancel(
        Stream.of(1, 2, 0, 3, 4).peek(System.out::println),
        1, (acc, i) -> acc * i, i -> i == 0);

    System.out.println("Result: " + product);

Выход:

1
2
0
Result: 0

Метод может быть обобщен для выполнения других видов терминальных операций.

Это основано на этом ответе об операции "возьми-пока".

Я ничего не знаю о потенциале распараллеливания этого.

person Lii    schedule 10.09.2015
comment
Обратите внимание, что в моем решении cancelPredicate проверялся результат сокращения, а не следующий элемент. В данном случае это даже лучше (например, 65536*65536 == 0 в Java, хотя ни один из аргументов не равен нулю). Ваш ответ можно легко адаптировать, чтобы сделать то же самое. У меня есть идея, основанная на разветвителе, которая удобна для параллелизма, но мне нужно некоторое время, чтобы правильно ее закодировать... - person Tagir Valeev; 10.09.2015
comment
@TagirValeev: Параллельное выполнение этого цикла было бы интересным упражнением в разделителях. Пишите, если что-то закончили! - person Lii; 10.09.2015
comment
@TagirValeev: Обратите внимание, что в моем решении cancelPredicate проверял результат сокращения, а не следующий элемент. Если подумать, я тоже думаю, что это лучше. Не только в этом случае, а вообще. Операция может привести к значению, на котором вы хотели бы остановиться. Я отредактировал это в ответе. Это также сэкономило мне линию! - person Lii; 10.09.2015
comment
К моему ответу добавлено решение для проверки концепции, поддерживающее параллельную работу. Возможно, я добавлю что-то на его основе в свою библиотеку... - person Tagir Valeev; 10.09.2015

Мое личное мнение состоит в том, чтобы не использовать reduce() как таковой, а использовать существующую заключительную операцию короткого замыкания.

Для этого можно использовать noneMatch() или allMatch() при использовании предиката с побочным эффектом. По общему признанию, это также не самое чистое решение, но оно достигает цели:

AtomicInteger product = new AtomicInteger(1);
IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .peek(System.out::println)
        .noneMatch(i -> {
            if (i == 0) {
                product.set(0);
                return true;
            }
            int oldValue = product.get();
            while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) {
                oldValue = product.get();
            }
            return oldValue == 0;
        });
System.out.println("Result: " + product.get());

Это короткое замыкание, и его можно сделать параллельным.

person bowmore    schedule 10.09.2015
comment
Интересное решение, правда работает только с коммутативными сумматорами. Обычно коммутативность не требуется в Stream API для функций объединителя. Тем не менее проголосовал. Я обобщил это для потоков объектов и пользовательских методов удостоверения/объединителя/предиката, это немного быстрее, чем мое решение с разделителем. - person Tagir Valeev; 11.09.2015
comment
Ну, я исправил небольшую ошибку в estimateSize() моего разделителя (возврат Long.MAX_VALUE заставлял потоковый движок производить гораздо больше параллельных задач, чем необходимо). Теперь моя параллельная версия работает в разы быстрее, чем ваша обобщенная версия, хотя последовательная все же несколько медленнее вашей. - person Tagir Valeev; 11.09.2015

так это делается после появления takeWhile
начиная с Java 9

int[] last = {1};
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
    .takeWhile(i -> last[0] != 0).reduce(1, (a, b) -> (last[0] = a) * b);
person Kaplan    schedule 16.04.2021