У меня есть следующий упрощенный поток интеграции Spring:
int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter
По сути:
- Конечная точка веб-службы отображается с помощью
int-ws:inbound-gateway
- Сообщения от этой конечной точки помещаются в
input
канал (первый--->
) - Пользовательский преобразователь переводит формат JSON полезной нагрузки и добавляет заголовок MESSAGE_KEY (требуется для kafka)
- Сообщение помещается на
inputToKafka
канал (второй--->
) int-kafka:outbound-channel-adapter
отправляет сообщение в тему кафки
Операция веб-службы имеет запрос и полезные данные ответа.
Полезные данные запроса - это то, что я преобразовываю в сообщение JSON.
Я хотел бы вернуть полезные данные ответа (которые будут упорядочены и т. Д.) После сообщения размещен в теме кафки int-kafka:outbound-channel-adapter
Как мне это сделать?
В настоящий момент, когда я вызываю веб-службу, все работает, как ожидалось, но я должен установить reply-timeout
на int-ws:inbound-gateway
, чтобы он не зависал. Когда я это делаю, я просто получаю пустой ответ на SOAPUI.
Я понимаю концепции в разделе Поведение шлюза при отсутствии ответа - но в моем случае я действительно хочу сгенерировать ответ.
Это мой контекст интеграции (без конфигурации брокера kafka и т. Д.):
<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>
<int:channel id="input"/>
<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
<bean class="com.test.InputToJSONTransformer"/>
</int:transformer>
<int:channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true"
channel="inputToKafka"
order="1">
</int-kafka:outbound-channel-adapter>