I have a Kafka connector with the following code for the poll()
method in the SourceTask implementation.
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
If I attach a consumer to the data topic, I get the following message sent through from the connector:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
If I publish a message straight to the topic using the following commands:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
Then attach the same consumer, I get this message:
foo bar
This is what I expected to see as the output from the connector implementation, rather than the {"schema":...
message I received.
How do I change the implementation of poll()
so that the message is sent without the schema meta data appearing in the actual key and value of the message?
Ok, turns out it was just because I had the following lines in
connect-standalone.properties
I should have had
As an alternative solution, I was also able to change the following setting from true to false
Then in my processor class I changed the code to:
This differs because I'm no longer specifying a schema for the value.