Akka Streams: как смоделировать ограничение пропускной способности/скорости в системе из двух связанных потоков?

Допустим, у меня есть печь для пиццы и ряд пицц, которые мне нужно испечь. Моя печь способна выпекать только 4 пиццы за раз, и разумно ожидать, что в течение дня в очереди всегда будет как минимум 4 пиццы, поэтому печь должна быть на полную мощность как можно чаще.

Каждый раз, когда я ставлю пиццу в духовку, я ставлю таймер на своем телефоне. Как только это срабатывает, я вынимаю пиццу из духовки, даю ее всем, кто хочет, и вместимость становится доступной.

У меня есть 2 источника, один из которых — очередь пиццы, которую нужно приготовить, а другой — таймер для яиц, который срабатывает, когда пицца приготовлена. В системе также есть 2 раковины, одна из которых предназначена для приготовленной пиццы, а другая — место для отправки подтверждения, что пицца была помещена в печь.

В настоящее время я представляю их очень наивно следующим образом:

Source.fromIterator(() => pizzas)
    .map(putInOven) // puts in oven and sets a timer
    .runWith(Sink.actorRef(confirmationDest, EndSignal))

Source.fromIterator(() => timerAlerts)
    .map(removePizza)
    .runWith(Sink.actorRef(pizzaDest, EndSignal))

Однако в настоящее время эти два потока полностью независимы друг от друга. eggTimer работает правильно, удаляя пиццу всякий раз, когда она собирается. Но он не может сообщить предыдущему потоку, что емкость стала доступной. На самом деле первый поток вообще не имеет понятия о вместимости и будет просто пытаться запихнуть пиццу в печь, как только она встанет на очередь.

Какие концепции Akka можно использовать для составления этих потоков таким образом, чтобы первый брал пиццы из очереди только при наличии вместимости, а второй поток мог «оповещать» первый об изменении вместимости при удалении пиццы из очереди? печь.

Мое первоначальное впечатление состоит в том, чтобы реализовать такой граф потока:

   ┌─────────────┐                                                          
┌─>│CapacityAvail│>──┐                                                      
│  └─────────────┘   │   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
│  ┌─────────────┐   ├──>│     Zip     │>─>│  PutInOven  │>─>│   Confirm   │
│  │    Queue    │>──┘   └─────────────┘   └─────────────┘   └─────────────┘
│  └─────────────┘                                                          
│  ┌─────────────┐       ┌─────────────┐                                    
│  │    Done     │>─────>│  SendPizza  │                                    
│  └─────────────┘       └─────────────┘                                    
│         v                                                                 
│         │                                                                 
└─────────┘                    

Принцип, лежащий в основе этого, заключается в том, что существует фиксированное количество объектов CapacityAvailable, которые заполняют источник CapacityAvail. Они заархивированы с событиями, которые поступают в очередь пиццы, то есть, если они недоступны, обработка пиццы не начинается, поскольку операция zip будет ждать их.

Затем, когда пицца готова, объект CapacityAvailable помещается обратно в пул.

Основное препятствие, которое я вижу для этой реализации, заключается в том, что я не уверен, как создать и заполнить пул для источника CapacityAvail, а также я не уверен, может ли источник также быть приемником. Существуют ли какие-либо типы Source/Sink/Flow, которые подходят для этой реализации?


person jackweirdy    schedule 31.10.2016    source источник
comment
Вы пытаетесь реализовать дросселирование?   -  person expert    schedule 31.10.2016
comment
В каком-то смысле, я думаю, да. но не по времени или ограничению скорости, а по бизнес-логике на уровне приложения.   -  person jackweirdy    schedule 31.10.2016
comment
И изменяется ли регулирование в зависимости от того, какой элемент мы обрабатываем, или это глобальная переменная для каждого потока, которую вы хотите динамически изменять?   -  person expert    schedule 31.10.2016
comment
Это произвольное количество времени. По сути, каждая поступающая пицца — это задача в очереди задач, которая должна быть выполнена, что может занять от 0 до бесконечности секунд, но одновременно могут выполняться только 4.   -  person jackweirdy    schedule 01.11.2016


Ответы (3)


Этот вариант использования обычно плохо сочетается с Akka Streams. Под капотом Akka Stream — это реактивный поток; из документации:

Реализация Akka Streams использует внутренние интерфейсы Reactive Streams для передачи данных между различными этапами обработки.

Ваш пример с пиццей не применим к потокам, потому что у вас есть какое-то внешнее событие, которое является таким же источником спроса, как и приемник вашего потока. Тот факт, что вы открыто заявляете, что «первый поток вообще не имеет понятия о емкости», означает, что вы не используете потоки по их прямому назначению.

Всегда можно использовать какое-то странное кодирование джиу-джитсу, чтобы неуклюже изменить потоки для решения проблемы параллелизма, но у вас, вероятно, возникнут трудности с поддержкой этого кода в дальнейшем. Я рекомендую вам рассмотреть возможность использования фьючерсов, акторов или старых простых потоков в качестве механизма параллелизма. Если ваша духовка имеет бесконечную емкость для приготовления пиццы, то для начала нет необходимости в потоках.

Я бы также пересмотрел весь ваш дизайн, поскольку вы используете ход часов в качестве сигнализатора спроса (т. Е. Ваш «таймер для яиц»). Как правило, это указывает на ошибку в схеме процесса. Если вы не можете обойти это требование, вам следует оценить другие шаблоны проектирования:

  1. Периодическое планирование сообщений
  2. Время ожидания блока без потока
person Ramón J Romero y Vigil    schedule 31.10.2016
comment
Хорошо, мне нравится ваше мышление в целом. Я также понял после создания моей первоначальной диаграммы, что я действительно смоделировал конечный автомат, поэтому я рассмотрю утилиты akka для этого, поскольку они могут лучше подходить. Часы были просто примером, что происходит, так это то, что я отправляю задачу (пиццу) и получаю ответ о ее завершении из другого несвязанного источника. Я действительно не использую таймеры в своем приложении. - person jackweirdy; 01.11.2016
comment
@jackweirdy Я собирался сказать то же самое, что сказал Рамон. Когда я получил ваш комментарий сегодня утром, я сказал, что вам нужно либо использовать mapAsync, либо реализовать его с помощью Futures/actors. - person expert; 01.11.2016

Вы можете представить печь с mapAsyncUnordered ступенью с помощью parallelism=4. Завершение Future может быть выполнено по таймеру (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After) или что вы решили вынуть его из духовки по какой-то другой причине.

person Patrik Nordwall    schedule 01.11.2016

Это то, что я в итоге использовал. Это в значительной степени точная реализация машины с искусственными состояниями в вопросе. Механика Source.queue намного неуклюже, чем я надеялся, но в остальном она довольно чистая. Реальные приемники и источники предоставляются в качестве параметров и создаются в другом месте, поэтому фактическая реализация имеет немного меньше шаблонного кода, чем эта.

RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects 
    // into capacitySrc. Can be consumed by using capacity as a Source.
    val (capacity, capacitySrc) =
      peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
                                                        OverflowStrategy.fail))

    // Set initial capacity
    capacitySrc.foreach(c =>
      Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))


    // Pull pizzas from the RabbitMQ queue
    val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
                             consume(queue("pizzas-to-cook")), body(as[TaskRun]))

    // Take the blocking events stream and turn into a source
    // (Blocking in a separate dispatcher)
    val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
        .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))

    // Split the events stream into two sources so 2 flows can be attached
    val bc = builder.add(Broadcast[PizzaEvent](2))

    // Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
    // When cooking starts, send the confirmation back to rabbitMQ
    cookQ.zip(AckedSource(capacity)).map(_._1)
      .mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
      .map(Message.queue(_, "pizzas-started-cooking"))
      .acked ~> Sink.actorRef(rabbitControl, HostDied)

    // Send the cook events stream into two flows
    cookEventsQ ~> bc.in

    // The first tops up the capacity pool
    bc.out(0)
      .mapAsync(CONCURRENT_CAPACITY)(e =>
         capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
      ) ~> Sink.ignore

    // The second sends out cooked events
    bc.out(1)
      .map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
    ) ~> Sink.actorRef(rabbitControl, HostDied)

    ClosedShape
}).run()
person jackweirdy    schedule 02.11.2016
comment
В качестве упражнения я также попытался переписать конечный автомат с помощью Akka FSM и использовать шаблон work-pulling. - person jackweirdy; 28.11.2016