How to configure Kafka topics so an interconnected

2020-06-28 12:00发布

I have a case where I have Information objects that contain Element objects. If I store an Information object it will try to find preexisting Element objects based on a unique value field otherwise insert them. Information objects and Element objects can't be deleted for now. Adding a parent needs two preexisting Element objects. I was planning to use three topics: CreateElement, CreateInformation, AddParentOfElement for the events Created Element Event, Created Information Event and Added Parent Event. I realized since there are no order guarantees between topics and between topic-partitions that those events as shown in the picture could be consumed in different order so the schema won't be able to be persisted to an RDBMS for example. I assume that ids are used for partition assignment of the Topics as usual.

Here is my diagram:

enter image description here

The scenario is

  1. Element with (id=1) was created by user
  2. Information with (id=1) containing Elements (1,2,3) was created by user
  3. Element with (id=5) was created by user
  4. Parent of Element with (id=5) was set to be Element with (id=3) by the user
  5. Information with (id=2) containing Elements (1,3 and 5) was created by the user

I am curious if my topic selections are making sense and I would appreciate any suggestions on how to have events that when are processed by consumer database services are idempotent - don't put the system in the wrong state.

Thanks!

1条回答
家丑人穷心不美
2楼-- · 2020-06-28 12:53

After considering this solution: How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service but not being satisfied with the suggestions. I investigated Confluent Bottled Water (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) and later the more active but similar Debezium (http://debezium.io/)

Ι decided to follow the Debezium way. Debezium is a plugin that reads directly from Mysql/Postgres binlog and publishes those changes (schema and data) in Kafka.

The example setup I am using involves docker and here it is how I set it up for Docker Toolbox (Windows) and Docker (Linux).

1a) Linux (Docker)

sudo docker stop $(sudo docker ps -a -q) \
sudo docker rm -f $(sudo docker ps -a -q) \
sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \
sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \
sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \
sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \
sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \
sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest

1b) Windows (Docker Toolbox)

docker stop $(docker ps -a -q) ;
docker rm -f $(docker ps -a -q) ;
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ;
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ;
docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ;
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ;
docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ;
docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ;

2) connect the databse to the debezium connect

send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body
{
    "name": "inventory-connector",
    "config": {
        "name": "inventory-connector",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

Debezium creates kafka topics one for each table - by navigating to the landoop/kafka-topics-ui server on port 8000 you can have a look on how the schema of the message payloads look like below. The important part is the payload before and after that sends the old values and the new values of the corresponding database row. Also op is 'c' for create 'u' for update etc.

enter image description here

Each consuming Microservice is using spring-cloud kafka binders using those maven dependencies:

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Brixton.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>1.5.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
                <version>1.2.0.RELEASE</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies> 
    [...]
       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
       </dependency>
    [...]
    </dependencies>

Then I have in each of my consuming Spring Cloud Microservices a Listener that listens to all of the topics that it's interested in at once and delegates each topic event to a dedicated event handler:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Listener {

    public final CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(id = "listener", topics = {
            "dbserver1.inventory.entity",
            "dbserver1.inventory.attribute",
            "dbserver1.inventory.entity_types"
    } , group = "group1")
    public void listen(ConsumerRecord<?, ?> record) {
        String topic = record.topic();
        if (topic.equals("dbserver1.inventory.entity") {
            // delegate to appropriate handler
            // EntityEventHandler.handle(record);
        }
        else if (...) {}
    }
}

In my case I wanted to be updating a graph based on the changes that happen on the RDBMS side. Of course the graph database will be eventually consistent with RDBMS. My concern was that since the topics include changes e.g in join_tables as well as the joined table sides, I wouldn't be able to create the corresponding edges and vertices without knowing that each of the vertices of the edges exist. So I decided to ask debezium gitter (https://gitter.im/debezium/dev):

From the discussion below two ways exist..Either create edges and vertices using placeholders for topics that haven't been consumed yet or use Kafka Streams to seam topics back to their original structures something that seems more painful to me than the first way. So I decided to go with the first way :)

Michail Michailidis @zifnab87 Apr 17 11:23 Hi I was able to integrate Mysql with Debezium Connect and using landoop/topics-ui I am able to see that the topics are picked up properly and messages are sent the way they have to. I saw that for each of the tables there is a topic. e.g join tables are separate topics too.. If lets say I have three tables order, product and order_product and I have a service consuming all three topics.. I might get first the insertion on order_product and then the insertion of order.. That may cause a problem if I am trying to push this information to a graph database.. I will try to create an edge on vertex that is not there yet.. how can I make consumers that consume events lets say based on a transactionId or at least are aware of the boundary context.. is there an easy way to listen to those events and then deserialize them to a real java object so I can push that to a graph database or search index? If not how would you approach this problem? Thanks!

Randall Hauch @rhauch Apr 17 19:19 @zifnab87 Debezium CDC is purely a row-based approach, so by default all consumers see the row-level change events in an eventually consistent manner. Of course, the challenge to eventual consistency of downstream systems is that they might potentially leak data states that never really existed in the upstream source. But with that come lots of other really huge benefits: downstream consumers are much simpler, more resilient to failure, have lower latency (since there’s no need to wait for the appearance of the upstream transaction’s completion before processing), and are less decoupled to the upstream system. You gave the example of an order and product tables with an order_product intersect table. I agree that when thinking transactionally it does not make sense for an order_product relationship to be added before both the order and product instances exist. But are you required to live with that constraint? Can the order_product consumer create placeholder nodes in the graph database for any missing order and/or product values referenced by the relationship? In this case when the order_product consumer is a bit ahead of the order consumer, it might create an empty order node with the proper key or identifier, and when the order consumer finally processes the new order it would find the existing placeholder node and fill in the details. Of course, when the order arrives before the order_product relationships, then everything works as one might expect. This kind of approach might not be allowed by the downstream graph database system or the business-level constraints defined in the graph database. But if it is allowed and the downstream applications and services are designed to handle such states, then you’ll benefit from the significant simplicity that this approach affords, as the consumers become almost trivial. You’ll be managing less intermediate state and your system will be more likely to continue operating when things go wrong (e.g., consumers crash or are taken down for maintenance). If your downstream consumers do have to stick with ahering to the transaction boundaries in the source database, then you might consider using Kafka Streams to join the order and order_product topics and produce a single aggregate order object with all the relationships to the referenced products. If you can’t assume the product already exists, then you could also join with the product topic to add more product detail to the aggregate order object. Of course, there still are lots of challenges, since the only way for a stream processor consuming these streams to know it’s seen all of the row-level change events for a given transaction is when a subsequent transaction is seen on each of the streams. As you might expect, this is not ideal, since the last transaction prior to any quiet period will not complete immediately.

Michail Michailidis @zifnab87 Apr 17 23:49 Thanks @rhauch really well explained! I was investigating Kafka Streams while waiting for your answers! now I am thinking I will try to code the placeholder variation e.g when a vertex is not there etc

Randall Hauch @rhauch Apr 17 23:58 @zifnab87 glad it helped, at least a bit! Be sure you also consider the fact that the consumer might see a sequence of messages that it already has consumed. That will only happen when something goes wrong (e.g., with the connector or the process(es) where the connector is running, or the broker, network partition, etc.); when everything is operating normally, the consumer should see no duplicate messages.

Michail Michailidis @zifnab87 Apr 18 01:15 @rhauch Sure it helped! Yeap I have that in mind - consumer processes need to be idempotent. I am curious if for example sinks for lets say elastic search, mongodb and graph databases can be implemented to consolidate events produced from debezium-mysql no matter what the order by using placeholders for missing things.. e.g the mountaineer sinks are doing that alreadu if you know by any chance? I am trying to avoid reimplementing things that already exist.. Also my solutions might be very fragile if mysql schema changes and I dont consume the new events.. I feel so many things are missing around the microservices world

Randall Hauch @rhauch Apr 18 03:30 I'm not sure how those sinks work. Ideally they should handle create, update, and delete events correctly. But because Debezium events have an envelope at the top level of every event, you'll probably have to use SMTs to grab the contents of the after field (or exclude the before field) so the "meaningful" parts are put into the sink system. This will probably get easier as more SMTs get added to KC. If you find that it takes too many SMTs and would rather Debezium added an SMT that did this, please log a feature request in JIRA.

Hopefully this answer/guide will help others jump start event sourcing having as a central piece a message broker like Kafka.

查看更多
登录 后发表回答