When I produce 20 messages, only consume 13 messages, the rest 7 not consumed real-time and timely. When some time later, I produce another 20 messages, the rest 7 messages of last time just been consumed.
Complete code in location: https://github.com/shaozhipeng/flink-quickstart/blob/master/src/main/java/me/icocoro/quickstart/streaming/sql/KafkaStreamToJDBCTable.java
Update different AssignerWithPeriodicWatermarks
was not effective.
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String GROUP_ID = KafkaStreamToJDBCTable.class.getSimpleName();
private static final String topic = "testPOJO";
private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<POJO> {
private static final long serialVersionUID = -742759155861320823L;
private long currentTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(POJO element, long previousElementTimestamp) {
this.currentTimestamp = element.getLogTime();
return element.getLogTime();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(5000);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", GROUP_ID);
kafkaProps.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer011<POJO> consumer = new FlinkKafkaConsumer011<>(topic, new ObjectSchema<>(POJO.class), kafkaProps);
DataStream<POJO> pojoDataStream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor());
tableEnv.registerDataStream("t_pojo", pojoDataStream, "aid, astyle, energy, age, rowtime.rowtime");
String query =
"SELECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start, TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy, CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_pojo GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), astyle";
Table table = tableEnv.sqlQuery(query);
TypeInformation[] FIELD_TYPES = new TypeInformation[]{
Types.STRING,
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP,
Types.BIG_DEC,
Types.INT,
Types.INT
};
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://127.0.0.1:3306/flink_demo?characterEncoding=utf8&useSSL=false")
.setUsername("root")
.setPassword("123456")
.setQuery("INSERT INTO t_pojo (astyle,time_start,time_end,sum_energy,cnt,avg_age,day_date,topic,group_id) VALUES (?,?,?,?,?,?,CURRENT_DATE(),'" + topic + "','" + GROUP_ID + "')")
.setParameterTypes(FIELD_TYPES)
.build();
DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class, tableEnv.queryConfig());
sink.emitDataStream(dataStream);
env.execute();
}