I have a case where I have Information
objects that contain Element
objects. If I store an Information
object it will try to find preexisting Element
objects based on a unique value field otherwise insert them. Information
objects and Element
objects can't be deleted for now. Adding a parent needs two preexisting Element
objects. I was planning to use three topics: CreateElement
, CreateInformation
, AddParentOfElement
for the events Created Element Event
, Created Information Event
and Added Parent Event
. I realized since there are no order guarantees between topics and between topic-partitions that those events as shown in the picture could be consumed in different order so the schema won't be able to be persisted to an RDBMS for example. I assume that ids are used for partition assignment of the Topics as usual.
Here is my diagram:
The scenario is
Element
with (id=1) was created by user
Information
with (id=1) containing Elements
(1,2,3) was created
by user
Element
with (id=5) was created by user
- Parent of
Element
with (id=5) was set to be Element
with (id=3)
by the user
Information
with (id=2) containing Elements
(1,3 and 5) was
created by the user
I am curious if my topic selections are making sense and I would appreciate any suggestions on how to have events that when are processed by consumer database services are idempotent - don't put the system in the wrong state.
Thanks!
After considering this solution: How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service but not being satisfied with the suggestions. I investigated Confluent Bottled Water (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) and later the more active but similar Debezium (http://debezium.io/)
Ι decided to follow the Debezium way. Debezium is a plugin that reads directly from Mysql/Postgres binlog and publishes those changes (schema and data) in Kafka.
The example setup I am using involves docker and here it is how I set it up for Docker Toolbox (Windows) and Docker (Linux).
1a) Linux (Docker)
sudo docker stop $(sudo docker ps -a -q) \
sudo docker rm -f $(sudo docker ps -a -q) \
sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \
sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \
sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \
sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \
sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \
sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest
1b) Windows (Docker Toolbox)
docker stop $(docker ps -a -q) ;
docker rm -f $(docker ps -a -q) ;
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ;
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ;
docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ;
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ;
docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ;
docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ;
2) connect the databse to the debezium connect
send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Debezium creates kafka topics one for each table - by navigating to the landoop/kafka-topics-ui server on port 8000 you can have a look on how the schema of the message payloads look like below. The important part is the payload
before
and after
that sends the old values and the new values of the corresponding database row. Also op
is 'c' for create 'u' for update etc.
Each consuming Microservice is using spring-cloud kafka binders using those maven dependencies:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
[...]
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
[...]
</dependencies>
Then I have in each of my consuming Spring Cloud Microservices a Listener that listens to all of the topics that it's interested in at once and delegates each topic event to a dedicated event handler:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Listener {
public final CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(id = "listener", topics = {
"dbserver1.inventory.entity",
"dbserver1.inventory.attribute",
"dbserver1.inventory.entity_types"
} , group = "group1")
public void listen(ConsumerRecord<?, ?> record) {
String topic = record.topic();
if (topic.equals("dbserver1.inventory.entity") {
// delegate to appropriate handler
// EntityEventHandler.handle(record);
}
else if (...) {}
}
}
In my case I wanted to be updating a graph based on the changes that happen on the RDBMS side. Of course the graph database will be eventually consistent with RDBMS. My concern was that since the topics include changes e.g in join_tables as well as the joined table sides, I wouldn't be able to create the corresponding edges and vertices without knowing that each of the vertices of the edges exist. So I decided to ask debezium gitter (https://gitter.im/debezium/dev):
From the discussion below two ways exist..Either create edges and vertices using placeholders for topics that haven't been consumed yet or use Kafka Streams to seam topics back to their original structures something that seems more painful to me than the first way. So I decided to go with the first way :)
Michail Michailidis @zifnab87 Apr 17 11:23 Hi I was able to integrate
Mysql with Debezium Connect and using landoop/topics-ui I am able to
see that the topics are picked up properly and messages are sent the
way they have to. I saw that for each of the tables there is a topic.
e.g join tables are separate topics too.. If lets say I have three
tables order, product and order_product and I have a service consuming
all three topics.. I might get first the insertion on order_product
and then the insertion of order.. That may cause a problem if I am
trying to push this information to a graph database.. I will try to
create an edge on vertex that is not there yet.. how can I make
consumers that consume events lets say based on a transactionId or at
least are aware of the boundary context.. is there an easy way to
listen to those events and then deserialize them to a real java object
so I can push that to a graph database or search index? If not how
would you approach this problem? Thanks!
Randall Hauch @rhauch Apr 17 19:19 @zifnab87 Debezium CDC is purely a
row-based approach, so by default all consumers see the row-level
change events in an eventually consistent manner. Of course, the
challenge to eventual consistency of downstream systems is that they
might potentially leak data states that never really existed in the
upstream source. But with that come lots of other really huge
benefits: downstream consumers are much simpler, more resilient to
failure, have lower latency (since there’s no need to wait for the
appearance of the upstream transaction’s completion before
processing), and are less decoupled to the upstream system. You gave
the example of an order and product tables with an order_product
intersect table. I agree that when thinking transactionally it does
not make sense for an order_product relationship to be added before
both the order and product instances exist. But are you required to
live with that constraint? Can the order_product consumer create
placeholder nodes in the graph database for any missing order and/or
product values referenced by the relationship? In this case when the
order_product consumer is a bit ahead of the order consumer, it might
create an empty order node with the proper key or identifier, and when
the order consumer finally processes the new order it would find the
existing placeholder node and fill in the details. Of course, when the
order arrives before the order_product relationships, then everything
works as one might expect. This kind of approach might not be allowed
by the downstream graph database system or the business-level
constraints defined in the graph database. But if it is allowed and
the downstream applications and services are designed to handle such
states, then you’ll benefit from the significant simplicity that this
approach affords, as the consumers become almost trivial. You’ll be
managing less intermediate state and your system will be more likely
to continue operating when things go wrong (e.g., consumers crash or
are taken down for maintenance). If your downstream consumers do have
to stick with ahering to the transaction boundaries in the source
database, then you might consider using Kafka Streams to join the
order and order_product topics and produce a single aggregate order
object with all the relationships to the referenced products. If you
can’t assume the product already exists, then you could also join with
the product topic to add more product detail to the aggregate order
object. Of course, there still are lots of challenges, since the only
way for a stream processor consuming these streams to know it’s seen
all of the row-level change events for a given transaction is when a
subsequent transaction is seen on each of the streams. As you might
expect, this is not ideal, since the last transaction prior to any
quiet period will not complete immediately.
Michail Michailidis @zifnab87 Apr 17 23:49 Thanks @rhauch really well
explained! I was investigating Kafka Streams while waiting for your
answers! now I am thinking I will try to code the placeholder
variation e.g when a vertex is not there etc
Randall Hauch @rhauch Apr 17 23:58 @zifnab87 glad it helped, at least
a bit! Be sure you also consider the fact that the consumer might see
a sequence of messages that it already has consumed. That will only
happen when something goes wrong (e.g., with the connector or the
process(es) where the connector is running, or the broker, network
partition, etc.); when everything is operating normally, the consumer
should see no duplicate messages.
Michail Michailidis @zifnab87 Apr 18 01:15 @rhauch Sure it helped!
Yeap I have that in mind - consumer processes need to be idempotent. I
am curious if for example sinks for lets say elastic search, mongodb
and graph databases can be implemented to consolidate events produced
from debezium-mysql no matter what the order by using placeholders for
missing things.. e.g the mountaineer sinks are doing that alreadu if
you know by any chance? I am trying to avoid reimplementing things
that already exist.. Also my solutions might be very fragile if mysql
schema changes and I dont consume the new events.. I feel so many
things are missing around the microservices world
Randall Hauch @rhauch Apr 18 03:30 I'm not sure how those sinks work.
Ideally they should handle create, update, and delete events
correctly. But because Debezium events have an envelope at the top
level of every event, you'll probably have to use SMTs to grab the
contents of the after field (or exclude the before field) so the
"meaningful" parts are put into the sink system. This will probably
get easier as more SMTs get added to KC. If you find that it takes too
many SMTs and would rather Debezium added an SMT that did this, please
log a feature request in JIRA.
Hopefully this answer/guide will help others jump start event sourcing having as a central piece a message broker like Kafka.