I'm creating a new DataFrame with a handful of records from a Join.
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
Everything is fast (under one second) except the count operation.
This is justified as follow : all operations before the count
are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count
in your example).
The second problem is in the repartition(1)
:
keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).
The RDD conversion kicks in and literally takes hours to complete.
If I understand correctly you would covert the DataFrame
to an RDD
, this is really a bad practice in spark and you should avoid such conversion as possible as you can.
this is because the data in DataFrame
and Dataset
are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrame
s and Dataset
s are very powerful than RDD
s
Hope this help you
As others have mentioned, the operations before count
are "lazy" and only register a transformation, rather than actually force a computation.
When you call count
, the computation is triggered. This is when Spark reads your data, performs all previously-registered transformations and calculates the result that you requested (in this case a count
).
The RDD conversion kicks in and literally takes hours to complete
I think the term "conversion" is perhaps a bit inaccurate. What is actually happening is that the DataFrame
transformations you registered are translated into RDD
operations, and these are applied to to the RDD that underlies your DataFrame
. There is no conversion per se in the code you have given here.
As an aside, it is possible to explicitly convert a DataFrame
to an RDD
via the DataFrame.rdd
property. As mentioned in this answer this is generally a bad idea, since you lose some of the benefits (in both performance and API) of having well-structured data.