forked from singlestore-labs/demo-realtime-digital-marketing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schema.sql
264 lines (225 loc) · 7.5 KB
/
schema.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
create rowstore table if not exists worldcities (
city_id BIGINT NOT NULL PRIMARY KEY,
city_name TEXT NOT NULL,
center GEOGRAPHYPOINT NOT NULL,
INDEX (center)
);
create rowstore table if not exists sessions (
session_id TEXT NOT NULL,
is_controller BOOLEAN NOT NULL DEFAULT FALSE,
expires_at DATETIME(6) NOT NULL,
PRIMARY KEY (session_id)
);
create rowstore reference table if not exists cities (
city_id BIGINT NOT NULL PRIMARY KEY,
city_name TEXT NOT NULL,
center GEOGRAPHYPOINT NOT NULL,
diameter DOUBLE
);
create rowstore table if not exists subscribers (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
current_location GEOGRAPHYPOINT NOT NULL,
PRIMARY KEY (city_id, subscriber_id),
INDEX (current_location)
);
create rowstore table if not exists subscribers_last_notification (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
last_notification DATETIME(6),
PRIMARY KEY (city_id, subscriber_id),
INDEX (last_notification)
);
create table if not exists locations (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
ts DATETIME(6) NOT NULL SERIES TIMESTAMP,
lonlat GEOGRAPHYPOINT NOT NULL,
-- open location code length 8 (275m resolution)
olc_8 TEXT NOT NULL,
SHARD KEY (city_id, subscriber_id),
SORT KEY (ts),
KEY (city_id, subscriber_id) USING HASH,
KEY (olc_8) USING HASH
);
create table if not exists requests (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
ts DATETIME(6) NOT NULL SERIES TIMESTAMP,
domain TEXT NOT NULL,
SHARD KEY (city_id, subscriber_id),
SORT KEY (ts),
KEY (domain) USING HASH
);
create table if not exists purchases (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
ts DATETIME(6) NOT NULL SERIES TIMESTAMP,
vendor TEXT NOT NULL,
SHARD KEY (city_id, subscriber_id),
SORT KEY (ts),
KEY (vendor) USING HASH
);
create rowstore reference table if not exists offers (
offer_id BIGINT NOT NULL AUTO_INCREMENT,
customer TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
notification_zone GEOGRAPHY NOT NULL,
segment_ids JSON NOT NULL,
notification_content TEXT NOT NULL,
notification_target TEXT NOT NULL,
maximum_bid_cents BIGINT NOT NULL,
PRIMARY KEY (offer_id),
INDEX (notification_zone),
INDEX (customer),
INDEX (notification_target)
);
create table if not exists notifications (
ts DATETIME(6) NOT NULL SERIES TIMESTAMP,
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
offer_id BIGINT NOT NULL,
cost_cents BIGINT NOT NULL,
lonlat GEOGRAPHYPOINT NOT NULL,
SHARD KEY (city_id, subscriber_id),
SORT KEY (ts)
);
create rowstore reference table segments (
segment_id BIGINT NOT NULL,
valid_interval ENUM ("minute", "hour", "day", "week", "month") NOT NULL,
filter_kind ENUM ("olc_8", "request", "purchase") NOT NULL,
filter_value TEXT NOT NULL,
PRIMARY KEY (segment_id),
UNIQUE KEY (valid_interval, filter_kind, filter_value),
KEY (filter_kind, filter_value)
);
create rowstore table subscriber_segments (
city_id BIGINT NOT NULL,
subscriber_id BIGINT NOT NULL,
segment_id BIGINT NOT NULL,
expires_at DATETIME(6) NOT NULL,
PRIMARY KEY (city_id, subscriber_id, segment_id),
SHARD KEY (city_id, subscriber_id)
);
CREATE OR REPLACE FUNCTION match_offers_to_subscribers(
_interval ENUM("second", "minute", "hour", "day", "week", "month")
) RETURNS TABLE AS RETURN (
WITH
phase_1 as (
SELECT offers.*, subscribers.*
FROM
offers,
subscribers
-- grab last notification time for each subscriber
-- with(table_convert_subselect=true) forces a hash join
LEFT JOIN subscribers_last_notification with(table_convert_subselect=true) ON (
subscribers.city_id = subscribers_last_notification.city_id
AND subscribers.subscriber_id = subscribers_last_notification.subscriber_id
)
WHERE
offers.enabled = TRUE
-- match offers to subscribers based on current location
AND geography_contains(offers.notification_zone, subscribers.current_location)
-- ensure we don't spam subscribers
AND (
subscribers_last_notification.last_notification IS NULL
OR subscribers_last_notification.last_notification < date_sub_dynamic(NOW(), _interval)
)
-- only match (offer, subscriber) pairs such that
-- there is no matching notification in the last minute
AND NOT EXISTS (
SELECT * FROM notifications n
WHERE
ts > date_sub_dynamic(NOW(), _interval)
AND offers.offer_id = n.offer_id
AND subscribers.city_id = n.city_id
AND subscribers.subscriber_id = n.subscriber_id
)
),
phase_2 as (
select
phase_1.*,
row_number() over (
partition by phase_1.offer_id, phase_1.city_id, phase_1.subscriber_id
) as num_matching_segments
from phase_1
JOIN TABLE(JSON_TO_ARRAY(phase_1.segment_ids)) AS segment_ids
LEFT JOIN subscriber_segments segment ON (
phase_1.city_id = segment.city_id
AND phase_1.subscriber_id = segment.subscriber_id
AND (segment_ids.table_col :> BIGINT) = segment.segment_id
)
)
select
city_id,
subscriber_id,
-- window to find one offer with highest bid per subscriber
last_value(offer_id) over window_best_offer as best_offer_id,
last_value(maximum_bid_cents) over window_best_offer as cost_cents,
current_location
from phase_2
where json_length(segment_ids) = num_matching_segments
group by city_id, subscriber_id
WINDOW window_best_offer as (
partition by city_id, subscriber_id
order by maximum_bid_cents asc
)
);
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_locations(
_since DATETIME(6),
_until DATETIME(6)
) RETURNS TABLE AS RETURN (
SELECT
city_id, subscriber_id, segment_id,
MAX(date_add_dynamic(ts, segments.valid_interval)) AS expires_at
FROM segments, locations
WHERE
segments.filter_kind = "olc_8"
AND segments.filter_value = locations.olc_8
AND ts >= date_sub_dynamic(NOW(6), segments.valid_interval)
AND ts >= _since
AND ts < _until
GROUP BY city_id, subscriber_id, segment_id
);
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_requests(
_since DATETIME(6),
_until DATETIME(6)
) RETURNS TABLE AS RETURN (
SELECT
city_id, subscriber_id, segment_id,
MAX(date_add_dynamic(ts, segments.valid_interval)) AS expires_at
FROM segments, requests
WHERE
segments.filter_kind = "request"
AND segments.filter_value = requests.domain
AND ts >= date_sub_dynamic(NOW(6), segments.valid_interval)
AND ts >= _since
AND ts < _until
GROUP BY city_id, subscriber_id, segment_id
);
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments_purchases(
_since DATETIME(6),
_until DATETIME(6)
) RETURNS TABLE AS RETURN (
SELECT
city_id, subscriber_id, segment_id,
MAX(date_add_dynamic(ts, segments.valid_interval)) AS expires_at
FROM segments, purchases
WHERE
segments.filter_kind = "purchase"
AND segments.filter_value = purchases.vendor
AND ts >= date_sub_dynamic(NOW(6), segments.valid_interval)
AND ts >= _since
AND ts < _until
GROUP BY city_id, subscriber_id, segment_id
);
CREATE OR REPLACE FUNCTION dynamic_subscriber_segments(
_since DATETIME(6),
_until DATETIME(6)
) RETURNS TABLE AS RETURN (
SELECT * FROM dynamic_subscriber_segments_locations(_since, _until)
UNION ALL
SELECT * FROM dynamic_subscriber_segments_requests(_since, _until)
UNION ALL
SELECT * FROM dynamic_subscriber_segments_purchases(_since, _until)
);