I have an unbounded Kafka stream sending data with the following fields
{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}
I read the stream using the apache beam sdk for kafka
import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("kafka:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
.commitOffsetsInFinalize()
.withoutMetadata()))
Since I want to window using event time ("ts" in my example), i parse the incoming string and assign "ts" field of the incoming datastream as the timestamp.
PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
.apply(ParDo.of(new ReadFromTopic()))
.apply("ParseTemperature", ParDo.of(new ParseTemperature()));
tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));
The window function and the computation is applied as below:
PCollection<Output> output = tempCollection.apply(Window
.<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardDays(1))
.accumulatingFiredPanes())
.apply(new ComputeMax());
I stream data into the input stream with a lag of 5 seconds from current utc time since in practical scenrios event timestamp is usually earlier than the processing timestamp.
I get the following error:
Cannot output with timestamp 2019-01-16T11:15:45.560Z. Output timestamps must be no earlier than the timestamp of the current input (2019-01-16T11:16:50.640Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
If I comment out the line for AssignTimeStamps, there are no errors but I guess, then it is considering the processing time.
How do I ensure my computation and windows are based on event time and not for processing time?
Please provide some inputs on how to handle this scenario.