How can I improve the reducebykey part of my spark

2019-03-06 16:18发布

I have 64 spark cores. I have over 80 Million rows of data which amount to 4.2 GB in my cassandra cluster. I now need 82 seconds to process this data. I want this reduced to 8 seconds. Any thoughts on this? Is this even possible? Thanks.

This is the part of my spark app I want to improve:

axes = sqlContext.read.format("org.apache.spark.sql.cassandra")\
    .options(table="axes", keyspace=source, numPartitions="192").load()\
    .repartition(64*3)\
    .reduceByKey(lambda x,y:x+y,52)\
    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    .filter(lambda x:len(x[1])>=2) \
    .map(lambda x:x[1][-1])

Edit:

This is the code I am currently running the one posted above was an experiment sorry for the confusion. The question above relate to this code.

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(64*3) \
                    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)])).reduceByKey(lambda x,y:x+y)\
                    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
                    .filter(lambda x:len(x[1])>=2) \
                    .map(lambda x:x[1][-1])

Thanks

1条回答
We Are One
2楼-- · 2019-03-06 16:51

Issues:

(Why this code cannot work correctly assuming unmodified Spark distribution)

Step-by-step:

  1. These two lines should create a Spark DataFrame. So far so good:

    sqlContext.read.format("org.apache.spark.sql.cassandra")
      .options(table="axes", keyspace=source, numPartitions="192").load()
    

    The only possible concern is numPartitions which as far as I remember is not a recognized option.

  2. This is pretty much a junk code. Shuffling data around without doing any actual job is unlikely to get you anywhere.

    .repartition(64*3)
    
  3. At this point you switch to RDD. Since Row is actually a subclass of tuple and reduceByKey may work only on pairwise RDDs each element hast to be a tuple of size 2. I am not sure why you choose 52 partitions though.

    .reduceByKey(lambda x,y:x+y,52)
    
  4. Since reduceByKey always result in a RDD of tuples of size 2 following part simply shouldn't work

    .map(lambda x: (x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    

    In particular x cannot have attributres like article or comments. Moreover this piece of code

    [Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)] 
    

    Creates list of size 1 (see below).

    Following part

    Row(article=x.article, ...)
    

    smells fishy for one more reason. If there are some obsolete columns these should be filtered out before data is converted to RDD to avoid excessive traffic and reduce memory usage. If there are no obsolete columns there is no reason to put more pressure on Python GC by creating new objects.

  5. Since x[1] has only one element sorting it doesn't makes sense:

    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    
  6. And this filter should always return an empty RDD

    .filter(lambda x:len(x[1])>=2) \
    
  7. And this doesn't perform any useful operations:

    .map(lambda x:x[1][-1])
    

Summary:

If you use some version of this code it most likely the order shown in the question is mixed up and map from the point 4:

.map(lambda x: (x.article,[Row(....)]))

precedes reduceByKey:

.reduceByKey(lambda x,y:x+y,52)

If thats the case you actually use .reduceByKey to perform groupByKey which is either equivalent to groupByKey with all its issues (Python) or less efficient (Scala). Moreover it would reduction in number of partitions highly suspicious.

If thats true there is no good reason to move data out of JVM (DataFrame -> RDD conversion) with corresponding serialization-deserialization, and even if there was, it can be easily solved by actual reduction with max not group-by-key.

from operator import attrgetter

(sqlContext.read.format(...).options(...).load()
  .select(...)  # Only the columns you actually need
  .keyBy(attrgetter("article"))
  .reduceByKey(lambda r1, r2: max(r1, r2, key=attrgetter("y"))))

Related questions:

查看更多
登录 后发表回答