Load CSV file with Spark

2019-01-01 10:56发布

问题:

I\'m new to Spark and I\'m trying to read CSV data from a file with Spark. Here\'s what I am doing :

sc.textFile(\'file.csv\')
    .map(lambda line: (line.split(\',\')[0], line.split(\',\')[1]))
    .collect()

I would expect this call to give me a list of the two first columns of my file but I\'m getting this error :

File \"<ipython-input-60-73ea98550983>\", line 1, in <lambda>
IndexError: list index out of range

although my CSV file as more than one column.

回答1:

Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

sc.textFile(\"file.csv\") \\
    .map(lambda line: line.split(\",\")) \\
    .filter(lambda line: len(line)>1) \\
    .map(lambda line: (line[0],line[1])) \\
    .collect()

Alternatively, you could print the culprit (if any):

sc.textFile(\"file.csv\") \\
    .map(lambda line: line.split(\",\")) \\
    .filter(lambda line: len(line)<=1) \\
    .collect()


回答2:

Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    \"some_input_file.csv\", header=True, mode=\"DROPMALFORMED\", schema=schema
)

or

(spark.read
    .schema(schema)
    .option(\"header\", \"true\")
    .option(\"mode\", \"DROPMALFORMED\")
    .csv(\"some_input_file.csv\"))

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

(df = sqlContext
    .read.format(\"com.databricks.spark.csv\")
    .option(\"header\", \"true\")
    .option(\"inferschema\", \"true\")
    .option(\"mode\", \"DROPMALFORMED\")
    .load(\"some_input_file.csv\"))

It can handle loading, schema inference, dropping malformed lines and doesn\'t require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns - integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField(\"A\", IntegerType()),
    StructField(\"B\", DoubleType()),
    StructField(\"C\", StringType())
])

(sqlContext
    .read
    .format(\"com.databricks.spark.csv\")
    .schema(schema)
    .option(\"header\", \"true\")
    .option(\"mode\", \"DROPMALFORMED\")
    .load(\"some_input_file.csv\"))


回答3:

Simply splitting by comma will also split commas that are within fields (e.g. a,b,\"1,2,3\",c), so it\'s not recommended. zero323\'s answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:

# works for both python 2 and 3
import csv
rdd = sc.textFile(\"file.csv\")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you\'ll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (make sure not to modify header before the filter evaluates). But at this point, you\'re probably better off using a built-in csv parser.



回答4:

And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

For example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext(\'local\',\'example\')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv(\'file.csv\')  # assuming the file contains a header
# pandas_df = pd.read_csv(\'file.csv\', names = [\'column 1\',\'column 2\']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)


回答5:

from pyspark.sql import SparkSession

spark = SparkSession \\
    .builder \\
    .appName(\"Python Spark SQL basic example\") \\
    .config(\"spark.some.config.option\", \"some-value\") \\
    .getOrCreate()

df = spark.read.csv(\"/home/stp/test1.csv\",header=True,separator=\"|\");

print(df.collect())


回答6:

This is in-line with what JP Mercier initially suggested about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext(\'local\',\'example\')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv(\"Your_Data_File.csv\", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv(\"Your_Data_File.csv\", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()


回答7:

Now, there\'s also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csv as follows:

Assume we have the following context

sc = SparkContext
sqlCtx = SQLContext or HiveContext

First, distribute pyspark-csv.py to executors using SparkContext

import pyspark_csv as pycsv
sc.addPyFile(\'pyspark_csv.py\')

Read csv data via SparkContext and convert it to DataFrame

plaintext_rdd = sc.textFile(\'hdfs://x.x.x.x/blah.csv\')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)


回答8:

If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile() and parse it

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=[\"name1\", \"name2\"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)


回答9:

If you want to load csv as a dataframe then you can do the following:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format(\'com.databricks.spark.csv\') \\
    .options(header=\'true\', inferschema=\'true\') \\
    .load(\'sampleFile.csv\') # this is your csv file

It worked fine for me.



回答10:

import pandas as pd

data1 = pd.read_csv(\"test1.csv\")
data2 = pd.read_csv(\"train1.csv\")