Создание ответа при использовании адаптера исходящего канала

У меня есть следующий упрощенный поток интеграции Spring:

int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter

По сути:

  1. Конечная точка веб-службы отображается с помощью int-ws:inbound-gateway
  2. Сообщения от этой конечной точки помещаются в input канал (первый --->)
  3. Пользовательский преобразователь переводит формат JSON полезной нагрузки и добавляет заголовок MESSAGE_KEY (требуется для kafka)
  4. Сообщение помещается на inputToKafka канал (второй --->)
  5. int-kafka:outbound-channel-adapter отправляет сообщение в тему кафки

Операция веб-службы имеет запрос и полезные данные ответа.
Полезные данные запроса - это то, что я преобразовываю в сообщение JSON.
Я хотел бы вернуть полезные данные ответа (которые будут упорядочены и т. Д.) После сообщения размещен в теме кафки int-kafka:outbound-channel-adapter

Как мне это сделать?

В настоящий момент, когда я вызываю веб-службу, все работает, как ожидалось, но я должен установить reply-timeout на int-ws:inbound-gateway, чтобы он не зависал. Когда я это делаю, я просто получаю пустой ответ на SOAPUI.

Я понимаю концепции в разделе Поведение шлюза при отсутствии ответа - но в моем случае я действительно хочу сгенерировать ответ.

Это мой контекст интеграции (без конфигурации брокера kafka и т. Д.):

<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
                        marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>

<int:channel id="input"/>

<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
    <bean class="com.test.InputToJSONTransformer"/>
</int:transformer>

<int:channel id="inputToKafka"/>

<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    order="1">
</int-kafka:outbound-channel-adapter>

person Justin Walsh    schedule 26.06.2015    source источник


Ответы (2)


Изменять

<int:channel id="inputToKafka"/>

to

<int:publish-subscribe-channel id="inputToKafka"/>

Добавьте на канал второго подписчика.

<service-activator input-channel="inputToKafka" ... order="2" />

Где сервис генерирует ответ; он будет вызываться после успешной отправки в кафку.

Не включайте output-channel; структура позаботится о маршрутизации вывода службы обратно на шлюз ws.

person Gary Russell    schedule 26.06.2015
comment
Спасибо - похоже, что ответ был отправлен второму подписчику независимо от того, была ли отправка в Kafka успешной или нет. Будет ли какой-либо способ обнаружить неудачную отправку в kafka и, желательно, вернуть сообщение об ошибке на WS? Я предполагаю, что нет, поскольку вызов kafka заключен в Future, и с ним ничего не делается. - person Justin Walsh; 27.06.2015
comment
Поддерживаются только асинхронные посылки кафки, так как мы перешли на 0.8.2. Вам нужно будет приостановить поток и настроить потребителя, чтобы подтвердить, что сообщение прибыло в kafka, а затем освободить поток отправки. - person Gary Russell; 27.06.2015

Какой ответ вы ожидаете от адаптера исходящего канала Kafka? Помните, что это Adapter, а не Gateway

Однако вы можете ввести другой компонент, ServiceActivator с входным каналом sendToInputToKafka, теперь ваш трансформатор будет выводить сигнал на этот канал.

Ваш ServiceActivator должен иметь @Autowire MessageChannel inputToKafka и должен вручную программно отправлять сообщение на этот канал. После отправки этого сообщения вы создадите желаемый ответ как тип возврата ServiceActivator и ответ на ваш ws gateway

person iamiddy    schedule 26.06.2015
comment
Обычно нет необходимости вводить компоненты фреймворка (такие как MessageChannel в код пользователя. См. Мой ответ. - person Gary Russell; 26.06.2015