铸造数字字段与卡夫卡连接和table.whitelist(Cast numeric fields w

2019-10-28 19:25发布

我已经创建了一个源和用于卡夫卡信宿的连接器连接的汇合5.0,推2个SQLSERVER表到我datalake

这里是我的SQLServer表模式:

CREATE TABLE MYBASE.dbo.TABLE1 (
id_field int IDENTITY(1,1) NOT NULL,
my_numericfield numeric(24,6) NULL,
time_field smalldatetime NULL,
CONSTRAINT PK_CBMARQ_F_COMPTEGA PRIMARY KEY (id_field)
) GO

我卡桑德拉模式:

create table TEST-TABLE1(my_numericfield decimal, id_field int, time_field timestamp, PRIMARY KEY (id_field));

这里为源配置,具有白名单参数:

{
"config":
{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://localhost:1433;database=MYBASE",
    "connection.user": "admin",
    "connection.password": "password",
    "table.whitelist": "TABLE1, TABLE2",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "time_field",
    "incrementing.column.name": "id_field",
    "validate.non.null": "false",
    "topic.prefix": "TEST-",
    "tasks.max": "8",
    "numeric.mapping":"best_fit"
},
"name": "sqlserver-MYBASE-test"
}

这是我的信宿连接:

{
"name": "s3-sink-MYBASE",
"config":
{
    "topics": "TEST-TABLE1, TEST_TABLE2",
    "topics.dir": "DATABASE_FULL",
    "s3.part.size": 5242880,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "tasks.max": 8,
    "schema.compatibility": "NONE",
    "s3.region": "eu-central-1",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "s3.bucket.name": "mydatalake",
    "flush.size": 1,
    "transforms":"InsertSourceDetails",  
    "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertSourceDetails.static.field":"DATABASE",
    "transforms.InsertSourceDetails.static.value":"MYBASE"
}
}

问题是,一些领域在SQLSERVER输入数字和在datalake抵当卡夫卡对它们进行转换BINARY

下面是schema_registry结果:

{"type": "record",
"name": "TEST-TABLE1",
"fields": [
{
  "name": "my_numericfield",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 6,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "6"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
},
{
  "name": "id_field",
  "type": "int"
},
{
  "name": "cbCreateur",
  "type": [
    "null",
    "string"
  ],
  "default": null
},
{
  "name": "time_field",
  "type": [
    "null",
    {
      "type": "long",
      "connect.version": 1,
      "connect.name": "org.apache.kafka.connect.data.Timestamp",
      "logicalType": "timestamp-millis"
    }
  ],
  "default": null
},
],
"connect.name": "TEST-TABLE1"}

这里是火花脚本和结果:

...: from pyspark.sql.functions import col 
...: AWS_ID='xxxxxxxxxxxxxxxxx'
...: AWS_KEY='xxxxxxxxxxxxxxxxx/'
...: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ID)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_KEY)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
...: spark.conf.set('spark.cassandra.connection.host', 'localhost')
...: spark.conf.set('spark.cassandra.connection.port', 9042)
...: spark.conf.set('spark.cassandra.auth.username', 'cassandra')
...: spark.conf.set('spark.cassandra.auth.password', 'cassandra')
...: 
...: 

   :    F_TEST-TABLE1 = spark.read.format('com.databricks.spark.avro').load('s3a://mydatalake/DATABASE_FULL/TEST-TABLE1').drop('partition')
...:    DF_TEST-TABLE1 = F_TEST-TABLE1.toDF(*[c.lower() for c in TEST-TABLE1.columns])
...: 
...: 

: DF_TEST-TABLE1.printSchema()
root
 |-- my_numericfield: binary (nullable = true)
 |-- id_field: integer (nullable = true)
 |-- time_field: long (nullable = true)


: DF_TEST-TABLE1.createTempView("event")

: spark.sql("select * from event").show(1, False)
+----------------+--------+--------------+
||my_numericfield|id_field|time_field    |
+----------------+-----------+-----------+
|[00]            | 5      |1542733800000 |
+----------------+--------+--------------+
only showing top 1 row


: DF_TEST-TABLE1.write.format('org.apache.spark.sql.cassandra').options(keyspace='sage_full', table='f_test-table1').option('confirm.truncate', True).save(mode='overwrite')
18/11/22 08:29:05 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 3)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [B@6d0d5743 of type class [B to java.lang.BigDecimal.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:190)

我想投在飞行中的字段,以匹配数字类型(即浮动),但我不能找到一个办法做到这一点不知道提前字段名称

与白名单中的参数中,连接器过程中,两个表,而不字段描述在连接器配置

有没有办法做投在飞行的所有数字字段?

谢谢你的帮助

文章来源: Cast numeric fields with kafka connect and table.whitelist