From 3a523e64d5ca240fc0f095af40fba8719a5e6a1e Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 9 Feb 2024 10:08:39 -0500 Subject: [PATCH] =?UTF-8?q?#862=20add=20intersection=20param=20to=203=20re?= =?UTF-8?q?maining=20aggregate=20functions=20+=20adju=E2=80=A6=20(#864)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * #862 add intersection param to 3 remaining aggregate functions + adjust dag and intersections_tmc * #862 make alter statements specific to params --- dags/miovision_pull.py | 48 +++++++++---- volumes/miovision/api/intersection_tmc.py | 68 +++++++++++++------ .../function-aggregate_volumes_daily.sql | 26 ++++--- .../sql/function/function-find_gaps.sql | 27 ++++++-- .../function-gapsize_lookup_insert.sql | 14 ++-- .../function-identify-zero-counts.sql | 12 ++-- 6 files changed, 137 insertions(+), 58 deletions(-) diff --git a/dags/miovision_pull.py b/dags/miovision_pull.py index 2eecba201..e0606f1c0 100644 --- a/dags/miovision_pull.py +++ b/dags/miovision_pull.py @@ -116,11 +116,19 @@ def pull_miovision(ds = None, **context): @task_group(tooltip="Tasks to aggregate newly pulled Miovision data.") def miovision_agg(): @task - def find_gaps_task(ds = None): + def find_gaps_task(ds = None, **context): mio_postgres = PostgresHook("miovision_api_bot") time_period = (ds, ds_add(ds, 1)) - with mio_postgres.get_conn() as conn: - find_gaps(conn, time_period) + #no user specified intersection + if context["params"]["intersection"] == [0]: + with mio_postgres.get_conn() as conn: + find_gaps(conn, time_period=time_period) + #user specified intersection + else: + INTERSECTIONS = tuple(context["params"]["intersection"]) + with mio_postgres.get_conn() as conn: + intersections = get_intersection_info(conn, intersection=INTERSECTIONS) + find_gaps(conn, time_period=time_period, intersections=intersections) @task def aggregate_15_min_mvt_task(ds = None, **context): @@ -138,11 +146,19 @@ def aggregate_15_min_mvt_task(ds = None, **context): aggregate_15_min_mvt(conn, time_period=time_period, intersections=intersections) @task - def zero_volume_anomalous_ranges_task(ds = None): - mio_postgres = PostgresHook("miovision_api_bot") - time_period = (ds, ds_add(ds, 1)) - with mio_postgres.get_conn() as conn: - agg_zero_volume_anomalous_ranges(conn, time_period) + def zero_volume_anomalous_ranges_task(ds = None, **context): + mio_postgres = PostgresHook("miovision_api_bot") + time_period = (ds, ds_add(ds, 1)) + #no user specified intersection + if context["params"]["intersection"] == [0]: + with mio_postgres.get_conn() as conn: + agg_zero_volume_anomalous_ranges(conn, time_period=time_period) + #user specified intersection + else: + INTERSECTIONS = tuple(context["params"]["intersection"]) + with mio_postgres.get_conn() as conn: + intersections = get_intersection_info(conn, intersection=INTERSECTIONS) + agg_zero_volume_anomalous_ranges(conn, time_period=time_period, intersections=intersections) @task def aggregate_15_min_task(ds = None, **context): @@ -160,11 +176,19 @@ def aggregate_15_min_task(ds = None, **context): aggregate_15_min(conn, time_period=time_period, intersections=intersections) @task - def aggregate_volumes_daily_task(ds = None): + def aggregate_volumes_daily_task(ds = None, **context): mio_postgres = PostgresHook("miovision_api_bot") - time_period = (ds, ds_add(ds, 1)) - with mio_postgres.get_conn() as conn: - aggregate_volumes_daily(conn, time_period) + time_period = (ds, ds_add(ds, 1)) + #no user specified intersection + if context["params"]["intersection"] == [0]: + with mio_postgres.get_conn() as conn: + aggregate_volumes_daily(conn, time_period=time_period) + #user specified intersection + else: + INTERSECTIONS = tuple(context["params"]["intersection"]) + with mio_postgres.get_conn() as conn: + intersections = get_intersection_info(conn, intersection=INTERSECTIONS) + aggregate_volumes_daily(conn, time_period=time_period, intersections=intersections) @task def get_report_dates_task(ds = None, **context): diff --git a/volumes/miovision/api/intersection_tmc.py b/volumes/miovision/api/intersection_tmc.py index 3afc8782c..37048a323 100644 --- a/volumes/miovision/api/intersection_tmc.py +++ b/volumes/miovision/api/intersection_tmc.py @@ -319,22 +319,30 @@ def process_data(conn, start_time, end_iteration_time, intersections): Tables: unacceptable_gaps, volumes_15min_mvt, volumes_15min, volumes_daily, report_dates. """ time_period = (start_time, end_iteration_time) - find_gaps(conn, time_period) + find_gaps(conn, time_period, intersections) aggregate_15_min_mvt(conn, time_period, intersections) - agg_zero_volume_anomalous_ranges(conn, time_period) + agg_zero_volume_anomalous_ranges(conn, time_period, intersections) aggregate_15_min(conn, time_period, intersections) - aggregate_volumes_daily(conn, time_period) + aggregate_volumes_daily(conn, time_period, intersections) get_report_dates(conn, time_period, intersections) -def find_gaps(conn, time_period): +def find_gaps(conn, time_period, intersections = None): """Process aggregated miovision data from volumes_15min_mvt to identify gaps and insert into miovision_api.unacceptable_gaps. miovision_api.find_gaps function contains a delete clause.""" try: with conn.cursor() as cur: - invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp)" - cur.execute(invalid_gaps, time_period) - logger.info(conn.notices[-1]) - logger.info('Updated gapsize table and found gaps exceeding allowable size') + #if intersections specified, clear/aggregate only those intersections. + if intersections is None: + invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp)" + cur.execute(invalid_gaps, time_period) + logger.info(conn.notices[-1]) + logger.info('Updated gapsize table and found gaps exceeding allowable size') + else: + query_params = time_period + ([x.uid for x in intersections], ) + invalid_gaps="SELECT miovision_api.find_gaps(%s::timestamp, %s::timestamp, %s::integer []);" + cur.execute(invalid_gaps, query_params) + logger.info('Updated gapsize table and found gaps exceeding allowable size for intersections %s', + [x.uid for x in intersections]) except psycopg2.Error as exc: logger.exception(exc) sys.exit(1) @@ -401,18 +409,27 @@ def aggregate_15_min( logger.exception(exc) sys.exit(1) -def aggregate_volumes_daily(conn, time_period): +def aggregate_volumes_daily(conn, time_period, intersections = None): """Aggregate into miovision_api.volumes_daily. Data is cleared from volumes_daily prior to insert. + Takes optional intersection param to specify certain + intersections to clear/aggregate. """ try: with conn.cursor() as cur: - #this function includes a delete query preceeding the insert. - daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date)" - cur.execute(daily_aggregation, time_period) - logger.info('Aggregation into miovision_api.volumes_daily table complete for %s to %s', - time_period[0], time_period[1]) + if intersections is None: + #this function includes a delete query preceeding the insert. + daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date)" + cur.execute(daily_aggregation, time_period) + logger.info('Aggregation into miovision_api.volumes_daily table complete for %s to %s', + time_period[0], time_period[1]) + else: + query_params = time_period + ([x.uid for x in intersections], ) + daily_aggregation="SELECT miovision_api.aggregate_volumes_daily(%s::date, %s::date, %s::integer []);" + cur.execute(daily_aggregation, query_params) + logger.info('Aggregation into miovision_api.volumes_daily table complete for intersections %s from %s to %s.', + [x.uid for x in intersections], time_period[0], time_period[1]) except psycopg2.Error as exc: logger.exception(exc) sys.exit(1) @@ -444,18 +461,27 @@ def get_report_dates(conn, time_period, intersections = None): logger.exception(exc) sys.exit(1) -def agg_zero_volume_anomalous_ranges(conn, time_period): +def agg_zero_volume_anomalous_ranges(conn, time_period, intersections = None): """Aggregate into miovision_api.anomalous_ranges. Data is cleared from volumes_daily prior to insert. """ try: with conn.cursor() as cur: - #this function includes a delete query preceeding the insert. - anomalous_range_sql="""SELECT * - FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date), - LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date)) AS agg""" - cur.execute(anomalous_range_sql, time_period) - logger.info('Aggregation of zero volume periods into anomalous_ranges table complete') + if intersections is None: + #this function includes a delete query preceeding the insert. + anomalous_range_sql="""SELECT * + FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date), + LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date)) AS agg""" + cur.execute(anomalous_range_sql, time_period) + logger.info('Aggregation of zero volume periods into anomalous_ranges table complete') + else: + query_params = time_period + ([x.uid for x in intersections], ) + anomalous_range_sql="""SELECT * + FROM generate_series(%s::date, %s::date - interval '1 day', interval '1 day') AS dates(start_date), + LATERAL (SELECT miovision_api.identify_zero_counts(start_date::date, %s::integer [])) AS agg""" + cur.execute(anomalous_range_sql, query_params) + logger.info('Aggregation of zero volume periods into anomalous_ranges table complete for intersections %s', + [x.uid for x in intersections]) except psycopg2.Error as exc: logger.exception(exc) sys.exit(1) diff --git a/volumes/miovision/sql/function/function-aggregate_volumes_daily.sql b/volumes/miovision/sql/function/function-aggregate_volumes_daily.sql index eb17d03a3..eac6b7e94 100644 --- a/volumes/miovision/sql/function/function-aggregate_volumes_daily.sql +++ b/volumes/miovision/sql/function/function-aggregate_volumes_daily.sql @@ -1,6 +1,7 @@ CREATE OR REPLACE FUNCTION miovision_api.aggregate_volumes_daily( start_date date, - end_date date + end_date date, + intersections integer [] DEFAULT ARRAY[]::integer [] ) RETURNS void LANGUAGE 'plpgsql' @@ -9,16 +10,19 @@ COST 100 VOLATILE AS $BODY$ -DECLARE n_deleted int; -DECLARE n_inserted int; +DECLARE + target_intersections integer [] = miovision_api.get_intersections_uids(intersections); + n_deleted int; + n_inserted int; BEGIN WITH deleted AS ( DELETE FROM miovision_api.volumes_daily_unfiltered - WHERE + WHERE dt >= start_date AND dt < end_date + AND intersection_uid = ANY(target_intersections) RETURNING * ) @@ -45,6 +49,7 @@ BEGIN WHERE un.datetime_bin::date >= start_date AND un.datetime_bin::date < end_date + AND un.intersection_uid = ANY(target_intersections) GROUP BY un.intersection_uid, un.datetime_bin::date, @@ -78,6 +83,7 @@ BEGIN WHERE v.datetime_bin >= start_date AND v.datetime_bin < end_date + AND v.intersection_uid = ANY(target_intersections) GROUP BY v.intersection_uid, v.classification_uid, @@ -101,13 +107,15 @@ BEGIN END; $BODY$; -COMMENT ON FUNCTION miovision_api.aggregate_volumes_daily(date, date) -IS 'Function for inserting daily volumes into miovision_api.volumes_daily_unfiltered'; +COMMENT ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer []) +IS 'Function for inserting daily volumes into miovision_api.volumes_daily_unfiltered. +Contains an optional intersection parameter.'; -ALTER FUNCTION miovision_api.aggregate_volumes_daily(date, date) +ALTER FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer []) OWNER TO miovision_admins; -GRANT EXECUTE ON FUNCTION miovision_api.aggregate_volumes_daily(date, date) +GRANT EXECUTE ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer []) TO miovision_api_bot; -REVOKE ALL ON FUNCTION miovision_api.aggregate_volumes_daily(date, date) FROM public; +REVOKE ALL ON FUNCTION miovision_api.aggregate_volumes_daily(date, date, integer []) +FROM public; \ No newline at end of file diff --git a/volumes/miovision/sql/function/function-find_gaps.sql b/volumes/miovision/sql/function/function-find_gaps.sql index a95379fd4..0b3a96d24 100644 --- a/volumes/miovision/sql/function/function-find_gaps.sql +++ b/volumes/miovision/sql/function/function-find_gaps.sql @@ -4,7 +4,8 @@ CREATE OR REPLACE FUNCTION miovision_api.find_gaps( start_date timestamp, - end_date timestamp + end_date timestamp, + intersections integer [] DEFAULT ARRAY[]::integer [] ) RETURNS void LANGUAGE 'plpgsql' @@ -13,18 +14,21 @@ COST 100 VOLATILE AS $BODY$ -DECLARE tot_gaps integer; +DECLARE + target_intersections integer [] = miovision_api.get_intersections_uids(intersections); + tot_gaps integer; BEGIN - --add to gapsize lookup table for this date. - PERFORM miovision_api.gapsize_lookup_insert(start_date, end_date); + --add to gapsize lookup table for this date/intersection. + PERFORM miovision_api.gapsize_lookup_insert(start_date, end_date, target_intersections); --clear table before inserting DELETE FROM miovision_api.unacceptable_gaps WHERE dt >= start_date::date - AND dt < end_date::date; + AND dt < end_date::date + AND intersection_uid = ANY(target_intersections); --find intersections active each day WITH daily_intersections AS ( @@ -41,6 +45,7 @@ BEGIN v.datetime_bin < i.date_decommissioned OR i.date_decommissioned IS NULL ) + AND i.intersection_uid = ANY(target_intersections) ), --combine the artificial and actual datetime_bins. @@ -75,6 +80,7 @@ BEGIN WHERE datetime_bin >= start_date - interval '15 minutes' AND datetime_bin < end_date + AND intersection_uid = ANY(target_intersections) ), --looks at sequential bins to identify breaks larger than 1 minute. @@ -172,7 +178,14 @@ BEGIN END; $BODY$; -COMMENT ON FUNCTION miovision_api.find_gaps +COMMENT ON FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer []) IS 'Function to identify gaps in miovision_api.volumes data and insert into miovision_api.unacceptable_gaps table. gap_tolerance set using 60 day -lookback avg volumes and thresholds defined in miovision_api.gapsize_lookup.'; +lookback avg volumes and thresholds defined in miovision_api.gapsize_lookup. +Contains optional intersection parameter.'; + +ALTER FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer []) +OWNER TO miovision_admins; + +GRANT EXECUTE ON FUNCTION miovision_api.find_gaps(timestamp, timestamp, integer []) +TO miovision_api_bot; \ No newline at end of file diff --git a/volumes/miovision/sql/function/function-gapsize_lookup_insert.sql b/volumes/miovision/sql/function/function-gapsize_lookup_insert.sql index a7551efaa..56e53f560 100644 --- a/volumes/miovision/sql/function/function-gapsize_lookup_insert.sql +++ b/volumes/miovision/sql/function/function-gapsize_lookup_insert.sql @@ -1,6 +1,7 @@ CREATE OR REPLACE FUNCTION miovision_api.gapsize_lookup_insert( start_date timestamp, - end_date timestamp + end_date timestamp, + intersections integer [] ) RETURNS void LANGUAGE 'plpgsql' @@ -13,7 +14,8 @@ AS $BODY$ DELETE FROM miovision_api.gapsize_lookup WHERE dt >= start_date - AND dt < end_date; + AND dt < end_date + AND intersection_uid = ANY(gapsize_lookup_insert.intersections); WITH study_dates AS ( SELECT @@ -43,6 +45,7 @@ AS $BODY$ WHERE v.datetime_bin >= start_date - interval '60 days' AND v.datetime_bin < end_date + AND v.intersection_uid = ANY(gapsize_lookup_insert.intersections) GROUP BY dates.dt, v.intersection_uid, @@ -69,6 +72,7 @@ AS $BODY$ UNION SELECT NULL::integer --represents all classifications ) AS classifications CROSS JOIN generate_series(0, 23, 1) AS hours(hour_bin) + WHERE intersection_uid = ANY(gapsize_lookup_insert.intersections) ), lookback_avgs AS ( @@ -126,13 +130,13 @@ AS $BODY$ END; $BODY$; -ALTER FUNCTION miovision_api.gapsize_lookup_insert +ALTER FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer []) OWNER TO miovision_admins; -GRANT EXECUTE ON FUNCTION miovision_api.gapsize_lookup_insert +GRANT EXECUTE ON FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer []) TO miovision_api_bot; -COMMENT ON FUNCTION miovision_api.gapsize_lookup_insert +COMMENT ON FUNCTION miovision_api.gapsize_lookup_insert(timestamp, timestamp, integer []) IS 'Determine the average volumes for each hour/intersection/daytype/classification based on a 60 day lookback. Uses GROUPING SETS to identify both volume for individual classifications and total interseciton volumes (classification_uid IS NULL). diff --git a/volumes/miovision/sql/function/function-identify-zero-counts.sql b/volumes/miovision/sql/function/function-identify-zero-counts.sql index 278b37cdf..4be0c9875 100644 --- a/volumes/miovision/sql/function/function-identify-zero-counts.sql +++ b/volumes/miovision/sql/function/function-identify-zero-counts.sql @@ -1,5 +1,6 @@ CREATE OR REPLACE FUNCTION miovision_api.identify_zero_counts( - start_date date --run on multiple dates using a lateral query. + start_date date, --run on multiple dates using a lateral query. + intersections integer [] DEFAULT ARRAY[]::integer [] ) RETURNS void LANGUAGE 'plpgsql' @@ -9,6 +10,7 @@ VOLATILE AS $BODY$ DECLARE + target_intersections integer [] = miovision_api.get_intersections_uids(intersections); n_inserted integer; n_updated integer; @@ -24,6 +26,7 @@ BEGIN WHERE datetime_bin >= start_date AND datetime_bin < start_date + interval '1 day' + AND intersection_uid = ANY(target_intersections) GROUP BY intersection_uid HAVING COALESCE(SUM(volume), 0) = 0 ), @@ -48,6 +51,7 @@ BEGIN --this script will only catch zeros for classification_uid 1,2,6,10 --since those are the ones that are zero padded in volumes_15min_mvt. Filter for additional speed. AND v15.classification_uid IN (1,2,6,10) + AND v15.intersection_uid = ANY(target_intersections) GROUP BY v15.classification_uid, v15.intersection_uid @@ -145,11 +149,11 @@ END; $BODY$; -ALTER FUNCTION miovision_api.identify_zero_counts(date) +ALTER FUNCTION miovision_api.identify_zero_counts(date, integer []) OWNER TO miovision_admins; -GRANT EXECUTE ON FUNCTION miovision_api.identify_zero_counts(date) +GRANT EXECUTE ON FUNCTION miovision_api.identify_zero_counts(date, integer []) TO miovision_api_bot; -GRANT EXECUTE ON FUNCTION miovision_api.identify_zero_counts(date) +GRANT EXECUTE ON FUNCTION miovision_api.identify_zero_counts(date, integer []) TO miovision_admins;