Save and append a file in HDFS using PySpark

2019-07-17 04:58发布

问题:

I have a data frame in PySpark called df. I have registered this df as a temptable like below.

df.registerTempTable('mytempTable')

date=datetime.now().strftime('%Y-%m-%d %H:%M:%S')

Now from this temp table I will get certain values, like max_id of a column id

min_id = sqlContext.sql("select nvl(min(id),0) as minval from mytempTable").collect()[0].asDict()['minval']

max_id = sqlContext.sql("select nvl(max(id),0) as maxval from mytempTable").collect()[0].asDict()['maxval']

Now I will collect all these values like below.

test = ("{},{},{}".format(date,min_id,max_id))

I found that test is not a data frame but it is a str string

>>> type(test)
<type 'str'>

Now I want save this test as a file in HDFS. I would also like to append data to the same file in hdfs.

How can I do that using PySpark?

FYI I am using Spark 1.6 and don't have access to Databricks spark-csv package.

回答1:

Here you go, you'll just need to concat your data with concat_ws and right it as a text:

query = """select concat_ws(',', date, nvl(min(id), 0), nvl(max(id), 0))
from mytempTable"""

sqlContext.sql(query).write("text").mode("append").save("/tmp/fooo")

Or even a better alternative :

from pyspark.sql import functions as f

(sqlContext
    .table("myTempTable")
    .select(f.concat_ws(",", f.first(f.lit(date)), f.min("id"), f.max("id")))
    .coalesce(1)
    .write.format("text").mode("append").save("/tmp/fooo"))