I am trying to pivot a Spark streaming dataset (structured streaming) but I get an AnalysisException
(excerpt below).
Could someone confirm pivoting is indeed not supported in structured streams (Spark 2.0), perhaps suggest alternative approaches?
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
tl;dr pivot
aggregation is not directly supported by Spark Structured Streaming up to and including 2.4.4.
As a workaround, use DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach.
I use the latest version of Spark 2.4.4 as of now.
scala> spark.version
res0: String = 2.4.4
UnsupportedOperationChecker (that you can find in the stack trace) checks whether (the logical plan of) a streaming query uses supported operations only.
When you execute pivot
you had to groupBy
first as that's the only interface to give you pivot
available.
There are two issues with pivot
:
pivot
wants to know how many columns to generate values for and hence does collect
which is not possible with streaming Datasets.
pivot
is actually another aggregation (beside groupBy
) that Spark Structured Streaming does not support
Let's look at the issue 1 with no columns to pivot on defined.
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp") // <-- pivot with no values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
... 49 elided
The last two lines show the issue, i.e. pivot
does collect
under the covers and hence the issue.
The other issue is that even though you'd specify the values for columns to pivot on you'd then get the other issue due to multiple aggregations (and you can see that it's actually a check for streaming not batch as has happened with the first case).
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp", Seq(1)) // <-- pivot with explicit values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
+- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
... 49 elided
Here is a simple Java example based on Jacek's answer above:
JSON Array:
[{
"customer_id": "d6315a00",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "d6315a00",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "d6315a00",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-02"
},
{
"customer_id": "d6315a00",
"product": "Food widget",
"price": 4,
"bought_date": "2019-08-20"
},
{
"customer_id": "d6315cd0",
"product": "Food widget",
"price": 4,
"bought_date": "2019-09-19"
}, {
"customer_id": "d6315e2e",
"product": "Bike widget",
"price": 10,
"bought_date": "2019-01-01"
}, {
"customer_id": "d6315a00",
"product": "Bike widget",
"price": 10,
"bought_date": "2019-03-10"
},
{
"customer_id": "d631614e",
"product": "Garage widget",
"price": 4,
"bought_date": "2019-02-15"
}
]
Java Code:
package io.centilliard;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.from_json;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function2;
import scala.runtime.BoxedUnit;
public class Pivot {
public static void main(String[] args) throws StreamingQueryException, AnalysisException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
});
ArrayType arrayType = new ArrayType(schema, false);
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
Column col = new Column("json");
Column data = from_json(col,arrayType).as("data");
Column explode = explode(data);
Dataset<Row> customers = dataset.select(explode).select("col.*");
DataStreamWriter<Row> dataStreamWriter = new DataStreamWriter<Row>(customers);
StreamingQuery dataStream = dataStreamWriter.foreachBatch(new Function2<Dataset<Row>, Object, BoxedUnit>() {
@Override
public BoxedUnit apply(Dataset<Row> dataset, Object object) {
dataset
.groupBy("customer_id","product","bought_date")
.pivot("product")
.sum("price")
.orderBy("customer_id")
.show();
return null;
}
})
.start();
dataStream.awaitTermination();
}
}
Output:
+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|customer_id| product|bought_date|Bike widget|Food widget|Garage widget|Super widget|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+
| d6315a00| Bike widget| 2019-03-10| 20| null| null| null|
| d6315a00| Super widget| 2019-01-02| null| null| null| 20|
| d6315a00| Super widget| 2019-01-01| null| null| null| 40|
| d6315a00| Food widget| 2019-08-20| null| 8| null| null|
| d6315cd0| Food widget| 2019-09-19| null| 8| null| null|
| d6315e2e| Bike widget| 2019-01-01| 20| null| null| null|
| d631614e|Garage widget| 2019-02-15| null| null| 8| null|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+