SpringXD и Spring Integration: Читайте из темы kafka каждые X минут, затем отправляйте в другую тему

Я пытаюсь реализовать решение для создания потока SpringXD, состоящего из источника kafka, модуля моста и приемника kafka.

Итак, у меня есть что-то вроде:

<channel id="pollable">
    <queue />
</channel>

<bridge input-channel="pollable" output-channel="executorChannel">
    <poller max-messages-per-poll="5" fixed-rate="5000" />
</bridge>

Моя проблема в том, что я хотел бы как-то избежать опроса. В основном потому, что я хотел бы избежать сохранения сообщений в памяти, когда эти сообщения находятся в очереди. Я бы предпочел читать из kafka каждые X минут и просто брать Y сообщения из очереди и отправлять их в следующую тему.

Похоже, я не могу избавиться от очереди, но тогда мой вопрос: есть ли другой вариант? Я не люблю хранить данные в памяти, но я бы тоже не хотел использовать эту опцию: http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store


person Columb1a    schedule 23.03.2017    source источник


Ответы (1)


Хранение данных в памяти - НЕ хорошая идея.

Вы можете stop() и start() адаптер канала (KafkaMessageDrivenChannelAdapter) по мере необходимости; при перезапуске он продолжит работу с того места, где остановился.

Однако исходный код kafka использует очень старую версию spring-integration-kafka (1.3.x).

Если вы создаете настраиваемый источник для использования spring-integration-kafka 2.1.0 (который использует клиент kafka 0.10.1.x), вы можете установить свойство kafka max.poll.records, чтобы ограничить количество извлекаемых записей.

person Gary Russell    schedule 24.03.2017
comment
Каково основное назначение адаптера управляемого канала? и какова основная цель KafkaMessageDrivenChannelAdapter? Я пытался понять, в чем основная цель, читая документацию к Spring, но для меня это не так ясно. (Я читал это: docs.spring.io/spring-kafka/docs/2.0.0.BUILD-SNAPSHOT/reference/ и этот github.com/spring-projects/spring -integration-samples / blob /) каким должен быть поток в моем случае? - person Columb1a; 27.03.2017
comment
Spring Integration - это реализация шаблонов корпоративной интеграции, вам следует ознакомиться с этой книгой и прочитать Документация по интеграции Spring; он должен ответить на такие вопросы, как ваш. - person Gary Russell; 27.03.2017
comment
Спасибо!. Прочитав 2 статьи об использовании канала, управляемого сообщениями, я понял, зачем его использовать. Я оставляю статьи здесь, если кто-то еще хочет проверить: Сначала прочтите это: javarticles.com/2015/01/ Тогда это: javarticles.com/2015/02/ - person Columb1a; 27.03.2017
comment
Как мне остановить и запустить адаптер канала? Я имею в виду, как KafkaMessageDrivenChannelAdapter использует эти методы? например, если я хочу сделать это с помощью xml? - person Columb1a; 27.03.2017
comment
Вы не можете сделать это только с помощью XML, вам нужна собственная логика в каком-то java-коде; @Autwire адаптер канала и остановите / запустите его по мере необходимости. - person Gary Russell; 27.03.2017
comment
Хорошо, имеет смысл. Я это сделал. Я знаю, что остановка и запуск происходят из класса SmartLifeCycle. Так что начать / остановить что-то можно, просто добавив свою логику. Итак, мой вопрос должен заключаться в следующем: есть ли подходящий способ, который я могу использовать между остановками / запусками адаптера канала? - person Columb1a; 28.03.2017
comment
Вы сказали, что хотите обрабатывать так много сообщений каждые несколько минут. Вы можете включить счетчики на выходном канале и контролировать sendCount и остановить адаптер, когда будет достигнуто необходимое количество. Установить таймер и перезапустить после истечения таймера. - person Gary Russell; 28.03.2017
comment
Ok. Тогда в этом подходе, который вы только что упомянули, max.poll.records не понадобится. Верно? В общем, способ, которым я реализую момент запуска / остановки, зависит на 100% от меня. Весной на данный момент нет ничего подобного, что-то, что легко обрабатывается, например, для запуска / остановки адаптера канала. - person Columb1a; 28.03.2017
comment
Верный; нет стандартного компонента с этой функциональностью. И да, этот метод будет работать независимо от настроек кафки. - person Gary Russell; 28.03.2017
comment
Позвольте нам продолжить это обсуждение в чате. - person Columb1a; 28.03.2017
comment
Вопрос, я заметил, что я не могу использовать max.poll.records в xml (используя spring-integration-kafka 2.1.0), но я могу использовать его, используя java config. В общем, разве нельзя использовать его с конфигурацией xml? - person Columb1a; 03.04.2017
comment
И еще вопрос, защищает ли такой подход от сохранения данных в памяти? - person Columb1a; 03.04.2017
comment
Я не уверен, что вы имеете в виду под словом «не могу использовать» - это свойство kafka, переданное потребителю из Properties bean-компонента. Да, это позволяет избежать сохранения данных в памяти (если вы удалите канал очереди). - person Gary Russell; 03.04.2017
comment
Виноват. вы правы, я приму ваш ответ и закрою вопрос, потому что информации достаточно, чтобы продолжить то, что я хотел. - person Columb1a; 04.04.2017