Convert csv to parquet file using python

2020-02-24 13:26发布

问题:

I am trying to convert a .csv file to a .parquet file.
The csv file (Temp.csv) has the following format

1,Jon,Doe,Denver

I am using the following python code to convert it into parquet

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os

if __name__ == "__main__":
    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType([
            StructField("col1", IntegerType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True)])
    dirname = os.path.dirname(os.path.abspath(__file__))
    csvfilename = os.path.join(dirname,'Temp.csv')    
    rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
    df = sqlContext.createDataFrame(rdd, schema)
    parquetfilename = os.path.join(dirname,'output.parquet')    
    df.write.mode('overwrite').parquet(parquetfilename)

The result is only a folder named, output.parquet and not a parquet file that I'm looking for, followed by the following error on the console.

I have also tried running the following code to face a similar issue.

from pyspark.sql import SparkSession
import os

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')    
df = spark.read.csv(csvfilename)

# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')    
df.write.mode('overwrite').parquet(parquetfilename)

How to best do it? Using windows, python 2.7.

回答1:

Using the packages pyarrow and pandas you can convert CSVs to Parquet without using a JVM in the background:

import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

One limitation in which you will run is that pyarrow is only available for Python 3.5+ on Windows. Either use Linux/OSX to run the code as Python 2 or upgrade your windows setup to Python 3.6.



回答2:

import boto3
import pandas as pd
import pyarrow as pa
from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = boto3.client('s3',region_name='us-east-2')
obj = s3.get_object(Bucket='ssiworkoutput', Key='file_Folder/File_Name.csv')
df = pd.read_csv(obj['Body'])

table = pa.Table.from_pandas(df)

output_file = "s3://ssiworkoutput/file/output.parquet"  # S3 Path need to mention
s3 = S3FileSystem()

pq.write_to_dataset(table=table,
                    root_path=output_file,partition_cols=['Year','Month'],
                    filesystem=s3)

print("File converted from CSV to parquet completed")


回答3:

You can write as a PARQUET FILE using spark:

spark = SparkSession.builder.appName("Test_Parquet").master("local[*]").getOrCreate()

parquetDF = spark.read.csv("data.csv")

parquetDF.coalesce(1).write.mode("overwrite").parquet("Parquet")

I hope this helps