Apache Flink: How to enable "upsert mode" for dynamic tables?
Asked Answered
R

3

6

I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.

Examples:

  • Blog post:

    When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.

  • Documentation:

    A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.

So my questions are:

  • How do I specify a unique key attribute on a dynamic table in Flink?
  • How do I place a dynamic table in update/upsert/"replace" mode, as opposed to append mode?
Repletion answered 1/2, 2018 at 3:43 Comment(0)
B
3

Update: since Flink 1.9, LAST_VALUE is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).

Assuming the existence of the Logins table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:

SELECT 
  user, 
  LAST_VALUE(loginTime), 
  LAST_VALUE(ip) 
FROM Logins 
GROUP BY user
Barium answered 24/1, 2021 at 20:1 Comment(4)
I have pulled my hair for long hours over the inability to apply DELETE op from Debezium to a Flink SQL table and this really helped. However I did not find any doc mentioning why this func works. Do you mind share some background or refs? Thanks!Fir
Hi, The list of SQL functions that Flink supports is documented here: nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/… LAST_VALUE is mentioned there among the "aggregate functions".Barium
Thanks, I have read the official doc, but the doc made LAST_VALUE like an ordinary aggregate. However it works even for DELETE operations, which would not produce any row of data. So using LAST_VALUE and GROUP BY I expect it to still keep the last value of a row even it was deleted. However, using LAST_VALUE seems to respect DELETEs. To me this looks like there is some implict upsert table creation and insertion instead of just simple aggregate, but this was not mentioned in the docs. Something's wrong with my understanding? Thanks.Fir
Sorry, I don't have any specific information to share about CDC-based tablesBarium
K
10

The linked resources describe two different scenarios.

  • The blog post discusses an upsert DataStream -> Table conversion.
  • The documentation describes the inverse upsert Table -> DataStream conversion.

The following discussion is based on Flink 1.4.0 (Jan. 2018).

Upsert DataStream -> Table Conversion

Converting a DataStream into a Table by upsert on keys is not natively supported but on the roadmap. Meanwhile, you can emulate this behavior using an append Table and a query with a user-defined aggregation function.

If you have an append Table Logins with the schema (user, loginTime, ip) that tracks logins of users, you can convert that into an upsert Table keyed on user with the following query:

SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user

The LAST_VAL aggregation function is a user-defined aggregation function that always returns the latest added value.

Native support for upsert DataStream -> Table conversion would work basically the same way, although providing a more concise API.

Upsert Table -> DataStream Conversion

Converting a Table into an upsert DataStream is not supported. This is also properly reflected in the documentation:

Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.

We deliberately chose not to support upsert Table -> DataStream conversions, because an upsert DataStream can only be processed if the key attributes are known. These depend on the query and are not always straight-forward to identify. It would be the responsibility of the developer to make sure that the key attributes are correctly interpreted. Failing to do so would result in faulty programs. To avoid problems, we decided to not offer the upsert Table -> DataStream conversion.

Instead users can convert a Table into a retraction DataStream. Moreover, we support UpsertTableSink that writes an upsert DataStream to an external system, such as a database or key-value store.

Kwiatkowski answered 1/2, 2018 at 9:12 Comment(5)
Extremely insightful answer, definitely clears this up for me. Thank you @FabianRepletion
Hi @Fabian, is LAST_VAL implemented and available somewhere?Animato
No, it's not a built-in function (yet). You'd need to implement it as a user-defined aggregation function. However, the Flink community is working on adding proper upsert table support. If everything goes as planned it should be included in the 1.6.0 release.Kwiatkowski
Hi @FabianHueske - thanks for the tip on how to convert an upsert stream to a dynamic table via a custom UDAF. I got that working locally, where a 3 way join now fires retract updates when any of the input streams change. Do you still plan on adding native support for this conversion - I couldn't see any other way to do this in Flink 1.9. Thanks AndyProduction
Yes, this feature is still on the roadmap but right now the community is giving higher priority to other features.Kwiatkowski
B
3

Update: since Flink 1.9, LAST_VALUE is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).

Assuming the existence of the Logins table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:

SELECT 
  user, 
  LAST_VALUE(loginTime), 
  LAST_VALUE(ip) 
FROM Logins 
GROUP BY user
Barium answered 24/1, 2021 at 20:1 Comment(4)
I have pulled my hair for long hours over the inability to apply DELETE op from Debezium to a Flink SQL table and this really helped. However I did not find any doc mentioning why this func works. Do you mind share some background or refs? Thanks!Fir
Hi, The list of SQL functions that Flink supports is documented here: nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/… LAST_VALUE is mentioned there among the "aggregate functions".Barium
Thanks, I have read the official doc, but the doc made LAST_VALUE like an ordinary aggregate. However it works even for DELETE operations, which would not produce any row of data. So using LAST_VALUE and GROUP BY I expect it to still keep the last value of a row even it was deleted. However, using LAST_VALUE seems to respect DELETEs. To me this looks like there is some implict upsert table creation and insertion instead of just simple aggregate, but this was not mentioned in the docs. Something's wrong with my understanding? Thanks.Fir
Sorry, I don't have any specific information to share about CDC-based tablesBarium
N
0

Flink 1.8 still lacks of such support. Expecting those features to be added in future : 1) LAST_VAL 2) Upsert Stream <-> Dynamic Table.

ps. LAST_VAL() seems not possible to be implemented in UDTF. Aggregation functions doesn't give attached event/proc time context. Alibaba's Blink provides an alternative implementation of LAST_VAL, but it requires another field to provide order information, not directly on event/proc time. which makes the sql code ugly. (https://help.aliyun.com/knowledge_detail/62791.html)

My work-around solution of LAST_VAL (eg.get latest ip) is something like:

  1. concat(ts, ip) as ordered_ip
  2. MAX(ordered_ip) as ordered_ip
  3. extract(ordered_ip) as ip
Nag answered 26/11, 2019 at 2:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.