I have a Kafka connector with the following code for the poll()
method in the SourceTask implementation.
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
If I attach a consumer to the data topic, I get the following message sent through from the connector:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
If I publish a message straight to the topic using the following commands:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
Then attach the same consumer, I get this message:
foo bar
This is what I expected to see as the output from the connector implementation, rather than the {"schema":...
message I received.
How do I change the implementation of poll()
so that the message is sent without the schema meta data appearing in the actual key and value of the message?