Structured Streaming exception when using append o

2019-04-11 12:56发布

Despite the fact that I'm using withWatermark(), I'm getting the following error message when I run my spark job:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

From what I can see in the programming guide, this exactly matches the intended usage (and the example code). Does anyone know what might be wrong?

Thanks in advance!

Relevant Code (Java 8, Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();

1条回答
放荡不羁爱自由
2楼-- · 2019-04-11 13:22

The problem is in parsed.col. Replacing it with col will fix the issue. I would suggest always using col function instead of Dataset.col.

Dataset.col returns resolved column while col returns unresolved column.

parsed.withWatermark("timestamp", "10 minutes") will create a new Dataset with new columns with the same names. The watermark information is attached the timestamp column in the new Dataset, not parsed.col("timestamp"), so the columns in groupBy don't have watermark.

When you use unresolved columns, Spark will figure out the correct columns for you.

查看更多
登录 后发表回答