Потоковая передача Spark + Accumulo — сериализация BatchWriterImpl

Я ищу коннекторы Spark Streaming + Accumulo и пример полного использования.

В настоящее время я пытаюсь записать результаты потоковой передачи Spark в таблицу Accumulo, но получаю NotSerializableException для BatchWriter. Может ли кто-нибудь указать мне примеры сериализации BatchWriter? Код ниже основан на документации Accumulo.

Текущий код:

val accumuloInstanceName = "accumulo"
val zooKeepers = "localhost:2181"
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers)
val accumuloUser = programOptions.accumuloUser()
val accumuloPassword = programOptions.accumuloPassword()
val passwordToken = new PasswordToken(accumuloPassword)
val connector = instance.getConnector(accumuloUser, passwordToken)

val accumuloBatchWriterConfig = new BatchWriterConfig
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory)
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
fullMergeResultFlatten.foreachRDD(recordRDD =>
  recordRDD.foreach(record => {
    val mutation = new Mutation(Longs.toByteArray(record.timestamp))
    mutation.put("value", "", new Value(Longs.toByteArray(record.value)))
    mutation.put("length", "", new Value(Longs.toByteArray(record.length)))
    accumuloBatchWriter.addMutation(mutation)
  })
)

Во время выполнения возникают ошибки:

17/05/05 16:55:25 ERROR util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

Я полагаю, что это очень распространенный случай, но я не смог найти простой пример искрового потока + аккумулирования.


person Marcin    schedule 08.05.2017    source источник


Ответы (2)


Как указал elserj, сериализация объекта подключения обычно не является правильным шаблоном. Шаблон, который я видел, заключается в том, чтобы инициировать соединение с рабочими узлами Spark напрямую с помощью RDD.foreachPartition(). Это хорошо, потому что позволяет создавать соединение для каждого пакета работы (в отличие от создания нового соединения для каждой отдельной записи, что почти никогда не бывает эффективным).

Пример:

fullMergeResultFlatten.foreachRDD(recordRDD => {
  recordRDD.foreachPartition(partitionRecords => {
    // this connection logic is executed in the Spark workers
    val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
    partitionRecords.foreach( // save operation )
    accumuloBatchWriter.close()
  })
})
person jeff    schedule 18.06.2017

Вы не можете сериализовать класс BatchWriter. У меня нет предложений, как исправить ваш код, но я могу сказать, что попытка сериализовать этот класс не является правильным путем.

person elserj    schedule 08.05.2017