По умолчанию потребитель Genstage подписан на все события от производителя. Но в некоторых случаях необходимо фильтровать события, на которые подписывается потребитель. Мы можем подписаться только на некоторые события или даже отфильтровать некоторые события.

Этого можно легко добиться, настроив диспетчера-производителя так, чтобы он использовал GenStage.BroadcastDispatcher, и подписывался только на нужные нам события, указав параметр :selector в потребителе.

Итак, давайте обновим нашего производителя и обновим диспетчер

def init(counter) do
  {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end

Следующий шаг — указать нашу опцию selector в потребителе. Опция выбора принимает функцию, которую можно использовать для фильтрации потока событий, на которые мы хотим подписаться.

Потребитель будет получать только те события, переданные производителем, для которых функция селектора возвращает истинное значение.

Для подписок на синхронизацию это можно сделать следующим образом:

GenStage.sync_subscribe(consumer, to: producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)

Для асинхронных подписок это можно сделать, указав функцию выбора в списке :subscribe_to в возвращаемом кортеже GenStage.init/1.

def init(:ok) do
  {:consumer, :ok, subscribe_to: [{producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end}]}
end

NB: Следует отметить, что опция селектора работает только при использовании GenStage.BroadcastDispatcher для диспетчера. К сожалению, пропуск этого не выдает никаких ошибок и не предупреждает пользователя.

Первоначально опубликовано на til.codes 27 января 2018 г.