В McAfee мы широко используем Apache Kafka. Мы создали OpenDXL SDK, обертывающий обычный Java-клиент Apache Kafka, чтобы добавить пользовательскую схему закодированных сообщений. OpenDXL SDK предоставляет Consumer, Producers и остальные клиентские функции Kafka.

Потребительские приложения

Потребитель — это объект, который позволяет считывать данные из кластера Kafka с использованием модели извлечения. Как правило, пользователи OpenDXL SDK реализуют потребительские приложения, записывая бесконечный цикл внутри блока кода, который выполняется в отдельном потоке.

// Logic implemented by SDK's users to read from Kafka.
...
private Runnable getConsumerTask() {
    return () -> {
        try {
           // Infinite loop to poll records from Kafka
           while (!closed.get()) {

                // Polling
                ConsumerRecords<byte[]> records = consumer.poll(0);

                // Records are processed here. 
                // It might be a heavy and long task.
               // Commit records
               consumer.commitSync();
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        } finally {
            consumer.close();
        }
    };
}
...

Соревнование

Несмотря на то, что разработка потребительского приложения проста, некоторые команды, которые используют OpenDXL SDK для использования тем Kafka, хотят сосредоточиться на своей собственной бизнес-логике, а не иметь дело с внутренними компонентами Kafka. Они хотели бы получать записи от Kafka и обрабатывать их вместо того, чтобы знать о таких механизмах, как опрос, фиксация, потокобезопасность и т. д. Кроме того, им необходимо изучить такие понятия, как группы потребителей, раздел темы, смещения, перебалансировка, тайм-ауты, пауза. /resume, seek и т. д., то, что их не интересует. В конечном счете, эти команды сосредоточены на том, что делать с записями и какие данные они содержат, а не на том, как записи передаются по конвейеру.

Решение

В дополнение к обычному OpenDXL SDK Consumer мы разработали тип Push Consumer, чтобы упростить и ускорить разработку потребительских приложений и позволить некоторым командам сосредоточиться на собственной бизнес-логике.

Как следует из названия, Push Consumer получает сообщения от Kafka спонтанно и автоматически. Нет необходимости постоянно опрашивать записи. Вместо этого пользователи OpenDXL SDK реализуют интерфейс Обратный вызов, который прослушивает новые входящие записи. Экземпляр обратного вызова содержит логику, которая обрабатывает записи и отправляет ответ на продолжение чтения, повторную попытку записи или остановку обработки. Таким образом, пользователю OpenDXL SDK не нужно беспокоиться о том, как фиксировать записи, как перематывать потребительское смещение по тематическому разделу при необходимости повторной обработки, как избежать групповой перебалансировки, когда обработка записей занимает много времени, и как создать потребитель в отдельном потоке.

Использование push-потребителя

  • Загрузите OpenDXL SDK здесь или интегрируйте его с maven Central.
Maven
<dependency>
  <groupId>com.opendxl</groupId>
  <artifactId>dxldatabusclient</artifactId>
  <version>2.4.0</version>
</dependency>
Gradle
compile 'com.opendxl:dxldatabusclient:2.4.0'
  • Реализовать метод onConsume из интерфейса DatabusPushConsumerListener, где должна быть размещена логика для обработки записей.
...
class MessageProcessor implements DatabusPushConsumerListener<byte[]> {

@Override
public DatabusPushConsumerListenerResponse onConsume(ConsumerRecords<byte[]> records) {
        
        // Business Logic here. A instance of this class will
        // receive records from Kafka topic spontaneously
        // and unattended fashion

    return DatabusPushConsumerListenerResponse.CONTINUE_AND_COMMIT;
    }
}
...
DatabusPushConsumerListener<byte[]> lstnr = new MessageProcessor();
DatabusPushConsumer<byte[]> consumer =
            new DatabusPushConsumer(getConsumerConfig(),
            new ByteArrayDeserializer(), lstnr);

Конструктор получает: свойства конфигурации Regular Kafka Consumer, экземпляр Deserializer и экземпляр DatabusPushConsumerListener.

  • Подпишитесь на тему
// Subscribe to topic
consumer.subscribe(Collections.singletonList("topic1"));
  • Начать push-потребитель
// Start pushing messages into MessageProcessor in an async fashion
databusPushConsumerFuture = consumer.pushAsync();
  • Мониторинг выполнения
// Option 1: Wait until the processor stops
databusPushConsumerStatus = databusPushConsumerFuture.get();
// Option 2: Wait for a while then check the listener status.
while (true)  {
    try {
      DatabusPushConsumerStatus status =
      databusPushConsumerFuture.get(100, TimeUnit.MILLISECONDS);
      // if this line is reached, means the listener has finished.
      return;
    } catch (TimeoutException e) {
      // TimeoutException means that listener is still working, so
      // it continue the loop
      continue;
    } finally {
      LOG.info("Push consumer status:" +
           databusPushConsumerStatus.getStatus());
      LOG.info("Listener status:" +
           databusPushConsumerStatus.getListenerResult());
    }
}

Полный пример того, как использовать Push Consumer, можно найти здесь

Как работает Push Consumer?

Основная логика Push Consumer находится в методе push(). У него есть основной цикл, который извлекает сообщения из Kafka, как это обычно делает обычный потребитель. Когда он получает записи из уже подписанной темы, отправляет их в пользовательский обратный вызов OpenDXL SDK для обработки, и он ожидает ответа.

Основной цикл Push Consumer

Основной цикл push() выполняется внутри фонового потока. Вот почему он не блокирует пользовательский поток. Каждая итерация в основном цикле извлекает записи из Kafka. Если они доступны, основной цикл отправляет их методу onConsume из реализации Listener в отдельном потоке. Кроме того, он сохраняет внутреннюю позицию потребителя (смещение) для каждого назначенного тематического раздела на случай, если позже потребуется перемотать потребителя и повторно обработать записи. Кроме того, Push Consumer приостанавливается до тех пор, пока прослушиватель не завершит обработку, чтобы сохранить его в рабочем состоянии и избежать перебалансировки группы, которая может быть вызвана истечением времени ожидания max.poll.interval.ms.

// It stores the current consumer position 
// based on the partitions assigned.
// This is the offset position per topicPartition before 
// polling messages from Databus.  Position
// will be used to rewind the consumer in case the listener returns // RETRY enum value.
final Set<TopicPartition> assignment = super.assignment();
final Map<TopicPartition, Long> lastKnownPositionPerTopicPartition =
        getCurrentConsumerPosition(assignment);

// poll records from Databus
ConsumerRecords records = super.poll(timeout);
LOG.info("Consumer " + super.getClientId() + " number of records read: " + records.count());

if (records.count() > 0) {
    super.pause(assignment);
    LOG.info("Consumer " + super.getClientId() + " is paused");
} else {
    continue;
}

// It calls the listener in a separated thread.
listenerFuture = runListenerAsync(databusPushConsumerFuture, records);

В этот момент Listener, реализованный пользователем, начал обрабатывать записи в отдельном потоке. Основной цикл проверяет значение ответа Listener.

onConsumeResponse = listenerFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);

Если таймаут listenerFuture истекает, это означает, что Listener все еще работает. Затем основной цикл будет опрашивать записи из Kafka, чтобы поддерживать его в рабочем состоянии. Поскольку он уже был приостановлен, никакие записи не будут возвращены.

// TimeoutException means that listener is still working.
// So, a poll is performed to heartbeat Databus
poll(Duration.ofMillis(0));
LOG.info("Consumer " + super.getClientId() + " sends heartbeat to coordinator. The listener continue processing messages...");

После завершения прослушивания он возвращает значение перечисления DatabusPushConsumerListenerResponse. Основываясь на этом значении, основной цикл знает, какое действие необходимо выполнить.

Ответ слушателя

Ответ прослушивателя управляет основным циклом. Как только слушатель закончит работу, основной цикл проанализирует ответ и будет действовать соответствующим образом. Прослушиватель ответит одним из этих значений перечисления (см. DatabusPushConsumerListenerResponse).

  • CONTINUE_AND_COMMITВозвращено прослушивателем, чтобы подтвердить, что последние записи были обработаны, и продолжить получение записей.
case CONTINUE_AND_COMMIT:
default:
    commitSync();
  • ПОВТОРИТЬ Возвращено слушателем для повторного чтения. Обычно что-то было не так при обработке записей. Потребитель push восстанавливает предыдущее смещение раздела темы, сохраненное перед опросом, и возвращается к этой позиции.
case RETRY:
    seek(lastKnownPositionPerTopicPartition);
  • STOP_AND_COMMIT Возвращается прослушивателем, когда он хочет остановить отправку получателя и зафиксировать записи.
case STOP_AND_COMMIT:
    stopRequested.set(true);
    commitSync();
  • STOP_NO_COMMIT Возвращается прослушивателем, когда он хочет остановить отправку получателя и не фиксирует записи.
case STOP_NO_COMMIT:
    stopRequested.set(true);

Что, если Listener выйдет из строя?

Пользователь SDK пишет его логику, реализуя интерфейс DatabusPushConsumerListener. Во время выполнения он может вызвать неожиданное исключение. Исключения, не пойманные прослушивателем, будут перехватываться основным циклом. В этом случае основной цикл остановится и создаст журнал, чтобы пользователь SDK узнал об этой ситуации.

...
} catch (ExecutionException | InterruptedException e) {
    LOG.error("Consumer " + super.getClientId()
            + " listener throws an Exception while it was working: "  
            + e.getMessage(), e);
            databusPushConsumerFuture
            .setDatabusPushConsumerListenerStatus(
                    new DatabusPushConsumerStatus.Builder()
                            .withException(e)
                            .build());
    stopRequested.set(true);
    break;
} catch (Exception e) {
    LOG.warn("Consumer " + super.getClientId() + " exception: " +  
    e.getMessage(), e);
    databusPushConsumerFuture
    .setDatabusPushConsumerListenerStatus(
          new DatabusPushConsumerStatus.Builder()
          .withException(e)
          .build());
    stopRequested.set(true);
    break;
}
...

Резюме

OpenDXL SDK Push Consumer позволяет разработчикам сосредоточиться исключительно на управлении сообщениями, получаемыми от Kafka, и устанавливать высокоуровневую семантику по отношению к базовому потребителю в зависимости от того, что делать с данными. В частности, он упрощает перемотку назад, повторную попытку и переадресацию смещения и освобождает пользователей SDK для работы с низкоуровневой потребительской семантикой, такой как опрос, фиксация, пауза/возобновление, поиск и управление смещениями.

Преимущества

  • Это снижает вероятность групповой перебалансировки, поскольку использует паузу и возобновление работы потребителя за кулисами во время обработки записей.
  • Он предлагает операцию Retry для автоматической повторной обработки одного и того же набора записей в случае сбоя.
  • Модель push вместо модели опроса. Записи считываются из Kafka и помещаются в логическую реализацию клиента. Непрерывный опрос исключен из пользовательской реализации OpenDXL SDK.
  • Скрыть операции сложности потребителя, такие как опрос, фиксация, пауза, возобновление и поиск. Позвольте пользователю сосредоточиться на логике обработки записей.