Spark CSV - No applicable constructor/method found

2019-08-03 05:28发布

问题:

I have an issue in using lambda functions on filters and maps of typed datasets in java spark applications.

I am getting this runtime error

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"

I am using the below class and spark 2.2.0. Full example with sample data is available in https://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark
            .read()
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .csv("CDR_SAMPLE.csv")
            .as(Encoders.bean(CDR.class));

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();

    System.out.println("validated entries :" + v);

CDR file definition is gitlab link

EDIT

val cdrCSVSchema = StructType(Array(
  StructField("timestamp", DataTypes.TimestampType),
  StructField("quantity", DataTypes.DoubleType),
  StructField("access", DataTypes.StringType),
  StructField("param1", DataTypes.StringType),
  StructField("param2", DataTypes.StringType),
  StructField("param3", DataTypes.StringType),
  StructField("param4", DataTypes.StringType),
  StructField("param5", DataTypes.StringType),
  StructField("param6", DataTypes.StringType),
  StructField("param7", DataTypes.StringType),
  StructField("param8", DataTypes.StringType),
  StructField("param9", DataTypes.StringType),
  StructField("dateParam1", DataTypes.TimestampType),
  StructField("dateParam2", DataTypes.TimestampType),
  StructField("dateParam3", DataTypes.TimestampType),
  StructField("dateParam4", DataTypes.TimestampType),
  StructField("dateParam5", DataTypes.TimestampType),
  StructField("decimalParam1", DataTypes.DoubleType),
  StructField("decimalParam2", DataTypes.DoubleType),
  StructField("decimalParam3", DataTypes.DoubleType),
  StructField("decimalParam4", DataTypes.DoubleType),
  StructField("decimalParam5", DataTypes.DoubleType),
  StructField("extraParam", DataTypes.StringType)))

and I used this command to load the CSV document

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")

and then tried this command to encode and run lambda function, but I am still getting error

cdr.as[CDR].filter(c => c.timestamp != null).show

回答1:

TL;DR Define the schema explicitly since the input dataset does not have values to infer types from (for java.sql.Date fields).

For your case, using untyped Dataset API could be a solution (perhaps a workaround and honestly I'd recommend it to avoid unnecessary deserialization from internal row format):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count

(It's Scala and I'm leaving translating it to Java as a home exercise).

The issue is that you use inferSchema option with most fields unavailable in the input CDR_SAMPLE.csv file that makes most fields of type String (which is the default type when no values are available to infer more specific type).

That makes the fields of type java.sql.Date, i.e. dateParam1 up to dateParam5, of type String.

import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
  option("inferSchema", "true").
  option("delimiter", ";").
  option("header", true).
  csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- access: string (nullable = true)
 |-- param1: string (nullable = true)
 |-- param2: string (nullable = true)
 |-- param3: string (nullable = true)
 |-- param4: string (nullable = true)
 |-- param5: string (nullable = true)
 |-- param6: string (nullable = true)
 |-- param7: string (nullable = true)
 |-- param8: string (nullable = true)
 |-- param9: string (nullable = true)
 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)
 |-- decimalParam1: string (nullable = true)
 |-- decimalParam2: string (nullable = true)
 |-- decimalParam3: string (nullable = true)
 |-- decimalParam4: string (nullable = true)
 |-- decimalParam5: string (nullable = true)
 |-- extraParam: string (nullable = true)

Note that the fields of interest, i.e. dateParam1 to dateParam5, are all strings.

 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)

The issue surfaces when you "pretend" the type of the fields is different by using the encoder as defined in CDR class which says:

private Date dateParam1;
private Date dateParam2;
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

That's the root cause of the issue. There is a difference between what Spark could infer from the class. Without the conversion the code would've worked, but since you insisted...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match
  filter(cdr => cdr.timestamp != null).
  show // <-- trigger conversion

It does not really matter what field you access in filter operator. The issue is that the conversion takes place that leads to incorrect execution (and whole-stage Java code generation).

I doubt Spark can do much about it since you requested inferSchema with a dataset with no values to use for the type inference. The best bet is to define the schema explicitly and use schema(...) operator to set it.