I am using Kafka-Connect to implement a Kafka-Elasticsearch connector.
The producer sent a complex JSON on to a Kafka Topic and my connector code will use this to persist to Elastic search. The connector get the data in form of Struct(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html).
I am able to get the field values of struct at top level Json but not able to fetch from nested jsons.
{
"after": {
"test.test.employee.Value": {
"id": 5671111,
"name": {
"string": "abc"
}
}
},
"op": "u",
"ts_ms": {
"long": 1474892835943
}
}
I am able to parse "op", but not "test.test.employee.Value".
Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u".
Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name
Struct.getStruct
does not natively support nesting using dot notation.
It seems your schema might come from Debezium, in that case, they have their own "unwrap" message transformer.
One option, if you are in control of this extractor code, you might find the code I wrote for the Confluent Kafka Connect Storage project useful. It takes a Struct or a Map object (see below)
Otherwise, you might want to try adding KCQL plugin by Landoop into your Connect classpath.
public static Object getNestedFieldValue(Object structOrMap, String fieldName) {
// validate(structOrMap, fieldName); // can ignore this
try {
Object innermost = structOrMap;
// Iterate down to final struct
for (String name : fieldName.split("\\.")) {
innermost = getField(innermost, name);
}
return innermost;
} catch (DataException e) {
throw new DataException(
String.format("The field '%s' does not exist in %s.", fieldName, structOrMap),
e
);
}
}
public static Object getField(Object structOrMap, String fieldName) {
// validate(structOrMap, fieldName);
Object field;
if (structOrMap instanceof Struct) {
field = ((Struct) structOrMap).get(fieldName);
} else if (structOrMap instanceof Map) {
field = ((Map<?, ?>) structOrMap).get(fieldName);
if (field == null) {
throw new DataException(String.format("Unable to find nested field '%s'", fieldName));
}
return field;
} else {
throw new DataException(String.format(
"Argument not a Struct or Map. Cannot get field '%s' from %s.",
fieldName,
structOrMap
));
}
if (field == null) {
throw new DataException(
String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
}
return field;
}