Как распечатать сообщения Kafka на консоли с помощью Spring-Cloud-Stream-Binder-Kafka

Мой вопрос очень близок к Выводить ввод Kafka Stream на консоль? с небольшой разницей: я использую StreamsBuilder вместо Kstreambuilder. (К сожалению, git с таким разрешением недоступен из-за ошибки 404).

Мой слушатель:

    @StreamListener(LoansStreams.INPUT)
    public void handleLoans(@Payload Loans loans) {


        final Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                ArrayList<String> keywords = new ArrayList<String>();

                // apply regex to value and for each match add it to keywords

                return keywords;
            }

        }
    }

Я получаю сообщение об ошибке: «Метод stream (String, Consumed) в типе StreamsBuilder не применим для аргументов (Serde, Serde, String)», очевидно, потому что он был частью Kstreambuilder.

Предполагая, что я правильно понял, я должен отобразить полученное сообщение, но как мне это сделать? Хочу добавить, что использую Kafka-Cloud-Stream впервые.

Моя цель - создать POC с двумя разными микросервисами, обращающимися к разным базам данных, и «соединить» оба микросервиса через Kafka, чтобы применить шаблон Saga (или, по крайней мере, начать применять его принципы).

В случае необходимости, я запустил Kafka только с сообщениями для печати:

    #log4j.rootLogger=WARN, stderr
    log4j.rootLogger=OFF, stdout 

    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    #log4j.appender.stderr.Target=System.err

На основе не выводить предупреждения kafka-console-consumer

Кроме того, на основе KeeperErrorCode = NoNode для / admin / preferred_replica_election я игнорируя это исключение zookeeper:

[2019-04-30 14:15:11,617] INFO Got user-level KeeperException when processing sessionid:0x100003793980001 type:multi cxid:0x71 zxid:0xbc txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)

И последнее, но не менее важное: мой отправитель:

public void sendLoan(final Loans loans) {
    log.info("Sending loans {}", loans);
    MessageChannel messageChannel = loansStreams.outboundLoans();
    messageChannel.send(MessageBuilder
            .withPayload(loans)
            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
            .build());
}

ПОМ:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.mybank</groupId>
    <artifactId>kafka-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-cloud-stream</name>
    <description>Spring Cloud Stream With Kafka</description>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <!-- version>5.1.5.RELEASE</version -->
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

person Jim C    schedule 30.04.2019    source источник
comment
Независимо от вашей проблемы, у вас, похоже, есть какое-то недоразумение - вы смешиваете канал сообщений StreamListener с связывателем потоков Kafka - с помощью этого кода вы создаете новый KStream для каждого сообщения, полученного слушателем . Вы должны использовать один или другой, а не оба.   -  person Gary Russell    schedule 30.04.2019
comment
Если я правильно понял, вы говорите либо использовать StreamListener, либо использовать связыватель Kafka-stream. Что ж, можете ли вы подвести итог, когда используете одно против другого?   -  person Jim C    schedule 30.04.2019
comment
Я добавил некоторые пояснения в качестве комментария к другому вашему сообщению. Здесь вы смешиваете две проблемы. Несколько примеров доступны по адресу github.com/spring-cloud/spring- облако-поток-образцы. Не могли бы вы взглянуть на них и узнать, соответствует ли какой-либо из них вашим требованиям?   -  person sobychacko    schedule 30.04.2019
comment
Вопрос, на который вы ссылаетесь, должен указывать здесь, но тот факт, что вы отключили корневой журнал, на самом деле не очень хорошо, если вы хотите что-то регистрировать. github.com/confluentinc/kafka-streams-examples   -  person OneCricketeer    schedule 01.05.2019


Ответы (1)


KStreamBuilder - это старый класс построителя, который был заменен на StreamsBuilder (в выпуске Apache Kafka 1.0.0). Изменилось не только название, но и параметры метода.

Как указано в сообщении об ошибке, новый метод ожидает stream(String, Consumed) вместо старого stream(Serde, Serde, String).

Сравните соответствующее руководство по обновлению: https://docs.confluent.io/4.0.0/streams/upgrade-guide.html#building-and-running-a-topology

person Matthias J. Sax    schedule 08.05.2019