I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.
Examples:
-
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
-
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
So my questions are:
- How do I specify a unique key attribute on a dynamic table in Flink?
- How do I place a dynamic table in update/upsert/"replace" mode, as opposed to append mode?
Flink 1.8 still lacks of such support. Expecting those features to be added in future : 1) LAST_VAL 2) Upsert Stream <-> Dynamic Table.
ps. LAST_VAL() seems not possible to be implemented in UDTF. Aggregation functions doesn't give attached event/proc time context. Alibaba's Blink provides an alternative implementation of LAST_VAL, but it requires another field to provide order information, not directly on event/proc time. which makes the sql code ugly. (https://help.aliyun.com/knowledge_detail/62791.html)
My work-around solution of LAST_VAL (eg.get latest ip) is something like:
The linked resources describe two different scenarios.
DataStream -> Table
conversion.Table -> DataStream
conversion.The following discussion is based on Flink 1.4.0 (Jan. 2018).
Upsert
DataStream -> Table
ConversionConverting a
DataStream
into aTable
by upsert on keys is not natively supported but on the roadmap. Meanwhile, you can emulate this behavior using an appendTable
and a query with a user-defined aggregation function.If you have an append
Table
Logins
with the schema(user, loginTime, ip)
that tracks logins of users, you can convert that into an upsertTable
keyed onuser
with the following query:The
LAST_VAL
aggregation function is a user-defined aggregation function that always returns the latest added value.Native support for upsert
DataStream -> Table
conversion would work basically the same way, although providing a more concise API.Upsert
Table -> DataStream
ConversionConverting a
Table
into an upsertDataStream
is not supported. This is also properly reflected in the documentation:We deliberately chose not to support upsert
Table -> DataStream
conversions, because an upsertDataStream
can only be processed if the key attributes are known. These depend on the query and are not always straight-forward to identify. It would be the responsibility of the developer to make sure that the key attributes are correctly interpreted. Failing to do so would result in faulty programs. To avoid problems, we decided to not offer the upsertTable -> DataStream
conversion.Instead users can convert a
Table
into a retractionDataStream
. Moreover, we supportUpsertTableSink
that writes an upsertDataStream
to an external system, such as a database or key-value store.