reading and writing from hive tables with spark af

2019-03-20 14:48发布

We have a hive warehouse, and wanted to use spark for various tasks (mainly classification). At times write the results back as a hive table. For example, we wrote the following python function to find the total sum of original_table column two, grouped by original_table column one. The function works, but we are worried that it is inefficient, particularly the maps to convert to key-value pairs, and dictionary versions. Functions combiner, mergeValue, mergeCombiner are defined elsewhere, but work fine.

from pyspark import HiveContext

rdd = HiveContext(sc).sql('from original_table select *')

#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))

#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)

# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})

# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)

# save
schema_rdd.saveAsTable('new_table_name')

Are there more efficient ways of doing the same thing?

3条回答
Melony?
2楼-- · 2019-03-20 15:14

...perhaps this was not possible when the question was written, but doesn't it make sense now (post 1.3) to use the createDataFrame() call?

After getting your first RDD, it looks like you could make the call, then run a simple SQL statement against the structure to get the whole job done in one pass. (Sum and Grouping) Plus, the DataFrame structure can infer schema directly upon creation if I'm reading the API doc correctly.

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

查看更多
再贱就再见
3楼-- · 2019-03-20 15:23

This error can solved by setting hive.exec.scratchdir to the folder where user has access

查看更多
霸刀☆藐视天下
4楼-- · 2019-03-20 15:32

What version of spark you are using ?

This answer is based on 1.6 & using the data frames.

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt")

    import org.apache.spark.sql.functions._
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show()


+-----+---+-----+
|Categ|Sum|count|
+-----+---+-----+
|    A| 15|    2|
|    B| 56|    1|
+-----+---+-----+

Hope this helps !!

查看更多
登录 后发表回答