Spark 2.0.1 write Error: Caused by: java.util.NoSu

2019-06-08 20:15发布

I am trying to attach sentiment value to each message and I have downloaded all stanford core jar files as dependencies:

import sqlContext.implicits._
import com.databricks.spark.corenlp.functions._
import org.apache.spark.sql.functions._

val version = "3.6.0"
val model = s"stanford-corenlp-$version-models-english" //
val jars = sc.listJars
if (!jars.exists(jar => jar.contains(model))) {
import scala.sys.process._
s"wget http://repo1.maven.org/maven2/edu/stanford/nlp/stanford-         
corenlp/$version/$model.jar -O /tmp/$model.jar".!!
sc.addJar(s"/tmp/$model.jar")}

val all_messages = spark.read.parquet("/home/ubuntu/messDS.parquet")

case class AllMessSent (user_id: Int, sent_at: java.sql.Timestamp, message:    
String)

val messDS = all_messages.as[AllMess]

Up to this point everything is fine as I can perform computations and save that DS

case class AllMessSentiment = (user_id: Int, sent_at:   
java.sql.Timestamp, message: String, sentiment: Int)

val output = messDS
.select('user_id,'message,'sent_at,
sentiment('message).as('sentiment)).as[AllMessSentiment])

import java.util

output.write.parquet("/home/ubuntu/AllMessSent.parquet")

I can output results as: output.show(truncate = false) where I can see the sentiment score but when writing to csv or parquet the error comes as below, does anyone know how to solve it?:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 9, localhost): java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:854)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163)
at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException


 at java.util.ArrayList$Itr.next(ArrayList.java:854)

at java.util.ArrayList$Itr.next(ArrayList.java:854)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at   
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at  
com.databricks.spark.corenlp.
functions$$anonfun$sentiment$1.apply(functions.scala:163)
at com.databricks.spark.corenlp.
functions$$anonfun$sentiment$1.apply(functions.scala:158)
at org.apache.spark.sql
.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown   
Source)
at org.apache.spark.sql.execution
.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at  org.apache.spark.sql.execution.
WholeStageCodegenExec$$anonfun$8$$anon$1
.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.datasources.
DefaultWriterContainer$$anonfun$writeRows$1
.apply$mcV$sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources
.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
 at  org.apache.spark.sql.execution.datasources          
 .DefaultWriterContainer$$anonfun$writeRows$1.
 apply(WriterContainer.scala:252)
 at org.apache.spark.util.Utils$
 .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
 at org.apache.spark.sql.execution.datasources
.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
 ... 8 more

1条回答
疯言疯语
2楼-- · 2019-06-08 20:47

I was able to run the algorithm when all messages were splitted into sentences and cleaned of special characters and empty spaces.

查看更多
登录 后发表回答