Upload Pandas DataFrame to GCP Bucket for Dataproc

2019-07-31 03:12发布

问题:

This question already has an answer here:

  • Save pandas data frame as csv on to gcloud storage bucket 2 answers

I have been working on Spark Cluster using Data Proc google cloud services for Machine Learning Modelling. I have been successful to load the data from the Google Storage bucket. However, I am not sure how to write the panda's data frame and spark data frame to the cloud storage bucket as csv.

When I use the below command it gives me an error

df.to_csv("gs://mybucket/")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/conda/lib/python3.6/site-packages/pandas/core/frame.py", line 1745, in to_csv
formatter.save()
File "/opt/conda/lib/python3.6/site-packages/pandas/io/formats/csvs.py", line 156, in save
compression=self.compression)
File "/opt/conda/lib/python3.6/site-packages/pandas/io/common.py", line 400, in _get_handle
f = open(path_or_buf, mode, encoding=encoding)
FileNotFoundError: [Errno 2] No such file or directory: 'gs://dataproc-78f5e64b-a26d-4fe4-bcf9-e1b894db9d8f-au-southeast1/trademe_xmas.csv'

FileNotFoundError: [Errno 2] No such file or directory: 'gs://mybucket/'

however the following command work but I am not sure where it is saving the file

df.to_csv("data.csv")

I also followed the below article and it gives the following error Write a Pandas DataFrame to Google Cloud Storage or BigQuery

import google.datalab.storage as storage
ModuleNotFoundError: No module named 'google.datalab'

I am relatively new to Google Cloud Data Proc and Spark and I was hoping if someone can help me understand how can I save my output pandas data frame to gcloud bucket

Thanks in Advance !!

########For Igor as Requested
from pyspark.ml.classification import RandomForestClassifier as RF
rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])


#Decile Creation for the Output
test = results.toPandas()
test['X0'] = test.probability.str[0]
test['X1'] = test.probability.str[1]
test = test.drop(columns=['probability'])
test = test.sort_values(by='X1', ascending=False)
test['rownum'] = test.reset_index().index
x = round(test['rownum'].count() / 10)
test['rank'] = (test.rownum - 1)//x + 1

回答1:

The easiest should be to convert Pandas DataFrame to Spark DataFrame and write it to GCS.

Here's instructions on how to do this: https://stackoverflow.com/a/45495969/3227693