我已经写了使用弗林克1.7.2一个简单的字数应用与卡夫卡2.2作为消费者和生产者。 我使用恰好一次语义的卡夫卡生产者, KeyedProcessFunction
有状态的处理, MapState
保持我的状态,并RocksDB
增量检查点为我的国家后端。
当我的IntelliJ运行,但是当我把它提交给我的本地弗林克簇我收到应用程序的工作相当精细AsynchronousException
异常,弗林克应用不断重试每隔0-20秒后。 有没有人遇到过这个问题? 我缺少从配置方面讲什么吗?
这里是我的代码:
class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
private var state: MapState[String, Int] = _
override def open(parameters: Configuration): Unit =
{
state = getRuntimeContext
.getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
createTypeInformation[Int]))
}
override def processElement(value: String,
ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit =
{
val currentSum =
if (state.contains(value)) state.get(value)
else 0
val newSum = currentSum + 1
state.put(value, newSum)
out.collect((value, newSum))
}
}
object KafkaProcFuncWordCount
{
val bootstrapServers = "localhost:9092"
val inTopic = "test"
val outTopic = "test-out"
def main(args: Array[String]): Unit =
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000)
env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val consumerProps = new Properties
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val kafkaProducer = new FlinkKafkaProducer011[String](
outTopic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
producerProps,
Optional.of(new FlinkFixedPartitioner[String]),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
5
)
val text = env.addSource(kafkaConsumer)
val runningCounts = text
.keyBy(_.toString)
.process(new KeyedProcFuncWordCount)
.map(_.toString())
runningCounts
.addSink(kafkaProducer)
env.execute("KafkaProcFuncWordCount")
}
}
下面是部分从不断重复的弗林克的TaskExecutor日志:
2019-07-05 14:05:47,548 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer - Flushing new partitions
2019-07-05 14:05:47,552 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:711)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
非常感谢你提前为您的帮助。