Динамическое потребление сообщений из темы кафка

Мне нужно получать сообщения из тем, которые динамически создаются производителем. Я использовал подход шаблона темы в потребителе @KafkaListener (topicPattern = "topicname _. *" Для этого, а также установил metadata.max.age.ms = 3000. Но, очевидно, если и пока я не установил offset.auto.reset до самого раннего возраста я не могу этого добиться. По нашему требованию offset.auto.reset должен быть установлен на последнее значение, чтобы избежать дублирования проблемы.

Есть идеи, как добиться того же?


person Sneha    schedule 09.12.2019    source источник
comment
>has to be set to latest to avoid the duplication of the issue. Это неверно; auto.offset.reset применяется только в первый раз, когда потребитель с определенным group.id потребляет данные из раздела. Как только он зафиксирует смещение, в будущем потребление начнется с этого смещения - если только смещение не истечет, потому что потребитель не потреблял в течение недели (по умолчанию).   -  person Gary Russell    schedule 09.12.2019
comment
Я создал потребителя, который потребляет из тем на основе шаблона, я установил для offset.auto.reset значение latest и установил metadata.max.age.ms на 5 секунд, но у потребителя всегда отсутствует начальное / первое сообщение но потребляя сообщение оттуда. Я использую модуль Spring-kafka. Producer = KafkaProducer (bootstrap_servers = 'localhost: 9092') record_metadata = Producer.send (topic, data) и код для потребителя: Пожалуйста, предложите что-нибудь, чтобы решить эту проблему или любую конфигурацию, которую я должен включить в экземпляры производителя и потребителя.   -  person Sneha    schedule 10.12.2019
comment
Вы должны использовать earliest, чтобы получить все записи в новой теме. Kafka на самом деле не предназначен для вашего случая использования. 5 секунд для возраста метаданных - это довольно агрессивно.   -  person Gary Russell    schedule 10.12.2019


Ответы (1)


Говоря на высшем уровне, философия дизайна Kafka не допускает ничего подобного, потому что добавление тем во время выполнения вызывает жесткую перебалансировку брокеров, и, следовательно, этого следует избегать, но если нам удастся перезапустить группы потребителей каждый раз, когда новая тема добавлен в список, мы можем изящно избежать этой опасности.

Самая последняя интеграция Spring-Kafka и ее использование включают аннотацию @KafkaListener, которая превращает ваш слушатель POJO в потребителя Kafka путем создания KafkaListenerContainer с использованием переданной ему фабрики контейнеров. Этот потребитель слушает темы, жестко запрограммированные в виде строкового массива тем, выражений тем, шаблонов тем и т. Д. Это ограничивает наш дизайн выборкой этих тем в виде ключей через Java DSL на максимальном уровне вместо прямого жесткого кодирования. Однако ключи по-прежнему жестко запрограммированы в массиве, переданном как параметр @KafkaListener.

Пример:

@KafkaListener(topics = {“${kafka.topics.receipt.cancel.name}”}, containerFactory = “kafkaContainerFactory”)

Примечание. Значение атрибута аннотации KafkaListener.topics должно быть инициализатором массива, поэтому жесткое кодирование является обязательным.

person SudoShivansh    schedule 09.12.2019
comment
Я не запрограммировал это жестко. Он у меня в application.properties. @KafkaListener (topicPattern = $ {topic.prefix}, значение topic.prefix содержит шаблон для чтения. - person Sneha; 09.12.2019