How to pivot streaming dataset?

2020-03-03 04:29发布

问题:

I am trying to pivot a Spark streaming dataset (structured streaming) but I get an AnalysisException (excerpt below).

Could someone confirm pivoting is indeed not supported in structured streams (Spark 2.0), perhaps suggest alternative approaches?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

回答1:

tl;dr pivot aggregation is not directly supported by Spark Structured Streaming up to and including 2.4.4.

As a workaround, use DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach.


I use the latest version of Spark 2.4.4 as of now.

scala> spark.version
res0: String = 2.4.4

UnsupportedOperationChecker (that you can find in the stack trace) checks whether (the logical plan of) a streaming query uses supported operations only.

When you execute pivot you had to groupBy first as that's the only interface to give you pivot available.

There are two issues with pivot:

  1. pivot wants to know how many columns to generate values for and hence does collect which is not possible with streaming Datasets.

  2. pivot is actually another aggregation (beside groupBy) that Spark Structured Streaming does not support

Let's look at the issue 1 with no columns to pivot on defined.

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp") // <-- pivot with no values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
  at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
  ... 49 elided

The last two lines show the issue, i.e. pivot does collect under the covers and hence the issue.

The other issue is that even though you'd specify the values for columns to pivot on you'd then get the other issue due to multiple aggregations (and you can see that it's actually a check for streaming not batch as has happened with the first case).

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
   +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
      +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
  ... 49 elided


回答2:

Here is a simple Java example based on Jacek's answer above:

JSON Array:

[{
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-02"
    },
    {
        "customer_id": "d6315a00",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-08-20"
    },
    {
        "customer_id": "d6315cd0",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-09-19"
    }, {
        "customer_id": "d6315e2e",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-01-01"
    }, {
        "customer_id": "d6315a00",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-03-10"
    },
    {
        "customer_id": "d631614e",
        "product": "Garage widget",
        "price": 4,
        "bought_date": "2019-02-15"
    }
]

Java Code:

package io.centilliard;

import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.from_json;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Function2;
import scala.runtime.BoxedUnit;

public class Pivot {

    public static void main(String[] args) throws StreamingQueryException, AnalysisException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
            });

        ArrayType  arrayType = new ArrayType(schema, false);

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        Column col = new Column("json");        
        Column data = from_json(col,arrayType).as("data");  
        Column explode = explode(data);
        Dataset<Row> customers = dataset.select(explode).select("col.*");

        DataStreamWriter<Row> dataStreamWriter = new DataStreamWriter<Row>(customers);

        StreamingQuery dataStream = dataStreamWriter.foreachBatch(new Function2<Dataset<Row>, Object, BoxedUnit>() {

            @Override
            public BoxedUnit apply(Dataset<Row> dataset, Object object) {               

                dataset
                .groupBy("customer_id","product","bought_date")
                .pivot("product")               
                .sum("price")               
                .orderBy("customer_id")
                .show();

                return null;
            }
        })
        .start();

        dataStream.awaitTermination();
    }

}

Output:

+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|customer_id|      product|bought_date|Bike widget|Food widget|Garage widget|Super widget|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|   d6315a00|  Bike widget| 2019-03-10|         20|       null|         null|        null|
|   d6315a00| Super widget| 2019-01-02|       null|       null|         null|          20|
|   d6315a00| Super widget| 2019-01-01|       null|       null|         null|          40|
|   d6315a00|  Food widget| 2019-08-20|       null|          8|         null|        null|
|   d6315cd0|  Food widget| 2019-09-19|       null|          8|         null|        null|
|   d6315e2e|  Bike widget| 2019-01-01|         20|       null|         null|        null|
|   d631614e|Garage widget| 2019-02-15|       null|       null|            8|        null|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+