I have a problem I don't really can figure out. So I have a kafka stream that contains some data like this:
{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}
And I want to replace 'adId' with another value 'bookingId'. This value is located in a csv file, but I can't really figure out how to get it working.
Here is my mapping csv file:
So my output would ideally be something like
{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}
This file can get refreshed every hour at least once, so it should pick up changes to it.
I currently have this code which doesn't work for me:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping"));
DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer());
//Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers"));
properties.setProperty("group.id", parameters.get("group.id"));
FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties);
DataStream<ObjectNode> logs = env.addSource(consumer);
DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser());
// output -> bookingId, action, impressions, sum
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3);
public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed,
DataStream<Tuple2<Integer, Integer>> input,long windowSize) {
return parsed.join(input)
.where(new ParsedKey())
.equalTo(new InputKey())
.window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)))
.apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() {
private static final long serialVersionUID = 4874139139788915879L;
public Tuple4<Integer, String, Integer, Float> join(
Tuple4<Integer, String, Integer, Float> first,
Tuple2<Integer, Integer> second) {
return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3);
The code only runs once and then stops, so it doesn't convert new entries in kafka using the csv file. Any ideas on how I could process the stream from Kafka with the latest values from my csv file?
Kind regards,
Your goal appears to be to join steaming data with a slow-changing catalog (i.e. a side-input). I don't think the
operation is useful here because it doesn't store the catalog entries across windows. Also, the text file is a bounded input whose lines are read once.Consider using
to create a connected stream, and store the catalog data as managed state to perform lookups into. The operator's parallelism would need to be 1.You may find a better solution by researching 'side inputs', looking at the solutions that people use today. See FLIP-17 and Dean Wampler's talk at Flink Forward.