Neo4j spark connector loadDataFrame gives error

2019-09-16 04:25发布

问题:

EDIT: Tried to do what was suggested in the comments with .toDF got this error:

                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@333e01c6
import sqlContext.implicits._
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n using Map()
<console>:48: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
       val df = rdd.toDF()

I am running this simple scala code:

import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf



val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
val sc = new SparkContext(conf)
val neo = Neo4j(sc)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

//val rdd = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadRowRdd

val df = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadDataFrame

It gives a few errors, the conf issue is an error but seems to work when i loadRDD. But here I get the error too, still gives me a count of items in the call though then I get the serialization errors. Not sure if there are steps or things I am missing running off this sample here:

https://blog.knoldus.com/2016/10/05/neo4j-with-scala-awesome-experience-with-spark/

Loading neo4jspark.scala...
import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
<console>:38: error: not found: value SparkConf
       val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
                      ^
<console>:38: error: not found: value conf
       val sc = new SparkContext(conf)
                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH (p:POINT) RETURN p using Map()
res0: Long = 53118
17/08/25 14:31:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/08/25 14:31:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 79 elided