Cannot write/save data to Ignite directly from a S

2019-07-27 03:02发布

问题:

I try to write dataframe to ignite using jdbc ,

The Spark version is : 2.1

Ignite version:2.3

JDK:1.8

Scala:2.11.8

this is my code snippet:

def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {

  val conn = DataSource.conn
  var psmt:PreparedStatement = null

  try {
    OperationIgniteUtil.deleteIgniteData(conn,targetTable)

    hiveDF.foreachPartition({
      partitionOfRecords => {
        partitionOfRecords.foreach(
          row => for ( i <- 0 until row.length ) {
            psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
            psmt.setObject(i+1, row.get(i))
            psmt.execute()
          }
        )
      }
    })

  }catch {
    case e: Exception =>  e.printStackTrace()
  } finally {
    conn.close
  }
}

and then I run on spark ,it print erro message:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) at com.pingan.pilot.ignite.common.OperationIgniteUtil$.WriteToIgnite(OperationIgniteUtil.scala:72) at com.pingan.pilot.ignite.etl.HdfsToIgnite$.main(HdfsToIgnite.scala:36) at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.ignite.internal.jdbc2.JdbcConnection Serialization stack: - object not serializable (class: org.apache.ignite.internal.jdbc2.JdbcConnection, value: org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field (class: com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, name: conn$1, type: interface java.sql.Connection) - object (class com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 27 more

Anyone konws I to fix it? Thanks!

回答1:

The problem here is you cannot serialize the connection to Ignite DataSource.conn. The closure you provide to forEachPartition contains the connection as part of its scope which is why Spark cannot serialize it.

Fortunately, Ignite provides a custom implementation of RDD which allows you to save values to it. You will need to create an IgniteContext first, then retrieve Ignite's shared RDD which provide distributed access to Ignite to save the Row of your RDD:

val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
...

// Retrieve Ignite's shared RDD
val igniteRdd = igniteContext.fromCache("partitioned")
igniteRDD.saveValues(hiveDF.toRDD)

More information are accessible from the Apache Ignite documentation.



回答2:

You have to extend the Serializable interface.

object Test extends Serializable { 
  def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
   ???
  }
}

I hope it would resolve your problem.