load a local file to spark using sc.textFile(&

2019-02-11 05:29发布

Question

How to load a file from the local file system to Spark using sc.textFile? Do I need to change any -env variables? Also when I tried the same on my windows where Hadoop is not installed I got the same error.

Code

> val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(63280) called with curMem=0, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.8 KB, free 265.1 MB)
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(19750) called with curMem=63280, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.3 KB, free 265.1 MB)
/17 22:28:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53659 (size: 19.3 KB, free: 265.1 MB)
/17 22:28:18 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
File: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

> val words = input.flatMap(line => line.split(" "))
ole>:19: error: not found: value input
  val words = input.flatMap(line => line.split(" "))
              ^

> val words = inputFile.flatMap(line => line.split(" "))
: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

> val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}

Error

apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/c:/spark-1.4.1-bin-hadoop2.6/bin/file/C:/Users/swaapnika/Desktop/to do list
   at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
   at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
   at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
   at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
   at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
   at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
   at $iwC$$iwC$$iwC.<init>(<console>:38)
   at $iwC$$iwC.<init>(<console>:40)
   at $iwC.<init>(<console>:42)
   at <init>(<console>:44)
   at .<init>(<console>:48)
   at .<clinit>(<console>)
   at .<init>(<console>:7)
   at .<clinit>(<console>)
   at $print(<console>)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
   at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
   at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
   at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
   at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
   at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
   at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
   at org.apache.spark.repl.Main$.main(Main.scala:31)
   at org.apache.spark.repl.Main.main(Main.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


>

4条回答
成全新的幸福
2楼-- · 2019-02-11 06:05

This error happens when you run spark in a cluster. When you submit a job to spark cluster the cluster manager(YARN or Mesos or any) will submit it to worker node. When the worker node trying to find the path of the file we need to load into spark it fails because the worker doesn't have such file. So try running spark-shell in local mode and try again,

\bin\spark-shell --master local

sc.textFile("file:///C:/Users/swaapnika/Desktop/to do list")

let me know if this helps.

查看更多
成全新的幸福
3楼-- · 2019-02-11 06:06

I checked all the dependencies and the environment variables again. The actual path "file:///home/..../.. .txt" would fetch the data from the local file system as the hadoop env.sh file has its default file system set to fs.defaultFs. If we leave the Spark-env.sh to its defaults without any change it takes the local file system when it encounters "file://..." and the hdfs when the path is "hdfs://.." If you specifically need any file system export HADOOP_CONF_DIR to the spark-env.sh And it would support any file system supported by Hadoop. This was my observation. Any corrections or suggestions accepted. Thank you

查看更多
叛逆
4楼-- · 2019-02-11 06:18

The file path you have defined is incorrect.

Try changing

sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

to

sc.textFile("file://C:/Users/swaapnika/Desktop/to do list")

or

sc.textFile("C:/Users/swaapnika/Desktop/to do list") 
查看更多
走好不送
5楼-- · 2019-02-11 06:29

Try changing

val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

to this:

val inputFile = sc.textFile("file:///Users/swaapnika/Desktop/to do list")

I'm also fairly new to hadoop and spark, but from what I gather, when running spark locally on Windows, the string file:/// when passed to sc.textFile already refers to C:\.

查看更多
登录 后发表回答