I have csv data and created Pandas dataframe using read_csv and forcing all columns as string.
Then when I try to create Spark dataframe from the Pandas dataframe, I get the error message below.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
z=pd.read_csv("mydata.csv", dtype=str)
z.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 74044003 entries, 0 to 74044002
Data columns (total 12 columns):
primaryid object
event_dt object
age object
age_cod object
age_grp object
sex object
occr_country object
drug_seq object
drugname object
route object
outc_cod object
pt object
q= sqlContext.createDataFrame(z)
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame
rdd, schema = self._createFromLocal(data, schema)
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal
struct = self._inferSchemaFromList(data)
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type
for f in a.fields]
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type
raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>
Here is an example. I am downloading public data and creating pandas dataframe but spark does not create spark dataframe from the pandas dataframe.
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip"
import requests, zipfile, StringIO
r = requests.get(url, stream=True)
z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()
z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe
Data_Frame = sqlContext.createDataFrame(z)
Long story short don't depend on schema inference. It is expensive and tricky in general. In particular some columns (for example event_dt_num
) in your data have missing values which pushes Pandas to represent them as mixed types (string for not missing, NaN for missing values).
If you're in doubt it is better to read all data as strings and cast afterwards. If you have access to code book you should always provide schema to avoid problems and reduce overall cost.
Finally passing data from the driver is anti-pattern. You should be able to read this data directly using csv
format (Spark 2.0.0+) or spark-csv
library (Spark 1.6 and below):
df = (spark.read.format("csv").options(header="true")
.load("/path/tp/demo2016q1.csv"))
## root
## |-- primaryid: string (nullable = true)
## |-- caseid: string (nullable = true)
## |-- caseversion: string (nullable = true)
## |-- i_f_code: string (nullable = true)
## |-- i_f_code_num: string (nullable = true)
## ...
## |-- to_mfr: string (nullable = true)
## |-- occp_cod: string (nullable = true)
## |-- reporter_country: string (nullable = true)
## |-- occr_country: string (nullable = true)
## |-- occp_cod_num: string (nullable = true)
In this particular case adding inferSchema="true"
option should work as well but it is still better to avoid it. You can also provide schema as follows:
from pyspark.sql.types import StructType
schema = StructType.fromJson({'fields': [{'metadata': {},
'name': 'primaryid',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'i_f_code_num',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {},
'name': 'init_fda_dt_num',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'rept_cod_num',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'},
{'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'},
{'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'reporter_country',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'occp_cod_num',
'nullable': True,
'type': 'integer'}],
'type': 'struct'})
directly to the reader:
(spark.read.schema(schema).format("csv").options(header="true")
.load("/path/to/demo2016q1.csv"))