Read Existing Avro File and Send to Kafka

2019-06-14 15:51发布

问题:

I have an existing Avro File with the schema. I need to send the file to Producer.

Following is the code i have written.

public class ProducerDataSample {

    public static void main(String[] args) {

        String topic = "my-topic";

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("encounter.avsc").get());

            File file = new File("/home/hello.avro");
        try{
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.create(schema, outputStream);
        dataFileWriter.appendTo(file);
        dataFileWriter.close();
        System.out.println("Here comes the data: " + outputStream);



        // Start KAFKA publishing

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("m-topic","1",outputStream.toByteArray());
        messageProducer.send(producerRecord);
        messageProducer.close();
        }catch(Exception e){
            System.out.println("Error in sending to kafka");
            e.printStackTrace();
        }





    }
}

As soon as I execute this, I get the error:

Error in sending to kafka org.apache.avro.AvroRuntimeException: already open
at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) 
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:203)
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:193)
at ProducerDataSample.main(ProducerDataSample.java:51)

Any help. Thanks.

回答1:

You will have to read the data from the avro file and serialize it to bytes array

Something like below snippet

        final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));            
        File file ="sample.avro"

        //read the avro file to GenericRecord
        final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
        final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);

        //serialize GenericRecords
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);

        Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);

        while (genericRecords.hasNext()) {
            writer.write(genericRecords.next(), binaryEncoder);
        }
        binaryEncoder.flush();
        out.close();
       //send out.toByteArray() to kakfa


回答2:

I think the other answer should look like this to send individual records as Kafka events.

Note: It should be possible to get the schema directly from the Avro file rather than have a separate AVSC file. Following code in the Avro project

final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));            
File file = new File("sample.avro");

//read the avro file to GenericRecord
final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);

DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);

while (genericRecords.hasNext()) {
    //serialize GenericRecords
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(genericRecords.next(), binaryEncoder);
    binaryEncoder.flush();
    out.close();

    // TODO: send out.toByteArray() to kafka
}

// TODO: kafkaProducer.flush();