diff --git a/transform/models/intermediate/performance/_performance.yml b/transform/models/intermediate/performance/_performance.yml index 59da17d2..82d75daa 100644 --- a/transform/models/intermediate/performance/_performance.yml +++ b/transform/models/intermediate/performance/_performance.yml @@ -439,7 +439,7 @@ models: This model identifies bottlenecks at every station. A bottleneck is defined by the following conditions: 1. There is a drop in speed of at least 20 mph between stations during the same time - 2. The speed at the current station (downstream station) is less than 40 mph. + 2. The speed at the current station (upstream station) is less than 40 mph. 3. The stations are less than 3 miles apart. 4. The speed drop persists for at least 5 out of any 7 contiguous 5-minute data points. @@ -463,6 +463,27 @@ models: entire duration and spatial extent of the bottleneck. The delay is calculated with respect to a threshold speed of 60 mph. columns: + - name: STATION_ID + description: | + An integer value that uniquely indentifies a station. + Use this value to 'join' other files or tables that contain the Station ID value. + - name: SAMPLE_DATE + description: The date associated with daily aggregated data samples. + - name: SAMPLE_TIMESTAMP + description: The timestamp of the start for the 5 minute aggregated samples. + - name: SPEED_FIVE_MINS + description: | + Actual reported speed if available otherwise the preliminary speed calculation + in miles/hour based on the simplified version of the speed formula located at + https://pems.dot.ca.gov/Papers/vanzwet_gfactor.pdf + - name: FREEWAY + description: The freeway where the VDS is located. + - name: DIRECTION + description: A string indicating the freeway direction of a specific VDS. Directions are N, E, S or W. + - name: STATION_TYPE + description: Two character string identify the VDS type. + - name: LENGTH + description: length of the station. - name: DISTANCE_DELTA_NE description: Calculates the delta between postmiles that have a direction of N or E. - name: DISTANCE_DELTA_SW @@ -475,13 +496,35 @@ models: description: | A boolean value that evaluates to True when all criteria for a bottleneck is met for at least 5 rows in a forward-looking window of seven, otherwise it will give a value of False. - - name: int_performance__detector_metrics_agg_hourly + - name: IS_CONGESTED + description: | + A boolean value that evaluates to True when the speed of a station is below 40 mph in a 5-minute period, + otherwise it will give a value of False. + - name: CONGESTION_LENGTH + description: Returns station length when the station is congested, otherwise it will return 0. + - name: UPSTREAM_IS_CONGESTED description: | - hourly aggregation of volume, occupancy and speed along with delays and lost productivity by - each detetcor lane. This metrics will measure the hourly performance of the state - highway system at the detecctor level.This can be used for daily aggregation of PeMS performance - metrics at the detector level. - columns: + A boolean value that evaluates to True when the upstream station is also congested, otherwise it will + give a value of False. + - name: CONGESTION_STATUS_CHANGE + description: | + A binary value that evaluates to 1 when the congestion status changes between stations, either + from congested to uncongested or uncongested to congested. + - name: CONGESTION_SEQUENCE + description: | + Calculates the summation of congestion_status_change values accumulatively. For different stations + with same sequence values will maintain the same congestion status (either congested or uncongested). + - name: BOTTLENECK_EXTENT + description: | + Calculates the congestion region starting from the bottleneck location towards upstream adjacent + congested stations, which is the summation of congestion lengths with same congestion sequence values. + - name: int_performance__detector_metrics_agg_hourly + description: | + hourly aggregation of volume, occupancy and speed along with delays and lost productivity by + each detetcor lane. This metrics will measure the hourly performance of the state + highway system at the detecctor level.This can be used for daily aggregation of PeMS performance + metrics at the detector level. + columns: - name: STATION_ID description: | An integer value that uniquely indentifies a station. diff --git a/transform/models/intermediate/performance/int_performance__bottlenecks.sql b/transform/models/intermediate/performance/int_performance__bottlenecks.sql index c945620f..bea95d20 100644 --- a/transform/models/intermediate/performance/int_performance__bottlenecks.sql +++ b/transform/models/intermediate/performance/int_performance__bottlenecks.sql @@ -13,11 +13,13 @@ station_five_minute as ( station_id, sample_date, sample_timestamp, - speed_five_mins, + nullifzero(speed_five_mins) as speed_five_mins, freeway, direction, station_type, - absolute_postmile + absolute_postmile, + length, + volume_sum from {{ ref ("int_performance__station_metrics_agg_five_minutes") }} where {{ make_model_incremental('sample_date') }} @@ -33,19 +35,19 @@ calcs as ( to get the speed there. When the direction is west or south, the "upstream" station has a larger postmile, and we need to lead to get the speed there. */ - speed_five_mins - lag(speed_five_mins) + speed_five_mins - lead(speed_five_mins) over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc) as speed_delta_ne, - speed_five_mins - lead(speed_five_mins) + speed_five_mins - lag(speed_five_mins) over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc) as speed_delta_sw, - absolute_postmile - lag(absolute_postmile) + absolute_postmile - lead(absolute_postmile) over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc) as distance_delta_ne, - absolute_postmile - lead(absolute_postmile) + absolute_postmile - lag(absolute_postmile) over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc) as distance_delta_sw @@ -87,8 +89,78 @@ temporal_extent_check as ( temporal_extent as ( select * exclude (bottleneck_check, bottleneck_check_summed), - iff(bottleneck_check_summed >= 5, true, false) as is_bottleneck + iff(bottleneck_check = 1 and bottleneck_check_summed >= 5, true, false) as is_bottleneck from temporal_extent_check +), + +congestion as ( + select + *, + speed_five_mins < 40 as is_congested, + + /* Create a helper length field which is zero if we don't consider this station + congested and the station length if we do. This will be summed later to get + the congestion extent */ + iff(is_congested, length, 0) as congestion_length, + + /* Absolute postmile increases going north and east. When the direction of the freeway for a + station is north or east, the "upstream" station has a smaller postmile, and we need to lag + to get the speed there. When the direction is west or south, the "upstream" station has a + larger postmile, and we need to lead to get the speed there. */ + case + when direction in ('N', 'E') + then + lag(is_congested) + over ( + partition by sample_timestamp, freeway, direction, station_type + order by absolute_postmile asc + ) + when direction in ('S', 'W') + then + lead(is_congested) + over ( + partition by sample_timestamp, freeway, direction, station_type + order by absolute_postmile asc + ) + end as upstream_is_congested, + iff(is_congested = upstream_is_congested, 0, 1) as congestion_status_change + from temporal_extent +), + +congestion_events as ( + select + *, + sum(congestion_status_change) + over ( + partition by sample_timestamp, freeway, direction, station_type + order by absolute_postmile asc + rows between unbounded preceding and current row + ) as congestion_sequence + from congestion +), + +congestion_length as ( + select + *, + case + when direction in ('N', 'E') + then + sum(congestion_length) over ( + partition by sample_timestamp, freeway, direction, station_type, congestion_sequence + order by absolute_postmile asc + rows between unbounded preceding and current row + ) + when direction in ('S', 'W') + then + sum(congestion_length) over ( + partition by sample_timestamp, freeway, direction, station_type, congestion_sequence + order by absolute_postmile asc + rows between current row and unbounded following + ) + end as bottleneck_extent + from congestion_events + qualify is_bottleneck = true -- TODO: also filter if upstream is a bottleneck start? + ) -select * from temporal_extent +select * from congestion_length diff --git a/transform/models/intermediate/performance/int_performance__station_metrics_agg_five_minutes.sql b/transform/models/intermediate/performance/int_performance__station_metrics_agg_five_minutes.sql index eb7c31cd..54305a24 100644 --- a/transform/models/intermediate/performance/int_performance__station_metrics_agg_five_minutes.sql +++ b/transform/models/intermediate/performance/int_performance__station_metrics_agg_five_minutes.sql @@ -1,32 +1,34 @@ -{{ config( - materialized="incremental", - cluster_by=["sample_date"], - unique_key=["station_id", "sample_date", "sample_timestamp"], - snowflake_warehouse = get_snowflake_refresh_warehouse(big="XL") -) }} - -with detector_agg_five_minutes as ( - select * - from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }} - where {{ make_model_incremental('sample_date') }} -), - -station_aggregated_speed as ( - select - station_id, - sample_date, - sample_timestamp, - any_value(freeway) as freeway, - any_value(direction) as direction, - any_value(station_type) as station_type, - any_value(absolute_postmile) as absolute_postmile, - any_value(district) as district, - sum(sample_ct) as sample_ct, - sum(volume_sum) as volume_sum, - avg(occupancy_avg) as occupancy_avg, - sum(volume_sum * speed_five_mins) / nullifzero(sum(volume_sum)) as speed_five_mins - from detector_agg_five_minutes - group by station_id, sample_date, sample_timestamp -) - -select * from station_aggregated_speed +{{ config( + materialized="incremental", + on_schema_change="append_new_columns", + cluster_by=["sample_date"], + unique_key=["station_id", "sample_date", "sample_timestamp"], + snowflake_warehouse = get_snowflake_refresh_warehouse(big="XL") +) }} + +with detector_agg_five_minutes as ( + select * + from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }} + where {{ make_model_incremental('sample_date') }} +), + +station_aggregated_speed as ( + select + station_id, + sample_date, + sample_timestamp, + any_value(freeway) as freeway, + any_value(direction) as direction, + any_value(station_type) as station_type, + any_value(absolute_postmile) as absolute_postmile, + any_value(district) as district, + any_value(length) as length, + sum(sample_ct) as sample_ct, + sum(volume_sum) as volume_sum, + avg(occupancy_avg) as occupancy_avg, + sum(volume_sum * speed_five_mins) / nullifzero(sum(volume_sum)) as speed_five_mins + from detector_agg_five_minutes + group by station_id, sample_date, sample_timestamp +) + +select * from station_aggregated_speed