I have 64 spark cores. I have over 80 Million rows of data which amount to 4.2 GB in my cassandra cluster. I now need 82 seconds to process this data. I want this reduced to 8 seconds. Any thoughts on this? Is this even possible? Thanks.
This is the part of my spark app I want to improve:
axes = sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table="axes", keyspace=source, numPartitions="192").load()\
.repartition(64*3)\
.reduceByKey(lambda x,y:x+y,52)\
.map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1])
Edit:
This is the code I am currently running the one posted above was an experiment sorry for the confusion. The question above relate to this code.
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(64*3) \
.map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)])).reduceByKey(lambda x,y:x+y)\
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1])
Thanks
Issues:
(Why this code cannot work correctly assuming unmodified Spark distribution)
Step-by-step:
These two lines should create a Spark
DataFrame
. So far so good:The only possible concern is
numPartitions
which as far as I remember is not a recognized option.This is pretty much a junk code. Shuffling data around without doing any actual job is unlikely to get you anywhere.
At this point you switch to RDD. Since
Row
is actually a subclass oftuple
andreduceByKey
may work only on pairwise RDDs each element hast to be a tuple of size 2. I am not sure why you choose 52 partitions though.Since
reduceByKey
always result in a RDD of tuples of size 2 following part simply shouldn't workIn particular
x
cannot have attributres likearticle
orcomments
. Moreover this piece of codeCreates
list
of size 1 (see below).Following part
smells fishy for one more reason. If there are some obsolete columns these should be filtered out before data is converted to RDD to avoid excessive traffic and reduce memory usage. If there are no obsolete columns there is no reason to put more pressure on Python GC by creating new objects.
Since
x[1]
has only one element sorting it doesn't makes sense:And this filter should always return an empty RDD
And this doesn't perform any useful operations:
Summary:
If you use some version of this code it most likely the order shown in the question is mixed up and map from the point 4:
precedes
reduceByKey
:If thats the case you actually use
.reduceByKey
to performgroupByKey
which is either equivalent togroupByKey
with all its issues (Python) or less efficient (Scala). Moreover it would reduction in number of partitions highly suspicious.If thats true there is no good reason to move data out of JVM (
DataFrame
->RDD
conversion) with corresponding serialization-deserialization, and even if there was, it can be easily solved by actual reduction withmax
not group-by-key.Related questions: