TL;DR: https://github.com/kifi/juggle

Параллельная обработка задач

В прошлом году мы открыли исходный код нашей Reactive Lock, облегченной библиотеки, которую мы используем в Kifi для ограничения параллелизма, обычно когда несколько запросов борются за какой-то ресурс.

В этом посте мы представляем шаблон, который мы используем для параллельной обработки задач, чтобы контролировать периодичность и пропускную способность. Вот что мы хотим сделать.

  1. Каждый период времени проверяйте, доступны ли новые задачи.
  2. Если доступны новые задачи, начните их обработку одновременно.

В Кифи мы используем Акку с первых дней. Однако мы не развернули большие иерархии субъектов. Вместо этого мы часто используем одноэлементные акторы для инкапсуляции изменяемого состояния: мы полагаемся на гарантию того, что акторы обрабатывают сообщения по одному, чтобы безопасно поддерживать это состояние в многопоточном контексте.

Именно так мы собираемся совмещать несколько одновременных задач с одним действующим лицом.

Знакомство с жонглированием

Жонглирование — небольшая библиотека, она предоставляет всего две черты актера:

  • ConcurrentTaskProcessingActor[T]
  • Актер пакетной обработки[T]

ConcurrentTaskProcessingActor[T] — это черта, на которую мы хотим обратить наибольшее внимание, она требует следующих реализаций:

Он запускает Future, которые асинхронно pullTasks и processTasks, а затем отправляют сообщения обратно, чтобы сообщить о своем результате, когда задачи типа T фактически извлекаются или обрабатываются.

Таким образом, этот актор инкапсулирует довольно простое состояние:

Отложив на время закрытие, наш актор отслеживает как отдельные задачи, которые в настоящее время находятся в обработке, так и количество дополнительных задач, которые могут вернуться после вытягивания. Поэтому он знает, что нужно ограничить параллелизм до maxConcurrentTasks, поскольку он извлекает больше задач. Затем, когда задачи будут выполнены, он будет продолжать извлекать новые до тех пор, пока параллелизм не упадет ниже уровня minConcurrentTasks.

Интуитивно понятно, что ConcurrentTaskProcessingActor будет постоянно корректировать параллелизм, чтобы поддерживать его в пределах minConcurrentTasks и maxConcurrentTasks.

Вопросы параллелизма

Правильные значения minConcurrentTasks и maxConcurrentTasks будут охватывать различные режимы периодичности и параллелизма в зависимости от бизнес-логики и соображений стоимости операций извлечения и обработки.

Например, minConcurrentTasks = maxConcurrentTasks гарантирует, что актор активно извлекает новые задачи после обработки любой задачи. Это нормально, если стоимость обработки задачи превышает стоимость ее вытягивания.

Однако стоимость извлечения часто сводится к фиксированной стоимости обращения к базе данных или к очереди, такой как SQS, и не зависит от того, сколько задач фактически извлечено. Таким образом, в таком сценарии мы могли бы использовать minConcurrentTasks ‹‹ maxConcurrentTasks, чтобы амортизировать фиксированную стоимость извлечения по сравнению с maxConcurrentTasks-minConcurrentTasks задачи.

Как правило, отношение minConcurrentTasks к maxConcurrentTasks будет увеличиваться с увеличением отношения относительных затрат на обработку к стоимости извлечения, в то время как абсолютные значения затем выводятся из спецификаций оборудования.

Обратите внимание, что minConcurrentTasks › 0 – это необходимое условие для того, чтобы актор продолжал получать новые задачи, пока не останется ни одной. Актер с minConcurrentTasks = 0 быстро утихнет, счастливо бездействуя, даже если можно будет вытащить больше задач.

Планирование

Если параллелизм когда-либо упадет до нуля, наш актор решит, что на данный момент больше нет работы, и вообще перестанет тянуть. На этом этапе мы должны явным образом отправить сообщение, чтобы разбудить его: обычно в дело вступает планировщик.

Планировщик должен периодически отправлять IfYouCouldJustGoAhead нашему актеру, чтобы убедиться, что он проверяет новые задачи по крайней мере каждый заданный период. Если действительно есть новые задачи, наш актор возвращается и начинает их обрабатывать.

Естественно, IfYouCouldJustGoAhead — это просто еще одно сообщение, которое безопасно обрабатывается нашим актором с точки зрения его внутреннего состояния: если обработка уже идет, запрос на продолжение не приведет к экстравагантному вытягиванию задачи. Поэтому даже агрессивные сообщения от планировщика не могут привести к нарушению параллелизма.

Наконец, мы можем сказать ConcurrentTaskProcessingActor Close навсегда, обычно перед закрытием приложения. Поскольку он закрывается, наш актор вообще не начнет обрабатывать какие-либо новые задачи.

Общий частный случай: пакетная обработка

В этом контексте BatchProcessingActor[T] — это просто частный случай ConcurrentTaskProcessingActor, который мы нашли достаточно распространенным, чтобы предоставить его явно в библиотеке Juggle. Требуются следующие реализации:

Таким образом, BatchProcessingActor[T] позволяет асинхронно извлекать и обрабатывать один пакет задач за другим. На практике BatchProcessingActor[T] — это просто ConcurrentTaskProcessingActor[Seq[T]] с обоими minConcurrentTasks и maxConcurrentTasks. установить на 1.

maxConcurrentTasks = 1 гарантирует, что субъект будет обрабатывать не более одного пакета Seq[T] за раз, а minConcurrentTasks = 1 гарантирует, что новые пакеты будут продолжаться до тех пор, пока не останется задач для извлечения.

Вот полная реализация:

Вывод

Субъекты жонглирования предоставляют простые в реализации и легкие решения для широкого спектра вариантов использования параллельной обработки задач.

Например, BatchProcessingActor широко используется в нашей системе для обработки приема данных между службами с помощью порядковых номеров, точно так же, как мы разработали нашу индексацию в реальном времени.

Другим распространенным вариантом использования является распределение работы в сервисном кластере. Одна машина (лидер) периодически отправляет новые задачи в распределенную очередь (например, SQS). Затем все остальные машины в кластере извлекают задачи из очереди и обрабатывают их одновременно. Один производитель обычно расширяет BatchProcessingActor[T], а потребители расширяют ConcurrentProcessingActor[SQSMessage[T]].

Мы полагались на такую ​​настройку для ряда задач, таких как парсинг и обработка изображений, а также в ключевых компонентах нашей интеграции со Slack.

Мы надеемся, что вам нравится жонглировать так же сильно, как и нам. 😼