Как кодировать / декодировать сообщения Kafka с помощью двоичного кодировщика Avro?

Я пытаюсь использовать Avro для чтения / записи сообщений в Kafka. Есть ли у кого-нибудь пример использования двоичного кодировщика Avro для кодирования / декодирования данных, которые будут помещены в очередь сообщений?

Мне нужна часть Avro больше, чем часть Kafka. Или, может быть, мне стоит поискать другое решение? По сути, я пытаюсь найти более эффективное решение для JSON в отношении пространства. Только что упоминался Avro, поскольку он может быть более компактным, чем JSON.


person blockcipher    schedule 28.11.2011    source источник


Ответы (5)


Это простой пример. С несколькими разделами / темами не пробовал.

// Пример кода производителя

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class ProducerTest {

    void producer(Schema schema) throws IOException {

        Properties props = new Properties();
        props.put("metadata.broker.list", "0:9092");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);
        GenericRecord payload1 = new GenericData.Record(schema);
        //Step2 : Put data in that genericrecord object
        payload1.put("desc", "'testdata'");
        //payload1.put("name", "अasa");
        payload1.put("name", "dbevent1");
        payload1.put("id", 111);
        System.out.println("Original Message : "+ payload1);
        //Step3 : Serialize the object to a bytearray
        DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload1, encoder);
        encoder.flush();
        out.close();

        byte[] serializedBytes = out.toByteArray();
        System.out.println("Sending message in bytes : " + serializedBytes);
        //String serializedHex = Hex.encodeHexString(serializedBytes);
        //System.out.println("Serialized Hex String : " + serializedHex);
        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes);
        producer.send(message);
        producer.close();

    }


    public static void main(String[] args) throws IOException, DecoderException {
        ProducerTest test = new ProducerTest();
        Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
        test.producer(schema);
    }
}

// Пример кода потребителя

Часть 1: Код группы потребителей: поскольку у вас может быть несколько потребителей для нескольких разделов / тем.

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by  on 9/1/15.
 */
public class ConsumerGroupExample {
   private final ConsumerConnector consumer;
   private final String topic;
   private ExecutorService executor;

   public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){
      consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
              createConsumerConfig(a_zookeeper, a_groupId));
      this.topic = a_topic;
   }

   private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){
       Properties props = new Properties();
       props.put("zookeeper.connect", a_zookeeper);
       props.put("group.id", a_groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");

       return new ConsumerConfig(props);
   }

    public void shutdown(){
         if (consumer!=null) consumer.shutdown();
        if (executor!=null) executor.shutdown();
        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
        try{
          if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){

          }
        }catch(InterruptedException e){
            System.out.println("Interrupted");
        }

    }


    public void run(int a_numThreads){
        //Make a map of topic as key and no. of threads for that topic
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        //Create message streams for each topic
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        //initialize thread pool
        executor = Executors.newFixedThreadPool(a_numThreads);
        //start consuming from thread
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }


}

Часть 2: Индивидуальный потребитель, который фактически потребляет сообщения.

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.codec.binary.Hex;

import java.io.File;
import java.io.IOException;

public class ConsumerTest implements Runnable{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run(){
        ConsumerIterator<byte[], byte[]>it = m_stream.iterator();
        while(it.hasNext())
        {
            try {
                //System.out.println("Encoded Message received : " + message_received);
                //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray());
                //System.out.println("Deserializied Byte array : " + input);
                byte[] received_message = it.next().message();
                System.out.println(received_message);
                Schema schema = null;
                schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
                DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
                GenericRecord payload2 = null;
                payload2 = reader.read(null, decoder);
                System.out.println("Message received : " + payload2);
            }catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
            }
        }

    }


}

Схема тестирования AVRO:

{
    "namespace": "xyz.test",
     "type": "record",
     "name": "payload",
     "fields":[
         {
            "name": "name", "type": "string"
         },
         {
            "name": "id",  "type": ["int", "null"]
         },
         {
            "name": "desc", "type": ["string", "null"]
         }
     ]
}

Обратите внимание на следующие важные моменты:

  1. Вам понадобятся стандартные jar-файлы kafka и avro для запуска этого кода из коробки.

  2. Очень важен props.put ("serializer.class", "kafka.serializer.DefaultEncoder"); Dont use stringEncoder as that wont работает, если вы отправляете байтовый массив как сообщение.

  3. Вы можете преобразовать byte [] в шестнадцатеричную строку и отправить ее, а на потребителе повторно преобразовать шестнадцатеричную строку в byte [], а затем в исходное сообщение.

  4. Запустите zookeeper и брокер, как указано здесь: - http://kafka.apache.org/documentation.html#quickstart и создайте тему под названием "page_views" или как хотите.

  5. Запустите ProducerTest.java, а затем ConsumerGroupExample.java и посмотрите, какие данные avro создаются и используются.

person ramu    schedule 01.09.2015
comment
Спасибо за помощь!! Я пробовал это, но в коде потребителя моя функция it.hasNext () возвращает false, поэтому элемент управления никогда не входит в цикл while. Есть идеи, что я делаю не так? - person Vikas Saxena; 20.07.2016

Я наконец не забыл спросить список рассылки Kafka и получил следующий ответ, который отлично сработал.

Да, вы можете отправлять сообщения в виде байтовых массивов. Если вы посмотрите на конструктор класса Message, вы увидите -

def this (байты: массив [байт])

Теперь, глядя на API send () производителя -

def send (ProducerData: ProducerData [K, V] *)

Вы можете установить V для типа Message, а K - на то, что вы хотите, чтобы ваш ключ был. Если вас не волнует разделение с помощью ключа, тогда также установите для него Тип сообщения.

Спасибо, Неха

person blockcipher    schedule 01.12.2011

Если вы хотите получить массив байтов из сообщения Avro (на часть kafka уже дан ответ), используйте двоичный кодировщик:

    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try {
        Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
        writer.write(record, e); 
        e.flush(); 
        byte[] byteData = os.toByteArray(); 
    } finally {
        os.close(); 
    }
person Will Sargent    schedule 22.07.2014
comment
Можете ли вы отправить этот byteData в KafkaBroker и прочитать его у потребителя консоли? Каким должен быть сериализатор ключей производителя? - person user2441441; 06.06.2016
comment
Как упоминалось в ответе, часть kafka задокументирована в других ответах - stackoverflow.com/a/8348264/5266 и stackoverflow.com/a/32341917/5266 - person Will Sargent; 06.06.2016

Обновленный ответ.

У Kafka есть сериализатор / десериализатор Avro с координатами Maven (в формате SBT):

  "io.confluent" % "kafka-avro-serializer" % "3.0.0"

Вы передаете экземпляр KafkaAvroSerializer в конструктор KafkaProducer.

Затем вы можете создать экземпляры Avro GenericRecord и использовать их в качестве значений внутри экземпляров Kafka ProducerRecord, которые вы можете отправить с помощью KafkaProducer.

На стороне потребителя Kafka вы используете KafkaAvroDeserializer и KafkaConsumer.

person clay    schedule 09.06.2016
comment
Не могли бы вы привести краткий, но полный пример? - person Cedric H.; 13.08.2016
comment
Это работает только с добавленным собственным репозиторием Maven Confluent, поскольку они не публикуют артефакты в maven central: packages.confluent. io / maven - person Robert Metzger; 25.10.2016

Вместо Avro вы также можете просто подумать о сжатии данных; либо с помощью gzip (хорошее сжатие, более высокий процессор), либо LZF или Snappy (намного быстрее, немного медленнее сжатие).

Или, в качестве альтернативы, также существует двоичный файл JSON Smile, поддерживаемый в Java Джексоном (с this extension): это компактный двоичный формат, который намного проще в использовании, чем Avro:

ObjectMapper mapper = new ObjectMapper(new SmileFactory());
byte[] serialized = mapper.writeValueAsBytes(pojo);
// or back
SomeType pojo = mapper.readValue(serialized, SomeType.class);

в основном тот же код, что и с JSON, за исключением передачи фабрики другого формата. С точки зрения размера данных, будет ли Smile или Avro более компактным, зависит от деталей варианта использования; но оба они компактнее, чем JSON.

Преимущество в том, что это работает быстро как с JSON, так и с Smile, с одним и тем же кодом, используя только POJO. По сравнению с Avro, который требует либо генерации кода, либо большого количества ручного кода для упаковки и распаковки GenericRecords.

person StaxMan    schedule 25.04.2012