diff --git a/dags/ecocounter_open_data.py b/dags/ecocounter_open_data.py new file mode 100644 index 000000000..c746bfaa5 --- /dev/null +++ b/dags/ecocounter_open_data.py @@ -0,0 +1,226 @@ +r"""### Monthly ecocounter Open Data DAG +Pipeline to run monthly ecocounter aggregations for Open Data. +""" +import sys +import os +from datetime import timedelta +import logging +import pendulum +from functools import partial + +from airflow.decorators import dag, task, task_group +from airflow.models import Variable +from airflow.hooks.base import BaseHook +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from airflow.sensors.date_time import DateTimeSensor +from airflow.macros import ds_format +from airflow.operators.python import get_current_context + +try: + repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + sys.path.insert(0, repo_path) + from dags.dag_functions import task_fail_slack_alert, send_slack_msg, get_readme_docmd + from dags.custom_operators import SQLCheckOperatorWithReturnValue +except: + raise ImportError("Cannot import DAG helper functions.") + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +DAG_NAME = 'ecocounter_open_data' +DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) + +README_PATH = os.path.join(repo_path, 'volumes/ecocounter/readme.md') +DOC_MD = get_readme_docmd(README_PATH, DAG_NAME) +EXPORT_PATH = '/home/airflow/open_data/permanent-bike-counters' #'/data/open_data/permanent-bike-counters' + +default_args = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past':False, + #set earlier start_date + catchup when ready? + 'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"), + 'email_on_failure': False, + 'email_on_success': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True), +} + +@dag( + dag_id=DAG_NAME, + default_args=default_args, + schedule='0 12 1 * *', # 12pm, 1st day of each month + template_searchpath=os.path.join(repo_path,'volumes/ecocounter'), + catchup=False, + max_active_runs=1, + tags=["ecocounter", "open_data"], + doc_md=DOC_MD +) +def ecocounter_open_data_dag(): + + check_data_availability = SQLCheckOperatorWithReturnValue( + task_id="check_data_availability", + sql="data_checks/select-data-availability.sql", + conn_id="ecocounter_bot" + ) + + @task(retries=0, doc_md="""A reminder message.""") + def reminder_message(ds = None, **context): + mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m') + slack_ids = Variable.get("slack_member_id", deserialize_json=True) + list_names = " ".join([slack_ids.get(name, name) for name in DAG_OWNERS]) + + send_slack_msg( + context=context, + msg=f"{list_names} Remember to check Ecocounter :open_data_to: for {mnth} and label any sites pending validation in anomalous_ranges. :meow_detective:", + use_proxy=True + ) + + wait_till_10th = DateTimeSensor( + task_id="wait_till_10th", + timeout=10*86400, + mode="reschedule", + poke_interval=3600*24, + target_time="{{ next_execution_date.replace(day=10) }}", + ) + wait_till_10th.doc_md = """ + Wait until the 10th day of the month to export data. Alternatively mark task as success to proceed immediately. + """ + + @task() + def get_years(ds=None): + mnth = pendulum.from_format(ds, 'YYYY-MM-DD') + prev_mnth = mnth.subtract(months=1) + yrs = [mnth.year, prev_mnth.year] + return list(set(yrs)) #unique + + update_locations = SQLExecuteQueryOperator( + sql=f"SELECT ecocounter.open_data_locations_insert()", + task_id='update_locations', + conn_id='ecocounter_bot', + autocommit=True, + retries = 0 + ) + + @task_group() + def insert_and_download_data(yr): + @task(map_index_template="{{ yr }}") + def insert_daily(yr): + context = get_current_context() + context["yr"] = yr + t = SQLExecuteQueryOperator( + sql=f"SELECT ecocounter.open_data_daily_counts_insert({yr}::int)", + task_id='insert_daily_open_data', + conn_id='ecocounter_bot', + autocommit=True, + retries = 0 + ) + return t.execute(context=context) + + @task(map_index_template="{{ yr }}") + def insert_15min(yr): + context = get_current_context() + context["yr"] = yr + t = SQLExecuteQueryOperator( + sql=f"SELECT ecocounter.open_data_15min_counts_insert({yr}::int)", + task_id='insert_15min_open_data', + conn_id='ecocounter_bot', + autocommit=True, + retries = 0 + ) + return t.execute(context=context) + + @task.bash( + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + } + ) + def download_daily_open_data()->str: + return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + "SELECT + location_dir_id, location_name, direction, linear_name_full, + side_street, dt, daily_volume + FROM open_data.cycling_permanent_counts_daily + WHERE dt < LEAST(date_trunc('month', now())) + ORDER BY location_dir_id, dt;" \ + --csv -o "$EXPORT_PATH/cycling_permanent_counts_daily.csv"''' + + @task.bash( + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + }, + map_index_template="{{ yr }}" + ) + def download_15min_open_data(yr)->str: + context = get_current_context() + context["yr"] = yr + return f'''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + "SELECT location_dir_id, datetime_bin, bin_volume + FROM open_data.cycling_permanent_counts_15min + WHERE + datetime_bin >= to_date({yr}::text, 'yyyy') + AND datetime_bin < LEAST(date_trunc('month', now()), to_date(({yr}+1)::text, 'yyyy')) + ORDER BY location_dir_id, datetime_bin;" \ + --csv -o "$EXPORT_PATH/cycling_permanent_counts_15min_{yr}_{yr+1}.csv"''' + + #insert only latest year data, but download everything (single file) + insert_daily(yr) >> download_daily_open_data() + insert_15min(yr) >> download_15min_open_data(yr) + + @task.bash( + env = { + 'HOST': '{{ conn.ecocounter_bot.host }}', + 'LOGIN': '{{ conn.ecocounter_bot.login }}', + 'PGPASSWORD': '{{ conn.ecocounter_bot.password }}', + 'EXPORT_PATH': EXPORT_PATH + } + ) + def download_locations_open_data()->str: + return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \ + "SELECT location_dir_id, location_name, direction, linear_name_full, side_street, + longitude, latitude, centreline_id, bin_size, latest_calibration_study, + first_active, last_active, date_decommissioned, technology + FROM open_data.cycling_permanent_counts_locations + ORDER BY location_dir_id;" \ + --csv -o "$EXPORT_PATH/cycling_permanent_counts_locations.csv"''' + + @task.bash() + def output_readme()->str: + source='/home/airflow/data_scripts/volumes/open_data/sql/cycling_permanent_counts_readme.md' + dest='cycling_permanent_counts_readme.pdf' + return f'''pandoc -V geometry:margin=1in -o "{EXPORT_PATH}/{dest}" "{source}"''' + + @task( + retries=0, + trigger_rule='all_success', + doc_md="""A status message to report DAG success.""" + ) + def status_message(ds = None, **context): + mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01') + send_slack_msg( + context=context, + msg=f"Ecocounter :open_data_to: DAG ran successfully for {mnth} :white_check_mark:. " + f"Remember to `cp {EXPORT_PATH}/* /data/open_data/permanent-bike-counters` as bigdata.", + use_proxy=True + ) + + yrs = get_years() + ( + check_data_availability >> + reminder_message() >> + wait_till_10th >> + update_locations >> [ + insert_and_download_data.expand(yr = yrs), + download_locations_open_data(), + output_readme() + ] >> + status_message() + ) + +ecocounter_open_data_dag() \ No newline at end of file diff --git a/dags/ecocounter_pull.py b/dags/ecocounter_pull.py index 7948db43a..9dea4a046 100644 --- a/dags/ecocounter_pull.py +++ b/dags/ecocounter_pull.py @@ -102,9 +102,9 @@ def update_sites_and_flows(**context): new_sites, new_flows = [], [] with eco_postgres.get_conn() as conn: for site in getSites(token): - site_id, site_name = site['id'], site['name'] + site_id, site_name, counter = site['id'], site['name'], site['counter'] if not siteIsKnownToUs(site_id, conn): - insertSite(conn, site_id, site_name, site['longitude'], site['latitude']) + insertSite(conn, site_id, site_name, counter, site['longitude'], site['latitude']) new_sites.append({ 'site_id': site_id, 'site_name': site_name @@ -177,7 +177,7 @@ def pull_recent_outages(): ) def data_checks(): data_check_params = { - "table": "ecocounter.counts", + "table": "ecocounter.counts_unfiltered", "lookback": '60 days', "dt_col": 'datetime_bin', "threshold": 0.7 diff --git a/test/integration/test_dags.py b/test/integration/test_dags.py index 2a995aabd..43e9e47c5 100644 --- a/test/integration/test_dags.py +++ b/test/integration/test_dags.py @@ -41,6 +41,7 @@ 'AIRFLOW_CONN_HERE_BOT': SAMPLE_CONN.get_uri(), 'AIRFLOW_CONN_RESCU_BOT': SAMPLE_CONN.get_uri(), 'AIRFLOW_CONN_VZ_API_BOT': SAMPLE_CONN.get_uri(), + 'AIRFLOW_CONN_ECOCOUNTER_BOT': SAMPLE_CONN.get_uri(), 'AIRFLOW_CONN_GOOGLE_SHEETS_API': SAMPLE_CONN.get_uri(), } diff --git a/volumes/ecocounter/data_checks/select-data-availability.sql b/volumes/ecocounter/data_checks/select-data-availability.sql new file mode 100644 index 000000000..af9e29073 --- /dev/null +++ b/volumes/ecocounter/data_checks/select-data-availability.sql @@ -0,0 +1,22 @@ +WITH daily_volumes AS ( + SELECT + dates.dt::date, + COALESCE(SUM(cu.volume), 0) AS daily_volume + FROM generate_series('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date, --noqa: PRS + '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date --noqa: PRS + + '1 month'::interval - '1 day'::interval, + '1 day'::interval) AS dates(dt) + LEFT JOIN ecocounter.counts_unfiltered AS cu ON cu.datetime_bin::date = dates.dt + WHERE + cu.datetime_bin >= '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date --noqa: PRS + AND cu.datetime_bin < '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date --noqa: PRS + + '1 month'::interval + GROUP BY dates.dt + ORDER BY dates.dt +) + +SELECT + NOT(COUNT(*) > 0) AS _check, + 'Missing dates: ' || string_agg(dt::text, ', ') AS summ +FROM daily_volumes +WHERE daily_volume = 0 diff --git a/volumes/ecocounter/open_data/create-function-open_data_daily_counts_insert.sql b/volumes/ecocounter/open_data/create-function-open_data_daily_counts_insert.sql new file mode 100644 index 000000000..174b3a5e5 --- /dev/null +++ b/volumes/ecocounter/open_data/create-function-open_data_daily_counts_insert.sql @@ -0,0 +1,50 @@ +CREATE OR REPLACE FUNCTION ecocounter.open_data_daily_counts_insert( + yyyy integer +) +RETURNS void +LANGUAGE sql +COST 100 +VOLATILE + +AS $BODY$ + + INSERT INTO ecocounter.open_data_daily_counts ( + site_id, site_description, direction, dt, daily_volume + ) + SELECT + s.site_id, + s.site_description, + f.direction_main AS direction, + cc.datetime_bin::date AS dt, + SUM(calibrated_volume) AS daily_volume + --this view excludes anomalous ranges + FROM ecocounter.counts AS cc + JOIN ecocounter.flows AS f USING (flow_id) + JOIN ecocounter.sites AS s USING (site_id) + WHERE + cc.datetime_bin >= to_date(yyyy::varchar, 'yyyy') + AND cc.datetime_bin < to_date((yyyy+1)::varchar, 'yyyy') + GROUP BY + s.site_id, + s.site_description, + f.direction_main, + cc.datetime_bin::date, + f.bin_size + HAVING + SUM(calibrated_volume) > 0 + --all datetime bins present, corrected for bin size + AND COUNT(DISTINCT cc.datetime_bin) = (3600*24 / EXTRACT(epoch FROM bin_size)) + ON CONFLICT (site_id, direction, dt) + DO NOTHING; + +$BODY$; + +ALTER FUNCTION ecocounter.open_data_daily_counts_insert(integer) OWNER TO ecocounter_admins; + +GRANT EXECUTE ON FUNCTION ecocounter.open_data_daily_counts_insert(integer) TO ecocounter_admins; +GRANT EXECUTE ON FUNCTION ecocounter.open_data_daily_counts_insert(integer) TO ecocounter_bot; +REVOKE EXECUTE ON FUNCTION ecocounter.open_data_daily_counts_insert(integer) FROM bdit_humans; + +COMMENT ON FUNCTION ecocounter.open_data_daily_counts_insert(integer) IS +'Function to insert data for a year into `ecocounter.open_data_daily_counts`. ' +'Does not overwrite existing data (eg. if sensitivity was retroactively updated).'; \ No newline at end of file diff --git a/volumes/ecocounter/open_data/create-function-open_data_locations_insert.sql b/volumes/ecocounter/open_data/create-function-open_data_locations_insert.sql new file mode 100644 index 000000000..364b28d76 --- /dev/null +++ b/volumes/ecocounter/open_data/create-function-open_data_locations_insert.sql @@ -0,0 +1,53 @@ +CREATE OR REPLACE FUNCTION ecocounter.open_data_locations_insert() +RETURNS void +LANGUAGE sql +COST 100 +VOLATILE + +AS $BODY$ + + INSERT INTO open_data.cycling_permanent_counts_locations ( + location_name, direction, linear_name_full, side_street, longitude, latitude, + centreline_id, bin_size, latest_calibration_study, first_active, last_active, + date_decommissioned, technology + ) + SELECT + location_name, + direction, + linear_name_full, + side_street, + lng, + lat, + centreline_id, + bin_size::text, + latest_calibration_study, + first_active, + last_active, + date_decommissioned, + technology + FROM ecocounter.open_data_locations + ON CONFLICT (location_name, direction) + DO UPDATE SET + linear_name_full = EXCLUDED.linear_name_full, + side_street = EXCLUDED.side_street, + longitude = EXCLUDED.longitude, + latitude = EXCLUDED.latitude, + centreline_id = EXCLUDED.centreline_id, + bin_size = EXCLUDED.bin_size, + latest_calibration_study = EXCLUDED.latest_calibration_study, + first_active = EXCLUDED.first_active, + last_active = EXCLUDED.last_active, + date_decommissioned = EXCLUDED.date_decommissioned, + technology = EXCLUDED.technology; + +$BODY$; + +ALTER FUNCTION ecocounter.open_data_locations_insert() OWNER TO ecocounter_admins; + +GRANT EXECUTE ON FUNCTION ecocounter.open_data_locations_insert() TO ecocounter_admins; +GRANT EXECUTE ON FUNCTION ecocounter.open_data_locations_insert() TO ecocounter_bot; +REVOKE EXECUTE ON FUNCTION ecocounter.open_data_locations_insert() FROM bdit_humans; + +COMMENT ON FUNCTION ecocounter.open_data_locations_insert() IS +'Function to insert locations Ecocounter locations into ' +'`open_data.cycling_permanent_counts_locations`. '; diff --git a/volumes/ecocounter/open_data/create-function-open_data_raw_counts_insert.sql b/volumes/ecocounter/open_data/create-function-open_data_raw_counts_insert.sql new file mode 100644 index 000000000..d66fb39ee --- /dev/null +++ b/volumes/ecocounter/open_data/create-function-open_data_raw_counts_insert.sql @@ -0,0 +1,76 @@ +CREATE OR REPLACE FUNCTION ecocounter.open_data_15min_counts_insert( + yyyy integer +) +RETURNS void +LANGUAGE sql +COST 100 +VOLATILE + +AS $BODY$ + + WITH complete_dates AS ( + SELECT + s.site_id, + f.direction_main, + cc.datetime_bin::date, + COUNT(DISTINCT cc.datetime_bin) AS datetime_bin_unique + --this view excludes anomalous ranges + FROM ecocounter.counts AS cc + JOIN ecocounter.flows AS f USING (flow_id) + JOIN ecocounter.sites AS s USING (site_id) + WHERE + cc.datetime_bin >= to_date(yyyy::varchar, 'yyyy') + AND cc.datetime_bin < to_date((yyyy+1)::varchar, 'yyyy') + GROUP BY + s.site_id, + f.direction_main, + cc.datetime_bin::date, + f.bin_size + HAVING + --all datetime bins present, corrected for bin size + COUNT(DISTINCT cc.datetime_bin) = (3600*24 / EXTRACT(epoch FROM bin_size)) + --non-zero count days + AND SUM(cc.calibrated_volume) > 0 + ) + + INSERT INTO ecocounter.open_data_15min_counts ( + site_id, site_description, direction, datetime_bin, bin_volume + ) + SELECT + s.site_id, + s.site_description, + f.direction_main AS direction, + cc.datetime_bin AS datetime_bin, + SUM(cc.calibrated_volume) AS bin_volume + --this view excludes anomalous ranges + FROM ecocounter.counts AS cc + JOIN ecocounter.flows AS f USING (flow_id) + JOIN ecocounter.sites AS s USING (site_id) + JOIN complete_dates AS cd + ON + cd.site_id = s.site_id + AND cd.direction_main = f.direction_main + AND cc.datetime_bin >= cd.datetime_bin + AND cc.datetime_bin < cd.datetime_bin + interval '1 day' + WHERE + cc.datetime_bin >= to_date(yyyy::varchar, 'yyyy') + AND cc.datetime_bin < to_date((yyyy+1)::varchar, 'yyyy') + GROUP BY + s.site_id, + s.site_description, + f.direction_main, + cc.datetime_bin + ON CONFLICT (site_id, direction, datetime_bin) + DO NOTHING; + +$BODY$; + +ALTER FUNCTION ecocounter.open_data_15min_counts_insert(integer) OWNER TO ecocounter_admins; + +GRANT EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) TO ecocounter_admins; +GRANT EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) TO ecocounter_bot; +REVOKE EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) FROM bdit_humans; + +COMMENT ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) IS +'Function to insert disaggregate data for a year into `ecocounter.open_data_15min_counts`. ' +'Does not overwrite existing data (eg. if sensitivity was retroactively updated).'; diff --git a/volumes/ecocounter/open_data/create-table-open_data_daily_counts.sql b/volumes/ecocounter/open_data/create-table-open_data_daily_counts.sql new file mode 100644 index 000000000..ed4a3ad1d --- /dev/null +++ b/volumes/ecocounter/open_data/create-table-open_data_daily_counts.sql @@ -0,0 +1,25 @@ +-- Table: ecocounter.open_data_daily_counts + +-- DROP TABLE IF EXISTS ecocounter.open_data_daily_counts; + +CREATE TABLE IF NOT EXISTS ecocounter.open_data_daily_counts +( + site_id numeric, + site_description text COLLATE pg_catalog."default", + direction travel_directions, + dt date, + daily_volume numeric, + CONSTRAINT eco_open_data_daily_pkey PRIMARY KEY (site_id, direction, dt) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS ecocounter.open_data_daily_counts OWNER TO ecocounter_admins; + +REVOKE ALL ON TABLE ecocounter.open_data_daily_counts FROM bdit_humans; +GRANT SELECT ON TABLE ecocounter.open_data_daily_counts TO bdit_humans; + +GRANT SELECT, INSERT ON TABLE ecocounter.open_data_daily_counts TO ecocounter_bot; + +COMMENT ON TABLE ecocounter.open_data_daily_counts IS +'Daily Ecocounter data by site and direction.'; \ No newline at end of file diff --git a/volumes/ecocounter/open_data/create-table-open_data_raw_counts.sql b/volumes/ecocounter/open_data/create-table-open_data_raw_counts.sql new file mode 100644 index 000000000..901361f13 --- /dev/null +++ b/volumes/ecocounter/open_data/create-table-open_data_raw_counts.sql @@ -0,0 +1,25 @@ +-- Table: ecocounter.open_data_15min_counts + +-- DROP TABLE IF EXISTS ecocounter.open_data_15min_counts; + +CREATE TABLE IF NOT EXISTS ecocounter.open_data_15min_counts +( + site_id numeric, + site_description text COLLATE pg_catalog."default", + direction travel_directions, + datetime_bin timestamp without time zone, + bin_volume numeric, + CONSTRAINT eco_open_data_raw_pkey PRIMARY KEY (site_id, direction, datetime_bin) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS ecocounter.open_data_15min_counts OWNER TO ecocounter_admins; + +REVOKE ALL ON TABLE ecocounter.open_data_15min_counts FROM bdit_humans; +GRANT SELECT ON TABLE ecocounter.open_data_15min_counts TO bdit_humans; + +GRANT SELECT, INSERT ON TABLE ecocounter.open_data_15min_counts TO ecocounter_bot; + +COMMENT ON TABLE ecocounter.open_data_15min_counts IS +'Disaggregate Ecocounter data by site and direction.'; \ No newline at end of file diff --git a/volumes/ecocounter/open_data/create-view-open_data_locations.sql b/volumes/ecocounter/open_data/create-view-open_data_locations.sql new file mode 100644 index 000000000..2c8fca5e5 --- /dev/null +++ b/volumes/ecocounter/open_data/create-view-open_data_locations.sql @@ -0,0 +1,42 @@ +-- View: ecocounter.open_data_locations +-- DROP VIEW ecocounter.open_data_locations; + +CREATE OR REPLACE VIEW ecocounter.open_data_locations AS +WITH od_flows AS ( + SELECT + site_id, + direction AS direction_main, + MIN(dt::date) AS min_date, + MAX(dt::date) AS max_date + FROM ecocounter.open_data_daily_counts + GROUP BY + site_id, + direction +) + +SELECT DISTINCT ON (s.site_description, f.direction_main::text) + s.site_description AS location_name, + f.direction_main::text AS direction, + s.linear_name_full, + s.side_street, + ROUND(st_x(s.geom)::numeric, 7) AS lng, + ROUND(st_y(s.geom)::numeric, 7) AS lat, + s.centreline_id, + f.bin_size, + od_flows.min_date AS first_active, + od_flows.max_date AS last_active, + s.date_decommissioned::date AS date_decommissioned, + cf.count_date AS latest_calibration_study, + s.technology +FROM ecocounter.flows AS f +--omit sites if they have no data to publish. +JOIN od_flows USING (site_id, direction_main) +JOIN ecocounter.sites AS s USING (site_id) +LEFT JOIN ecocounter.calibration_factors AS cf USING (flow_id) +ORDER BY s.site_description, f.direction_main::text, cf.count_date DESC NULLS LAST; + +ALTER TABLE ecocounter.open_data_locations OWNER TO ecocounter_admins; + +GRANT SELECT ON TABLE ecocounter.open_data_locations TO bdit_humans; +GRANT ALL ON TABLE ecocounter.open_data_locations TO ecocounter_admins; +GRANT SELECT ON TABLE ecocounter.open_data_locations TO ecocounter_bot; diff --git a/volumes/ecocounter/open_data/ecocounter_open_data_export.sh b/volumes/ecocounter/open_data/ecocounter_open_data_export.sh new file mode 100644 index 000000000..f8baccd84 --- /dev/null +++ b/volumes/ecocounter/open_data/ecocounter_open_data_export.sh @@ -0,0 +1,38 @@ +HOST=trans-bdit-db-prod0-rds-smkrfjrhhbft.cpdcqisgj1fj.ca-central-1.rds.amazonaws.com +USER=gwolofs +#dest_path="/data/open_data/permanent-bike-counters" +dest_path="/data/home/gwolofs/open_data/permanent-bike-counters" +YR1=(1994 2024) +YR2=(2024 2025) + +cd ~ +rm -f -r $dest_path/*.csv +mkdir "$dest_path/" + +for ((i=0; i<${#YR1[@]}; i++)) do + /usr/bin/psql -h $HOST -U $USER -d bigdata -c \ + "SELECT location_dir_id, datetime_bin, bin_volume + FROM open_data.cycling_permanent_counts_15min + WHERE + datetime_bin >= to_date(${YR1[i]}::text, 'yyyy') + AND datetime_bin < LEAST(date_trunc('month', now()), to_date((${YR2[i]})::text, 'yyyy')) + ORDER BY location_dir_id, datetime_bin;" \ + --csv -o "$dest_path/cycling_permanent_counts_15min_${YR1[i]}_${YR2[i]}.csv" +done + +#need to export this on +#pandoc -V geometry:margin=1in \ +# -o $dest_path/cycling_permanent_counts_readme.pdf \ +# /data/home/gwolofs/bdit_data-sources/volumes/open_data/sql/cycling_permanent_counts_readme.md + +#as gwolofs +#grant permission for bigdata to read from my home folder. +setfacl -R -m u:bigdata:rx $dest_path + +pbrun su - bigdata +#rm /data/open_data/permanent-bike-counters/* +cp -r $dest_path/*.csv /data/open_data/permanent-bike-counters +cp -r $dest_path/*.pdf /data/open_data/permanent-bike-counters + +cd /data/open_data/permanent-bike-counters +wc -l ./* \ No newline at end of file diff --git a/volumes/ecocounter/open_data/readme.md b/volumes/ecocounter/open_data/readme.md new file mode 100644 index 000000000..c8196182a --- /dev/null +++ b/volumes/ecocounter/open_data/readme.md @@ -0,0 +1,20 @@ +# Tables + +This readme contains an overview of how Open Data schema is structured for Ecocounter. For detailed column descriptions, see documentation in the Open Data folder [here](../../open_data/sql/cycling_permanent_counts_readme.md), which mirrors what is posted to Open Data. + +## `ecocounter.open_data_locations` +List of Ecocounter site-direction pairs for Open Data. Each site-direction pair can contain more than one flow_id, eg. regular, contraflow, scooter. +- Only includes sites marked as `validated` in `ecocounter.sites`, `ecocounter.flows`. +- Sites are excluded if they have no data in `ecocounter.open_data_daily_counts`, for example because of an anomalous range. + +## ecocounter.open_data_daily_counts +Daily volumes (calibrated) for Open Data. +- Contains only complete days, excludes anomalous_ranges. +- Data is inserted into this table each month by the `ecocounter_open_data` DAG using fn `ecocounter.open_data_daily_counts_insert`. +- A primary key prevent us from accidentally overwriting older, already published data. + +## ecocounter.open_data_15min_counts +15 minute (or hourly for some older sites) volumes (calibrated) for Open Data. +- Contains only complete days, excludes anomalous_ranges. +- Data is inserted into this table each month by the `ecocounter_open_data` DAG using fn `ecocounter.open_data_15min_counts_insert`. +- A primary key prevent us from accidentally overwriting older, already published data. \ No newline at end of file diff --git a/volumes/ecocounter/pull_data_from_api.py b/volumes/ecocounter/pull_data_from_api.py index d5f5384b1..88bd2d6bf 100644 --- a/volumes/ecocounter/pull_data_from_api.py +++ b/volumes/ecocounter/pull_data_from_api.py @@ -74,13 +74,13 @@ def getFlowData(token: str, flow_id: int, startDate: datetime, endDate: datetime def getKnownSites(conn: any): with conn.cursor() as cur: - cur.execute('SELECT site_id FROM ecocounter.sites_unfiltered;') + cur.execute('SELECT site_id FROM ecocounter.sites_unfiltered WHERE date_decommissioned IS NULL;') sites = cur.fetchall() return [site[0] for site in sites] def getKnownFlows(conn: any, site: int): with conn.cursor() as cur: - cur.execute('SELECT flow_id FROM ecocounter.flows_unfiltered WHERE site_id = %s;', + cur.execute('SELECT flow_id FROM ecocounter.flows_unfiltered WHERE date_decommissioned IS NULL AND site_id = %s;', (site, ) ) flows = cur.fetchall() @@ -123,18 +123,19 @@ def insertFlowCounts(conn: any, volume: any): return cur.query # insert new site record -def insertSite(conn: any, site_id: int, site_name: str, lon: float, lat: float): +def insertSite(conn: any, site_id: int, site_name: str, counter: str, lon: float, lat: float): insert_query=""" - INSERT INTO ecocounter.sites_unfiltered (site_id, site_description, geom, validated) + INSERT INTO ecocounter.sites_unfiltered (site_id, site_description, counter, geom, validated) VALUES ( %s::numeric, %s::text, + %s::text, ST_SetSRID(ST_MakePoint(%s, %s), 4326), - null::boolean --not validated + null::boolean --not validated by default ) """ with conn.cursor() as cur: - cur.execute(insert_query, (site_id, site_name, lon, lat)) + cur.execute(insert_query, (site_id, site_name, counter, lon, lat)) # insert new flow record def insertFlow(conn: any, flow_id: int, site_id: int, flow_name: str, bin_size: int): diff --git a/volumes/ecocounter/qc/create-function-qc_graph_volumes.sql b/volumes/ecocounter/qc/create-function-qc_graph_volumes.sql new file mode 100644 index 000000000..42cb3cf87 --- /dev/null +++ b/volumes/ecocounter/qc/create-function-qc_graph_volumes.sql @@ -0,0 +1,70 @@ +-- FUNCTION: ecocounter.qc_graph_volumes(numeric) + +-- DROP FUNCTION IF EXISTS ecocounter.qc_graph_volumes(numeric); + +CREATE OR REPLACE FUNCTION ecocounter.qc_graph_volumes( + site_var numeric +) +RETURNS TABLE ( + site_id numeric, + flow_id numeric, + date date, + daily_volume numeric, + rolling_avg_1_week numeric, + flow_color text +) +LANGUAGE sql +COST 100 +VOLATILE PARALLEL UNSAFE +ROWS 1000 + +AS $BODY$ + +WITH daily_volumes AS ( + SELECT + site_id, + f.flow_id, + datetime_bin::date AS date, + CASE SUM(lat.calibrated_volume) WHEN 0 THEN null ELSE SUM(lat.calibrated_volume) END AS daily_volume + FROM ecocounter.counts_unfiltered AS c + JOIN ecocounter.flows_unfiltered AS f USING (flow_id) + LEFT JOIN ecocounter.calibration_factors AS cf + ON c.flow_id = cf.flow_id + AND c.datetime_bin::date <@ cf.factor_range, + LATERAL ( + SELECT + round(COALESCE(cf.ecocounter_day_corr_factor, 1::numeric) * c.volume::numeric) AS calibrated_volume + ) lat + WHERE site_id = site_var + GROUP BY + site_id, + f.flow_id, + datetime_bin::date +) + +SELECT + dv.site_id, + dv.flow_id, + dv.date, + dv.daily_volume, + AVG(dv.daily_volume) OVER w AS rolling_avg_1_week, + f.flow_direction || ' - ' || f.flow_id AS color +FROM daily_volumes AS dv +LEFT JOIN ecocounter.flows_unfiltered AS f USING (flow_id) +WINDOW w AS ( + PARTITION BY dv.flow_id + ORDER BY dv.date + RANGE BETWEEN interval '6 days' PRECEDING AND CURRENT ROW +) +ORDER BY + dv.site_id, + dv.flow_id, + dv.date + +$BODY$; + +ALTER FUNCTION ecocounter.qc_graph_volumes(numeric) +OWNER TO ecocounter_admins; + +COMMENT ON FUNCTION ecocounter.qc_graph_volumes IS +'A function to get unfiltered flows/volumes for Ecocounter Shiny graphing tool.'; \ No newline at end of file diff --git a/volumes/ecocounter/qc/qc_plots_pdf.r b/volumes/ecocounter/qc/qc_plots_pdf.r new file mode 100644 index 000000000..ea149a879 --- /dev/null +++ b/volumes/ecocounter/qc/qc_plots_pdf.r @@ -0,0 +1,187 @@ +library('tidyverse') +library('dplyr') +library('dbplyr') +library('ggplot2') +library('config') +library('gridExtra') +library('ggrepel') + +setwd('~/../OneDrive - City of Toronto/Documents/R') + +dw <- config::get("bigdata") + +con <- DBI::dbConnect(RPostgres::Postgres(), + host = dw$host, + dbname = dw$database, + user = dw$user, + password = dw$pwd +) + +flows = tbl(con, sql("SELECT * FROM ecocounter.flows")) %>% collect() +sites = tbl(con, sql("SELECT * FROM ecocounter.sites ORDER BY site_description")) %>% + mutate(site_title = paste0(site_description, " (site_id = ", site_id, ")")) %>% collect() + +volumes_valid = tbl(con, sql(" + SELECT + site_id, direction::text AS flow_color, dt AS date, daily_volume, + CASE WHEN daily_volume IS NOT NULL THEN AVG(daily_volume) OVER w END AS rolling_avg_1_week + FROM ecocounter.open_data_daily_counts + WINDOW w AS ( + PARTITION BY site_id, direction + ORDER BY dt + RANGE BETWEEN interval '6 days' PRECEDING AND CURRENT ROW + )")) %>% + #filter(site_id %in% !!site_ids()) %>% + collect() %>% + group_by(site_id, flow_color) %>% + arrange(site_id, flow_color, date) %>% + mutate(datedif = as.numeric(date - dplyr::lag(date))-1) %>% + mutate(groupid = cumsum(ifelse(is.na(datedif), 0, datedif))) + +volumes_qc = tbl(con, sql(sprintf(" + SELECT site_id, flow_id, date, daily_volume, rolling_avg_1_week, flow_color + FROM (VALUES %s) AS sites(s), + LATERAL ecocounter.qc_graph_volumes(s) AS qc", + paste0('(', sites$site_id, ')', collapse = ', ')))) %>% + collect() %>% + group_by(site_id, flow_color) %>% + arrange(date) %>% + mutate(datedif = as.numeric(date - dplyr::lag(date))-1) %>% + mutate(groupid = cumsum(ifelse(is.na(datedif), 0, datedif))) + +anomalous_ranges = tbl(con, sql(" + SELECT + site_id, + lower(time_range)::date AS lower, + upper(time_range)::date AS upper, + CASE WHEN flow_id IS NULL THEN 'site_id - ' || site_id + ELSE 'flow_id - ' || flow_id END || ': ' || notes AS notes, + problem_level + FROM ecocounter.anomalous_ranges +")) %>% collect() + +calibration_factors = tbl(con, sql(" + SELECT + flow_id, + site_id, + flow_direction, + ecocounter_day_corr_factor AS calibration_factor, + LOWER(factor_range) AS factor_start, + UPPER(factor_range) AS factor_end, + f.flow_direction || ' - ' || f.flow_id AS flow_color + FROM ecocounter.calibration_factors + JOIN ecocounter.flows_unfiltered AS f USING (flow_id) + JOIN ecocounter.sites USING (site_id) +")) %>% collect() + + +#s="Bloor St W, between Palmerston & Markham (retired)" +myplots <- list() +i=1 +for (s in unique(sites$site_description)) { + site = sites %>% filter(site_description == s) + site_ids = (sites %>% filter(site_description == s))$site_id + v = volumes_qc %>% filter(site_id %in% site_ids) + site_titles = (sites %>% filter(site_id %in% site_ids))$site_title + graph_title <- paste0(s, ' (site_id = ', paste(site_ids, collapse = ', '), ')') + + limits = v %>% filter(!is.na(daily_volume)) + min_date <- min(limits$date) + max_date <- max(limits$date) + if (max_date - min_date > dyears(3)){ + break_minor = '1 month' + break_major = '1 year' + } else { + break_minor = '1 day' + break_major = '1 month' + } + + ars = anomalous_ranges %>% filter(site_id %in% site_ids) %>% rowwise() %>% + mutate(x_plot = mean.Date(c(coalesce(upper, max_date), coalesce(lower, min_date)))) + cf = calibration_factors %>% filter(site_id %in% site_ids) + + volumes_layers <- list( + geom_path(data = v, aes( + x=date, y=daily_volume, color = flow_color, group = paste(site_id, flow_color, groupid) + ), linewidth = 0.2, alpha = 0.5), + geom_line(data = v, aes( + x=date, y=rolling_avg_1_week, linewidth = "7 day avg", + color = flow_color, group = paste(site_id, flow_color, groupid)), linewidth = 1) + ) + + v_valid <- volumes_valid %>% filter(site_id %in% site_ids) + volumes_layers_valid <- list( + geom_path(data = v_valid, aes( + x=date, y=daily_volume, color = flow_color, group = paste(site_id, flow_color, groupid) + ), linewidth = 0.2, alpha = 0.5), + geom_line(data = v_valid, aes( + x=date, y=rolling_avg_1_week, linewidth = "7 day avg", + color = flow_color, group = paste(site_id, flow_color, groupid)), linewidth = 1) + ) + + base_plot <- list( + ggtitle(graph_title), + theme_bw(), + scale_x_date(date_breaks = break_major, + date_minor_breaks = break_minor, + date_labels = "%Y-%m-%d", limits = c(min_date-ddays(20), max_date+ddays(20))), + theme(axis.text.x = element_text(angle = 45, vjust = 1, hjust=1), + text = element_text(size = 10)), + guides( + alpha="none", + linewidth=guide_legend(title=NULL), + color=guide_legend(title="Flow"), + fill=guide_legend(title="Anomalous Range")) + ) + + cf_layers <- list( + geom_vline( + #add a jitter to calibration factors since they occur on the same date. + data = cf, #%>% mutate(factor_start = factor_start + sample(rnorm(1,7), n(), replace = TRUE)), + aes(xintercept=factor_start, color = flow_color), + linewidth = 1.2, linetype = 'dotted'), + geom_label_repel( + data = cf, + max.overlaps = Inf, + aes(x = coalesce(factor_start, min_date)+3, y = max(limits$daily_volume)-50, + color = flow_color, label = paste("CF: ", calibration_factor))) + ) + + ar_layers = list( + geom_rect(data = ars, + aes( + fill=problem_level, + xmin=coalesce(lower, min_date), + xmax=coalesce(upper, max_date), + ymin = 0, + ymax = max(limits$daily_volume) + ), alpha = 0.5), + geom_label_repel(data = ars, aes( + x = x_plot, + y = max(limits$daily_volume), + label = stringr::str_wrap(notes, 35), + hjust = 0, + vjust = 1)), + scale_fill_manual( + values = c("#00BFC4", "#F8766D"), + limits = c("valid-caveat", "do-not-use")) + ) + + + + gA <- ggplot() + base_plot + ar_layers + volumes_layers + cf_layers + gB <- ggplot() + base_plot + volumes_layers_valid + grid::grid.newpage() + myplots[[i]] <- gridExtra::gtable_rbind(ggplotGrob(gA), ggplotGrob(gB)) + i <- i+1 +} + + +fname = paste0("ecocounter_validation_plots_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".pdf") + +ggsave( + filename = file.path(getwd(), fname), + plot = marrangeGrob(myplots, nrow=1, ncol=1), + width = 15, height = 9 +) +print(file.path(getwd(), fname)) \ No newline at end of file diff --git a/volumes/ecocounter/qc/qc_shiny_app.r b/volumes/ecocounter/qc/qc_shiny_app.r new file mode 100644 index 000000000..eacff81c2 --- /dev/null +++ b/volumes/ecocounter/qc/qc_shiny_app.r @@ -0,0 +1,489 @@ +# Install required packages if not already installed +# install.packages("shiny") +# install.packages("ggplot2") + +library(shiny) +library(DT) +library(tidyverse) +library(dplyr) +library(dbplyr) +library(ggplot2) +library(config) +library(ggrepel) + +setwd('~/../OneDrive - City of Toronto/Documents/R') + +#db connection from config.yml located in working directory +dw <- config::get("bigdata") +con <- DBI::dbConnect(RPostgres::Postgres(), + host = dw$host, + dbname = dw$database, + user = dw$user, + password = dw$pwd +) + +dir.create('ecocounter_anomalous_ranges', showWarnings = FALSE) +export_path <- file.path(getwd(), 'ecocounter_anomalous_ranges') + +flows = tbl(con, sql("SELECT * FROM ecocounter.flows")) %>% collect() +sites = tbl(con, sql("SELECT * FROM ecocounter.sites ORDER BY site_description")) %>% + mutate(site_title = paste0(site_description, " (site_id = ", site_id, ")")) %>% collect() + +# Define UI for the app +ui <- fluidPage( + titlePanel("Ecocounter - Interactive Anomalous Range Selection"), + fluidRow( + column(3, + fluidRow(uiOutput('site_descriptions')), + actionButton("query", "Query Selected Sites") + ), + column(3, + checkboxInput(inputId = 'validated_only', label = 'Validated Data Only', value = FALSE), + checkboxInput(inputId = 'anomalous_ranges', label = 'Show Anomalous Ranges?', value = TRUE), + checkboxInput(inputId = 'calibration_factors', label = 'Show Calibration Factors?', value = TRUE), + checkboxInput(inputId = 'recent_data', label = 'Only sites with recent data?', value = FALSE) + ) + ), + fluidRow(uiOutput("dynamic_title")), # Dynamic title panel + fluidRow( + plotOutput("plot", + dblclick = "plot_dblclick", + #click = "plot_click", + brush = brushOpts( + id = "brush", + resetOnNew = FALSE, + delay = 1000, + delayType = "debounce" + ))), + fluidRow( + fluidRow( + actionButton("save_range_flow", "Save Range (flows)"), + actionButton("save_range_site", "Save Range (site)") + ), + DTOutput("myTable"), + actionButton("export_csv", "Export Ranges to File") + ) +) + +#remove buttons to make rows in temp anomalous range table removable +getRemoveButton <- function(n, idS = "", lab = "Pit") { + if (stringr::str_length(idS) > 0) idS <- paste0(idS, "-") + ret <- shinyInput(actionButton, n, + 'button_', label = "Remove", + onclick = sprintf('Shiny.onInputChange(\"%sremove_button_%s\", this.id)' ,idS, lab)) + return (ret) +} + +#helper function for getRemoveButton +shinyInput <- function(FUN, n, id, ses, ...) { + as.character(FUN(paste0(id, n), ...)) +} + +# Define server logic +server <- function(input, output, session) { + # Store ranges selected by the user + values <- reactiveValues() + values$tab <- data.frame( + id = integer(), + site_id = numeric(), + flow_id = numeric(), + range_start = as.Date(character()), + range_end = as.Date(character()), + notes = character(), + investigation_level = character(), + problem_level = character() + ) %>% + mutate(Remove = getRemoveButton(id, idS = "", lab = "Tab1")) + + buttonCounter <- 0L + + proxyTable <- DT::dataTableProxy("tab") + + # Render table with delete buttons + output$myTable <- DT::renderDataTable({ + DT::datatable(values$tab, + #options = list(dom = "t"), + escape = FALSE, + editable = TRUE) + }) + + # reactive values for zoomable plot + ranges <- reactiveValues(x = NULL, y = NULL) + + # Clear brush, reset zoom when site_id changes or range is saved. + observeEvent(list(input$query, input$save_range_site, input$save_range_flow), priority = -1, { + session$resetBrush("brush") + ranges$x <- NULL + ranges$y <- NULL + }) + + #sites_list which can be filtrered for all data or only recent data. + sites_list <- reactive({ + if (input$recent_data) { + sites_list <- (sites %>% filter(last_active >= today() - dmonths(2)))$site_description + } else { + sites_list <- sites$site_description + } + return(unique(sites_list)) + }) + + #a dynamic site select list based on checkbox. + output$site_descriptions = renderUI({ + selectInput("site_descriptions", + label = "Select Site ID", + choices = sites_list(), + selected = sites_list()[1], + multiple = TRUE, + selectize = FALSE, + width = 400) + }) + + #converts site descriptions to ids + site_ids <- eventReactive(input$site_descriptions, { + temp = sites %>% filter(site_description %in% input$site_descriptions) + return(temp$site_id) + }) + + # When a double-click happens, check if there's a brush on the plot. + # If so, zoom to the brush bounds; if not, reset the zoom. + observeEvent(list(input$plot_dblclick), priority = 1, { + brush <- input$brush + if (!is.null(brush)) { + ranges$x <- c(as.Date(brush$xmin), as.Date(brush$xmax)) + ranges$y <- c(brush$ymin, brush$ymax) + } else { + ranges$x <- NULL + ranges$y <- NULL + } + }) + + #volume data, refreshes on site_id + vol <- eventReactive(list(input$query, input$validated_only), { + + #returns open data if "validted = true" + if(input$validated_only){ + volumes = tbl(con, sql(" + SELECT + site_id, direction::text AS flow_color, dt AS date, daily_volume, + CASE WHEN daily_volume IS NOT NULL THEN AVG(daily_volume) OVER w END AS rolling_avg_1_week + FROM ecocounter.open_data_daily_counts + WINDOW w AS ( + PARTITION BY site_id, direction + ORDER BY dt + RANGE BETWEEN interval '6 days' PRECEDING AND CURRENT ROW + )")) %>% + filter(site_id %in% !!site_ids()) %>% + collect() %>% + group_by(site_id, flow_color) %>% + arrange(site_id, flow_color, date) %>% + mutate(datedif = as.numeric(date - dplyr::lag(date))-1) %>% + mutate(groupid = cumsum(ifelse(is.na(datedif), 0, datedif))) + + #otherwise returns unfiltered data + } else { + volumes = tbl(con, sql(sprintf(" + SELECT site_id, flow_id, date, daily_volume, rolling_avg_1_week, flow_color + FROM (VALUES %s) AS sites(s), + LATERAL ecocounter.qc_graph_volumes(s) AS qc", + paste0('(', site_ids(), ')', collapse = ', ')))) %>% + collect() %>% + group_by(site_id, flow_color) %>% + arrange(date) %>% + mutate(datedif = as.numeric(date - dplyr::lag(date))-1) %>% + mutate(groupid = cumsum(ifelse(is.na(datedif), 0, datedif))) + } + return(volumes) + }) + + #anomalous ranges data, refreshes on site_id + ars <- eventReactive(input$site_descriptions, { + anomalous_ranges = tbl(con, sql(" + SELECT + site_id, + lower(time_range)::date AS lower, + upper(time_range)::date AS upper, + CASE WHEN flow_id IS NULL THEN 'site_id - ' || site_id + ELSE 'flow_id - ' || flow_id END || ': ' || notes AS notes, + problem_level + FROM ecocounter.anomalous_ranges + ")) %>% filter(site_id %in% !!site_ids()) %>% collect() + return(anomalous_ranges) + }) + + #calibration factors, refreshes on site_id + corr <- eventReactive(input$site_descriptions, { + calibration_factors = tbl(con, sql(" + SELECT + flow_id, + site_id, + flow_direction, + ecocounter_day_corr_factor AS calibration_factor, + LOWER(factor_range) AS factor_start, + UPPER(factor_range) AS factor_end, + f.flow_direction || ' - ' || f.flow_id AS flow_color + FROM ecocounter.calibration_factors + JOIN ecocounter.flows_unfiltered AS f USING (flow_id) + JOIN ecocounter.sites USING (site_id) + ")) %>% filter(site_id %in% !!site_ids()) %>% collect() + return(calibration_factors) + }) + + #dynamic graph title as a UI element (not ggplot) so it is select-able. + output$dynamic_title <- renderUI({ + s = site_ids() + site_titles = (sites %>% filter(site_id %in% s))$site_title + tags$h2( + paste(site_titles, collapse = ', '), + style = "font-size: 22px; text-align: center;" + ) + }) + + # Observe "Remove" buttons and removes rows + observeEvent(input$remove_button_Tab1, { + myTable <- values$tab + s <- as.numeric(strsplit(input$remove_button_Tab1, "_")[[1]][2]) + myTable <- filter(myTable, id != s) + replaceData(proxyTable, myTable, resetPaging = FALSE) + values$tab <- myTable + }) + + # Observe brush input and add ranges to the reactive dataframe + observeEvent(input$save_range_site, { + brush <- input$brush + + if (!is.null(brush)) { + myTable <- isolate(values$tab) + si = site_ids() + for (s in si){ + buttonCounter <<- buttonCounter + 1L + new_range <- data.frame( + id = buttonCounter, + site_id = s, + flow_id = 0, + range_start = as.Date(input$brush$xmin), + range_end = as.Date(input$brush$xmax), + notes = "", + investigation_level = 'confirmed', + problem_level = 'do-not-use' + ) + + myTable <- bind_rows( + myTable, + new_range %>% + mutate(Remove = getRemoveButton(buttonCounter, idS = "", lab = "Tab1"))) + } + replaceData(proxyTable, myTable, resetPaging = FALSE) + values$tab <- myTable + } + }) + + # Observe brush input and add ranges to the reactive dataframe + observeEvent(input$save_range_flow, { + brush <- input$brush + + if (!is.null(brush)) { + myTable <- isolate(values$tab) + si = site_ids() + + for (s in si){ + fl = (flows %>% filter(site_id == s))$flow_id + for (f in fl){ + buttonCounter <<- buttonCounter + 1L + new_range <- data.frame( + id = buttonCounter, + site_id = s, + flow_id = f, + range_start = as.Date(input$brush$xmin), + range_end = as.Date(input$brush$xmax), + notes = "", + investigation_level = 'confirmed', + problem_level = 'do-not-use' + ) + + myTable <- bind_rows( + myTable, + new_range %>% + mutate(Remove = getRemoveButton(buttonCounter, idS = "", lab = "Tab1"))) + } + } + replaceData(proxyTable, myTable, resetPaging = FALSE) + values$tab <- myTable + } + }) + + + + # Observes table editing and updates data + observeEvent(input$myTable_cell_edit, { + + myTable <- values$tab + row <- input$myTable_cell_edit$row + clmn <- input$myTable_cell_edit$col + myTable[row, clmn] <- input$myTable_cell_edit$value + replaceData(proxyTable, myTable, resetPaging = FALSE) + values$tab <- myTable + + }) + + #update anomalous ranges layer on these events + ar_listen <- reactive({ + list(input$myTable_cell_edit, input$save_range, input$query, input$anomalous_ranges) + }) + + #combine anomalous ranges from database and temp table to display + ar_data <- eventReactive(ar_listen(), { + temp_ars = values$tab %>% filter(site_id %in% site_ids()) %>% + transmute(site_id, lower = range_start, upper = range_end, notes, problem_level = "Draft") + + return(rbind(ars(), temp_ars)) + }) + + + output$plot <- renderPlot({ + #this part of the plot only changes when site id changes + p <- ggplot() + base_plot() + + if(input$calibration_factors){ + p <- p + cf_layers() + } + + #changes when brush changes + if (!is.null(input$brush)) { + p <- p + brush_labels() + } + + #changes when anomalous ranges change + if ((ar_data() %>% count() > 0) & input$anomalous_ranges){ + p <- p + ar_layers() + } + + p + + #reactive to zoom + coord_cartesian(xlim = ranges$x, ylim = ranges$y, expand = TRUE) + }) + + cf_layers <- eventReactive(list(input$calibration_factors, input$query), { + v = vol() + limits = v %>% filter(!is.na(daily_volume)) + min_date <- min(limits$date) + max_date <- max(limits$date) + cf = corr() + layers <- list( + geom_vline( + #add a jitter to calibration factors since they occur on the same date. + data = cf, #%>% mutate(factor_start = factor_start + sample(rnorm(1,7), n(), replace = TRUE)), + aes(xintercept=factor_start, color = flow_color), + linewidth = 1.2, linetype = 'dotted'), + geom_label_repel( + data = cf, + max.iter = 1, + aes(x = coalesce(factor_start, min_date)+3, y = max(limits$daily_volume)-50, + color = flow_color, label = paste("CF: ", calibration_factor))) + ) + }) + + #a base part of the plot which doesn't change except when site_id changes (improves render time) + base_plot <- eventReactive(list(input$query, input$validated_only), { + v = vol() + limits = v %>% filter(!is.na(daily_volume)) + min_date <- min(limits$date) + max_date <- max(limits$date) + if (max_date - min_date > dyears(3)){ + break_minor = '1 month' + break_major = '1 year' + } else { + break_minor = '1 day' + break_major = '1 month' + } + + layers <- list( + theme_bw(), + geom_path(data = v, aes( + x=date, y=daily_volume, color = flow_color, group = paste(site_id, flow_color, groupid) + ), linewidth = 0.2, alpha = 0.5), + geom_line(data = v, aes( + x=date, y=rolling_avg_1_week, linewidth = "7 day avg", + color = flow_color, group = paste(site_id, flow_color, groupid)), linewidth = 1), + scale_x_date(date_breaks = break_major, + date_minor_breaks = break_minor, + date_labels = "%Y-%m-%d", limits = c(min_date-ddays(20), max_date+ddays(20))), + theme(axis.text.x = element_text(angle = 90, vjust = 0.5, hjust=1), + text = element_text(size = 20)), + guides( + alpha="none", + linewidth=guide_legend(title=NULL), + color=guide_legend(title="Flow"), + fill=guide_legend(title="Anomalous Range")) + ) + + return(layers) + }) + + #dynamically label brush extents + brush_labels <- eventReactive(input$brush, { + brush=input$brush + if (!is.null(brush)) { + brush_dates <- c(as.Date(brush$xmin), as.Date(brush$xmax)) + v = vol() %>% filter(date %in% brush_dates) + layers <- list( + geom_point(data = v, aes(x=date, y=daily_volume, color = flow_color), size = 2), + geom_label_repel(data = v, max.iter = 1, + aes(x=date, y=daily_volume, label=daily_volume, color = flow_color)) + ) + } + return(layers) + }) + + #dynamically draw anomalous ranges when they change + ar_layers <- eventReactive(ar_listen(), { + ars = ar_data() + limits = vol() %>% filter(!is.na(daily_volume)) + min_date <- min(limits$date) + max_date <- max(limits$date) + ars <- ars %>% rowwise() %>% mutate(x_plot = mean.Date(c(coalesce(upper, max_date), coalesce(lower, min_date)))) + + layers = list( + geom_rect(data = ars, + aes( + fill=problem_level, + xmin=coalesce(lower, min_date), + xmax=coalesce(upper, max_date), + ymin = 0, + ymax = max(limits$daily_volume) + ), alpha = 0.5), + geom_text(data = ars, aes( + x = x_plot, + y = max(limits$daily_volume), + label = stringr::str_wrap(notes, 35), + hjust = 0, + vjust = 1)) + ) + return(layers) + }) + + # Save ranges to a CSV file when button is clicked + observeEvent(input$export_csv, { + fname = paste0("anomalous_ranges_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".txt") + temp <- values$tab %>% + mutate(time_range = paste0("[", range_start, ", ", range_end, ")")) %>% + select(-any_of(c("Remove", "id", "range_start", "range_end"))) + + comma_sep <- function(x) paste0("('", paste0(x, collapse = "', '"), sep = "')") + + lines <- c( + paste0("INSERT INTO ecocounter.anomalous_ranges (", paste(colnames(temp), collapse = ', '), ") (VALUES "), + paste0(apply(temp, 1, comma_sep), collapse = ',\n'), + ') RETURNING ecocounter.anomalous_ranges.*' + ) + write_lines(lines, file.path(export_path, fname)) + + showModal(modalDialog( + title = "Ranges Saved to: ", + paste0("Your selected ranges have been saved to:", file.path(export_path, fname)), + easyClose = TRUE + )) + }) +} + +# Run the application +shinyApp(ui = ui, server = server) \ No newline at end of file diff --git a/volumes/ecocounter/qc/readme.md b/volumes/ecocounter/qc/readme.md new file mode 100644 index 000000000..78e9e2364 --- /dev/null +++ b/volumes/ecocounter/qc/readme.md @@ -0,0 +1,24 @@ + +This folder contains two R scripts used for data validation. + +## [`qc_shiny_app.r`](./qc_shiny_app.r) +- An R script for generating QC plots in a (local) interactive R Shiny app. Can also be used for labelling anomalous ranges interactively, and then exporting new ranges to .txt as a query ready for insertion to the database. +- You should store a config.yml file containing your database credentials at `~/../OneDrive - City of Toronto/Documents/R`. + +## [`qc_plots_pdf.r`](./qc_plots_pdf.r) +- An R script for generating QC plots to pdf. + +## Calibration - Validation + +[`scaling_validation.r`](./scaling_validation.r): Script used for generating QC plots to validate the calibration (rounding) method. +- Investigation applied the daily calibration factors developed from the manual studies to the 15 minute bins, and then summed the rounded 15 minute values to the daily level to check error introduced by rounding. +- For higher volume sites (>100), the average error was 0.8 bikes per day (around 0.1%). +- For low volume sites (<100), the average error was -1.6 bikes per day (around -5%), indicating a minor undercounting. + +- Validation of calibration factors by Site and Study date, showing % error vs ground truth (Spectrum) data: + +![alt text](scaling_validation_percent.png) + +- Validation of calibration factors by Site and Study date, showing # error vs ground truth (Spectrum) data: + +![alt text](scaling_validation_num.png) \ No newline at end of file diff --git a/volumes/ecocounter/qc/scaling_validation.R b/volumes/ecocounter/qc/scaling_validation.R new file mode 100644 index 000000000..c202f9500 --- /dev/null +++ b/volumes/ecocounter/qc/scaling_validation.R @@ -0,0 +1,88 @@ + +library(tidyverse) +library(dplyr) +library(dbplyr) +library(ggplot2) +library(config) +library(ggrepel) +library(gridExtra) + +setwd('~/../OneDrive - City of Toronto/Documents/R') + +#db connection from config.yml located in working directory +dw <- config::get("bigdata") +con <- DBI::dbConnect(RPostgres::Postgres(), + host = dw$host, + dbname = dw$database, + user = dw$user, + password = dw$pwd +) + +validation_results = tbl(con, sql(" + WITH most_common_counts AS ( + SELECT ecocounter_site_id, count_date, ecocounter_direction, string_agg(ecocounter_bikes || ':' || count, chr(10) ORDER BY count DESC) AS common_counts + FROM ( + SELECT ecocounter_site_id, count_date, ecocounter_direction, ecocounter_bikes, COUNT(*) + FROM ecocounter.manual_counts_matched + WHERE ecocounter_bikes > 0 + GROUP BY ecocounter_site_id, count_date, ecocounter_direction, ecocounter_bikes + ) AS eco_counts + GROUP BY ecocounter_site_id, count_date, ecocounter_direction + ) + + SELECT + ecocounter_site_id, + sites.site_description, + count_date, + ecocounter_direction, + vol_spec_path::numeric, + vol_ecocounter, + SUM(round(evci.ecocounter_day_corr_factor * ecocounter_bikes::numeric)) AS total_open_data_method, + SUM(round(evci.ecocounter_day_corr_factor * ecocounter_bikes::numeric)) - vol_spec_path AS open_data_error, + ROUND((SUM(round(evci.ecocounter_day_corr_factor * ecocounter_bikes::numeric)) - vol_spec_path) / vol_spec_path, 3) AS open_data_error_percent, + common_counts + FROM ecocounter.manual_counts_matched AS vcm --ecocounter_site_id, ecocounter_approach, spectrum_approach, ecocounter_direction, count_date + JOIN ecocounter.validation_results AS evci USING (ecocounter_site_id, count_date, ecocounter_direction) + JOIN most_common_counts USING (ecocounter_site_id, count_date, ecocounter_direction) + JOIN ecocounter.sites ON ecocounter_site_id = site_id + GROUP BY + ecocounter_site_id, + count_date, + ecocounter_direction, + vol_spec_path, + vol_ecocounter, + sites.site_description, + common_counts + ORDER BY ABS(ROUND((vol_spec_path - SUM(round(evci.ecocounter_day_corr_factor * ecocounter_bikes::numeric))) / vol_spec_path, 3)) DESC +")) %>% collect() %>% + mutate(title = paste(site_description, '-', ecocounter_direction, '-', count_date)) %>% + rename("Spectrum Volume" = vol_spec_path, "Scaling Error" = open_data_error) + +summary = validation_results %>% group_by("Spectrum Count < 100" = `Spectrum Volume` < 100) %>% + summarise( + avg_error = round(mean(`Scaling Error`), 1), + avg_percent_error = scales::label_percent()(sum(`Scaling Error`) / sum(`Spectrum Volume`)), + count = n() + ) + +p <- validation_results %>% + select(title, `Scaling Error`, `Spectrum Volume`, open_data_error_percent) %>% + pivot_longer(names_to = "type", values_to = "count", + cols = c(`Spectrum Volume`, `Scaling Error`)) %>% +ggplot() + + geom_col(aes(x=forcats::fct_reorder(title, abs(open_data_error_percent)), + y=count, fill=type)) + +theme(axis.text.x = element_text(angle = 45, vjust = 1, hjust=1), + text = element_text(size = 10)) + xlab('Study')+ + ggtitle('Ecocounter - Spectrum Scaling Validation') + + annotation_custom(tableGrob(summary), xmin = 50, xmax = 65, ymin = 1000, ymax = Inf ) + +p + geom_label_repel(data = validation_results, + aes(x=title, y=`Spectrum Volume`, + label = scales::label_percent(accuracy = 0.1)(open_data_error_percent)), + max.overlaps = Inf, min.segment.length = 0) + +p + geom_label_repel(data = validation_results, + aes(x=title, y=`Spectrum Volume`, + label = `Scaling Error`), + max.overlaps = Inf, min.segment.length = 0) diff --git a/volumes/ecocounter/qc/scaling_validation_num.png b/volumes/ecocounter/qc/scaling_validation_num.png new file mode 100644 index 000000000..3e1d3291c Binary files /dev/null and b/volumes/ecocounter/qc/scaling_validation_num.png differ diff --git a/volumes/ecocounter/qc/scaling_validation_percent.png b/volumes/ecocounter/qc/scaling_validation_percent.png new file mode 100644 index 000000000..6aeaf3223 Binary files /dev/null and b/volumes/ecocounter/qc/scaling_validation_percent.png differ diff --git a/volumes/ecocounter/readme.md b/volumes/ecocounter/readme.md index a1e9d843d..f8b82ea8e 100644 --- a/volumes/ecocounter/readme.md +++ b/volumes/ecocounter/readme.md @@ -1,29 +1,30 @@ -# Ecocounter - - [Bicycle loop detectors](#bicycle-loop-detectors) - - [Installation types](#installation-types) - - [Ecocounter data](#ecocounter-data) - - [Flows - what we know](#flows---what-we-know) - - [Discontinuities](#discontinuities) - - [Using the Ecocounter API](#using-the-ecocounter-api) - - [Note](#note) - - [Historical data](#historical-data) - - [`ecocounter_pull` DAG](#ecocounter_pull-dag) - - [`ecocounter_check` DAG](#ecocounter_check-dag) + - [Installation types](#installation-types) + - [Ecocounter data](#ecocounter-data) + - [Flows - what we know](#flows---what-we-know) + - [Discontinuities](#discontinuities) + - [Using the Ecocounter API](#using-the-ecocounter-api) + - [Note](#note) + - [Historical data](#historical-data) + - [ecocounter_pull DAG](#ecocounter_pull-dag) + - [ecocounter_check DAG](#ecocounter_check-dag) + - [ecocounter_open_data DAG](#ecocounter_open_data-dag) - [SQL Tables](#sql-tables) - - [Main Tables](#main-tables) - - [`ecocounter.sites_unfiltered`](#ecocountersites_unfiltered) - - [`ecocounter.counts_unfiltered`](#ecocountercounts_unfiltered) - - [`ecocounter.flows_unfiltered`](#ecocounterflows_unfiltered) - - [QC Tables](#qc-tables) - - [`ecocounter.discontinuities`](#ecocounterdiscontinuities) - - [`ecocounter.anomalous_ranges`](#ecocounteranomalous_ranges) - - [Validation](#validation) - - [`ecocounter.manual_counts_matched`](#ecocountermanual_counts_matched) - - [`ecocounter.manual_counts_info`](#ecocountermanual_counts_info) - - [`ecocounter.manual_counts_raw`](#ecocountermanual_counts_raw) + - [Main Tables](#main-tables) + - [ecocounter.sites_unfiltered](#ecocountersites_unfiltered) + - [ecocounter.counts](#ecocountercounts) + - [ecocounter.counts_unfiltered](#ecocountercounts_unfiltered) + - [ecocounter.flows_unfiltered](#ecocounterflows_unfiltered) + - [QC Tables](#qc-tables) + - [ecocounter.calibration_factors](#ecocountercalibration_factors) + - [ecocounter.sensitivity_history](#ecocountersensitivity_history) + - [ecocounter.anomalous_ranges](#ecocounteranomalous_ranges) + - [Validation](#validation) + - [ecocounter.manual_counts_matched](#ecocountermanual_counts_matched) + - [ecocounter.manual_counts_info](#ecocountermanual_counts_info) + - [ecocounter.manual_counts_raw](#ecocountermanual_counts_raw) @@ -75,10 +76,7 @@ From an email from Pierre, of Ecocounter: ## Discontinuities -In January of 2024, it was determined that several sites were undercounting relative to other counting methods. To address this, the sensitivity of these sites was increased. -As a result of the change however, we now expect to have some degree of discontinuity in the data where counts before and after a certain point in time may not be directly comparable. - -While we're still working through how best to handle this, we have recorded the times and sites that were effected in a new table, `ecocounter.discontinuities`. +Since 2023, periodic ground-truth counts have been used to calibrate sensors. Following these studies, if sensitivity of a sensor is adjusted, the new sensitivity is logged in `ecocounter.sensitivity_history` and will result in a discontinuity in the raw volumes. However, if you use `ecocounter.counts.calibrated_volumes`, you should not see a discontinuity as both before and after volumes are adjusted to match ground-truth observations using appropriate calibration factors. ## Using the Ecocounter API @@ -113,7 +111,7 @@ LIMIT 1000; -## `ecocounter_pull` DAG +## ecocounter_pull DAG The `ecocounter_pull` DAG runs daily at 3am to populate `ecocounter` schema with new data. - `pull_recent_outages` task is similar to `pull_ecocounter` task except it tries to pull data corresponding to zero volume outages within the last 60 days. This was implemented following the finding that some Ecocounters will suddenly backfill missing data due to spotty cellular signal. Max ~2 weeks of backfilling has been observed so the task was conservatively set to look back 60 days. @@ -134,7 +132,7 @@ The `ecocounter_pull` DAG runs daily at 3am to populate `ecocounter` schema with -## `ecocounter_check` DAG +## ecocounter_check DAG The `ecocounter_check` DAG runs daily at 4am following completion of `ecocounter_pull` to perform additional "yellow card" data checks on the new data. - `starting_point` is an external task sensor to ensure `ecocounter_pull` DAG is complete before running. @@ -143,13 +141,33 @@ The `ecocounter_check` DAG runs daily at 4am following completion of `ecocounter - `check_unvalidated_sites` runs a `SQLCheckOperatorWithReturnValue` to check for unvalidated sites or flows with non-zero volumes this week and send a slack notification with their details. + + +## ecocounter_open_data DAG +The `ecocounter_open_data` DAG runs monthly on the 1st of the month to perform insert/download of open data extracts for the month. + +- `check_data_availability` A SQLCheckOperatorWithReturnValue to check if there is data for every day of the previous month before proceeding and report if not. +- `reminder_message`: A slack message to indicate data should be verified and any necessary anomalous ranges added to exclude irregular data. +- `wait_till_10th`: A DateTimeSensor which waits until the 10th day of the month to proceed with exporting data. Can also be marked as "Success" manually to proceed earlier. +- `get_years`: Identifies this month and last month's years in case of additional data added for the previous month. +- `insert_and_download_data`: TaskGroup for tasks which update and download data. Dynamically mapped over output of `get_years`. + - `insert_daily_open_data`: Inserts daily data into open data table. + - `insert_15min_open_data`: Inserts 15min data into open data table. + - `download_daily_open_data`: Downloads daily data to Open Data mounted drive on Morbius. + - `download_15min_open_data`: Downloads 15min data to Open Data mounted drive on Morbius. +- `download_locations_open_data`: Downloads locations table to Open Data mounted drive on Morbius. +- `status_message`: A slack message indicating successful pipeline completion. + + + # SQL Tables ## Main Tables Key tables `ecocounter.sites_unfiltered`, `ecocounter.flows_unfiltered`, `ecocounter.counts_unfiltered` each have corresponding VIEWs filtered only to sites/flows marked as `validated` by a human: `ecocounter.sites`, `ecocounter.flows`, `ecocounter.counts`. They otherwise have the same structure as the parent tables described below. +See also Open Data SQL definitions [here](./open_data/readme.md). -### `ecocounter.sites_unfiltered` -CAUTION: Use VIEW `ecocounter.sites` which includes only sites verified by a human. Sites or "locations" of separate ecocounter installations. Each site may have one or more flows. +### ecocounter.sites_unfiltered` +CAUTION: Use VIEW `ecocounter.sites` which includes only sites verified by a human (`validated` = True). Sites or "locations" of separate ecocounter installations. Each site may have one or more flows. When you want to update new rows with missing `centreline_id`s, use [this script](./updates/ecocounter_centreline_updates.sql). | column_name | data_type | sample | comments | @@ -164,9 +182,29 @@ When you want to update new rows with missing `centreline_id`s, use [this script | centreline_id | integer | | The nearest street centreline_id, noting that ecocounter sensors are only configured to count bike like objects on a portion of the roadway ie. cycletrack or multi-use-path. Join using `JOIN gis_core.centreline_latest USING (centreline_id)`. | | first_active | timestamp without time zone | | First timestamp site_id appears in ecocounter.counts_unfiltered. Updated using trigger with each insert on ecocounter.counts_unfiltered. | | last_active | timestamp without time zone | | Last timestamp site_id appears in ecocounter.counts_unfiltered. Updated using trigger with each insert on ecocounter.counts_unfiltered. | - - -### `ecocounter.counts_unfiltered` +| date_decommissioned | timestamp without time zone | | | +| counter | character varying | ECO09063082 | This field is pulled from the API and is another unique ID frequently referred to by Ecocounter in communications. | +| linear_name_full | text | | Main road name taken from centreline. Useful for filtering all sensors on one corridor. | +| side_street | text | | Side street name | +| technology | text | | Technology description, useful when unioning with other data sources. | + +### ecocounter.counts` +This view contains calibrated (`calibrated_volume`) and raw (`raw_volume`) volumes for Ecocoutner flows. +This view excludes: +- "unvalidated" sites and flows: Usually these don't produce any data or we do not know the location. +- Data labelled in anomalous_ranges as `problem_level= 'do-not-use'` + +| column_name | data_type | sample | +|:-------------------|:----------------------------|:--------------------| +| flow_id | numeric | 2102.0 | +| datetime_bin | timestamp without time zone | 1994-08-05 05:30:00 | +| raw_volume | smallint | 0 | +| calibration_factor | numeric | | +| validation_date | date | | +| calibrated_volume | numeric | 0.0 | + + +### ecocounter.counts_unfiltered` CAUTION: Use VIEW `ecocounter.counts` instead to see only data that has been screened for * manually validated sites * manually validated flows @@ -182,8 +220,8 @@ Row count: 3,147,432 | datetime_bin | timestamp without time zone | 2012-12-04 09:00:00 | indicates start time of the time bin. Note that not all time bins are the same size! | | volume | smallint | | | -### `ecocounter.flows_unfiltered` -CAUTION: Use VIEW `ecocounter.flows` which includes only flows verified by a human. A flow is usually a direction of travel associated with a sensor at an ecocounter installation site. For earlier sensors that did not detect directed flows, a flow may be both directions of travel together, i.e. just everyone who passed over the sensor any which way. +### ecocounter.flows_unfiltered` +CAUTION: Use VIEW `ecocounter.flows` which includes only flows verified by a human (`validated` = True). A flow is usually a direction of travel associated with a sensor at an ecocounter installation site. For earlier sensors that did not detect directed flows, a flow may be both directions of travel together, i.e. just everyone who passed over the sensor any which way. Row count: 73 | column_name | data_type | sample | Comments | @@ -200,30 +238,50 @@ Row count: 73 | notes | text | | | | first_active | timestamp without time zone | | First timestamp flow_id appears in ecocounter.counts_unfiltered. Updated using trigger with each insert on ecocounter.counts_unfiltered. | | last_active | timestamp without time zone | | Last timestamp flow_id appears in ecocounter.counts_unfiltered. Updated using trigger with each insert on ecocounter.counts_unfiltered. | +| date_decommissioned | timestamp without time zone | | +| direction_main | USER-DEFINED | Westbound | Grouping column used for Open Data. A custom datatype was created to force this to be one of `Northbound`/`Southbound`/`Westbound`/`Eastbound`. You will need to coerce this column to text (::text) for comparison. | ## QC Tables -These tables are used by `ecocounter_admins` to document discontinuities and anomalous ranges in the Ecocounter data when identified. - -### `ecocounter.discontinuities` -Moments in time when data collection methods changed in such a way that we would expect clear pre- and post-change paradigms that may not be intercomparable. - -Row count: 7 -| column_name | data_type | sample | Comments | -|:--------------|:----------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------:| -| uid | integer | 1 | nan | -| site_id | numeric | 300031255.0 | nan | -| break | timestamp without time zone | 2024-01-11 00:00:00 | nan | -| give_or_take | interval | 1 days 00:00:00 | nan | -| notes | text | A validation study found that several sensors, including this one, were undercounting bikes. The correct this, the sensitivity of the sensors at this site were increased by one unit. | nan | - -### `ecocounter.anomalous_ranges` -A means of flagging periods with questionable data. +These tables are used by `ecocounter_admins` to document sensitivity changes and anomalous ranges in the Ecocounter data when identified. + +### ecocounter.calibration_factors` +This view joins together `sensitivty_history` and `manual_counts_results` in order to link calibration factors to the periods during which that sensitivty applied. In case of multiple calibration studies during a single sensitivity range, the first one will be applied starting from the beginning of the sensitivity range until the next calibration study, and the others will be applied from the date of the study forwards, until the next study takes place. + +| column_name | data_type | sample | +|:---------------------------|:------------|:-----------------------| +| flow_id | numeric | 101042943.0 | +| count_date | date | | +| ecocounter_day_corr_factor | numeric | | +| setting | text | Original configuration | +| sensitivity_date_range | daterange | [2018-02-09, None) | +| factor_range | daterange | [2018-02-09, None) | + +### ecocounter.sensitivity_history` +This table stores date ranges for sensitivity adjustments in order to link calibration studies to only the period during which the same sensitivity settings were in effect. +- `sensitivity_history` must be manually updated based on communication with Eco-counter Technical Support Specialist (Derek Yates as of 2024). +- This table has `CONSTRAINT eco_sensitivity_exclude` to exclude flow_ids from having overlapping sensitivities. +- The sensetivities go from 1-4 as noted below (least to most selective): + - `1`: High-Traffic Bikeway (01D0) + - `2`: Bikeway (01C0) + - `3`: Greenway (01C8) + - `4`: Total Selectivity (0162) + - `Not Standardized`: Newer ZELT sensors have more complex configurations that do not fit in to the above framework. + +| column_name | data_type | sample | Comments | +|:--------------|:------------|:-----------------------|----------| +| flow_id | numeric | 101042943.0 | +| date_range | daterange | [2018-02-09, None) | Date range where sensitivity setting is applicable. Use null end date to indicate current setting. | +| setting | text | Original configuration | Label the sensitivity. Include the sensitivity number (described above) if known. | +| uid | smallint | 2 | Serial Pkey to allow interactive editing | + +### ecocounter.anomalous_ranges` +A means of flagging periods with questionable data. `counts` (and subsequently Open Data tables), exclude data with `problem_level = 'do-not-use'`. Row count: 9 | column_name | data_type | sample | Comments | |:--------------------|:------------|:---------------------------------------------------------------------------------------------|-----------:| -| flow_id | numeric | | nan | -| site_id | numeric | 100042942.0 | nan | +| flow_id | numeric | | It is only necessary to include one of flow_id and site_id for an anomalous_range. Use only site_id if it is a site wide issue. | +| site_id | numeric | 100042942.0 | It is only necessary to include one of flow_id and site_id for an anomalous_range. | | time_range | tsrange | [2021-02-09 00:00:00, None) | nan | | notes | text | Goes from reporting in the hundreds daily to mostly single digits. What could have happened? | nan | | investigation_level | text | suspect | nan | @@ -233,7 +291,7 @@ Row count: 9 ## Validation These tables were created to compare Ecocounter data with Spectrum counts. For more information see: [data_collection_automation/ecocounter_validation_counts](https://github.com/Toronto-Big-Data-Innovation-Team/data_collection_automation/tree/ecocounter_validation_counts/ecocounter_validation_counts). -### `ecocounter.manual_counts_matched` +### ecocounter.manual_counts_matched` Spectrum manual count volumes matched to eco-counter volumes. 1 row per 15min bin. Used for eco-coounter data validation. Row count: 2,944 @@ -250,7 +308,7 @@ Row count: 2,944 | bikes_path_spectrum | integer | 0 | nan | | ecocounter_bikes | bigint | 0 | nan | -### `ecocounter.manual_counts_info` +### ecocounter.manual_counts_info` Spectrum manual bike count information, matched to eco-counter sites. Used for validation of eco-counter data. 1 row per manual count location. Row count: 21 @@ -271,7 +329,7 @@ Row count: 21 | count_geom | USER-DEFINED | 0101000020E6100000C136E2C96EDA53C06C0A647616D54540 | nan | | match_line_geom | USER-DEFINED | 0102000020E610000002000000C136E2C96EDA53C06C0A647616D545406FEF6C1D70DA53C010E9EEBB15D54540 | nan | -### `ecocounter.manual_counts_raw` +### ecocounter.manual_counts_raw` Spectrum manual bike counts at eco-counter locations - raw data, 15min bins Row count: 3,072 diff --git a/volumes/ecocounter/tables/discontinuities.sql b/volumes/ecocounter/tables/discontinuities.sql deleted file mode 100644 index b52809340..000000000 --- a/volumes/ecocounter/tables/discontinuities.sql +++ /dev/null @@ -1,17 +0,0 @@ -CREATE TABLE ecocounter.discontinuities ( - uid serial PRIMARY KEY, - site_id numeric NOT NULL REFERENCES ecocounter.sites_unfiltered (site_id), - -- moment the change takes place - break timestamp NOT NULL, - -- approximate bounds if the precise time is not known - give_or_take interval, - -- required description of what changed - be verbose! - notes text NOT NULL -); - -ALTER TABLE ecocounter.discontinuities OWNER TO ecocounter_admins; - -GRANT SELECT ON ecocounter.discontinuities TO bdit_humans; - -COMMENT ON TABLE ecocounter.discontinuities -IS 'Moments in time when data collection methods changed in such a way that we would expect clear pre- and post-change paradigms that may not be intercomparable.'; diff --git a/volumes/ecocounter/tables/flows_unfiltered.sql b/volumes/ecocounter/tables/flows_unfiltered.sql index cb6b532c4..a698e2b08 100644 --- a/volumes/ecocounter/tables/flows_unfiltered.sql +++ b/volumes/ecocounter/tables/flows_unfiltered.sql @@ -12,6 +12,7 @@ CREATE TABLE ecocounter.flows_unfiltered ( first_active timestamp without time zone, last_active timestamp without time zone, date_decommissioned timestamp without time zone, + direction_main travel_directions, CONSTRAINT locations_pkey PRIMARY KEY (flow_id), CONSTRAINT flows_replaced_by_flow_id_fkey FOREIGN KEY (replaced_by_flow_id) REFERENCES ecocounter.flows_unfiltered (flow_id) MATCH SIMPLE @@ -68,4 +69,6 @@ COMMENT ON COLUMN ecocounter.flows_unfiltered.last_active IS E'' CREATE INDEX IF NOT EXISTS flows_flow_id_idx ON ecocounter.flows_unfiltered USING btree (flow_id ASC NULLS LAST) -TABLESPACE pg_default; \ No newline at end of file +TABLESPACE pg_default; + +CREATE TYPE travel_directions AS ENUM ('Northbound', 'Southbound', 'Westbound', 'Eastbound'); \ No newline at end of file diff --git a/volumes/ecocounter/tables/sensitivity_history.sql b/volumes/ecocounter/tables/sensitivity_history.sql new file mode 100644 index 000000000..5cb71df30 --- /dev/null +++ b/volumes/ecocounter/tables/sensitivity_history.sql @@ -0,0 +1,51 @@ +-- Table: ecocounter.sensitivity_history +-- DROP TABLE IF EXISTS ecocounter.sensitivity_history; + +CREATE TABLE IF NOT EXISTS ecocounter.sensitivity_history +( + flow_id numeric, + date_range daterange, + setting text, + CONSTRAINT eco_sensitivity_exclude EXCLUDE USING gist ( + date_range WITH &&, + flow_id WITH = + ) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS ecocounter.sensitivity_history +OWNER TO ecocounter_admins; + +REVOKE ALL ON TABLE ecocounter.sensitivity_history FROM bdit_humans; + +GRANT SELECT ON TABLE ecocounter.sensitivity_history TO bdit_humans; + +GRANT ALL ON TABLE ecocounter.sensitivity_history TO ecocounter_admins; + +COMMENT ON TABLE ecocounter.sensitivity_history IS +'Stores sensitivity adjustments for each flow as date ranges to ' +'ranges when data is comparable.'; + +/* +--initial code used to populate + +WITH dates AS ( + SELECT flow_id, date_of_change::date, setting + FROM ecocounter.sensitivity_changes + UNION + SELECT flow_id, flows.first_active::date, 'Original configuration' + FROM ecocounter.flows +) + +INSERT INTO ecocounter.sensitivity_history (flow_id, date_range, setting) +SELECT + flow_id, + daterange( + date_of_change, + LEAD(date_of_change) OVER w + ) AS date_range, + setting +FROM dates +WINDOW w AS (PARTITION BY flow_id ORDER BY date_of_change) +*/ \ No newline at end of file diff --git a/volumes/ecocounter/tables/sites_unfiltered.sql b/volumes/ecocounter/tables/sites_unfiltered.sql index ae1a28a99..a04ce719a 100644 --- a/volumes/ecocounter/tables/sites_unfiltered.sql +++ b/volumes/ecocounter/tables/sites_unfiltered.sql @@ -10,6 +10,10 @@ CREATE TABLE ecocounter.sites_unfiltered ( first_active timestamp without time zone, last_active timestamp without time zone, date_decommissioned timestamp without time zone, + counter character varying(16) COLLATE pg_catalog."default", + linear_name_full text COLLATE pg_catalog."default", + side_street text COLLATE pg_catalog."default", + technology text COLLATE pg_catalog."default", CONSTRAINT sites_pkey PRIMARY KEY (site_id), CONSTRAINT sites_replaced_by_fkey FOREIGN KEY (replaced_by_site_id) REFERENCES ecocounter.sites_unfiltered (site_id) MATCH SIMPLE diff --git a/volumes/ecocounter/views/create-view-calibration_factors.sql b/volumes/ecocounter/views/create-view-calibration_factors.sql new file mode 100644 index 000000000..48d3264ce --- /dev/null +++ b/volumes/ecocounter/views/create-view-calibration_factors.sql @@ -0,0 +1,46 @@ +CREATE VIEW ecocounter.calibration_factors AS +WITH dates AS ( + SELECT + sh.flow_id, + vr.count_date, + vr.ecocounter_day_corr_factor, + sh.setting, + sh.date_range AS sensitivity_date_range, + LAG(vr.count_date) OVER w AS last_count_date, + LEAD(vr.count_date) OVER w AS next_count_date + FROM ecocounter.sensitivity_history AS sh + LEFT JOIN ecocounter.validation_results AS vr + ON + sh.flow_id = ANY(vr.flow_ids) + AND vr.count_date <@ sh.date_range + WINDOW w AS (PARTITION BY sh.flow_id, sh.date_range ORDER BY vr.count_date) + ORDER BY + sh.flow_id, + vr.count_date +) + +SELECT + flow_id, + count_date, + ecocounter_day_corr_factor, + setting, + sensitivity_date_range, + daterange( + --if there's a previous study, let that one take precedent for days before count_date. + CASE + WHEN last_count_date IS NULL THEN LOWER(sensitivity_date_range) + ELSE count_date + END, + --if there's a future study, let that one take precedent. + COALESCE(next_count_date, UPPER(sensitivity_date_range)) + ) AS factor_range +FROM dates +ORDER BY + flow_id, + count_date; + +COMMENT ON VIEW ecocounter.calibration_factors IS +'(in development) - table of validation results and dates to apply ' +'calibration factors based on other validation studies and sensitivity history.'; + +ALTER VIEW ecocounter.calibration_factors OWNER TO ecocounter_admins; \ No newline at end of file diff --git a/volumes/ecocounter/views/create-view-counts.sql b/volumes/ecocounter/views/create-view-counts.sql index 0731c1b2f..651388a05 100644 --- a/volumes/ecocounter/views/create-view-counts.sql +++ b/volumes/ecocounter/views/create-view-counts.sql @@ -2,10 +2,18 @@ CREATE OR REPLACE VIEW ecocounter.counts AS SELECT counts_unfiltered.flow_id, counts_unfiltered.datetime_bin, - counts_unfiltered.volume + counts_unfiltered.volume AS raw_volume, + cf.ecocounter_day_corr_factor AS calibration_factor, + cf.count_date AS validation_date, + ROUND(COALESCE(cf.ecocounter_day_corr_factor, 1) * counts_unfiltered.volume) + AS calibrated_volume FROM ecocounter.counts_unfiltered JOIN ecocounter.flows_unfiltered USING (flow_id) JOIN ecocounter.sites_unfiltered USING (site_id) +LEFT JOIN ecocounter.calibration_factors AS cf + ON + counts_unfiltered.flow_id = cf.flow_id + AND counts_unfiltered.datetime_bin::date <@ cf.factor_range WHERE -- must be validated at the level of both site and flow flows_unfiltered.validated @@ -19,14 +27,19 @@ WHERE AND anomalous_ranges.time_range @> counts_unfiltered.datetime_bin AND ( counts_unfiltered.flow_id = anomalous_ranges.flow_id - OR sites_unfiltered.site_id = anomalous_ranges.site_id + OR ( + anomalous_ranges.flow_id IS NULL + AND sites_unfiltered.site_id = anomalous_ranges.site_id + ) ) ); COMMENT ON VIEW ecocounter.counts IS 'This view contains the (somewhat) validated counts for Ecocounter flows. Counts are only included for sites and flows and marked as validated by a human. -Anomalous ranges at the do-not-use problem_level are also excluded here. +Sites which have undergone validation studies will have `validation_date` and +`calibration_factor` populated to scale raw data to match ground truth (`calibrated_volume`). +Anomalous ranges at the `do-not-use` problem_level are also excluded here. For the complete, raw count data, see ecocounter.counts_unfiltered. Please note that bin size varies for older data, so averaging these numbers may not be straightforward.'; @@ -38,3 +51,9 @@ GRANT SELECT ON ecocounter.counts TO bdit_humans; COMMENT ON COLUMN ecocounter.counts.datetime_bin IS 'indicates start time of the time bin. Note that not all time bins are the same size!'; + +COMMENT ON COLUMN ecocounter.counts.validation_date +IS 'The date on which a validation study was conducted to determine this calibration factor.'; + +COMMENT ON COLUMN ecocounter.counts.calibration_factor +IS 'A factor developed through ground-truth validation to scale raw Ecocounter volumes.'; diff --git a/volumes/ecocounter/views/create-view-flows.sql b/volumes/ecocounter/views/create-view-flows.sql index a3f6ec502..62c493800 100644 --- a/volumes/ecocounter/views/create-view-flows.sql +++ b/volumes/ecocounter/views/create-view-flows.sql @@ -3,6 +3,7 @@ CREATE OR REPLACE VIEW ecocounter.flows AS ( flow_id, site_id, flow_direction, + direction_main, flow_geom, bin_size, notes, diff --git a/volumes/ecocounter/views/create-view-sites.sql b/volumes/ecocounter/views/create-view-sites.sql index ee82d45df..8b6547eb6 100644 --- a/volumes/ecocounter/views/create-view-sites.sql +++ b/volumes/ecocounter/views/create-view-sites.sql @@ -9,7 +9,11 @@ CREATE OR REPLACE VIEW ecocounter.sites AS ( centreline_id, first_active, last_active, - date_decommissioned + date_decommissioned, + counter, + linear_name_full, + side_street, + technology FROM ecocounter.sites_unfiltered WHERE validated ); diff --git a/volumes/open_data/sql/create-table-cycling_permanent_counts_locations.sql b/volumes/open_data/sql/create-table-cycling_permanent_counts_locations.sql new file mode 100644 index 000000000..5c1ce7c79 --- /dev/null +++ b/volumes/open_data/sql/create-table-cycling_permanent_counts_locations.sql @@ -0,0 +1,46 @@ +-- Table: open_data.cycling_permanent_counts_locations + +-- DROP TABLE IF EXISTS open_data.cycling_permanent_counts_locations; + +CREATE TABLE IF NOT EXISTS open_data.cycling_permanent_counts_locations +( + location_dir_id bigint NOT NULL DEFAULT nextval( + 'open_data.cycling_permanent_counts_location_dir_id'::regclass + ), + location_name text COLLATE pg_catalog."default", + direction text COLLATE pg_catalog."default", + linear_name_full text COLLATE pg_catalog."default", + side_street text COLLATE pg_catalog."default", + longitude numeric, + latitude numeric, + centreline_id integer, + bin_size text COLLATE pg_catalog."default", + latest_calibration_study date, + first_active date, + last_active date, + date_decommissioned date, + technology text COLLATE pg_catalog."default", + CONSTRAINT cycling_permanent_counts_locations_pkey PRIMARY KEY ( + location_dir_id + ), + CONSTRAINT unique_cycling_permanent_counts_locations UNIQUE ( + location_name, direction + ) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS open_data.cycling_permanent_counts_locations +OWNER TO od_admins; + +REVOKE ALL ON TABLE open_data.cycling_permanent_counts_locations FROM bdit_humans; +REVOKE ALL ON TABLE open_data.cycling_permanent_counts_locations FROM ecocounter_bot; + +GRANT SELECT ON TABLE open_data.cycling_permanent_counts_locations TO bdit_humans; + +GRANT UPDATE, INSERT, SELECT ON TABLE open_data.cycling_permanent_counts_locations +TO ecocounter_bot; + +GRANT ALL ON TABLE open_data.cycling_permanent_counts_locations TO od_admins; + +GRANT ALL ON TABLE open_data.cycling_permanent_counts_locations TO rds_superuser WITH GRANT OPTION; diff --git a/volumes/open_data/sql/create-view-cycling_permanent_counts_15min.sql b/volumes/open_data/sql/create-view-cycling_permanent_counts_15min.sql new file mode 100644 index 000000000..73b52f805 --- /dev/null +++ b/volumes/open_data/sql/create-view-cycling_permanent_counts_15min.sql @@ -0,0 +1,25 @@ +-- View: open_data.cycling_permanent_counts_15min + +-- DROP VIEW IF EXISTS open_data.cycling_permanent_counts_15min; + +CREATE VIEW open_data.cycling_permanent_counts_15min AS +SELECT + od.location_dir_id, + eco.site_description::text AS location_name, + eco.direction::text AS direction, + eco.datetime_bin, + eco.bin_volume::integer +FROM ecocounter.open_data_15min_counts AS eco +JOIN open_data.cycling_permanent_counts_locations AS od + ON eco.site_description::text = od.location_name + AND eco.direction::text = od.direction; + +ALTER TABLE IF EXISTS open_data.cycling_permanent_counts_15min OWNER TO od_admins; + +REVOKE ALL ON TABLE open_data.cycling_permanent_counts_15min FROM bdit_humans; +GRANT SELECT ON TABLE open_data.cycling_permanent_counts_15min TO bdit_humans; + +GRANT SELECT ON TABLE open_data.cycling_permanent_counts_15min TO ecocounter_bot; + +COMMENT ON VIEW open_data.cycling_permanent_counts_15min IS +'15 minute cycling volumes from various (currently just Eco-Counter) permanent counting stations.'; \ No newline at end of file diff --git a/volumes/open_data/sql/create-view-cycling_permanent_counts_daily.sql b/volumes/open_data/sql/create-view-cycling_permanent_counts_daily.sql new file mode 100644 index 000000000..832581819 --- /dev/null +++ b/volumes/open_data/sql/create-view-cycling_permanent_counts_daily.sql @@ -0,0 +1,27 @@ +-- View: open_data.cycling_permanent_counts_daily + +-- DROP VIEW IF EXISTS open_data.cycling_permanent_counts_daily; + +CREATE OR REPLACE VIEW open_data.cycling_permanent_counts_daily AS +SELECT + od.location_dir_id, + eco.site_description::text AS location_name, + eco.direction::text AS direction, + od.linear_name_full, + od.side_street, + eco.dt::date, + eco.daily_volume::integer +FROM ecocounter.open_data_daily_counts AS eco +JOIN open_data.cycling_permanent_counts_locations AS od + ON eco.site_description::text = od.location_name + AND eco.direction::text = od.direction; + +ALTER TABLE IF EXISTS open_data.cycling_permanent_counts_daily OWNER TO od_admins; + +REVOKE ALL ON TABLE open_data.cycling_permanent_counts_daily FROM bdit_humans; +GRANT SELECT ON TABLE open_data.cycling_permanent_counts_daily TO bdit_humans; + +GRANT SELECT ON TABLE open_data.cycling_permanent_counts_daily TO ecocounter_bot; + +COMMENT ON VIEW open_data.cycling_permanent_counts_daily IS +'Daily cycling volumes from various (currently just Eco-Counter) permanent counting stations.'; \ No newline at end of file diff --git a/volumes/open_data/sql/cycling_permanent_counts_readme.md b/volumes/open_data/sql/cycling_permanent_counts_readme.md new file mode 100644 index 000000000..45bcc00ac --- /dev/null +++ b/volumes/open_data/sql/cycling_permanent_counts_readme.md @@ -0,0 +1,51 @@ + +# Permanent Bicycle Counters Open Data Dictionary + + +## cycling_permanent_counts_locations.csv + +This table contains the locations and metadata about each permanent bicycle counting sensor installation. Table can be joined to daily and 15 minute tables using `location_dir_id`. This table references the City of Toronto's Street Centreline dataset. + +| Column Name | Data Type | Sample | Description | +|-------------------------------|:-----------:|--------------------------------------:|-------------------------------------------------------------| +| location_dir_id | integer | 1 | Unique ID for location and direction for joining to data tables. | +| location_name | text | Bloor St E, West of Castle Frank Rd (retired) | Short description of sensor location. | +| direction | text | Eastbound | Closest cardinal direction of bike flow. | +| linear_name_full | text | Bloor St E | Full street name of flow from Toronto Centreline (TCL) | +| side_street | text | Castle Frank Rd | Nearest side street to sensor flow. | +| longitude | float | -79.3681194 | Approximate longitude of sensor. | +| latitude | float | 43.6738047 | Approximate latitude of sensor. | +| centreline_id | integer | 8540609 | `centreline_id` corresponding to [Toronto Centreline (TCL)](https://open.toronto.ca/dataset/toronto-centreline-tcl/) | +| bin_size | text | 00:15:00 | Duration of `datetime_bin`s recorded by sensor in the 15 minute table. | +| latest_calibration_study | date | | Date of latest calibration study. Where older sites have `null` values, the data was validated with other available sources. | +| first_active | date | 1994-06-26 | The earliest date for which data is available. | +| last_active | date | 2019-06-13 | The most recent date of available data produced by the sensor. | +| date_decommissioned | date | 2019-06-13 | Date decommissioned. | +| technology | text | Induction - Other | Technology of permanent sensor. | + + +\pagebreak + +## cycling_permanent_counts_daily_counts.csv + +Daily cycling and micromobility volumes by location and direction. + +| Column Name | Data Type | Sample | Description | +|------------------|:---------:|----------------------------------------------:|------------------------------------------| +| location_dir_id | integer | 1 | Unique ID for location and direction for joining to `cycling_permanent_counts_locations`. | +| location_name | text | Bloor St E, West of Castle Frank Rd (retired) | Short description of sensor location. | +| direction | text | Westbound | Closest cardinal direction of bike flow. | +| linear_name_full | text | Bloor St E | Full street name of flow from Toronto Centreline (TCL) | +| side_street | text | Castle Frank Rd | Nearest side street to sensor flow. | +| dt | date | 06/26/1994 | Date of count. | +| daily_volume | integer | 939 | Count of users on date `dt`. | + +## cycling_permanent_counts_15min_counts_YYYY_YYYY.csv + +15 minute cycling and micromobility volumes by location and direction. Where 15 minute volumes are not available, 1 hour volumes are provided. The row counts in these files may exceed the limits for Excel. + +| Column Name | Data Type | Sample | Description | +|------------------|:---------:|-------------------------------:|------------------------------------------| +| location_dir_id | integer | 1 | Unique ID for location and direction for joining to `cycling_permanent_counts_locations`. | +| datetime_bin | timestamp | 06/26/1994 0:00 | The date-time at which the record begins. See `bin_size` in `sites` table for size of bin. | +| bin_volume | integer | 3 | Count of users in `datetime_bin`. |