Skip to content

Commit

Permalink
#1088 RODARs DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Nov 29, 2024
1 parent 3470497 commit 7f4754f
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 12 deletions.
60 changes: 60 additions & 0 deletions dags/rodars_pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import sys
from functools import partial
from datetime import datetime, timedelta

from airflow.decorators import dag, task_group, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable

DAG_NAME = 'rodars_pull'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ['Unknown'])

repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)

from events.construction.itsc_issues_functions import fetch_and_insert_data
from dags.dag_functions import task_fail_slack_alert, get_readme_docmd
from dags.custom_operators import SQLCheckOperatorWithReturnValue

README_PATH = os.path.join(repo_path, 'events/construction/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past': False,
'start_date': datetime(2024, 11, 27),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, #Allow for progressive longer waits between retries
'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True),
'catchup': True,
}

@dag(
dag_id=DAG_NAME,
default_args=default_args,
max_active_runs=1,
template_searchpath=[
os.path.join(repo_path,'events/construction/sql')
],
doc_md=DOC_MD,
tags=['rodars', 'pull', 'itsc_central'],
schedule='0 4 * * *' #daily at 4am
)

def rodars_dag():
@task
def pull_rodars(ds = None):
"Get RODARS data from ITSC and insert into RDS `vds.vdsconfig`"
itsc_bot = PostgresHook('itsc_postgres')
vds_bot = PostgresHook('vds_bot')
fetch_and_insert_data(select_conn=itsc_bot, insert_conn=vds_bot, start_date=ds)

#add a delete task to remove outdated revisions?

pull_rodars()

rodars_dag()
23 changes: 12 additions & 11 deletions events/construction/itsc_issues_functions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import os
import logging
import pandas as pd
import struct
from numpy import nan
import struct
from io import BytesIO
from psycopg2 import sql, Error
from psycopg2.extras import execute_values
from io import BytesIO

from airflow.providers.postgres.hooks.postgres import PostgresHook

fpath = '/data/home/gwolofs/bdit_data-sources/events/rodars/rodars_functions.py'
SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(fpath)), 'sql')
#SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')
#fpath = '/data/home/gwolofs/bdit_data-sources/events/rodars/itsc_issues_functions.py'
#SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(fpath)), 'sql')
SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -43,15 +44,15 @@ def geometry_from_bytes(geo_bytes):

def fetch_and_insert_data(
select_conn = PostgresHook('itsc_postgres'),
insert_conn = PostgresHook('vds_bot')
insert_conn = PostgresHook('vds_bot'),
start_date = None
):
#generic function to pull and insert data using different connections and queries.
select_fpath = os.path.join(SQL_DIR, 'select-itsc_issues.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read())
#.format(
# start = sql.Literal(start_date + " 00:00:00 EST5EDT")
#)
select_query = sql.SQL(file.read()).format(
start = sql.Literal(start_date)
)
try:
with select_conn.get_conn() as con, con.cursor() as cur:
LOGGER.info(f"Fetching RODARS data.")
Expand Down Expand Up @@ -87,7 +88,7 @@ def fetch_and_insert_data(
with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, insert_query, df_no_geom)

geom_update_fpath = os.path.join(SQL_DIR, 'update-rodars_geometry.sql')
geom_update_fpath = os.path.join(SQL_DIR, 'update-itsc_issues_geometry.sql')
with open(geom_update_fpath, 'r', encoding="utf-8") as file:
geom_update_query = sql.SQL(file.read())

Expand Down
9 changes: 9 additions & 0 deletions events/construction/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Introduction

## RODARS DAG

<!-- rodars_pull_doc_md -->

- `pull_rodars`: pulls RODARS issue data from ITSC and inserts into RDS.

<!-- rodars_pull_doc_md -->
18 changes: 18 additions & 0 deletions events/construction/sql/delete-outdated-itsc_issues.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--WIP. May need to delete outdated versions of each issue.

WITH newest_timestamps AS (
SELECT DISTINCT ON (divisionid, issueid)
divisionid,
issueid,
timestamputc
FROM gwolofs.itsc_issues
ORDER BY
divisionid,
issueid,
timestamputc DESC
)

SELECT *

Check failure on line 15 in events/construction/sql/delete-outdated-itsc_issues.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference '*' found in select with more than one referenced table/view.
FROM gwolofs.itsc_issues
LEFT JOIN newest_timestamps USING (divisionid, issueid, timestamputc)
WHERE newest_timestamps.timestamputc IS NULL
4 changes: 3 additions & 1 deletion events/construction/sql/select-itsc_issues.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ WITH issues AS (
8014, --rodars (old)
8023 --TMMS TM3 Planned Work
)
ORDER BY divisionid, issueid, timestamputc DESC
AND timestamputc >= {start}::date -- noqa: PRS

Check failure on line 22 in events/construction/sql/select-itsc_issues.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT02: Expected line break and indent of 4 spaces before '>'.
AND timestamputc < {start}::date + interval '1 day' -- noqa: PRS
ORDER BY divisionid ASC, issueid ASC, timestamputc DESC
)

SELECT
Expand Down

0 comments on commit 7f4754f

Please sign in to comment.