Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?
val rdd=sc.textFile(\"file1,file2,file3\")
Now, how can we skip header lines from this rdd?
Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?
val rdd=sc.textFile(\"file1,file2,file3\")
Now, how can we skip header lines from this rdd?
If there were just one header line in the first record, then the most efficient way to filter it out would be:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
This doesn\'t help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.
You could also just write a filter
that matches only a line that could be a header. This is quite simple, but less efficient.
Python equivalent:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile(\'path_to_data\')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:
spark.read.option(\"header\",\"true\").csv(\"filePath\")
From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:
val spark = SparkSession.builder.config(conf).getOrCreate()
and then as @SandeepPurohit said:
val dataFrame = spark.read.format(\"CSV\").option(\"header\",\"true\").load(csvfilePath)
I hope it solved your question !
P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package
You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0)
and then union all the file RDDs.
If the number of files is too large, the union could throw a StackOverflowExeption
.
In PySpark you can use a dataframe and set header as True:
df = spark.read.csv(dataPath, header=True)
Use the filter()
method in PySpark by filtering out the first column name to remove the header:
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
It\'s an option that you pass to the read()
command:
context = new org.apache.spark.sql.SQLContext(sc)
var data = context.read.option(\"header\",\"true\").csv(\"<path>\")
Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):
schema = StructType([
StructField(\'lat\',DoubleType(),True),
StructField(\'lng\',DoubleType(),True)])
df = sqlContext.read.format(\'com.databricks.spark.csv\'). \\
options(header=\'true\',
delimiter=\"\\t\",
treatEmptyValuesAsNulls=True,
mode=\"DROPMALFORMED\").load(input_file,schema=schema)
Working in 2018 (Spark 2.3)
Python
df = spark.read.option(\"header\",\"true\").format(\"csv\").schema(myManualSchema).load(\"maestraDestacados.csv\")
Scala
val myDf = spark.read.option(\"header\",\"true\").format(\"csv\").schema(myManualSchema).load(\"maestraDestacados.csv\")
PD1: myManualSchema is a predefined schema written by me, you could skip that part of code
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles(\"E:\\\\sss\\\\*.txt\",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap
val fileNameHeaderBr = sc.broadcast(fileNameHeader)
// Now let\'s skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile(\"E:\\\\sss\\\\*.txt\",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s\"Comparing with firstLine $firstLine\")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)
class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}
override def next(): String = {
if (isFirstIteration){
println(s\"For the first time $firstLine\")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s\"Every time $firstLine\")
iter.next()
}
}
else {
iter.next()
}
}
}
For python developers. I have tested with spark2.0. Let\'s say you want to remove first 14 rows.
sc = spark.sparkContext
lines = sc.textFile(\"s3://folder_location_of_csv/\")
parts = lines.map(lambda l: l.split(\",\"))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn is df function. So below will not work in RDD style as used above.
parts.withColumn(\"index\",monotonically_increasing_id()).filter(index > 14)