I'm trying to use KSQL (as part of confluent-5.0.0) to create a single record out of a set of parent records and child records, where every parent record has multiple child records (spefically, payment details and the parties involved in the payment). These parent/child records are linked by the parent's id. To illustrate, I'm dealing with records of roughly this structure in the source system:
payment:
| id | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 |
| pmt02 | USD | 13000 | 2018-11-23 |
payment_parties:
| id | payment_id | party_type | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01 | sender | XXYYZZ23 | (null) |
| prt02 | pmt01 | intermediary | AADDEE98 | 123456789 |
| prt03 | pmt01 | receiver | FFGGHH56 | 987654321 |
| prt04 | pmt02 | sender | XXYYZZ23 | (null) |
| prt05 | pmt02 | intermediary | (null) | (null) |
| prt06 | pmt02 | receiver | FFGGHH56 | 987654321 |
These records are loaded, in AVRO format, onto a set of Kafka topics using Oracle Golden Gate, with one topic for every table. This means the following topics exist: src_payment
and src_payment_parties
. As per the way the source system functions, the timestamps of these records fall within several milliseconds.
Now, the intent is to 'flatten' these records into a single record, which will be consumed from an outgoing topic. To illustrate, for the records above, the desired output would be along these lines:
payment_flattened:
| id | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | (null) | AADDEE98 | 123456789 | FFGGHH56 | 987654321 |
| pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | (null) | (null) | (null) | FFGGHH56 | 987654321 |
The first question I'd like to ask here, is the following: How can I best achieve this combination of data from the source topics?
Of course, I have tried some actions myself. In the interest of brevity, I'll describe what I have tried to achieve appending the first of the payment parties to the payment records.
Step one: set up the source streams
Note: due to the OGG setup adding a property called 'table' to the AVRO schema, I have to specify the fields to take from the topic. Additionally, I'm not interested in the fields specifying the type of operation (e.g. insert or update).
create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');
create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');
Step two: create stream for the payment senders
Note: from what I've gathered from the documentation, and found out from experimenting, in order to be able to join the payment stream to a payment party stream, the latter needs to be partitioned by the payment id. Additionally, the only way I have gotten the join to work is by renaming the column.
create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;
Step three: join two streams
Note: I'm using a left join, because not all parties are present for every payment. As in the example records above, where pmt02
does not have an intermediary.
create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;
Now, the output I would expect from this stream is something along these lines:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Instead, the output I'm seeing is along these lines:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Hence, the second (two-part) question I'd like to ask is: Why does the left join produce these duplicate records? And can this be avoided?
Apologies for the wall of text, I tried to be as complete as possible in the description of the issue. Of course, I'd be happy to add any possible missing information, and answer questions regarding the setup to the best of my knowledge.