autoStartup для @StreamListener

В отличие от @KafkaListener, похоже, что @StreamListener не поддерживает параметр autoStartup. Есть ли способ добиться такого же поведения для @StreamListener? Вот мой вариант использования:

У меня есть универсальное приложение Spring, которое может прослушивать любую тему Kafka и записывать в соответствующую таблицу в моей базе данных. Для некоторых тем объем невелик, поэтому обработка одного сообщения с очень малой задержкой - это нормально. Для других тем, которые имеют большой объем, код должен получать микропакет сообщений и реже записывать в базу данных с помощью пакета Jdbc. В идеале определение для слушателей выглядело бы примерно так:

// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...

// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...

Для темы с небольшим объемом я бы установил application.singleMessageListenerEnabled на true и application.multipleMessageListenerEnabled на false, и наоборот для темы с большим объемом. Таким образом, только один из слушателей будет активно слушать сообщения, а другой не будет активно слушать.

Есть ли способ добиться этого с помощью @StreamListener?


person Joe P    schedule 28.02.2020    source источник


Ответы (1)


Во-первых, рассмотрите возможность обновления до модель функционального программирования, рефакторинг которой займет у вас несколько минут. Мы практически отказались от модели программирования на основе аннотаций. Если вы это сделаете, то то, что вы пытаетесь достичь, будет очень просто:

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleStreamApplication.class);
    }

    @Bean
    public Consumer<GenericRecord> singleRecordConsumer() {...}

    @Bean
    public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}

Затем вы можете просто использовать свойство --spring.cloud.function.definition=singleRecordConsumer для одного случая и --spring.cloud.function.definition=multipleRecordConsumer при запуске приложения, чтобы убедиться, какой конкретный слушатель вы хотите активировать.

person Oleg Zhurakousky    schedule 28.02.2020
comment
Спасибо Олег. Мне удалось заставить singleRecordConsumer работать нормально. Однако я хотел бы получить заголовки Kafka, но не мог придумать, как это сделать. Для multipleRecordConsumer у меня есть приведенный ниже код для компиляции, но он не получает никаких сообщений. Я добавил следующее в свой файл application.properties. spring.cloud.function.definition=multipleRecordConsumer spring.cloud.stream.bindings.multipleRecordConsumer-in-0.consumer.batch-mode=true - person Joe P; 29.02.2020