接收异步异常在弗林克1.7.2有状态处理与KeyedProcessFunction和RocksDB状

2019-10-29 12:47发布

我已经写了使用弗林克1.7.2一个简单的字数应用与卡夫卡2.2作为消费者和生产者。 我使用恰好一次语义的卡夫卡生产者, KeyedProcessFunction有状态的处理, MapState保持我的状态,并RocksDB增量检查点为我的国家后端。

当我的IntelliJ运行,但是当我把它提交给我的本地弗林克簇我收到应用程序的工作相当精细AsynchronousException异常,弗林克应用不断重试每隔0-20秒后。 有没有人遇到过这个问题? 我缺少从配置方面讲什么吗?

这里是我的代码:

class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
  private var state: MapState[String, Int] = _

  override def open(parameters: Configuration): Unit =
  {
    state = getRuntimeContext
      .getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
        createTypeInformation[Int]))
  }

  override def processElement(value: String,
                              ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
                              out: Collector[(String, Int)]): Unit =
  {
    val currentSum =
      if (state.contains(value)) state.get(value)
      else 0

    val newSum = currentSum + 1

    state.put(value, newSum)

    out.collect((value, newSum))
  }
}

object KafkaProcFuncWordCount
{
  val bootstrapServers = "localhost:9092"
  val inTopic = "test"
  val outTopic = "test-out"

  def main(args: Array[String]): Unit =
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(30000)
    env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))

    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val consumerProps = new Properties
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
    consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")

    val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)

    val producerProps = new Properties
    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
    producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
    producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
    producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

    val kafkaProducer = new FlinkKafkaProducer011[String](
      outTopic,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
      producerProps,
      Optional.of(new FlinkFixedPartitioner[String]),
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
      5
    )

    val text = env.addSource(kafkaConsumer)

    val runningCounts = text
      .keyBy(_.toString)
      .process(new KeyedProcFuncWordCount)
      .map(_.toString())

    runningCounts
      .addSink(kafkaProducer)

    env.execute("KafkaProcFuncWordCount")
  }
}

下面是部分从不断重复的弗林克的TaskExecutor日志:

2019-07-05 14:05:47,548 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer  - Flushing new partitions
2019-07-05 14:05:47,552 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
    ... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:711)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
    ... 7 more

非常感谢你提前为您的帮助。

Answer 1:

你能仔细检查你是不是在你的jar包弗林克核心依赖(弗林克的Java,弗林克码流的Java,弗林克运行时,...)? 同时仔细检查你的运行弗林克相同版本的集群中的作为卡夫卡连接器(弗林克 - 卡夫卡连接器)的依赖。 该弗林克-kakfa连接器(像所有的连接器必须是你的fatjar的一部分)。

希望这可以帮助。

干杯,

康斯坦丁



文章来源: Receiving Asynchronous Exception in Flink 1.7.2 Stateful-processing with KeyedProcessFunction and RocksDB state-backend