I am trying to attach sentiment value to each message and I have downloaded all stanford core jar files as dependencies:
import sqlContext.implicits._
import com.databricks.spark.corenlp.functions._
import org.apache.spark.sql.functions._
val version = "3.6.0"
val model = s"stanford-corenlp-$version-models-english" //
val jars = sc.listJars
if (!jars.exists(jar => jar.contains(model))) {
import scala.sys.process._
s"wget http://repo1.maven.org/maven2/edu/stanford/nlp/stanford-
corenlp/$version/$model.jar -O /tmp/$model.jar".!!
sc.addJar(s"/tmp/$model.jar")}
val all_messages = spark.read.parquet("/home/ubuntu/messDS.parquet")
case class AllMessSent (user_id: Int, sent_at: java.sql.Timestamp, message:
String)
val messDS = all_messages.as[AllMess]
Up to this point everything is fine as I can perform computations and save that DS
case class AllMessSentiment = (user_id: Int, sent_at:
java.sql.Timestamp, message: String, sentiment: Int)
val output = messDS
.select('user_id,'message,'sent_at,
sentiment('message).as('sentiment)).as[AllMessSentiment])
import java.util
output.write.parquet("/home/ubuntu/AllMessSent.parquet")
I can output results as: output.show(truncate = false)
where I can see the sentiment score but when writing to csv or parquet the error comes as below, does anyone know how to solve it?:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 9, localhost): java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:854)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163)
at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:854)
at java.util.ArrayList$Itr.next(ArrayList.java:854)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at
com.databricks.spark.corenlp.
functions$$anonfun$sentiment$1.apply(functions.scala:163)
at com.databricks.spark.corenlp.
functions$$anonfun$sentiment$1.apply(functions.scala:158)
at org.apache.spark.sql
.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at org.apache.spark.sql.execution
.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.
WholeStageCodegenExec$$anonfun$8$$anon$1
.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.datasources.
DefaultWriterContainer$$anonfun$writeRows$1
.apply$mcV$sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources
.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at org.apache.spark.sql.execution.datasources
.DefaultWriterContainer$$anonfun$writeRows$1.
apply(WriterContainer.scala:252)
at org.apache.spark.util.Utils$
.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
at org.apache.spark.sql.execution.datasources
.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
I was able to run the algorithm when all messages were splitted into sentences and cleaned of special characters and empty spaces.