Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cdc.sql #861

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions examples/cdc/cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@ CREATE MATERIALIZED VIEW mv_customers_r INTO customers AS
1::int8 as _tp_delta
FROM customers_cdc WHERE raw:payload.op='r' SETTINGS seek_to='earliest';
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS
WITH before AS(
SELECT to_time(raw:payload.ts_ms)-1ms AS _tp_time,
raw:payload.before.id::int AS id,
raw:payload.before.first_name AS first_name,
raw:payload.before.last_name AS last_name,
raw:payload.before.email AS email,
-1::int8 as _tp_delta
FROM customers_cdc WHERE raw:payload.op='u' SETTINGS seek_to='earliest'
),after AS(
SELECT to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.after.id::int AS id,
raw:payload.after.first_name AS first_name,
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email,
1::int8 as _tp_delta
FROM customers_cdc WHERE raw:payload.op='u' SETTINGS seek_to='earliest'
)SELECT * FROM before UNION SELECT * FROM after;
WITH cdc_changes AS
(
SELECT
ts_ms, array_join(changes) AS change, change.1 as val, change.2 AS _tp_delta
FROM
(
SELECT
to_time(raw:payload.ts_ms) AS ts_ms, raw:payload.before AS before, raw:payload.after AS after, [(before, -1::int8), (after, 1::int8)] AS changes
FROM
default.customers_cdc
jovezhong marked this conversation as resolved.
Show resolved Hide resolved
WHERE
raw:payload.op = 'u'
SETTINGS
seek_to = 'earliest'
)
)
SELECT
ts_ms AS _tp_time, val:id:int32 AS id, val:first_name AS first_name, val:last_name AS last_name, val:email AS email, _tp_delta
jovezhong marked this conversation as resolved.
Show resolved Hide resolved
FROM
cdc_changes

CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS
SELECT to_time(raw:payload.ts_ms) AS _tp_time,
Expand All @@ -51,4 +54,4 @@ CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email,
1::int8 as _tp_delta
FROM customers_cdc WHERE raw:payload.op='c' SETTINGS seek_to='earliest';
FROM customers_cdc WHERE raw:payload.op='c' SETTINGS seek_to='earliest';