Kafka connect setup to send record from Aurora usi

2020-06-29 07:33发布

I have to send records from Aurora/Mysql to MSK and from there to Elastic search service

Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->Elastic search

The record in Aurora table structure is something like this
I think record will go to AWS MSK in this format.

"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""

So in order to consume by elastic search i need to use proper schema so schema registry i have to use.

My question

Question 1

How should i use schema registry for above type of message schema registry is required ?. Do i have to create JSON structure for this and if yes where i have keep that. More help required here to understand this ?

I have edited

vim /usr/local/confluent/etc/schema-registry/schema-registry.properties

Mentioned zookeper but i did not what is kafkastore.topic=_schema How to link this to custom schema .

Even i started and got this error

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.

Which i was expecting because i did not do anything about schema .

I do have jdbc connector installed and when i start i get below error

Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`

Question 2 Can i create two onnector on one ec2 (jdbc and elastic serach one ).If yes do i have to start both in sepearte cli ?

Question 3 When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties I see only propeties value like below

name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-

In the above properties file where i can mention schema name and table name?

Based on answer i am updating my configuration for Kafka connect JDBC

---------------start JDBC connect elastic search -----------------------------

wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
tar -xzf  mysql-connector-java-5.1.48.tar.gz
sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc

And then

vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

Then i modified below properties

connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
mode=incrementing
connection.user=admin
connection.password=Welcome123
table.whitelist=PANStatementInstanceLog
schema.pattern=dbo

Last i modified

vim /usr/local/confluent/etc/kafka/connect-standalone.properties

and here i modified below properties

bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

When i list topic i do not see any topic listed for table name .

Stack trace for the error message

[2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)

        curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'

2条回答
时光不老,我们不散
2楼-- · 2020-06-29 08:03

I'm guessing that you're planning to use AVRO in order to transfer data so don't forget to specify AVROConverter as the default converter when you start up your Kafka Connect workers. If you will use JSON then Schema Registry is not needed.

1.1 kafkastore.topic=_schema

Have you started up your own schema registry? When you start Schema Registry you'll have to specify the "schemas" topic. Basically, this topic will be used by Schema Registry to store the schemas registered by it and in case of a failure, it can recover them from there.

1.2 jdbc connector installed and when i start i get below error By default, JDBC Connector only works with SQLite and PostgreSQL. If you would like it to work with a MySQL database then you should add the MySQL Driver to the classpath as well.

2.It depends on how you are deploying your Kafka Connect workers. If you go for Distributed mode ( recommended ) then you don't really need separate CLI's. You can deploy your connectors through the Kafka Connect REST API.

3.There is another property called table.whitelist on which you can specify your schemas and tables. e.g: table.whitelistusers,products,transactions

查看更多
Anthone
3楼-- · 2020-06-29 08:09

schema registry is required ?

No. You can enable schemas in json records. JDBC source can create them for you based on the table information

value.converter=org.apache.kafka...JsonConverter 
value.converter.schemas.enable=true

Mentioned zookeper but i did not what is kafkastore.topic=_schema

If you want to use Schema Registry, you should be using kafkastore.bootstrap.servers.with the Kafka address, not Zookeeper. So remove kafkastore.connection.url

Please read the docs for explanations of all properties

i did not do anything about schema .

Doesn't matter. The schemas topic gets created when the Registry first starts

Can i create two onnector on one ec2

Yes (ignoring available JVM heap space). Again, this is detailed in the Kafka Connect documentation.

Using standalone mode, you first pass the connect worker configuration, then up to N connector properties in one command

Using distributed mode, you use the Kafka Connect REST API

https://docs.confluent.io/current/connect/managing/configuring.html

When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

First of all, that's for Sqlite, not Mysql/Postgres. You don't need to use the quickstart files, they are only there for reference

Again, all properties are well documented

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

I do have jdbc connector installed and when i start i get below error

Here's more information about how you can debug that

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/


As stated before, I would personally suggest using Debezium/CDC where possible

Debezium Connector for RDS Aurora

查看更多
登录 后发表回答