Skip to content

Commit

Permalink
#862 add intersection param to 3 remaining aggregate functions + adju… (
Browse files Browse the repository at this point in the history
#864)

* #862 add intersection param to 3 remaining aggregate functions + adjust dag and intersections_tmc

* #862 make alter statements specific to params
  • Loading branch information
gabrielwol authored Feb 9, 2024
1 parent 890d214 commit 3a523e6
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 58 deletions.
48 changes: 36 additions & 12 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,19 @@ def pull_miovision(ds = None, **context):
@task_group(tooltip="Tasks to aggregate newly pulled Miovision data.")
def miovision_agg():
@task
def find_gaps_task(ds = None):
def find_gaps_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
with mio_postgres.get_conn() as conn:
find_gaps(conn, time_period)
#no user specified intersection
if context["params"]["intersection"] == [0]:
with mio_postgres.get_conn() as conn:
find_gaps(conn, time_period=time_period)
#user specified intersection
else:
INTERSECTIONS = tuple(context["params"]["intersection"])
with mio_postgres.get_conn() as conn:
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
find_gaps(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_15_min_mvt_task(ds = None, **context):
Expand All @@ -138,11 +146,19 @@ def aggregate_15_min_mvt_task(ds = None, **context):
aggregate_15_min_mvt(conn, time_period=time_period, intersections=intersections)

@task
def zero_volume_anomalous_ranges_task(ds = None):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
with mio_postgres.get_conn() as conn:
agg_zero_volume_anomalous_ranges(conn, time_period)
def zero_volume_anomalous_ranges_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
#no user specified intersection
if context["params"]["intersection"] == [0]:
with mio_postgres.get_conn() as conn:
agg_zero_volume_anomalous_ranges(conn, time_period=time_period)
#user specified intersection
else:
INTERSECTIONS = tuple(context["params"]["intersection"])
with mio_postgres.get_conn() as conn:
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
agg_zero_volume_anomalous_ranges(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_15_min_task(ds = None, **context):
Expand All @@ -160,11 +176,19 @@ def aggregate_15_min_task(ds = None, **context):
aggregate_15_min(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_volumes_daily_task(ds = None):
def aggregate_volumes_daily_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
with mio_postgres.get_conn() as conn:
aggregate_volumes_daily(conn, time_period)
time_period = (ds, ds_add(ds, 1))
#no user specified intersection
if context["params"]["intersection"] == [0]:
with mio_postgres.get_conn() as conn:
aggregate_volumes_daily(conn, time_period=time_period)
#user specified intersection
else:
INTERSECTIONS = tuple(context["params"]["intersection"])
with mio_postgres.get_conn() as conn:
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
aggregate_volumes_daily(conn, time_period=time_period, intersections=intersections)

@task
def get_report_dates_task(ds = None, **context):
Expand Down
68 changes: 47 additions & 21 deletions volumes/miovision/api/intersection_tmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,30 @@ def process_data(conn, start_time, end_iteration_time, intersections):
Tables: unacceptable_gaps, volumes_15min_mvt, volumes_15min, volumes_daily, report_dates.
"""
time_period = (start_time, end_iteration_time)
find_gaps(conn, time_period)
find_gaps(conn, time_period, intersections)
aggregate_15_min_mvt(conn, time_period, intersections)
agg_zero_volume_anomalous_ranges(conn, time_period)
agg_zero_volume_anomalous_ranges(conn, time_period, intersections)
aggregate_15_min(conn, time_period, intersections)
aggregate_volumes_daily(conn, time_period)
aggregate_volumes_daily(conn, time_period, intersections)
get_report_dates(conn, time_period, intersections)

def find_gaps(conn, time_period):
def find_gaps(conn, time_period, intersections = None):
"""Process aggregated miovision data from volumes_15min_mvt to identify gaps and insert
into miovision_api.unacceptable_gaps. miovision_api.find_gaps function contains a delete clause."""
try:
with conn.cursor() as cur:
invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp)"
cur.execute(invalid_gaps, time_period)
logger.info(conn.notices[-1])
logger.info('Updated gapsize table and found gaps exceeding allowable size')
#if intersections specified, clear/aggregate only those intersections.
if intersections is None:
invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp)"
cur.execute(invalid_gaps, time_period)
logger.info(conn.notices[-1])
logger.info('Updated gapsize table and found gaps exceeding allowable size')
else:
query_params = time_period + ([x.uid for x in intersections], )
invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp, %s::integer []);"
cur.execute(invalid_gaps, query_params)
logger.info('Updated gapsize table and found gaps exceeding allowable size for intersections %s',
[x.uid for x in intersections])
except psycopg2.Error as exc:
logger.exception(exc)
sys.exit(1)
Expand Down Expand Up @@ -401,18 +409,27 @@ def aggregate_15_min(
logger.exception(exc)
sys.exit(1)

def aggregate_volumes_daily(conn, time_period):
def aggregate_volumes_daily(conn, time_period, intersections = None):
"""Aggregate into miovision_api.volumes_daily.
Data is cleared from volumes_daily prior to insert.
Takes optional intersection param to specify certain
intersections to clear/aggregate.
"""
try:
with conn.cursor() as cur:
#this function includes a delete query preceeding the insert.
daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date)"
cur.execute(daily_aggregation, time_period)
logger.info('Aggregation into miovision_api.volumes_daily table complete for %s to %s',
time_period[0], time_period[1])
if intersections is None:
#this function includes a delete query preceeding the insert.
daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date)"
cur.execute(daily_aggregation, time_period)
logger.info('Aggregation into miovision_api.volumes_daily table complete for %s to %s',
time_period[0], time_period[1])
else:
query_params = time_period + ([x.uid for x in intersections], )
daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date, %s::integer []);"
cur.execute(daily_aggregation, query_params)
logger.info('Aggregation into miovision_api.volumes_daily table complete for intersections %s from %s to %s.',
[x.uid for x in intersections], time_period[0], time_period[1])
except psycopg2.Error as exc:
logger.exception(exc)
sys.exit(1)
Expand Down Expand Up @@ -444,18 +461,27 @@ def get_report_dates(conn, time_period, intersections = None):
logger.exception(exc)
sys.exit(1)

def agg_zero_volume_anomalous_ranges(conn, time_period):
def agg_zero_volume_anomalous_ranges(conn, time_period, intersections = None):
"""Aggregate into miovision_api.anomalous_ranges.
Data is cleared from volumes_daily prior to insert.
"""
try:
with conn.cursor() as cur:
#this function includes a delete query preceeding the insert.
anomalous_range_sql="""SELECT *
FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date),
LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date)) AS agg"""
cur.execute(anomalous_range_sql, time_period)
logger.info('Aggregation of zero volume periods into anomalous_ranges table complete')
if intersections is None:
#this function includes a delete query preceeding the insert.
anomalous_range_sql="""SELECT *
FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date),
LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date)) AS agg"""
cur.execute(anomalous_range_sql, time_period)
logger.info('Aggregation of zero volume periods into anomalous_ranges table complete')
else:
query_params = time_period + ([x.uid for x in intersections], )
anomalous_range_sql="""SELECT *
FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date),
LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date, %s::integer [])) AS agg"""
cur.execute(anomalous_range_sql, query_params)
logger.info('Aggregation of zero volume periods into anomalous_ranges table complete for intersections %s',
[x.uid for x in intersections])
except psycopg2.Error as exc:
logger.exception(exc)
sys.exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE OR REPLACE FUNCTION miovision_api.aggregate_volumes_daily(
start_date date,
end_date date
end_date date,
intersections integer [] DEFAULT ARRAY[]::integer []
)
RETURNS void
LANGUAGE 'plpgsql'
Expand All @@ -9,16 +10,19 @@ COST 100
VOLATILE
AS $BODY$

DECLARE n_deleted int;
DECLARE n_inserted int;
DECLARE
target_intersections integer [] = miovision_api.get_intersections_uids(intersections);
n_deleted int;
n_inserted int;

BEGIN

WITH deleted AS (
DELETE FROM miovision_api.volumes_daily_unfiltered
WHERE
WHERE
dt >= start_date
AND dt < end_date
AND intersection_uid = ANY(target_intersections)
RETURNING *
)

Expand All @@ -45,6 +49,7 @@ BEGIN
WHERE
un.datetime_bin::date >= start_date
AND un.datetime_bin::date < end_date
AND un.intersection_uid = ANY(target_intersections)
GROUP BY
un.intersection_uid,
un.datetime_bin::date,
Expand Down Expand Up @@ -78,6 +83,7 @@ BEGIN
WHERE
v.datetime_bin >= start_date
AND v.datetime_bin < end_date
AND v.intersection_uid = ANY(target_intersections)
GROUP BY
v.intersection_uid,
v.classification_uid,
Expand All @@ -101,13 +107,15 @@ BEGIN
END;
$BODY$;

COMMENT ON FUNCTION miovision_api.aggregate_volumes_daily(date, date)
IS 'Function for inserting daily volumes into miovision_api.volumes_daily_unfiltered';
COMMENT ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer [])
IS 'Function for inserting daily volumes into miovision_api.volumes_daily_unfiltered.
Contains an optional intersection parameter.';

ALTER FUNCTION miovision_api.aggregate_volumes_daily(date, date)
ALTER FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer [])
OWNER TO miovision_admins;

GRANT EXECUTE ON FUNCTION miovision_api.aggregate_volumes_daily(date, date)
GRANT EXECUTE ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer [])
TO miovision_api_bot;

REVOKE ALL ON FUNCTION miovision_api.aggregate_volumes_daily(date, date) FROM public;
REVOKE ALL ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer [])
FROM public;
27 changes: 20 additions & 7 deletions volumes/miovision/sql/function/function-find_gaps.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

CREATE OR REPLACE FUNCTION miovision_api.find_gaps(
start_date timestamp,
end_date timestamp
end_date timestamp,
intersections integer [] DEFAULT ARRAY[]::integer []
)
RETURNS void
LANGUAGE 'plpgsql'
Expand All @@ -13,18 +14,21 @@ COST 100
VOLATILE
AS $BODY$

DECLARE tot_gaps integer;
DECLARE
target_intersections integer [] = miovision_api.get_intersections_uids(intersections);
tot_gaps integer;

BEGIN

--add to gapsize lookup table for this date.
PERFORM miovision_api.gapsize_lookup_insert(start_date, end_date);
--add to gapsize lookup table for this date/intersection.
PERFORM miovision_api.gapsize_lookup_insert(start_date, end_date, target_intersections);

--clear table before inserting
DELETE FROM miovision_api.unacceptable_gaps
WHERE
dt >= start_date::date
AND dt < end_date::date;
AND dt < end_date::date
AND intersection_uid = ANY(target_intersections);

--find intersections active each day
WITH daily_intersections AS (
Expand All @@ -41,6 +45,7 @@ BEGIN
v.datetime_bin < i.date_decommissioned
OR i.date_decommissioned IS NULL
)
AND i.intersection_uid = ANY(target_intersections)
),

--combine the artificial and actual datetime_bins.
Expand Down Expand Up @@ -75,6 +80,7 @@ BEGIN
WHERE
datetime_bin >= start_date - interval '15 minutes'
AND datetime_bin < end_date
AND intersection_uid = ANY(target_intersections)
),

--looks at sequential bins to identify breaks larger than 1 minute.
Expand Down Expand Up @@ -172,7 +178,14 @@ BEGIN
END;
$BODY$;

COMMENT ON FUNCTION miovision_api.find_gaps
COMMENT ON FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer [])
IS 'Function to identify gaps in miovision_api.volumes data and insert into
miovision_api.unacceptable_gaps table. gap_tolerance set using 60 day
lookback avg volumes and thresholds defined in miovision_api.gapsize_lookup.';
lookback avg volumes and thresholds defined in miovision_api.gapsize_lookup.
Contains optional intersection parameter.';

ALTER FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer [])
OWNER TO miovision_admins;

GRANT EXECUTE ON FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer [])
TO miovision_api_bot;
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE OR REPLACE FUNCTION miovision_api.gapsize_lookup_insert(
start_date timestamp,
end_date timestamp
end_date timestamp,
intersections integer []
)
RETURNS void
LANGUAGE 'plpgsql'
Expand All @@ -13,7 +14,8 @@ AS $BODY$
DELETE FROM miovision_api.gapsize_lookup
WHERE
dt >= start_date
AND dt < end_date;
AND dt < end_date
AND intersection_uid = ANY(gapsize_lookup_insert.intersections);

WITH study_dates AS (
SELECT
Expand Down Expand Up @@ -43,6 +45,7 @@ AS $BODY$
WHERE
v.datetime_bin >= start_date - interval '60 days'
AND v.datetime_bin < end_date
AND v.intersection_uid = ANY(gapsize_lookup_insert.intersections)
GROUP BY
dates.dt,
v.intersection_uid,
Expand All @@ -69,6 +72,7 @@ AS $BODY$
UNION SELECT NULL::integer --represents all classifications
) AS classifications
CROSS JOIN generate_series(0, 23, 1) AS hours(hour_bin)
WHERE intersection_uid = ANY(gapsize_lookup_insert.intersections)
),

lookback_avgs AS (
Expand Down Expand Up @@ -126,13 +130,13 @@ AS $BODY$
END;
$BODY$;

ALTER FUNCTION miovision_api.gapsize_lookup_insert
ALTER FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer [])
OWNER TO miovision_admins;

GRANT EXECUTE ON FUNCTION miovision_api.gapsize_lookup_insert
GRANT EXECUTE ON FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer [])
TO miovision_api_bot;

COMMENT ON FUNCTION miovision_api.gapsize_lookup_insert
COMMENT ON FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer [])
IS 'Determine the average volumes for each hour/intersection/daytype/classification
based on a 60 day lookback. Uses GROUPING SETS to identify both volume for individual
classifications and total interseciton volumes (classification_uid IS NULL).
Expand Down
Loading

0 comments on commit 3a523e6

Please sign in to comment.