Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miovision Aggregation Improvements (ATR view, intersection_movements AFTER INSERT trigger, unacceptable_gaps) #1024

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 3 additions & 19 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def check_partitions():
create_annual_partition = PostgresOperator(
task_id='create_annual_partitions',
sql=["SELECT miovision_api.create_yyyy_volumes_partition('volumes', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'datetime_bin')",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt_unfiltered', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
postgres_conn_id='miovision_api_bot',
autocommit=True
)
Expand Down Expand Up @@ -168,21 +167,6 @@ def zero_volume_anomalous_ranges_task(ds = None, **context):
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
agg_zero_volume_anomalous_ranges(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_15_min_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
#no user specified intersection
if context["params"]["intersection"] == [0]:
with mio_postgres.get_conn() as conn:
aggregate_15_min(conn, time_period=time_period)
#user specified intersection
else:
INTERSECTIONS = tuple(context["params"]["intersection"])
with mio_postgres.get_conn() as conn:
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
aggregate_15_min(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_volumes_daily_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
Expand Down Expand Up @@ -213,7 +197,7 @@ def get_report_dates_task(ds = None, **context):
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
get_report_dates(conn, time_period=time_period, intersections=intersections)

find_gaps_task() >> aggregate_15_min_mvt_task() >> [aggregate_15_min_task(), zero_volume_anomalous_ranges_task()] >> aggregate_volumes_daily_task()
find_gaps_task() >> aggregate_15_min_mvt_task() >> zero_volume_anomalous_ranges_task() >> aggregate_volumes_daily_task()
get_report_dates_task()

t_done = ExternalTaskMarker(
Expand All @@ -225,7 +209,7 @@ def get_report_dates_task(ds = None, **context):
@task_group(tooltip="Tasks to check critical data quality measures which could warrant re-running the DAG.")
def data_checks():
data_check_params = {
"table": "miovision_api.volumes_15min_mvt",
"table": "miovision_api.volumes_15min_mvt_unfiltered",
"lookback": '60 days',
"dt_col": 'datetime_bin',
"threshold": 0.7
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
CREATE OR REPLACE FUNCTION miovision_api.fn_add_intersection_movement_padding_values()
RETURNS trigger
LANGUAGE 'plpgsql'

COST 100
VOLATILE SECURITY DEFINER PARALLEL UNSAFE
AS $BODY$

DECLARE n_inserted numeric;

BEGIN

WITH temp AS (
-- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
SELECT
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
0 AS volume
FROM new_rows AS im
JOIN miovision_api.intersections AS i USING (intersection_uid)
CROSS JOIN generate_series(
i.date_installed,
LEAST(
i.date_decommissioned,
(CURRENT_TIMESTAMP AT TIME ZONE 'Canada/Eastern')::date
) - interval '15 minutes',
interval '15 minutes'
) AS dt(datetime_bin)
WHERE
--0 padding for certain modes (padding)
im.classification_uid IN (1,2,6,10)

UNION ALL

--real volumes
SELECT
v.intersection_uid,
datetime_bin_15(v.datetime_bin) AS datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
SUM(volume)
FROM miovision_api.volumes AS v
JOIN new_rows USING (
intersection_uid, classification_uid, leg, movement_uid
)
GROUP BY
v.intersection_uid,
datetime_bin_15(v.datetime_bin),
v.classification_uid,
v.leg,
v.movement_uid
),

aggregate_insert AS (
INSERT INTO miovision_api.volumes_15min_mvt_unfiltered(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)
SELECT DISTINCT ON (
v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
)
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
v.volume
FROM temp AS v
JOIN miovision_api.intersections AS i USING (intersection_uid)
--set unacceptable gaps as null
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
v.datetime_bin >= i.date_installed + interval '1 day'
AND (
i.date_decommissioned IS NULL
OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
)
ORDER BY
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
--select real value instead of padding value if available
v.volume DESC
RETURNING intersection_uid, volume_15min_mvt_uid, datetime_bin, classification_uid, leg, movement_uid, volume
),

updated AS (
--To update foreign key for 1min bin table
UPDATE miovision_api.volumes AS v
SET volume_15min_mvt_uid = a_i.volume_15min_mvt_uid
FROM aggregate_insert AS a_i
WHERE
v.volume_15min_mvt_uid IS NULL
AND a_i.volume > 0
AND v.intersection_uid = a_i.intersection_uid
AND v.datetime_bin >= a_i.datetime_bin
AND v.datetime_bin < a_i.datetime_bin + interval '15 minutes'
AND v.classification_uid = a_i.classification_uid
AND v.leg = a_i.leg
AND v.movement_uid = a_i.movement_uid
)

SELECT COUNT(*) INTO n_inserted
FROM aggregate_insert;

RAISE NOTICE '% Done adding to 15min MVT bin based on intersection_movement new rows. % rows added.', timeofday(), n_inserted;
RETURN NULL;
END;

$BODY$;

ALTER FUNCTION miovision_api.fn_add_intersection_movement_padding_values()
OWNER TO miovision_admins;

GRANT EXECUTE ON FUNCTION miovision_api.fn_add_intersection_movement_padding_values()
TO miovision_admins;

GRANT EXECUTE ON FUNCTION miovision_api.fn_add_intersection_movement_padding_values()
TO miovision_api_bot;

COMMENT ON FUNCTION miovision_api.fn_add_intersection_movement_padding_values() IS
'This function is called using a trigger after each statement on insert into
miovision_api.intersection_movements. It uses newly inserted rows to update the zero padding
values in miovision_api.volumes_15min_mvt_unfiltered';
108 changes: 0 additions & 108 deletions volumes/miovision/sql/function/function-aggregate-volumes_15min.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ WITH temp AS (
),

aggregate_insert AS (
INSERT INTO miovision_api.volumes_15min_mvt(
INSERT INTO miovision_api.volumes_15min_mvt_unfiltered(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)
SELECT DISTINCT ON (
Expand All @@ -74,18 +74,9 @@ aggregate_insert AS (
v.classification_uid,
v.leg,
v.movement_uid,
CASE
--set unacceptable gaps as nulls
WHEN un.datetime_bin IS NOT NULL THEN NULL
--gap fill with zeros (restricted to certain modes in temp CTE)
ELSE v.volume
END AS volume
v.volume
FROM temp AS v
JOIN miovision_api.intersections AS i USING (intersection_uid)
--set unacceptable gaps as null
LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
intersection_uid, datetime_bin
)
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
Expand Down Expand Up @@ -132,9 +123,9 @@ OWNER TO miovision_admins;
GRANT EXECUTE ON FUNCTION miovision_api.aggregate_15_min_mvt(date, date, integer [])
TO miovision_api_bot;

COMMENT ON FUNCTION miovision_api.aggregate_15_min_mvt(date, date, integer [])
COMMENT ON FUNCTION miovision_api.aggregate_15_min_mvt(date, date, integer [])
IS '''Aggregates valid movements from `miovision_api.volumes` in to
`miovision_api.volumes_15min_mvt` as 15 minute turning movement counts (TMC) bins and fills
`miovision_api.volumes_15min_mvt_unfiltered` as 15 minute turning movement counts (TMC) bins and fills
in gaps with 0-volume bins. Also updates foreign key in `miovision_api.volumes`. Takes an
optional intersection array parameter to aggregate only specific intersections. Use
`clear_15_min_mvt()` to remove existing values before summarizing.''';
Expand Down
53 changes: 0 additions & 53 deletions volumes/miovision/sql/function/function-clear-volumes_15min.sql

This file was deleted.

Loading