I have created a source and a sink connector for kafka connect Confluent 5.0, to push two sqlserver tables to my datalake
Here is my SQLServer table schema :
CREATE TABLE MYBASE.dbo.TABLE1 (
id_field int IDENTITY(1,1) NOT NULL,
my_numericfield numeric(24,6) NULL,
time_field smalldatetime NULL,
CONSTRAINT PK_CBMARQ_F_COMPTEGA PRIMARY KEY (id_field)
) GO
My Cassandra schema :
create table TEST-TABLE1(my_numericfield decimal, id_field int, time_field timestamp, PRIMARY KEY (id_field));
Here is the source configuration, with a whitelist parameter :
{
"config":
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://localhost:1433;database=MYBASE",
"connection.user": "admin",
"connection.password": "password",
"table.whitelist": "TABLE1, TABLE2",
"mode": "timestamp+incrementing",
"timestamp.column.name": "time_field",
"incrementing.column.name": "id_field",
"validate.non.null": "false",
"topic.prefix": "TEST-",
"tasks.max": "8",
"numeric.mapping":"best_fit"
},
"name": "sqlserver-MYBASE-test"
}
Here is my sink connector :
{
"name": "s3-sink-MYBASE",
"config":
{
"topics": "TEST-TABLE1, TEST_TABLE2",
"topics.dir": "DATABASE_FULL",
"s3.part.size": 5242880,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"tasks.max": 8,
"schema.compatibility": "NONE",
"s3.region": "eu-central-1",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"s3.bucket.name": "mydatalake",
"flush.size": 1,
"transforms":"InsertSourceDetails",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"DATABASE",
"transforms.InsertSourceDetails.static.value":"MYBASE"
}
}
The problem is that some fields are typed NUMERIC in sqlserver, and kafka transforms them in BINARY when arrived in the datalake
Here is the schema_registry result :
{"type": "record",
"name": "TEST-TABLE1",
"fields": [
{
"name": "my_numericfield",
"type": [
"null",
{
"type": "bytes",
"scale": 6,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "6"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
{
"name": "id_field",
"type": "int"
},
{
"name": "cbCreateur",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "time_field",
"type": [
"null",
{
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
],
"default": null
},
],
"connect.name": "TEST-TABLE1"}
Here is the spark script and result :
...: from pyspark.sql.functions import col
...: AWS_ID='xxxxxxxxxxxxxxxxx'
...: AWS_KEY='xxxxxxxxxxxxxxxxx/'
...: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ID)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_KEY)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
...: spark.conf.set('spark.cassandra.connection.host', 'localhost')
...: spark.conf.set('spark.cassandra.connection.port', 9042)
...: spark.conf.set('spark.cassandra.auth.username', 'cassandra')
...: spark.conf.set('spark.cassandra.auth.password', 'cassandra')
...:
...:
: F_TEST-TABLE1 = spark.read.format('com.databricks.spark.avro').load('s3a://mydatalake/DATABASE_FULL/TEST-TABLE1').drop('partition')
...: DF_TEST-TABLE1 = F_TEST-TABLE1.toDF(*[c.lower() for c in TEST-TABLE1.columns])
...:
...:
: DF_TEST-TABLE1.printSchema()
root
|-- my_numericfield: binary (nullable = true)
|-- id_field: integer (nullable = true)
|-- time_field: long (nullable = true)
: DF_TEST-TABLE1.createTempView("event")
: spark.sql("select * from event").show(1, False)
+----------------+--------+--------------+
||my_numericfield|id_field|time_field |
+----------------+-----------+-----------+
|[00] | 5 |1542733800000 |
+----------------+--------+--------------+
only showing top 1 row
: DF_TEST-TABLE1.write.format('org.apache.spark.sql.cassandra').options(keyspace='sage_full', table='f_test-table1').option('confirm.truncate', True).save(mode='overwrite')
18/11/22 08:29:05 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 3)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [B@6d0d5743 of type class [B to java.lang.BigDecimal.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:190)
I'm trying to cast the fields on the fly to match a numeric type (i.e. float), but i can't find a way to do it without knowing the field names in advance
With the whitelist parameter, the connector process the two tables without field description in the connector configuration
Is there a way to do the cast for all NUMERIC fields on the fly ?
Thanks for your help