Поддерживаемые протоколы члена группы несовместимы с протоколами существующих участников.

У меня проблема, связанная с Кафкой.

У меня есть текущая служба (Producer), которая отправляет сообщение в тему Kafka (events). Сервис использует kafka_2.12 v1.0.0, написанный на Java.

Я пытаюсь интегрировать его с примером проекта spark-streaming как Consumer службы (здесь с использованием kafka_2.11 v0.10.0, написанного на Scala)

Сообщение успешно отправлено с Producer в тему Kafka. Однако я всегда получаю стек ошибок ниже:

Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)    at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
    at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)     at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)   at scala.App$class.main(App.scala:76)
    at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
    at com.jj.streaming.ItemApp.main(ItemApp.scala)

Я не знаю первопричины. Как я могу это исправить?


person Tin Nguyen    schedule 23.07.2018    source источник
comment
Тин Нгуен, вы нашли решение этой проблемы? Я тоже столкнулся с той же проблемой   -  person Satish    schedule 07.01.2019
comment
привет @Satish, нет, я этого не делал.   -  person Tin Nguyen    schedule 20.03.2019


Ответы (1)


Это происходит в моей конфигурации, когда я пытаюсь добавить потребителя в кластер, который использует стратегию назначения разделов, отличную от предыдущих.

Например:

partition.assignment.strategy = org.apache.kafka.clients.consumer.RandomAccessAssignor

смешанный или по умолчанию:

partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor

person John Cairns    schedule 26.08.2019