Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1085 Ecocounter Open Data Schema+DAG+QC plots #1096

Open
wants to merge 80 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
b4e359b
#1085 sensitivity_history table
gabrielwol Oct 21, 2024
dcf802d
#1085 add counter variable to sites tables (used by ecocounter in com…
gabrielwol Oct 22, 2024
9f9440c
#1085 script used to populate sensitivity history
gabrielwol Oct 22, 2024
01697e4
#1085 transform sensitivity_history and validation_results into corre…
gabrielwol Oct 22, 2024
8318257
#1085 counts_corrected view
gabrielwol Oct 22, 2024
15da0e1
#1085 open data daily view
gabrielwol Oct 22, 2024
7312c2b
#1085 update sensitivity history, fix groupby bug in opendata view
gabrielwol Oct 24, 2024
3f6a837
#1085 sensitivity -> setting
gabrielwol Oct 24, 2024
49bfa9f
#1085 add qc plots
gabrielwol Oct 25, 2024
19fdfef
#1085 add qc shiny app
gabrielwol Oct 29, 2024
ac7740b
#1085 ecocounter_graph_volumes
gabrielwol Oct 29, 2024
9b19644
#1085 shiny app improvements; zoom, faster render times, dynamic labels
gabrielwol Oct 31, 2024
581cd00
#1085 shiny app; add flow_id
gabrielwol Oct 31, 2024
6e8ba63
#1085 improve export feature, fix scaled volume bug
gabrielwol Nov 6, 2024
59b4db0
#1085 add MOVE open data sync DAG
gabrielwol Nov 7, 2024
6e2562f
#1085 add "direction_main" for easier grouping of flows
gabrielwol Nov 7, 2024
9adc3a9
#1085 fix anomalous_range join conditions for new shiny ranges (conta…
gabrielwol Nov 7, 2024
39ec9c3
#1085 open data views (filtered + calibrated & raw, daily).
gabrielwol Nov 7, 2024
e9ccdb8
#1085 shiny; ability to view validated counts only
gabrielwol Nov 7, 2024
a82948d
#1085 don't pull for decommissioned sites/flows
gabrielwol Nov 8, 2024
232c1c3
#1085 base of ecocounter_open_data dag
gabrielwol Nov 8, 2024
6595b4f
#1085 add task.bash to download data
gabrielwol Nov 8, 2024
511ea73
#1085 fix permissions error
gabrielwol Nov 8, 2024
2e8a97d
#1085 switch open data views to tables + add insert functions, adjust…
gabrielwol Nov 12, 2024
e68b8c1
#1085 test_dags add connection
gabrielwol Nov 12, 2024
1930137
#1085 sort sites, add daily tick marks
gabrielwol Nov 12, 2024
995b63c
#1085 add missing table comments
gabrielwol Nov 12, 2024
8b71bf6
#1085 fix file suffix
gabrielwol Nov 12, 2024
f6675c6
#1085 genericize username references
gabrielwol Nov 12, 2024
071b94a
#1085 remove MOVE sample pipeline files
gabrielwol Nov 12, 2024
4ed482c
#1085 consistently use calibrate instead of correct
gabrielwol Nov 12, 2024
8ab0c1e
#1085 rename files correct -> calibrate
gabrielwol Nov 13, 2024
1921683
#1085 filter for only complete days
gabrielwol Nov 13, 2024
05ade38
#1085 open_data_sites (draft)
gabrielwol Nov 14, 2024
57b8b56
#1085 shiny app; allow multiple site selection
gabrielwol Nov 14, 2024
af60b44
#1085 shiny app; somewhat dynamic breaks, query button
gabrielwol Nov 14, 2024
dc4faab
#1085 update open_data_sites
gabrielwol Nov 14, 2024
0e40754
#1085 sqlfluff
gabrielwol Nov 15, 2024
61c8e25
#1085 shiny; update ui and add 2nd save_range button
gabrielwol Nov 15, 2024
ca7fa08
#1085 swap out `counts_corrected` with `counts`
gabrielwol Nov 15, 2024
9fff6b4
#1085 add site technology
gabrielwol Nov 15, 2024
5a09ee1
#1085 pdf improvements; qc and valid plots on same page
gabrielwol Nov 15, 2024
759cbf3
#1085 bug in unnesting flow_ids
gabrielwol Nov 18, 2024
bfdc88b
#1085 add open_data_flows; separate out info from sites
gabrielwol Nov 18, 2024
7317641
#1085 fix task_fail_alert to use proxy, update open_data path
gabrielwol Nov 19, 2024
ee60e3b
#1085 fix joins in ecocounter open_data_sites
gabrielwol Nov 20, 2024
57854c4
#1085 combine sites/flows into "locations"
gabrielwol Nov 20, 2024
183f199
#1085 don't publish site_id
gabrielwol Nov 20, 2024
18143df
#1085 create open_data folder
gabrielwol Nov 21, 2024
a29f72d
#1085 add columns to sites, OD locations
gabrielwol Nov 21, 2024
516664e
#1085 r shiny updates + validation plots
gabrielwol Nov 21, 2024
4a2a3cd
#1085 open data dictionary
gabrielwol Nov 21, 2024
8e19407
#1085 fix image links
gabrielwol Nov 21, 2024
0844bf8
Merge branch 'master' into 1085-ecocounter-develop-sensitivityfactor-…
gabrielwol Nov 21, 2024
ce6f289
#1085 rename raw_counts to 15min_counts
gabrielwol Nov 21, 2024
905909a
#1085 add locations pull to dag
gabrielwol Nov 21, 2024
7e5da54
#1085 add DAG readme + DOC_MD
gabrielwol Nov 25, 2024
138798a
#1085 remove externaltasksensor; (cross environment complexity, not n…
gabrielwol Nov 25, 2024
636844b
#1085 map data pulls over years (last 2 months)
gabrielwol Nov 25, 2024
c9f211e
#1085 remove discontinuities table, replace with sensitivity_history
gabrielwol Nov 26, 2024
1275087
#1085 readme updates
gabrielwol Nov 26, 2024
e66b0d4
#1085 create open_data schema views for unified permanent cycling vol…
gabrielwol Nov 26, 2024
1b4ddfb
#1085 update DAG to pull from open_data schema
gabrielwol Nov 26, 2024
2c1dcb1
#1085 output readme with pandoc
gabrielwol Nov 26, 2024
8df8acf
#1085 data check bug fix
gabrielwol Nov 27, 2024
c3f3050
#1085 lat,lng to latitude,longitude
gabrielwol Nov 27, 2024
b3923a5
#1085 doc updates
gabrielwol Nov 27, 2024
90782fb
#1085 bash script for re-exporting historical ecocounter data
gabrielwol Nov 28, 2024
54669ac
#1085 use variable for EXPORT_PATH
gabrielwol Nov 28, 2024
b0b7c3f
#1085 add location_dir_id, extra columns to summary table
gabrielwol Nov 29, 2024
c8cc228
#1085 fix for data-availability check
gabrielwol Dec 2, 2024
d89182f
#1085 fix: get_years -> int, minor doc changes
gabrielwol Dec 2, 2024
808cc2e
#1085 remove name columns from 15 min data
gabrielwol Dec 4, 2024
086d95f
#1085 add zeros to 15 min data
gabrielwol Dec 4, 2024
b2ec2f8
#1085 minor doc update
gabrielwol Dec 4, 2024
d5dc31d
#1085 remove toplevel database connections (#1112), remove deprecated…
gabrielwol Dec 18, 2024
bfb52c9
#1085 don't try to set os.environ['PGPASSWORD']
gabrielwol Dec 19, 2024
ae5c566
#1085 update year breaks
gabrielwol Dec 4, 2024
d00e15a
#1085 use templating for conn details
gabrielwol Dec 20, 2024
3b9dd0b
#1085 pandoc! + update readme pagebreaks
gabrielwol Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 96 additions & 59 deletions dags/ecocounter_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import sys
import os
from datetime import timedelta
from datetime import timedelta, datetime
import logging
import pendulum
from functools import partial
Expand All @@ -12,15 +12,14 @@
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.date_time import DateTimeSensor
from airflow.macros import ds_format

from airflow.operators.python import get_current_context

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert, send_slack_msg
from dags.dag_functions import task_fail_slack_alert, send_slack_msg, get_readme_docmd
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except:
raise ImportError("Cannot import DAG helper functions.")
Expand All @@ -31,11 +30,14 @@
DAG_NAME = 'ecocounter_open_data'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

README_PATH = os.path.join(repo_path, 'volumes/ecocounter/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
#set earlier start_date + catchup when ready?
'start_date': pendulum.datetime(2024, 10, 1, tz="America/Toronto"),
'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
Expand All @@ -46,26 +48,14 @@
@dag(
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 11 1 * *', # 10am, 1st day of each month
catchup=True,
schedule='0 12 1 * *', # 12pm, 1st day of each month
catchup=False,
max_active_runs=1,
tags=["ecocounter", "open_data"],
doc_md=__doc__
doc_md=DOC_MD
)
def ecocounter_open_data_dag():

t_upstream_done = ExternalTaskSensor(
task_id="starting_point",
external_dag_id="ecocounter_pull",
external_task_id="done",
poke_interval=3600, #retry hourly
mode="reschedule",
doc_md="Wait for last day of month to run before running monthly DAG.",
timeout=86400, #one day
#wait for the 1st of the following month
execution_date_fn=lambda dt: dt + pendulum.duration(months=1, hours=-1) #ecocounter_pull scheduled at '0 10 * * *'
)

check_data_availability = SQLCheckOperatorWithReturnValue(
task_id="check_data_availability",
sql="""WITH daily_volumes AS (
Expand Down Expand Up @@ -106,46 +96,93 @@ def reminder_message(ds = None, **context):
wait_till_10th.doc_md = """
Wait until the 10th day of the month to export data. Alternatively mark task as success to proceed immediately.
"""

@task()
def get_years(ds=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we add this for mapped task naming? It was using ds before right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the insert_and_download_data task_group is mapped over the output of get_years. The purpose is to run the exports for last two months (which may be two separate years), since new data frequently arrives within ~30 days.
image

mnth = pendulum.from_format(ds, 'YYYY-MM-DD')
prev_mnth = mnth.subtract(months=1)
yrs = [str(mnth.year), str(prev_mnth.year)]
return list(set(yrs)) #unique

@task_group()
def insert_and_download_data():
insert_daily = PostgresOperator(
sql="SELECT ecocounter.open_data_daily_counts_insert({{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }})",
task_id='insert_daily_open_data',
postgres_conn_id='ecocounter_bot',
autocommit=True,
retries = 0
def insert_and_download_data(yr):
@task(map_index_template="{{ yr }}")
def insert_daily(yr):
context = get_current_context()
context["yr"] = yr
t = PostgresOperator(
sql=f"SELECT ecocounter.open_data_daily_counts_insert({yr}::int)",
task_id='insert_daily_open_data',
postgres_conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)
return t.execute(context=context)

@task(map_index_template="{{ yr }}")
def insert_15min(yr):
context = get_current_context()
context["yr"] = yr
t = PostgresOperator(
sql=f"SELECT ecocounter.open_data_15min_counts_insert({yr}::int)",
task_id='insert_15min_open_data',
postgres_conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)
return t.execute(context=context)

@task.bash(
map_index_template="{{ yr }}",
env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
}
)
insert_raw = PostgresOperator(
sql="SELECT ecocounter.open_data_raw_counts_insert({{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }})",
task_id='insert_raw_open_data',
postgres_conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)

@task.bash(env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
})
def download_daily_open_data()->str:
return '''psql -h $HOST -U $USER -d bigdata -c \
"SELECT site_description, direction, dt, daily_volume FROM ecocounter.open_data_daily_counts WHERE dt >= date_trunc('year'::text, '{{ ds }}'::date) LIMIT 100" \
--csv -o /data/open_data/permanent-bike-counters/ecocounter_daily_counts_{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}.csv'''
def download_daily_open_data(yr)->str:
context = get_current_context()
context["yr"] = yr
return f'''/usr/bin/psql -h $HOST -U $USER -d bigdata -c \
"SELECT site_description, direction, dt, daily_volume
FROM ecocounter.open_data_daily_counts
WHERE
dt >= to_date({yr}::text, 'yyyy')
AND dt < LEAST(date_trunc('month', now()), to_date(({yr}::int+1)::text, 'yyyy'));" \
--csv -o "/data/open_data/permanent-bike-counters/ecocounter_daily_counts_{yr}.csv"'''

@task.bash(env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
})
def download_raw_open_data()->str:
return '''psql -h $HOST -U $USER -d bigdata -c \
"SELECT site_description, direction, datetime_bin, bin_volume FROM ecocounter.open_data_raw_counts WHERE datetime_bin >= date_trunc('year'::text, '{{ ds }}'::date) LIMIT 100" \
--csv -o /data/open_data/permanent-bike-counters/ecocounter_raw_counts_{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}.csv'''
@task.bash(
map_index_template="{{ yr }}",
env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
}
)
def download_15min_open_data(yr)->str:
context = get_current_context()
context["yr"] = yr
return f'''/usr/bin/psql -h $HOST -U $USER -d bigdata -c \
"SELECT site_description, direction, datetime_bin, bin_volume
FROM ecocounter.open_data_15min_counts
WHERE
datetime_bin >= to_date({yr}::text, 'yyyy')
AND datetime_bin < LEAST(date_trunc('month', now()), to_date(({yr}+1)::text, 'yyyy'));" \
--csv -o "/data/open_data/permanent-bike-counters/ecocounter_15min_counts_{yr}.csv"'''

insert_daily >> download_daily_open_data()
insert_raw >> download_raw_open_data()
insert_daily(yr) >> download_daily_open_data(yr)
insert_15min(yr) >> download_15min_open_data(yr)

@task.bash(env={
"HOST": BaseHook.get_connection("ecocounter_bot").host,
"USER" : BaseHook.get_connection("ecocounter_bot").login,
"PGPASSWORD": BaseHook.get_connection("ecocounter_bot").password
})
def download_locations_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $USER -d bigdata -c \
"SELECT location_name, direction, linear_name_full, side_street, lng, lat, centreline_id, bin_size, latest_calibration_study, first_active, last_active, date_decommissioned, technology
FROM ecocounter.open_data_locations" \
--csv -o /data/open_data/permanent-bike-counters/locations.csv'''

@task(
retries=0,
Expand All @@ -159,14 +196,14 @@ def status_message(ds = None, **context):
msg=f"Ecocounter :open_data_to: DAG ran successfully for {mnth} :white_check_mark:",
use_proxy=True
)


yrs = get_years()
(
t_upstream_done >>
check_data_availability >>
reminder_message() >>
wait_till_10th >>
insert_and_download_data() >>
[insert_and_download_data.expand(yr = yrs), download_locations_open_data()] >>
status_message()
)

ecocounter_open_data_dag()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION ecocounter.open_data_raw_counts_insert(
CREATE OR REPLACE FUNCTION ecocounter.open_data_15min_counts_insert(
yyyy integer
)
RETURNS void
Expand Down Expand Up @@ -30,7 +30,7 @@ AS $BODY$
HAVING COUNT(DISTINCT cc.datetime_bin) = (3600*24 / EXTRACT(epoch FROM bin_size))
)

INSERT INTO ecocounter.open_data_raw_counts (
INSERT INTO ecocounter.open_data_15min_counts (
site_id, site_description, direction, datetime_bin, bin_volume
)
SELECT
Expand Down Expand Up @@ -63,12 +63,12 @@ AS $BODY$

$BODY$;

ALTER FUNCTION ecocounter.open_data_raw_counts_insert(integer) OWNER TO ecocounter_admins;
ALTER FUNCTION ecocounter.open_data_15min_counts_insert(integer) OWNER TO ecocounter_admins;

GRANT EXECUTE ON FUNCTION ecocounter.open_data_raw_counts_insert(integer) TO ecocounter_admins;
GRANT EXECUTE ON FUNCTION ecocounter.open_data_raw_counts_insert(integer) TO ecocounter_bot;
REVOKE EXECUTE ON FUNCTION ecocounter.open_data_raw_counts_insert(integer) FROM bdit_humans;
GRANT EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) TO ecocounter_admins;
GRANT EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) TO ecocounter_bot;
REVOKE EXECUTE ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) FROM bdit_humans;

COMMENT ON FUNCTION ecocounter.open_data_raw_counts_insert(integer) IS
'Function to insert disaggregate data for a year into `ecocounter.open_data_raw_counts`. '
COMMENT ON FUNCTION ecocounter.open_data_15min_counts_insert(integer) IS
'Function to insert disaggregate data for a year into `ecocounter.open_data_15min_counts`. '
'Does not overwrite existing data (eg. if sensitivity was retroactively updated).';
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-- Table: ecocounter.open_data_raw_counts
-- Table: ecocounter.open_data_15min_counts

-- DROP TABLE IF EXISTS ecocounter.open_data_raw_counts;
-- DROP TABLE IF EXISTS ecocounter.open_data_15min_counts;

CREATE TABLE IF NOT EXISTS ecocounter.open_data_raw_counts
CREATE TABLE IF NOT EXISTS ecocounter.open_data_15min_counts
(
site_id numeric,
site_description text COLLATE pg_catalog."default",
Expand All @@ -14,12 +14,12 @@ CREATE TABLE IF NOT EXISTS ecocounter.open_data_raw_counts

TABLESPACE pg_default;

ALTER TABLE IF EXISTS ecocounter.open_data_raw_counts OWNER TO ecocounter_admins;
ALTER TABLE IF EXISTS ecocounter.open_data_15min_counts OWNER TO ecocounter_admins;

REVOKE ALL ON TABLE ecocounter.open_data_raw_counts FROM bdit_humans;
GRANT SELECT ON TABLE ecocounter.open_data_raw_counts TO bdit_humans;
REVOKE ALL ON TABLE ecocounter.open_data_15min_counts FROM bdit_humans;
GRANT SELECT ON TABLE ecocounter.open_data_15min_counts TO bdit_humans;

GRANT SELECT, INSERT ON TABLE ecocounter.open_data_raw_counts TO ecocounter_bot;
GRANT SELECT, INSERT ON TABLE ecocounter.open_data_15min_counts TO ecocounter_bot;

COMMENT ON TABLE ecocounter.open_data_raw_counts IS
COMMENT ON TABLE ecocounter.open_data_15min_counts IS
'Disaggregate Ecocounter data by site and direction.';
2 changes: 1 addition & 1 deletion volumes/ecocounter/open_data/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ecocounter.open_data_locations
| dt | date | 06/26/1994 | Date of count. |
| daily_volume | integer | 939 | Count of users on date `dt`. |

## ecocounter.open_data_raw_counts
## ecocounter.open_data_15min_counts

| column_name | data_type | sample | explanation |
|------------------|-----------------------------|-----------------------------------------------|----------------------------------------------------------------------------------------------|
Expand Down
22 changes: 20 additions & 2 deletions volumes/ecocounter/readme.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Ecocounter <!-- omit in toc -->

<!-- TOC -->

- [Bicycle loop detectors](#bicycle-loop-detectors)
Expand All @@ -12,6 +10,7 @@
- [Historical data](#historical-data)
- [`ecocounter_pull` DAG](#ecocounter_pull-dag)
- [`ecocounter_check` DAG](#ecocounter_check-dag)
- [`ecocounter_check` DAG](#ecocounter_check-dag-1)
- [SQL Tables](#sql-tables)
- [Main Tables](#main-tables)
- [`ecocounter.sites_unfiltered`](#ecocountersites_unfiltered)
Expand Down Expand Up @@ -143,6 +142,25 @@ The `ecocounter_check` DAG runs daily at 4am following completion of `ecocounter
- `check_unvalidated_sites` runs a `SQLCheckOperatorWithReturnValue` to check for unvalidated sites or flows with non-zero volumes this week and send a slack notification with their details.
<!-- ecocounter_check_doc_md -->

<!-- ecocounter_open_data_doc_md -->

## `ecocounter_open_data` DAG
The `ecocounter_open_data` DAG runs monthly on the 1st of the month to perform insert/download of open data extracts for the month.

- `check_data_availability` A SQLCheckOperatorWithReturnValue to check if there is data for every day of the previous month before proceeding and report if not.
- `reminder_message`: A slack message to indicate data should be verified and any necessary anomalous ranges added to exclude irregular data.
- `wait_till_10th`: A DateTimeSensor which waits until the 10th day of the month to proceed with exporting data. Can also be marked as "Success" manually to proceed earlier.
- `get_years`: Identifies this month and last month's years in case of additional data added for the previous month.
- `insert_and_download_data`: TaskGroup for tasks which update and download data. Dynamically mapped over output of `get_years`.
- `insert_daily_open_data`: Inserts daily data into open data table.
- `insert_15min_open_data`: Inserts 15min data into open data table.
- `download_daily_open_data`: Downloads daily data to Open Data mounted drive on Morbius.
- `download_15min_open_data`: Downloads 15min data to Open Data mounted drive on Morbius.
- `download_locations_open_data`: Downloads locations table to Open Data mounted drive on Morbius.
- `status_message`: A slack message indicating successful pipeline completion.

<!-- ecocounter_open_data_doc_md -->

# SQL Tables

## Main Tables
Expand Down
Loading