spark job (scala) write type Date to Cassandra

2019-09-11 15:23发布

问题:

I'm using DSE 5.1 (spark 2.0.2.6 and cassandra 3.10.0.1652)

My Cassandra table:

CREATE TABLE ks.tbl (
   dk int,
   date date,
   ck int,
   val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);

with the following data:

 dk | date       | ck | val
----+------------+----+-----
  1 | 2017-01-01 |  1 | 100
  1 | 2017-01-01 |  2 | 200

My code must read this data and write the same thing but with yesterday's date (it compiles successfully):

package com.datastax.spark.example

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._

object test extends App {

  val conf = new SparkConf().setAppName("DSE calculus app TEST")
  val sc = new SparkContext(conf)

  val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))

  val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")

  tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

  sc.stop()
  sys.exit(0)
}

When I run this app:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar

It fails to properly write to Cassandra. It seems the date variable is not inserted in the map correctly. The error I get is:

Error:
WARN  2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

However, when I insert a date (string) directly in the map statement as follows, the code does insert the data correctly:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

It also insert the data correctly if I set yesterday to an integer (days since epoch). This will be optimal, but can't get 'yesterday' to behave this way

EDIT: This does not insert data correctly, actually. No matter if I set 'yesterday' to 1 or 100,000,000 it always insert epoch ('1970-01-01)

The code that fails behave correctly and as I would expect in the DSE Spark console.

I just can't figure out how what I'm doing wrong. Any help is welcome.

EDIT2: The excecutor 0 stderr log does show that its trying to insert a Null value in the column date, that's obviously not possible since its a clustering column.

回答1:

When writing code for a Spark Job it's important to realize when particular variables are set and when they are serialized. Let's take a look at a note from the App trait docs

Caveats

It should be noted that this trait is implemented using the DelayedInit functionality, which means that fields of the object will not have been initialized before the main method has been executed.

This means references to the variables used in the body of the App are possibly not initialized on the Executors when the code is actually being run.

My guess is that the lambda you have written contains a reference to a val which is initialized in the Delayed init portion of the App class. This means the serialized version of the code on the executor which doesn't run the Main method gets the uninitialized version of the value (null).

Switching the constant to a lazy val (or moving it into a separate object or class) would fix this issue by making sure the value is initialized remotely (lazy val) or simply serialized initialized (separate class/object).



回答2:

I think I know what is your problem.
You may see full log file. You just attach a part of them...
Today have similar error, when create keyspace with replication_factor: 3 when I had only one cassandra instance.

So i change it and problem is gone.

ALTER KEYSPACE "some_keyspace_name" WITH REPLICATION =
  { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

Here is my error.log file

And important part of log:

Logging.scala[logError]:72) - Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@4746499f
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)