Apache Beam: Error assigning event time using With

2019-07-24 16:50发布


I have an unbounded Kafka stream sending data with the following fields

{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}

I read the stream using the apache beam sdk for kafka

import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
                    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) 
                    .updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))

Since I want to window using event time ("ts" in my example), i parse the incoming string and assign "ts" field of the incoming datastream as the timestamp.

PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
                    .apply(ParDo.of(new ReadFromTopic()))
                    .apply("ParseTemperature", ParDo.of(new ParseTemperature()));

tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));  

The window function and the computation is applied as below:

PCollection<Output> output = tempCollection.apply(Window
                .apply(new ComputeMax());

I stream data into the input stream with a lag of 5 seconds from current utc time since in practical scenrios event timestamp is usually earlier than the processing timestamp.

I get the following error:

Cannot output with timestamp 2019-01-16T11:15:45.560Z. Output timestamps must be no earlier than the timestamp of the current input (2019-01-16T11:16:50.640Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

If I comment out the line for AssignTimeStamps, there are no errors but I guess, then it is considering the processing time.

How do I ensure my computation and windows are based on event time and not for processing time?

Please provide some inputs on how to handle this scenario.


Have you had a chance to try this using the time stamp policy, sorry I have not tried this one out myself, but I believe with 2.9.0 you should look at using the policy along with the KafkaIO read.



To be able to use custom timestamp, first You need to implement CustomTimestampPolicy, by extending TimestampPolicy<KeyT,ValueT>

For example:

public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {

protected Instant currentWatermark;

public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
    currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);

public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
    currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return currentWatermark;

public Instant getWatermark(PartitionContext ctx) {
    return currentWatermark;


Then you need to pass your custom TimestampPolicy, when you setting up your KafkaIO source using functional interface TimestampPolicyFactory

KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
                .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
                .withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))

This line is responsible for creating a new timestampPolicy, passing a related partition and previous checkpointed watermark see the documentation

withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))