Служба сотрудников, в которую я добавил один метод getEmployees:
trait EmployeeService extends Service {
def addEmployee(): ServiceCall[Employee, Done]
def getEmployees(): ServiceCall[NotUsed, Vector[Employee]]
def employeeTopic: Topic[EmployeeAddedEvent]
override final def descriptor: Descriptor = {
import Service._
named("employee")
.withCalls(
restCall(Method.POST, "/api/employees", addEmployee _),
restCall(Method.GET, "/api/employees", getEmployees _)
)
.withTopics(
topic(EmployeeService.TOPIC_NAME, employeeTopic _)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[EmployeeAddedEvent](_.id)
))
.withAutoAcl(true)
}
}
В конфигурации приложения добавлена одна строка, чтобы настройки cassandra выглядели так:
cassandra-journal.keyspace = ${employees.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${employees.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${employees.cassandra.keyspace}
Приложение EmployeeApplication выглядит следующим образом:
abstract class EmployeeApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with LagomKafkaComponents
with CassandraPersistenceComponents
with HikariCPComponents
with AhcWSComponents {
EmployeeServiceImpl добавил следующий метод:
override def getEmployees(): ServiceCall[NotUsed, Vector[Employee]] = ServiceCall { _ =>
employeeRepository.getEmployees()
}
EmployeeRepository переписываю так:
package com.codingkapoor.employee.persistence.read
import java.time.LocalDate
import akka.Done
import com.codingkapoor.employee.api.Employee
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraSession
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class EmployeeRepository(session: CassandraSession) {
def createTable: Future[Done] = {
for {
r <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS employees(id text, name text, gender text, PRIMARY KEY (id))")
} yield r
}
def getEmployees(): Future[Vector[Employee]] = {
session.selectAll("SELECT * FROM employees").map(rows =>
rows.map(r => Employee(
id = r.getString("id"),
name = r.getString("name"),
gender = r.getString("gender"),
doj = LocalDate.now(),
pfn = "pfn")).toVector)
}
}
EventProcessor выглядит так:
package com.codingkapoor.employee.persistence.read
import akka.Done
import com.codingkapoor.employee.persistence.write.{EmployeeAdded, EmployeeEvent}
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
class EmployeeEventProcessor(readSide: CassandraReadSide, employeeRepository: EmployeeRepository, session: CassandraSession)
extends ReadSideProcessor[EmployeeEvent] {
override def buildHandler(): ReadSideProcessor.ReadSideHandler[EmployeeEvent] =
readSide
.builder[EmployeeEvent]("employeeoffset")
.setGlobalPrepare(() => employeeRepository.createTable)
.setPrepare(_ => prepare())
.setEventHandler[EmployeeAdded](processEmployeeAdded)
.build()
private val createPromise = Promise[PreparedStatement]
private def createFuture: Future[PreparedStatement] = createPromise.future
override def aggregateTags: Set[AggregateEventTag[EmployeeEvent]] = Set(EmployeeEvent.Tag)
private def prepare(query: String, promise: Promise[PreparedStatement]): Future[Done] = {
val f = session.prepare(query)
promise.completeWith(f)
f.map(_ => Done)
}
def prepare(): Future[Done] = {
for {
r <- prepare("INSERT INTO employees (id, name, gender) VALUES (?, ?, ?)", createPromise)
} yield r
}
private def processEmployeeAdded(eventElement: EventStreamElement[EmployeeAdded]): Future[List[BoundStatement]] = {
createFuture.map { ps =>
val bindCreate = ps.bind()
bindCreate.setString("id", eventElement.event.id)
bindCreate.setString("name", eventElement.event.name)
bindCreate.setString("gender", eventElement.event.gender)
List(bindCreate)
}
}
}
Я добавил метод getEmployees, чтобы проверить, работает ли сторона чтения. Также, после отправки создания сотрудника нужно подождать 10-20 секунд, прежде чем сотрудник появится в базе, после чего его можно будет получить из readside.
person
Vladislav Kievski
schedule
26.08.2019