Допустим, у меня есть печь для пиццы и ряд пицц, которые мне нужно испечь. Моя печь способна выпекать только 4 пиццы за раз, и разумно ожидать, что в течение дня в очереди всегда будет как минимум 4 пиццы, поэтому печь должна быть на полную мощность как можно чаще.
Каждый раз, когда я ставлю пиццу в духовку, я ставлю таймер на своем телефоне. Как только это срабатывает, я вынимаю пиццу из духовки, даю ее всем, кто хочет, и вместимость становится доступной.
У меня есть 2 источника, один из которых — очередь пиццы, которую нужно приготовить, а другой — таймер для яиц, который срабатывает, когда пицца приготовлена. В системе также есть 2 раковины, одна из которых предназначена для приготовленной пиццы, а другая — место для отправки подтверждения, что пицца была помещена в печь.
В настоящее время я представляю их очень наивно следующим образом:
Source.fromIterator(() => pizzas)
.map(putInOven) // puts in oven and sets a timer
.runWith(Sink.actorRef(confirmationDest, EndSignal))
Source.fromIterator(() => timerAlerts)
.map(removePizza)
.runWith(Sink.actorRef(pizzaDest, EndSignal))
Однако в настоящее время эти два потока полностью независимы друг от друга. eggTimer работает правильно, удаляя пиццу всякий раз, когда она собирается. Но он не может сообщить предыдущему потоку, что емкость стала доступной. На самом деле первый поток вообще не имеет понятия о вместимости и будет просто пытаться запихнуть пиццу в печь, как только она встанет на очередь.
Какие концепции Akka можно использовать для составления этих потоков таким образом, чтобы первый брал пиццы из очереди только при наличии вместимости, а второй поток мог «оповещать» первый об изменении вместимости при удалении пиццы из очереди? печь.
Мое первоначальное впечатление состоит в том, чтобы реализовать такой граф потока:
┌─────────────┐
┌─>│CapacityAvail│>──┐
│ └─────────────┘ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ ┌─────────────┐ ├──>│ Zip │>─>│ PutInOven │>─>│ Confirm │
│ │ Queue │>──┘ └─────────────┘ └─────────────┘ └─────────────┘
│ └─────────────┘
│ ┌─────────────┐ ┌─────────────┐
│ │ Done │>─────>│ SendPizza │
│ └─────────────┘ └─────────────┘
│ v
│ │
└─────────┘
Принцип, лежащий в основе этого, заключается в том, что существует фиксированное количество объектов CapacityAvailable, которые заполняют источник CapacityAvail
. Они заархивированы с событиями, которые поступают в очередь пиццы, то есть, если они недоступны, обработка пиццы не начинается, поскольку операция zip будет ждать их.
Затем, когда пицца готова, объект CapacityAvailable помещается обратно в пул.
Основное препятствие, которое я вижу для этой реализации, заключается в том, что я не уверен, как создать и заполнить пул для источника CapacityAvail, а также я не уверен, может ли источник также быть приемником. Существуют ли какие-либо типы Source/Sink/Flow, которые подходят для этой реализации?