Why not on-time when I consumed kafka message usin

2019-08-29 11:52发布

问题:

When I produce 20 messages, only consume 13 messages, the rest 7 not consumed real-time and timely. When some time later, I produce another 20 messages, the rest 7 messages of last time just been consumed.

Complete code in location: https://github.com/shaozhipeng/flink-quickstart/blob/master/src/main/java/me/icocoro/quickstart/streaming/sql/KafkaStreamToJDBCTable.java

Update different AssignerWithPeriodicWatermarks was not effective.

private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String GROUP_ID = KafkaStreamToJDBCTable.class.getSimpleName();
private static final String topic = "testPOJO";

private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<POJO> {
private static final long serialVersionUID = -742759155861320823L;

private long currentTimestamp = Long.MIN_VALUE;

@Override
public long extractTimestamp(POJO element, long previousElementTimestamp) {
    this.currentTimestamp = element.getLogTime();
    return element.getLogTime();
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
}

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);
    env.enableCheckpointing(5000);
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", GROUP_ID);
kafkaProps.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer011<POJO> consumer = new FlinkKafkaConsumer011<>(topic, new ObjectSchema<>(POJO.class), kafkaProps);

DataStream<POJO> pojoDataStream = env
        .addSource(consumer)
        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor());

tableEnv.registerDataStream("t_pojo", pojoDataStream, "aid, astyle, energy, age, rowtime.rowtime");

String query =
        "SELECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start, TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy, CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_pojo GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), astyle";

Table table = tableEnv.sqlQuery(query);

TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        Types.STRING,
        Types.SQL_TIMESTAMP,
        Types.SQL_TIMESTAMP,
        Types.BIG_DEC,
        Types.INT,
        Types.INT
};

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl("jdbc:mysql://127.0.0.1:3306/flink_demo?characterEncoding=utf8&useSSL=false")
        .setUsername("root")
        .setPassword("123456")
        .setQuery("INSERT INTO t_pojo (astyle,time_start,time_end,sum_energy,cnt,avg_age,day_date,topic,group_id) VALUES (?,?,?,?,?,?,CURRENT_DATE(),'" + topic + "','" + GROUP_ID + "')")
        .setParameterTypes(FIELD_TYPES)
        .build();

DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class, tableEnv.queryConfig());
sink.emitDataStream(dataStream);

    env.execute();
}