reading and writing from hive tables with spark af

2019-03-20 14:35发布

问题:

We have a hive warehouse, and wanted to use spark for various tasks (mainly classification). At times write the results back as a hive table. For example, we wrote the following python function to find the total sum of original_table column two, grouped by original_table column one. The function works, but we are worried that it is inefficient, particularly the maps to convert to key-value pairs, and dictionary versions. Functions combiner, mergeValue, mergeCombiner are defined elsewhere, but work fine.

from pyspark import HiveContext

rdd = HiveContext(sc).sql('from original_table select *')

#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))

#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)

# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})

# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)

# save
schema_rdd.saveAsTable('new_table_name')

Are there more efficient ways of doing the same thing?

回答1:

...perhaps this was not possible when the question was written, but doesn't it make sense now (post 1.3) to use the createDataFrame() call?

After getting your first RDD, it looks like you could make the call, then run a simple SQL statement against the structure to get the whole job done in one pass. (Sum and Grouping) Plus, the DataFrame structure can infer schema directly upon creation if I'm reading the API doc correctly.

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)



回答2:

This error can solved by setting hive.exec.scratchdir to the folder where user has access



回答3:

What version of spark you are using ?

This answer is based on 1.6 & using the data frames.

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt")

    import org.apache.spark.sql.functions._
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show()


+-----+---+-----+
|Categ|Sum|count|
+-----+---+-----+
|    A| 15|    2|
|    B| 56|    1|
+-----+---+-----+

Hope this helps !!