Spark DataSet filter performance

2020-05-27 14:46发布

问题:

I have been experimenting different ways to filter a typed data set. It turns out the performance can be quite different.

The data set was created based on a 1.6 GB rows of data with 33 columns and 4226047 rows. DataSet is created by loading csv data and mapped to a case class.

val df = spark.read.csv(csvFile).as[FireIncident]

A filter on UnitId = 'B02' should return 47980 rows. I tested three ways as below: 1) Use typed column (~ 500 ms on local host)

df.where($"UnitID" === "B02").count()

2) Use temp table and sql query (~ same as option 1)

df.createOrReplaceTempView("FireIncidentsSF")
spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count()

3) Use strong typed class field (14,987ms, i.e. 30 times as slow)

df.filter(_.UnitID.orNull == "B02").count()

I tested it again with the python API, for the same data set, the timing is 17,046 ms, comparable to the performance of the scala API option 3.

df.filter(df['UnitID'] == 'B02').count()

Could someone shed some light on how 3) and the python API are executed differently from the first two options?

回答1:

It's because of step 3 here.

In the first two, spark doesn't need to deserialize the whole Java/Scala object - it just looks at the one column and moves on.

In the third, since you're using a lambda function, spark can't tell that you just want the one field, so it pulls all 33 fields out of memory for each row, so that you can check the one field.

I'm not sure why the fourth is so slow. It seems like it would work the same way as the first.



回答2:

When running python what is happening is that first your code is loaded onto the JVM, interpreted, and then its finally compiled into bytecode. When using the Scala API, Scala natively runs on the JVM so you're cutting out the entire load python code into the JVM part.