Why is meta data added to the output of this Kafka

2019-09-14 01:17发布

I have a Kafka connector with the following code for the poll() method in the SourceTask implementation.

@Override
public List<SourceRecord> poll() throws InterruptedException 
{
    SomeType item = mQueue.take();
    List<SourceRecord> records = new ArrayList<>();
    SourceRecord[] sourceRecords = new SourceRecord[]{
        new SourceRecord(null, null, "data", null,
                         Schema.STRING_SCHEMA, "foo",
                         Schema.STRING_SCHEMA, "bar")
    };
    Collections.addAll(records, sourceRecords);

    return records;
}

If I attach a consumer to the data topic, I get the following message sent through from the connector:

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

If I publish a message straight to the topic using the following commands:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

Then attach the same consumer, I get this message:

foo bar

This is what I expected to see as the output from the connector implementation, rather than the {"schema":... message I received.

How do I change the implementation of poll() so that the message is sent without the schema meta data appearing in the actual key and value of the message?

1条回答
forever°为你锁心
2楼-- · 2019-09-14 01:44

Ok, turns out it was just because I had the following lines in connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

I should have had

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

As an alternative solution, I was also able to change the following setting from true to false

value.converter.schemas.enable=false

Then in my processor class I changed the code to:

SourceRecord[] sourceRecords = new SourceRecord[]{
    new SourceRecord(null, null, "data", null,
                     Schema.STRING_SCHEMA, "foo",
                     null, "bar")
};

This differs because I'm no longer specifying a schema for the value.

查看更多
登录 后发表回答