Skip to content

Commit

Permalink
Merge pull request #306 from cagov/bottleneck_extent_ba
Browse files Browse the repository at this point in the history
Bottleneck extent (spatial)
  • Loading branch information
thehanggit authored Sep 4, 2024
2 parents bcbd505 + 0f5f587 commit 30fe9e5
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 47 deletions.
57 changes: 50 additions & 7 deletions transform/models/intermediate/performance/_performance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ models:
This model identifies bottlenecks at every station. A bottleneck is defined by the following
conditions:
1. There is a drop in speed of at least 20 mph between stations during the same time
2. The speed at the current station (downstream station) is less than 40 mph.
2. The speed at the current station (upstream station) is less than 40 mph.
3. The stations are less than 3 miles apart.
4. The speed drop persists for at least 5 out of any 7 contiguous 5-minute data points.
Expand All @@ -463,6 +463,27 @@ models:
entire duration and spatial extent of the bottleneck. The delay is calculated with
respect to a threshold speed of 60 mph.
columns:
- name: STATION_ID
description: |
An integer value that uniquely indentifies a station.
Use this value to 'join' other files or tables that contain the Station ID value.
- name: SAMPLE_DATE
description: The date associated with daily aggregated data samples.
- name: SAMPLE_TIMESTAMP
description: The timestamp of the start for the 5 minute aggregated samples.
- name: SPEED_FIVE_MINS
description: |
Actual reported speed if available otherwise the preliminary speed calculation
in miles/hour based on the simplified version of the speed formula located at
https://pems.dot.ca.gov/Papers/vanzwet_gfactor.pdf
- name: FREEWAY
description: The freeway where the VDS is located.
- name: DIRECTION
description: A string indicating the freeway direction of a specific VDS. Directions are N, E, S or W.
- name: STATION_TYPE
description: Two character string identify the VDS type.
- name: LENGTH
description: length of the station.
- name: DISTANCE_DELTA_NE
description: Calculates the delta between postmiles that have a direction of N or E.
- name: DISTANCE_DELTA_SW
Expand All @@ -475,13 +496,35 @@ models:
description: |
A boolean value that evaluates to True when all criteria for a bottleneck is met for at least 5 rows
in a forward-looking window of seven, otherwise it will give a value of False.
- name: int_performance__detector_metrics_agg_hourly
- name: IS_CONGESTED
description: |
A boolean value that evaluates to True when the speed of a station is below 40 mph in a 5-minute period,
otherwise it will give a value of False.
- name: CONGESTION_LENGTH
description: Returns station length when the station is congested, otherwise it will return 0.
- name: UPSTREAM_IS_CONGESTED
description: |
hourly aggregation of volume, occupancy and speed along with delays and lost productivity by
each detetcor lane. This metrics will measure the hourly performance of the state
highway system at the detecctor level.This can be used for daily aggregation of PeMS performance
metrics at the detector level.
columns:
A boolean value that evaluates to True when the upstream station is also congested, otherwise it will
give a value of False.
- name: CONGESTION_STATUS_CHANGE
description: |
A binary value that evaluates to 1 when the congestion status changes between stations, either
from congested to uncongested or uncongested to congested.
- name: CONGESTION_SEQUENCE
description: |
Calculates the summation of congestion_status_change values accumulatively. For different stations
with same sequence values will maintain the same congestion status (either congested or uncongested).
- name: BOTTLENECK_EXTENT
description: |
Calculates the congestion region starting from the bottleneck location towards upstream adjacent
congested stations, which is the summation of congestion lengths with same congestion sequence values.
- name: int_performance__detector_metrics_agg_hourly
description: |
hourly aggregation of volume, occupancy and speed along with delays and lost productivity by
each detetcor lane. This metrics will measure the hourly performance of the state
highway system at the detecctor level.This can be used for daily aggregation of PeMS performance
metrics at the detector level.
columns:
- name: STATION_ID
description: |
An integer value that uniquely indentifies a station.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ station_five_minute as (
station_id,
sample_date,
sample_timestamp,
speed_five_mins,
nullifzero(speed_five_mins) as speed_five_mins,
freeway,
direction,
station_type,
absolute_postmile
absolute_postmile,
length,
volume_sum
from {{ ref ("int_performance__station_metrics_agg_five_minutes") }}
where
{{ make_model_incremental('sample_date') }}
Expand All @@ -33,19 +35,19 @@ calcs as (
to get the speed there. When the direction is west or south, the "upstream" station has a
larger postmile, and we need to lead to get the speed there. */

speed_five_mins - lag(speed_five_mins)
speed_five_mins - lead(speed_five_mins)
over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc)
as speed_delta_ne,

speed_five_mins - lead(speed_five_mins)
speed_five_mins - lag(speed_five_mins)
over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc)
as speed_delta_sw,

absolute_postmile - lag(absolute_postmile)
absolute_postmile - lead(absolute_postmile)
over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc)
as distance_delta_ne,

absolute_postmile - lead(absolute_postmile)
absolute_postmile - lag(absolute_postmile)
over (partition by sample_timestamp, freeway, direction, station_type order by absolute_postmile asc)
as distance_delta_sw

Expand Down Expand Up @@ -87,8 +89,78 @@ temporal_extent_check as (
temporal_extent as (
select
* exclude (bottleneck_check, bottleneck_check_summed),
iff(bottleneck_check_summed >= 5, true, false) as is_bottleneck
iff(bottleneck_check = 1 and bottleneck_check_summed >= 5, true, false) as is_bottleneck
from temporal_extent_check
),

congestion as (
select
*,
speed_five_mins < 40 as is_congested,

/* Create a helper length field which is zero if we don't consider this station
congested and the station length if we do. This will be summed later to get
the congestion extent */
iff(is_congested, length, 0) as congestion_length,

/* Absolute postmile increases going north and east. When the direction of the freeway for a
station is north or east, the "upstream" station has a smaller postmile, and we need to lag
to get the speed there. When the direction is west or south, the "upstream" station has a
larger postmile, and we need to lead to get the speed there. */
case
when direction in ('N', 'E')
then
lag(is_congested)
over (
partition by sample_timestamp, freeway, direction, station_type
order by absolute_postmile asc
)
when direction in ('S', 'W')
then
lead(is_congested)
over (
partition by sample_timestamp, freeway, direction, station_type
order by absolute_postmile asc
)
end as upstream_is_congested,
iff(is_congested = upstream_is_congested, 0, 1) as congestion_status_change
from temporal_extent
),

congestion_events as (
select
*,
sum(congestion_status_change)
over (
partition by sample_timestamp, freeway, direction, station_type
order by absolute_postmile asc
rows between unbounded preceding and current row
) as congestion_sequence
from congestion
),

congestion_length as (
select
*,
case
when direction in ('N', 'E')
then
sum(congestion_length) over (
partition by sample_timestamp, freeway, direction, station_type, congestion_sequence
order by absolute_postmile asc
rows between unbounded preceding and current row
)
when direction in ('S', 'W')
then
sum(congestion_length) over (
partition by sample_timestamp, freeway, direction, station_type, congestion_sequence
order by absolute_postmile asc
rows between current row and unbounded following
)
end as bottleneck_extent
from congestion_events
qualify is_bottleneck = true -- TODO: also filter if upstream is a bottleneck start?

)

select * from temporal_extent
select * from congestion_length
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
{{ config(
materialized="incremental",
cluster_by=["sample_date"],
unique_key=["station_id", "sample_date", "sample_timestamp"],
snowflake_warehouse = get_snowflake_refresh_warehouse(big="XL")
) }}

with detector_agg_five_minutes as (
select *
from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }}
where {{ make_model_incremental('sample_date') }}
),

station_aggregated_speed as (
select
station_id,
sample_date,
sample_timestamp,
any_value(freeway) as freeway,
any_value(direction) as direction,
any_value(station_type) as station_type,
any_value(absolute_postmile) as absolute_postmile,
any_value(district) as district,
sum(sample_ct) as sample_ct,
sum(volume_sum) as volume_sum,
avg(occupancy_avg) as occupancy_avg,
sum(volume_sum * speed_five_mins) / nullifzero(sum(volume_sum)) as speed_five_mins
from detector_agg_five_minutes
group by station_id, sample_date, sample_timestamp
)

select * from station_aggregated_speed
{{ config(
materialized="incremental",
on_schema_change="append_new_columns",
cluster_by=["sample_date"],
unique_key=["station_id", "sample_date", "sample_timestamp"],
snowflake_warehouse = get_snowflake_refresh_warehouse(big="XL")
) }}

with detector_agg_five_minutes as (
select *
from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }}
where {{ make_model_incremental('sample_date') }}
),

station_aggregated_speed as (
select
station_id,
sample_date,
sample_timestamp,
any_value(freeway) as freeway,
any_value(direction) as direction,
any_value(station_type) as station_type,
any_value(absolute_postmile) as absolute_postmile,
any_value(district) as district,
any_value(length) as length,
sum(sample_ct) as sample_ct,
sum(volume_sum) as volume_sum,
avg(occupancy_avg) as occupancy_avg,
sum(volume_sum * speed_five_mins) / nullifzero(sum(volume_sum)) as speed_five_mins
from detector_agg_five_minutes
group by station_id, sample_date, sample_timestamp
)

select * from station_aggregated_speed

0 comments on commit 30fe9e5

Please sign in to comment.