How to calculate cumulative sum using sqlContext

2019-01-23 21:11发布

问题:

I know we can use Window function in pyspark to calculate cumulative sum. But Window is only supported in HiveContext and not in SQLContext. I need to use SQLContext as HiveContext cannot be run in multi processes.

Is there any efficient way to calculate cumulative sum using SQLContext? A simple way is to load the data into the driver's memory and use numpy.cumsum, but the con is the data need to be able to fit into the memory

回答1:

Not sure if this is what you are looking for but here are two examples how to use sqlContext to calculate the cumulative sum:

First when you want to partition it by some categories:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

Output:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

Second when you only want to take the cumsum of one variable. Change df2 to this:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

Output:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

Hope this helps. Using np.cumsum is not very efficient after collecting the data especially if the dataset is large. Another way you could explore is to use simple RDD transformations like groupByKey() and then use map to calculate the cumulative sum of each group by some key and then reduce it at the end.



回答2:

Here is a simple example:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

gives output

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

Define a window

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

# window.Window.unboundedPreceding -- first row of the group # .rowsBetween(..., 0) -- 0 refers to current row, if instead -2 specified then upto 2 rows before current row

Now, here is a trap:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

with error :

TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

This is due to using python's sum function instead of pyspark's. The way to fix this is using sum function from pyspark.sql.functions.sum:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

will give:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+


回答3:

After landing on this thread trying to solve a similar problem, I've solved my issue using this code. Not sure if I'm missing part of the OP, but this is a way to sum a SQLContext column:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext

sc = SparkContext() 
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)

def sum_column(table, column):
    sc_table = sqlContext.table(table)
    return sc_table.agg({column: "sum"})

sum_column("db.tablename", "column").show()


回答4:

It is not true that windows function works only with HiveContext. You can use them even with sqlContext:

from pyspark.sql.window import *

myPartition=Window.partitionBy(['col1','col2','col3'])

temp= temp.withColumn("#dummy",sum(temp.col4).over(myPartition))