I have an issue in using lambda functions on filters and maps of typed datasets in java spark applications.
I am getting this runtime error
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
I am using the below class and spark 2.2.0.
Full example with sample data is available in https://gitlab.com/opencell/test-bigdata
Dataset<CDR> cdr = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.csv("CDR_SAMPLE.csv")
.as(Encoders.bean(CDR.class));
long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
System.out.println("validated entries :" + v);
CDR file definition is gitlab link
EDIT
val cdrCSVSchema = StructType(Array(
StructField("timestamp", DataTypes.TimestampType),
StructField("quantity", DataTypes.DoubleType),
StructField("access", DataTypes.StringType),
StructField("param1", DataTypes.StringType),
StructField("param2", DataTypes.StringType),
StructField("param3", DataTypes.StringType),
StructField("param4", DataTypes.StringType),
StructField("param5", DataTypes.StringType),
StructField("param6", DataTypes.StringType),
StructField("param7", DataTypes.StringType),
StructField("param8", DataTypes.StringType),
StructField("param9", DataTypes.StringType),
StructField("dateParam1", DataTypes.TimestampType),
StructField("dateParam2", DataTypes.TimestampType),
StructField("dateParam3", DataTypes.TimestampType),
StructField("dateParam4", DataTypes.TimestampType),
StructField("dateParam5", DataTypes.TimestampType),
StructField("decimalParam1", DataTypes.DoubleType),
StructField("decimalParam2", DataTypes.DoubleType),
StructField("decimalParam3", DataTypes.DoubleType),
StructField("decimalParam4", DataTypes.DoubleType),
StructField("decimalParam5", DataTypes.DoubleType),
StructField("extraParam", DataTypes.StringType)))
and I used this command to load the CSV document
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")
and then tried this command to encode and run lambda function, but I am still getting error
cdr.as[CDR].filter(c => c.timestamp != null).show
TL;DR Define the schema explicitly since the input dataset does not have values to infer types from (for java.sql.Date
fields).
For your case, using untyped Dataset API could be a solution (perhaps a workaround and honestly I'd recommend it to avoid unnecessary deserialization from internal row format):
cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count
(It's Scala and I'm leaving translating it to Java as a home exercise).
The issue is that you use inferSchema
option with most fields unavailable in the input CDR_SAMPLE.csv
file that makes most fields of type String (which is the default type when no values are available to infer more specific type).
That makes the fields of type java.sql.Date
, i.e. dateParam1
up to dateParam5
, of type String.
import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
option("inferSchema", "true").
option("delimiter", ";").
option("header", true).
csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- quantity: integer (nullable = true)
|-- access: string (nullable = true)
|-- param1: string (nullable = true)
|-- param2: string (nullable = true)
|-- param3: string (nullable = true)
|-- param4: string (nullable = true)
|-- param5: string (nullable = true)
|-- param6: string (nullable = true)
|-- param7: string (nullable = true)
|-- param8: string (nullable = true)
|-- param9: string (nullable = true)
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
|-- decimalParam1: string (nullable = true)
|-- decimalParam2: string (nullable = true)
|-- decimalParam3: string (nullable = true)
|-- decimalParam4: string (nullable = true)
|-- decimalParam5: string (nullable = true)
|-- extraParam: string (nullable = true)
Note that the fields of interest, i.e. dateParam1
to dateParam5
, are all strings.
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
The issue surfaces when you "pretend" the type of the fields is different by using the encoder as defined in CDR
class which says:
private Date dateParam1;
private Date dateParam2;
private Date dateParam3;
private Date dateParam4;
private Date dateParam5;
That's the root cause of the issue. There is a difference between what Spark could infer from the class. Without the conversion the code would've worked, but since you insisted...
cdrs.as[CDR]. // <-- HERE is the issue = types don't match
filter(cdr => cdr.timestamp != null).
show // <-- trigger conversion
It does not really matter what field you access in filter
operator. The issue is that the conversion takes place that leads to incorrect execution (and whole-stage Java code generation).
I doubt Spark can do much about it since you requested inferSchema
with a dataset with no values to use for the type inference. The best bet is to define the schema explicitly and use schema(...)
operator to set it.