Lagom Framework: тема Kafka не создается

Я пытаюсь написать небольшой микросервис, используя лагом-фреймворк со стороной чтения, реализованной для поддержки mysql. https://github.com/codingkapoor/lagom-scala-slick

Цель этой службы — предоставить API для создания, обновления и чтения сотрудников.

Однако при выполнении проект не создает тему кафки и не публикует в ней сообщения. Я пытался отлаживать, читать документы и ссылаться на пару других подобных проектов, но пока безуспешно.

Документация Lagom и аналогичные проекты — единственные источники, в которых можно найти любую помощь для такой довольно новой технологии. Мне действительно нужна помощь, чтобы отладить и понять эту проблему. Дайте мне знать, если это правильная платформа, чтобы обратиться за такой помощью.

Шаги, которые я предпринимаю, чтобы создать сотрудника и, возможно, увидеть созданную тему кафки, следующие:

#1. sbt runAll

#2. curl -X POST \
  http://localhost:9000/api/employees \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "128",
    "name": "Shivam",
    "gender": "M",
    "doj": "2017-01-16",
    "pfn": "PFKN110"
}'

#3. /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181

#4. /opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic employee --from-beginning

person iamsmkr    schedule 24.08.2019    source источник
comment
Я запустил ваш код, вижу тему в Кафке. Не могли бы вы объяснить, что не работает?   -  person Vladislav Kievski    schedule 25.08.2019
comment
@VladislavKievski Прежде всего, большое спасибо за то, что нашли время и силы, чтобы помочь мне разобраться в этом вопросе. Я обновил шаги, которые я предпринял для создания сотрудника, и увидел тему кафки, созданную для него. На шаге 3, упомянутом выше, я не вижу созданной темы с именем сотрудника. Кроме того, я хотел бы знать, видели ли вы тему kafka, созданную до запуска curl cmd? Вы настроили mysql для этого проекта? Какие темы вы видите? Дайте мне знать, если я могу объяснить вещи лучше в конце.   -  person iamsmkr    schedule 26.08.2019
comment
@VladislavKievski Я также хотел бы знать, видите ли вы данные в таблице сотрудников kafka и mysql после того, как вы введете команду curl для создания сотрудника.   -  person iamsmkr    schedule 26.08.2019
comment
Я вижу тему kafka, созданную после выполнения curl cmd. Нет, я его удаляю, и заменяю там, где нужно, на касснадру, так мне будет проще тестировать. Я вижу только одну тему. Все ваши события должны храниться в базе данных. Я ответил на ваш вопрос?   -  person Vladislav Kievski    schedule 26.08.2019
comment
@VladislavKievski У меня это не работает, и я не знаю, почему? Видите ли вы данные в пространстве ключей сотрудника, если вы настроили создание на стороне чтения? Не могли бы вы опубликовать команды и результаты из пространства ключей сотрудников kafka и cassandra?   -  person iamsmkr    schedule 26.08.2019
comment
Кроме того, если вы можете опубликовать свой код на github в виде разветвленного репо или чего-то еще, чтобы я мог сослаться, чтобы увидеть внесенные вами изменения, пожалуйста?   -  person iamsmkr    schedule 26.08.2019


Ответы (2)


Служба сотрудников, в которую я добавил один метод getEmployees:

trait EmployeeService extends Service {

  def addEmployee(): ServiceCall[Employee, Done]

  def getEmployees(): ServiceCall[NotUsed, Vector[Employee]]

  def employeeTopic: Topic[EmployeeAddedEvent]

  override final def descriptor: Descriptor = {
    import Service._

    named("employee")
      .withCalls(
        restCall(Method.POST, "/api/employees", addEmployee _),
        restCall(Method.GET, "/api/employees", getEmployees _)
      )
      .withTopics(
        topic(EmployeeService.TOPIC_NAME, employeeTopic _)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[EmployeeAddedEvent](_.id)
          ))
      .withAutoAcl(true)
  }
}

В конфигурации приложения добавлена ​​одна строка, чтобы настройки cassandra выглядели так:

cassandra-journal.keyspace = ${employees.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${employees.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${employees.cassandra.keyspace}

Приложение EmployeeApplication выглядит следующим образом:

abstract class EmployeeApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with LagomKafkaComponents
    with CassandraPersistenceComponents
    with HikariCPComponents
    with AhcWSComponents {

EmployeeServiceImpl добавил следующий метод:

  override def getEmployees(): ServiceCall[NotUsed, Vector[Employee]] = ServiceCall { _ =>
    employeeRepository.getEmployees()
  }

EmployeeRepository переписываю так:

package com.codingkapoor.employee.persistence.read

import java.time.LocalDate

import akka.Done
import com.codingkapoor.employee.api.Employee
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraSession

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class EmployeeRepository(session: CassandraSession) {

  def createTable: Future[Done] = {
    for {
      r <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS employees(id text, name text, gender text, PRIMARY KEY (id))")
    } yield r
  }

  def getEmployees(): Future[Vector[Employee]] = {
    session.selectAll("SELECT * FROM employees").map(rows =>
      rows.map(r => Employee(
        id = r.getString("id"),
        name = r.getString("name"),
        gender = r.getString("gender"),
        doj = LocalDate.now(),
        pfn = "pfn")).toVector)
  }
}

EventProcessor выглядит так:

package com.codingkapoor.employee.persistence.read

import akka.Done
import com.codingkapoor.employee.persistence.write.{EmployeeAdded, EmployeeEvent}
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}

class EmployeeEventProcessor(readSide: CassandraReadSide, employeeRepository: EmployeeRepository, session: CassandraSession)
  extends ReadSideProcessor[EmployeeEvent] {

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[EmployeeEvent] =
    readSide
      .builder[EmployeeEvent]("employeeoffset")
      .setGlobalPrepare(() => employeeRepository.createTable)
      .setPrepare(_ => prepare())
      .setEventHandler[EmployeeAdded](processEmployeeAdded)
      .build()

  private val createPromise = Promise[PreparedStatement]

  private def createFuture: Future[PreparedStatement] = createPromise.future

  override def aggregateTags: Set[AggregateEventTag[EmployeeEvent]] = Set(EmployeeEvent.Tag)


  private def prepare(query: String, promise: Promise[PreparedStatement]): Future[Done] = {
    val f = session.prepare(query)
    promise.completeWith(f)
    f.map(_ => Done)
  }

  def prepare(): Future[Done] = {
    for {
      r <- prepare("INSERT INTO employees (id, name, gender) VALUES (?, ?, ?)", createPromise)
    } yield r
  }

  private def processEmployeeAdded(eventElement: EventStreamElement[EmployeeAdded]): Future[List[BoundStatement]] = {
    createFuture.map { ps =>
      val bindCreate = ps.bind()
      bindCreate.setString("id", eventElement.event.id)
      bindCreate.setString("name", eventElement.event.name)
      bindCreate.setString("gender", eventElement.event.gender)

      List(bindCreate)
    }
  }

}

Я добавил метод getEmployees, чтобы проверить, работает ли сторона чтения. Также, после отправки создания сотрудника нужно подождать 10-20 секунд, прежде чем сотрудник появится в базе, после чего его можно будет получить из readside.

person Vladislav Kievski    schedule 26.08.2019
comment
Ваше решение выглядит хорошо, но я все еще не мог понять проблему с моим. Я хочу попробовать этот проект с readside, реализованным в Jdbc с использованием Slick. Тем не менее, я хотел бы обратить ваше внимание на два важных изменения в отладке этой проблемы. №1. Я попытался сделать этот проект еще более компактным, избавившись от readside, и, похоже, это сработало! Вы можете увидеть изменения в новой ветке с именем noreadside. Что в основном включает в себя изменения в EmployeeApplication.scala, помимо избавления от readside. Это ясно указывает на то, что проблема где-то в классах, которые я расширяю в этом классе. - person iamsmkr; 26.08.2019
comment
Прод.. #2. Я заметил два предупреждающих сообщения в журналах runAll, которые, как я подозреваю, могут привести нас к проблеме, которую мы пытаемся отладить. Я включил их в пост. Посмотри пожалуйста. - person iamsmkr; 26.08.2019

После некоторой борьбы я смог решить проблему. Итак, в основном было две проблемы:

  1. Первая проблема возникла из-за порядка, в котором черты ReadSideJdbcPersistenceComponents и WriteSideCassandraPersistenceComponents расширяются для создания EmployeeApplication. Из-за ошибки в Lagom порядок, в котором вы смешиваете эти две черты, имеет значение, и только если вы смешиваете ReadSideJdbcPersistenceComponents перед WriteSideCassandraPersistenceComponents, вы сможете использовать эту комбинацию.

    См. этот README от лагом-проб.

  2. Кроме того, я неправильно реализовывал полиморфный поток событий, как описано в документации lagom здесь.

Теперь я придумал рабочий проект github, на который вы можете сослаться: lagom-scala-slick.

person iamsmkr    schedule 30.08.2019