可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I'm trying to figure out the best way to get the largest value in a Spark dataframe column.
Consider the following example:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Which creates:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
My goal is to find the largest value in column A (by inspection, this is 3.0). Using PySpark, here are four approaches I can think of:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').collect()[0].asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Each of the above gives the right answer, but in the absence of a Spark profiling tool I can't tell which is best.
Any ideas from either intuition or empiricism on which of the above methods is most efficient in terms of Spark runtime or resource usage, or whether there is a more direct method than the ones above?
回答1:
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
The answer is almost the same as method3. but seems the "asDict()" in method3 can be removed
回答2:
Max value for a particular column of a dataframe can be achieved by using -
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
回答3:
Remark: Spark is intended to work on Big Data - distributed computing. The size of the example DataFrame is very small, so the order of real-life examples can be altered with respect to the small ~ example.
Slowest: Method_1, because .describe("A") calculates min, max, mean, stddev, and count (5 calculations over the whole column)
Medium: Method_4, because, .rdd (DF to RDD transformation) slows down the process.
Faster: Method_3 ~ Method_2 ~ method_5, because the logic is very similar, so Spark's catalyst optimizer follows very similar logic with minimal number of operations (get max of a particular column, collect a single-value dataframe); (.asDict() adds a little extra-time comparing 3,2 to 5)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
Result on an edge-node of a cluster in milliseconds (ms):
small DF (ms) : {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
bigger DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
回答4:
Another way of doing it:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
On my data, I got this benchmarks:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
All of them give the same answer
回答5:
In case some wonders how to do it using Scala (using Spark 2.0.+), here you go:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
回答6:
Here is a lazy way of doing this, by just doing compute Statistics:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
or
spark.sql("Select * from sampleStats").describe('ColName')
or you can open a hive shell and
describe formatted table sampleStats;
You will see the statistics in the properties - min, max, distinct, nulls, etc.
回答7:
I believe the best solution will be using head()
Considering your example:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Using agg and max method of python we can get the value as following :
from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
This will return:
3.0
Make sure you have the correct import:
from pyspark.sql.functions import max
The max function we use here is the pySPark sql library function, not the default max function of python.
回答8:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
And the result is (3.0,6.0), which is the same to the testDataFrame.agg(max($"A"), max($"B")).collect()(0)
.However, testDataFrame.agg(max($"A"), max($"B")).collect()(0)
returns a List, [3.0,6.0]