From 797f2e25d082417f785137d0629ac807a91d57eb Mon Sep 17 00:00:00 2001 From: matttriano Date: Sat, 9 Nov 2024 15:54:09 -0600 Subject: [PATCH 1/2] Implements db dumping task with compression, and a light verification task. --- .gitignore | 2 +- airflow/dags/maintenance/backup_dwh_db.py | 4 +- airflow/dags/tasks/maintenance/backup.py | 169 ++++++++++++++++++++++ 3 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 airflow/dags/tasks/maintenance/backup.py diff --git a/.gitignore b/.gitignore index 479e224..f240f92 100644 --- a/.gitignore +++ b/.gitignore @@ -7,9 +7,9 @@ dev_utils/ airflow/dags/dev_experiments/ - notes/ sandbox/ +scraps/ **/logs airflow/data_raw airflow/backups diff --git a/airflow/dags/maintenance/backup_dwh_db.py b/airflow/dags/maintenance/backup_dwh_db.py index 40dc2b4..f21b51a 100644 --- a/airflow/dags/maintenance/backup_dwh_db.py +++ b/airflow/dags/maintenance/backup_dwh_db.py @@ -6,6 +6,7 @@ from airflow.decorators import dag, task from airflow.models.baseoperator import chain +from tasks.maintenance.backup import backup_and_lightly_verify_postgres_db task_logger = logging.getLogger("airflow.task") @@ -43,7 +44,8 @@ def dumpall_dwh_db(task_logger: Logger) -> None: tags=["utility", "maintenance"], ) def backup_dwh_db(): - dump_db_1 = dumpall_dwh_db(task_logger=task_logger) + # dump_db_1 = dumpall_dwh_db(task_logger=task_logger) + dump_db_1 = backup_and_lightly_verify_postgres_db(conn_id="dwh_db_conn") chain(dump_db_1) diff --git a/airflow/dags/tasks/maintenance/backup.py b/airflow/dags/tasks/maintenance/backup.py new file mode 100644 index 0000000..8992236 --- /dev/null +++ b/airflow/dags/tasks/maintenance/backup.py @@ -0,0 +1,169 @@ +import datetime as dt +import gzip +import logging +import os +from pathlib import Path +import shutil +import subprocess + +from airflow.decorators import task, task_group +from airflow.exceptions import AirflowException +from airflow.models.baseoperator import chain +from airflow.providers.postgres.hooks.postgres import PostgresHook + + +@task +def dump_postgres_db_v0(conn_id: str) -> Path: + logger = logging.getLogger(__name__) + hook = PostgresHook(postgres_conn_id=conn_id) + conn = hook.get_connection(conn_id) + service_name = conn.host + file_name = f"{service_name}_backup_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_file_dir = Path("/opt/airflow/backups").joinpath(service_name) + backup_file_dir.mkdir(exist_ok=True) + backup_file_path = backup_file_dir.joinpath(f"{file_name}.dump") + gzipped_file_path = backup_file_dir.joinpath(f"{file_name}.gz") + logger.info(f"Preparing to backup the {service_name} db to {backup_file_path}.") + try: + result = subprocess.run( + [ + "pg_dumpall", + "-h", + service_name, + "-p", + str(conn.port), + "-U", + conn.login, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PGPASSWORD": conn.password}, + check=True, + ) + if result.stderr: + logger.warning(f"pg_dumpall warnings: {result.stderr}") + with open(backup_file_path, "wb") as f: + f.write(result.stdout) + except subprocess.CalledProcessError as e: + raise AirflowException(f"Database backup failed: {e.stderr.decode()}") + + logger.info("Compressing backup file...") + try: + with open(backup_file_path, "wb") as f_in: + with gzip.open(gzipped_file_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + except Exception as e: + raise AirflowException(f"Compression failed: {str(e)}") + finally: + if backup_file_path.exists(): + backup_file_path.unlink() + logger.info( + f"Finished backing up and compressing the {service_name} db to {gzipped_file_path}." + ) + return gzipped_file_path + + +@task +def dump_postgres_db(conn_id: str) -> Path: + logger = logging.getLogger(__name__) + hook = PostgresHook(postgres_conn_id=conn_id) + conn = hook.get_connection(conn_id) + service_name = conn.host + file_name = f"{service_name}_backup_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_file_dir = Path("/opt/airflow/backups").joinpath(service_name) + backup_file_dir.mkdir(exist_ok=True) + backup_file_path = backup_file_dir.joinpath(f"{file_name}.dump") + gzipped_file_path = backup_file_dir.joinpath(f"{file_name}.gz") + logger.info(f"Preparing to backup the {service_name} db to {backup_file_path}.") + try: + with gzip.open(gzipped_file_path, "wb", compresslevel=9) as gz_file: + process = subprocess.Popen( + [ + "pg_dumpall", + "-h", + service_name, + "-p", + str(conn.port), + "-U", + conn.login, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PGPASSWORD": conn.password}, + ) + while True: + chunk = process.stdout.read(8192) + if not chunk: + break + gz_file.write(chunk) + stderr = process.stderr.read() + return_code = process.wait() + if return_code != 0: + raise AirflowException(f"Database backup failed: {stderr.decode()}") + if stderr: + logger.warning(f"pg_dumpall warnings: {stderr.decode()}") + except Exception as e: + if gzipped_file_path.exists(): + gzipped_file_path.unlink() + raise AirflowException(f"Backup failed: {str(e)}") + logger.info(f"Successfully backed up and compressed {service_name} db to {gzipped_file_path}") + return gzipped_file_path + + +@task +def lightly_verify_backup(backup_file_path: Path) -> bool: + """Verify the compressed backup file using lightweight heuristic checks""" + + logger = logging.getLogger(__name__) + backup_file_path = Path(backup_file_path) + try: + logger.info(f"Verifying backup file: {backup_file_path}") + try: + with gzip.open(backup_file_path, "rb") as f: + f.read(1024) + except gzip.BadGzipFile as e: + raise AirflowException(f"Corrupt gzip file: {str(e)}") + expected_patterns = [ + b"PostgreSQL database dump", + b"SET statement_timeout", + b"CREATE DATABASE", + b"COMMENT", + ] + pattern_matches = {pattern: False for pattern in expected_patterns} + with gzip.open(backup_file_path, "rb") as f: + chunk_size = 1024 * 1024 + while True: + chunk = f.read(chunk_size) + if not chunk: + break + for pattern in expected_patterns: + if pattern in chunk: + pattern_matches[pattern] = True + if all(pattern_matches.values()): + break + matches_found = sum(pattern_matches.values()) + if matches_found < 2: + raise AirflowException( + f"Backup file doesn't appear to be a valid PostgreSQL dump. " + f"Only found {matches_found} expected patterns." + ) + file_size = backup_file_path.stat().st_size + if file_size < 100: + raise AirflowException(f"Backup file suspiciously small: {file_size} bytes") + logger.info( + f"Backup verification successful: " + f"Found {matches_found} expected patterns, " + f"file size: {file_size/1024/1024:.2f}MB" + ) + return True + except Exception as e: + logger.error(f"Backup verification failed: {str(e)}") + raise AirflowException(f"Backup verification failed: {str(e)}") + + +@task_group +def backup_and_lightly_verify_postgres_db(conn_id: str) -> None: + dump_db_ = dump_postgres_db(conn_id=conn_id) + verify_db_ = lightly_verify_backup(dump_db_) + + chain(dump_db_, verify_db_) From fcca6261d9b42becab882fd8f9b511d2caaa9f0c Mon Sep 17 00:00:00 2001 From: matttriano Date: Tue, 12 Nov 2024 12:01:34 -0600 Subject: [PATCH 2/2] Addresses supression of info logs and cleans up backup code. --- airflow/dags/cc_utils/utils.py | 11 ++ airflow/dags/maintenance/backup_dwh_db.py | 34 +---- airflow/dags/tasks/maintenance/backup.py | 64 ++-------- airflow/dags/tasks/socrata_tasks.py | 146 ++++++++++++---------- 4 files changed, 104 insertions(+), 151 deletions(-) diff --git a/airflow/dags/cc_utils/utils.py b/airflow/dags/cc_utils/utils.py index b3f9125..bc44821 100644 --- a/airflow/dags/cc_utils/utils.py +++ b/airflow/dags/cc_utils/utils.py @@ -1,4 +1,6 @@ +from contextlib import contextmanager import datetime as dt +import logging from pathlib import Path import re import subprocess @@ -58,3 +60,12 @@ def produce_offset_and_nrows_counts_for_pd_read_csv(file_path: Path, rows_per_ba nrow_nums = [rows_per_batch for el in offsets] nrow_and_offset_nums = [{"offset": el[0], "nrows": el[1]} for el in zip(offsets, nrow_nums)] return nrow_and_offset_nums + + +def log_as_info(logger: logging.Logger, msg: str) -> None: + try: + original_level = logger.level + logger.setLevel(logging.INFO) + logger.info(msg) + finally: + logger.setLevel(original_level) diff --git a/airflow/dags/maintenance/backup_dwh_db.py b/airflow/dags/maintenance/backup_dwh_db.py index f21b51a..fb7172e 100644 --- a/airflow/dags/maintenance/backup_dwh_db.py +++ b/airflow/dags/maintenance/backup_dwh_db.py @@ -1,41 +1,10 @@ import datetime as dt -from logging import Logger -import logging -import subprocess -from airflow.decorators import dag, task +from airflow.decorators import dag from airflow.models.baseoperator import chain from tasks.maintenance.backup import backup_and_lightly_verify_postgres_db -task_logger = logging.getLogger("airflow.task") - - -@task -def dump_db(task_logger: Logger) -> None: - cmd = f"""PGPASSWORD=$POSTGRES_PASSWORD \ - pg_dump -h airflow_db -U $POSTGRES_USER $POSTGRES_DB > \ - /opt/airflow/backups/airflow_metadata_db_dump_`date +%d-%m-%Y"_"%H_%M_%S`.sql && \ - PGPASSWORD="" \ - """ - subproc_output = subprocess.run(cmd, shell=True, capture_output=True, text=True) - for el in subproc_output.stdout.split("\n"): - task_logger.info(f"{el}") - return "result" - - -@task -def dumpall_dwh_db(task_logger: Logger) -> None: - cmd = f"""PGPASSWORD=$DWH_POSTGRES_PASSWORD \ - pg_dumpall -U $DWH_POSTGRES_USER -h dwh_db -c | gzip > \ - /opt/airflow/backups/dwh_db_dump_`date +%d-%m-%Y"_"%H_%M_%S`.sql.gz && \ - PGPASSWORD="" \ - """ - subproc_output = subprocess.run(cmd, shell=True, capture_output=True, text=True) - for el in subproc_output.stdout.split("\n"): - task_logger.info(f"{el}") - return "result" - @dag( schedule=None, @@ -44,7 +13,6 @@ def dumpall_dwh_db(task_logger: Logger) -> None: tags=["utility", "maintenance"], ) def backup_dwh_db(): - # dump_db_1 = dumpall_dwh_db(task_logger=task_logger) dump_db_1 = backup_and_lightly_verify_postgres_db(conn_id="dwh_db_conn") chain(dump_db_1) diff --git a/airflow/dags/tasks/maintenance/backup.py b/airflow/dags/tasks/maintenance/backup.py index 8992236..8500141 100644 --- a/airflow/dags/tasks/maintenance/backup.py +++ b/airflow/dags/tasks/maintenance/backup.py @@ -11,56 +11,7 @@ from airflow.models.baseoperator import chain from airflow.providers.postgres.hooks.postgres import PostgresHook - -@task -def dump_postgres_db_v0(conn_id: str) -> Path: - logger = logging.getLogger(__name__) - hook = PostgresHook(postgres_conn_id=conn_id) - conn = hook.get_connection(conn_id) - service_name = conn.host - file_name = f"{service_name}_backup_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}" - backup_file_dir = Path("/opt/airflow/backups").joinpath(service_name) - backup_file_dir.mkdir(exist_ok=True) - backup_file_path = backup_file_dir.joinpath(f"{file_name}.dump") - gzipped_file_path = backup_file_dir.joinpath(f"{file_name}.gz") - logger.info(f"Preparing to backup the {service_name} db to {backup_file_path}.") - try: - result = subprocess.run( - [ - "pg_dumpall", - "-h", - service_name, - "-p", - str(conn.port), - "-U", - conn.login, - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env={"PGPASSWORD": conn.password}, - check=True, - ) - if result.stderr: - logger.warning(f"pg_dumpall warnings: {result.stderr}") - with open(backup_file_path, "wb") as f: - f.write(result.stdout) - except subprocess.CalledProcessError as e: - raise AirflowException(f"Database backup failed: {e.stderr.decode()}") - - logger.info("Compressing backup file...") - try: - with open(backup_file_path, "wb") as f_in: - with gzip.open(gzipped_file_path, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - except Exception as e: - raise AirflowException(f"Compression failed: {str(e)}") - finally: - if backup_file_path.exists(): - backup_file_path.unlink() - logger.info( - f"Finished backing up and compressing the {service_name} db to {gzipped_file_path}." - ) - return gzipped_file_path +from cc_utils.utils import log_as_info @task @@ -74,7 +25,7 @@ def dump_postgres_db(conn_id: str) -> Path: backup_file_dir.mkdir(exist_ok=True) backup_file_path = backup_file_dir.joinpath(f"{file_name}.dump") gzipped_file_path = backup_file_dir.joinpath(f"{file_name}.gz") - logger.info(f"Preparing to backup the {service_name} db to {backup_file_path}.") + log_as_info(logger, f"Preparing to backup the {service_name} db to {backup_file_path}.") try: with gzip.open(gzipped_file_path, "wb", compresslevel=9) as gz_file: process = subprocess.Popen( @@ -106,7 +57,9 @@ def dump_postgres_db(conn_id: str) -> Path: if gzipped_file_path.exists(): gzipped_file_path.unlink() raise AirflowException(f"Backup failed: {str(e)}") - logger.info(f"Successfully backed up and compressed {service_name} db to {gzipped_file_path}") + log_as_info( + logger, f"Successfully backed up and compressed {service_name} db to {gzipped_file_path}" + ) return gzipped_file_path @@ -117,7 +70,7 @@ def lightly_verify_backup(backup_file_path: Path) -> bool: logger = logging.getLogger(__name__) backup_file_path = Path(backup_file_path) try: - logger.info(f"Verifying backup file: {backup_file_path}") + log_as_info(logger, f"Verifying backup file: {backup_file_path}") try: with gzip.open(backup_file_path, "rb") as f: f.read(1024) @@ -150,10 +103,11 @@ def lightly_verify_backup(backup_file_path: Path) -> bool: file_size = backup_file_path.stat().st_size if file_size < 100: raise AirflowException(f"Backup file suspiciously small: {file_size} bytes") - logger.info( + log_as_info( + logger, f"Backup verification successful: " f"Found {matches_found} expected patterns, " - f"file size: {file_size/1024/1024:.2f}MB" + f"file size: {file_size/1024/1024:.2f}MB", ) return True except Exception as e: diff --git a/airflow/dags/tasks/socrata_tasks.py b/airflow/dags/tasks/socrata_tasks.py index 810909f..ac80a9d 100644 --- a/airflow/dags/tasks/socrata_tasks.py +++ b/airflow/dags/tasks/socrata_tasks.py @@ -28,6 +28,7 @@ get_local_data_raw_dir, get_lines_in_geojson_file, produce_slice_indices_for_gpd_read_file, + log_as_info, ) from cc_utils.validation import ( run_checkpoint, @@ -56,7 +57,7 @@ def ingest_into_table( else: table_name = f"{socrata_metadata.table_name}" if_exists = "fail" - task_logger.info(f"Ingesting data to database table 'data_raw.{table_name}'") + log_as_info(task_logger, f"Ingesting data to database table 'data_raw.{table_name}'") source_data_updated = socrata_metadata.data_freshness_check["source_data_last_updated"] time_of_check = socrata_metadata.data_freshness_check["time_of_check"] engine = get_pg_engine(conn_id=conn_id) @@ -73,7 +74,7 @@ def ingest_into_table( if_exists=if_exists, chunksize=100000, ) - task_logger.info("Successfully ingested data using gpd.to_postgis()") + log_as_info(task_logger, "Successfully ingested data using gpd.to_postgis()") else: import pandas as pd @@ -87,7 +88,7 @@ def ingest_into_table( if_exists=if_exists, chunksize=100000, ) - task_logger.info("Successfully ingested data using pd.to_sql()") + log_as_info(task_logger, "Successfully ingested data using pd.to_sql()") @task @@ -95,9 +96,10 @@ def get_socrata_table_metadata( socrata_table: SocrataTable, task_logger: Logger ) -> SocrataTableMetadata: socrata_metadata = SocrataTableMetadata(socrata_table=socrata_table) - task_logger.info( + log_as_info( + task_logger, f"Retrieved metadata for socrata table {socrata_metadata.table_name} and table_id" - + f" {socrata_metadata.table_id}." + + f" {socrata_metadata.table_id}.", ) return socrata_metadata @@ -110,10 +112,11 @@ def extract_table_freshness_info( ) -> SocrataTableMetadata: engine = get_pg_engine(conn_id=conn_id) socrata_metadata.check_warehouse_data_freshness(engine=engine) - task_logger.info( + log_as_info( + task_logger, f"Extracted table freshness information. " + f"Fresh source data available: {socrata_metadata.data_freshness_check['updated_data_available']} " - + f"Fresh source metadata available: {socrata_metadata.data_freshness_check['updated_metadata_available']}" + + f"Fresh source metadata available: {socrata_metadata.data_freshness_check['updated_metadata_available']}", ) return socrata_metadata @@ -126,7 +129,8 @@ def ingest_table_freshness_check_metadata( ) -> None: engine = get_pg_engine(conn_id=conn_id) socrata_metadata.insert_current_freshness_check_to_db(engine=engine) - task_logger.info( + log_as_info( + task_logger, f"Ingested table freshness check results into metadata table. " + f"Freshness check id: {socrata_metadata.freshness_check_id}", ) @@ -156,11 +160,12 @@ def fresher_source_data_available( ) table_does_not_exist = socrata_metadata.table_name not in tables_in_data_raw_schema update_availble = socrata_metadata.data_freshness_check["updated_data_available"] - task_logger.info( - f"In fresher_source_data_available; table_does_not_exist: {table_does_not_exist}" + log_as_info( + task_logger, + f"In fresher_source_data_available; table_does_not_exist: {table_does_not_exist}", ) - task_logger.info(f" --- table_does_not_exist: {table_does_not_exist}") - task_logger.info(f" --- update_availble: {update_availble}") + log_as_info(task_logger, f" --- table_does_not_exist: {table_does_not_exist}") + log_as_info(task_logger, f" --- update_availble: {update_availble}") if table_does_not_exist or update_availble: return "update_socrata_table.download_fresh_data" else: @@ -174,9 +179,9 @@ def download_fresh_data(task_logger: Logger, **kwargs) -> SocrataTableMetadata: task_ids="update_socrata_table.check_table_metadata.ingest_table_freshness_check_metadata" ) output_file_path = get_local_file_path(socrata_metadata=socrata_metadata) - task_logger.info(f"Started downloading data at {dt.datetime.utcnow()} UTC") + log_as_info(task_logger, f"Started downloading data at {dt.datetime.utcnow()} UTC") urlretrieve(url=socrata_metadata.data_download_url, filename=output_file_path) - task_logger.info(f"Finished downloading data at {dt.datetime.utcnow()} UTC") + log_as_info(task_logger, f"Finished downloading data at {dt.datetime.utcnow()} UTC") return socrata_metadata @@ -198,7 +203,7 @@ def drop_temp_table( ti = kwargs["ti"] socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") - task_logger.info(f"inside drop_temp_table, from {route_str}") + log_as_info(task_logger, f"inside drop_temp_table, from {route_str}") engine = get_pg_engine(conn_id=conn_id) try: full_temp_table_name = f"data_raw.temp_{socrata_metadata.table_name}" @@ -217,9 +222,10 @@ def create_temp_data_raw_table(conn_id: str, task_logger: Logger, **kwargs) -> N socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") table_name = f"temp_{socrata_metadata.table_name}" local_file_path = get_local_file_path(socrata_metadata=socrata_metadata) - task_logger.info( + log_as_info( + task_logger, f"Attempting to create table 'data_raw.{table_name}, " - + f"dtypes inferred from file {local_file_path}." + + f"dtypes inferred from file {local_file_path}.", ) if local_file_path.is_file(): engine = get_pg_engine(conn_id=conn_id) @@ -243,7 +249,7 @@ def create_temp_data_raw_table(conn_id: str, task_logger: Logger, **kwargs) -> N ) table_create_obj = a_table._create_table_setup() table_create_obj.create(bind=engine) - task_logger.info(f"Successfully created table 'data_raw.{table_name}'") + log_as_info(task_logger, f"Successfully created table 'data_raw.{table_name}'") return socrata_metadata else: raise Exception(f"File not found in expected location.") @@ -256,7 +262,7 @@ def ingest_csv_data( try: full_temp_table_name = f"data_raw.temp_{socrata_metadata.table_name}" file_path = get_local_file_path(socrata_metadata=socrata_metadata) - task_logger.info(f"file_path: {file_path}, is_file: {file_path.is_file()}") + log_as_info(task_logger, f"file_path: {file_path}, is_file: {file_path.is_file()}") postgres_hook = PostgresHook(postgres_conn_id=conn_id) conn = postgres_hook.get_conn() @@ -292,10 +298,12 @@ def ingest_csv_data( """ ) conn.close() - task_logger.info(f"Successfully ingested csv data into {full_temp_table_name} via COPY.") + log_as_info( + task_logger, f"Successfully ingested csv data into {full_temp_table_name} via COPY." + ) return socrata_metadata except Exception as e: - task_logger.info(f"Failed to ingest flat file to temp table. Error: {e}, {type(e)}") + log_as_info(task_logger, f"Failed to ingest flat file to temp table. Error: {e}, {type(e)}") @task_group @@ -321,7 +329,7 @@ def get_geospatial_load_indices( ), "Geojson is the only supported geospatial type at the moment." n_rows = get_lines_in_geojson_file(file_path=file_path) indexes = produce_slice_indices_for_gpd_read_file(n_rows=n_rows, rows_per_batch=rows_per_batch) - task_logger.info(f"slices spanning data: {indexes}") + log_as_info(task_logger, f"slices spanning data: {indexes}") return indexes @@ -338,7 +346,7 @@ def ingest_geojson_data( engine = get_pg_engine(conn_id=conn_id) temp_table_name = f"temp_{socrata_metadata.table_name}" file_path = get_local_file_path(socrata_metadata=socrata_metadata) - task_logger.info(f"file_path: {file_path}, is_file: {file_path.is_file()}") + log_as_info(task_logger, f"file_path: {file_path}, is_file: {file_path.is_file()}") source_data_updated = socrata_metadata.data_freshness_check["source_data_last_updated"] time_of_check = socrata_metadata.data_freshness_check["time_of_check"] @@ -349,7 +357,7 @@ def ingest_geojson_data( gdf = impute_empty_geometries_into_missing_geometries(gdf=gdf, logger=task_logger) gdf["source_data_updated"] = source_data_updated gdf["ingestion_check_time"] = time_of_check - task_logger.info(f"Shape of gdf: {gdf.shape}, columns: {gdf.columns}") + log_as_info(task_logger, f"Shape of gdf: {gdf.shape}, columns: {gdf.columns}") gdf.to_postgis( name=temp_table_name, @@ -358,8 +366,9 @@ def ingest_geojson_data( if_exists="append", index=False, ) - task_logger.info( - f"Successfully ingested records {start_index} to {end_index} using gpd.to_postgis()" + log_as_info( + task_logger, + f"Successfully ingested records {start_index} to {end_index} using gpd.to_postgis()", ) except Exception as e: task_logger.error( @@ -391,7 +400,7 @@ def load_data_tg( conn_id: str, task_logger: Logger, ) -> SocrataTableMetadata: - task_logger.info(f"Entered load_data_tg task_group") + log_as_info(task_logger, f"Entered load_data_tg task_group") file_ext_route_1 = file_ext_branch_router(socrata_metadata=socrata_metadata) geojson_route_1 = load_geojson_data( @@ -427,10 +436,12 @@ def socrata_table_checkpoint_exists(task_logger: Logger, **kwargs) -> str: socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") checkpoint_name = f"data_raw.temp_{socrata_metadata.socrata_table.table_name}" if check_if_checkpoint_exists(checkpoint_name=checkpoint_name, task_logger=task_logger): - task_logger.info(f"GE checkpoint for {checkpoint_name} exists") + log_as_info(task_logger, f"GE checkpoint for {checkpoint_name} exists") return "update_socrata_table.raw_data_validation_tg.run_socrata_checkpoint" else: - task_logger.info(f"GE checkpoint for {checkpoint_name} doesn't exist yet. Make it maybe?") + log_as_info( + task_logger, f"GE checkpoint for {checkpoint_name} doesn't exist yet. Make it maybe?" + ) return "update_socrata_table.raw_data_validation_tg.validation_endpoint" @@ -442,11 +453,12 @@ def run_socrata_checkpoint(task_logger: Logger, **kwargs) -> SocrataTableMetadat checkpoint_run_results = run_checkpoint( checkpoint_name=checkpoint_name, task_logger=task_logger ) - task_logger.info( - f"list_validation_results: {checkpoint_run_results.list_validation_results()}" + log_as_info( + task_logger, + f"list_validation_results: {checkpoint_run_results.list_validation_results()}", ) - task_logger.info(f"validation success: {checkpoint_run_results.success}") - task_logger.info(f"dir(checkpoint_run_results): {dir(checkpoint_run_results)}") + log_as_info(task_logger, f"validation success: {checkpoint_run_results.success}") + log_as_info(task_logger, f"dir(checkpoint_run_results): {dir(checkpoint_run_results)}") return socrata_metadata @@ -462,7 +474,7 @@ def raw_data_validation_tg( datasource_name: str, task_logger: Logger, ) -> SocrataTableMetadata: - task_logger.info(f"Entered raw_data_validation_tg task_group") + log_as_info(task_logger, f"Entered raw_data_validation_tg task_group") register_temp_table_1 = register_temp_table_asset( datasource_name=datasource_name, task_logger=task_logger @@ -488,12 +500,12 @@ def table_exists_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> str tables_in_data_raw_schema = get_data_table_names_in_schema( engine=get_pg_engine(conn_id=conn_id), schema_name="data_raw" ) - task_logger.info(f"tables_in_data_raw_schema: {tables_in_data_raw_schema}") + log_as_info(task_logger, f"tables_in_data_raw_schema: {tables_in_data_raw_schema}") if socrata_metadata.table_name not in tables_in_data_raw_schema: - task_logger.info(f"Table {socrata_metadata.table_name} not in data_raw; creating.") + log_as_info(task_logger, f"Table {socrata_metadata.table_name} not in data_raw; creating.") return "update_socrata_table.persist_new_raw_data_tg.create_table_in_data_raw" else: - task_logger.info(f"Table {socrata_metadata.table_name} in data_raw; skipping.") + log_as_info(task_logger, f"Table {socrata_metadata.table_name} in data_raw; skipping.") return "update_socrata_table.persist_new_raw_data_tg.dbt_data_raw_model_exists" @@ -505,7 +517,7 @@ def create_table_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> Soc ) try: table_name = socrata_metadata.table_name - task_logger.info(f"Creating table data_raw.{table_name}") + log_as_info(task_logger, f"Creating table data_raw.{table_name}") postgres_hook = PostgresHook(postgres_conn_id=conn_id) conn = postgres_hook.get_conn() cur = conn.cursor() @@ -527,8 +539,8 @@ def dbt_data_raw_model_exists(task_logger: Logger, **kwargs) -> str: task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" ) dbt_data_raw_model_dir = Path(f"/opt/airflow/dbt/models/data_raw") - task_logger.info(f"dbt data_raw model dir ('{dbt_data_raw_model_dir}')") - task_logger.info(f"Dir exists? {dbt_data_raw_model_dir.is_dir()}") + log_as_info(task_logger, f"dbt data_raw model dir ('{dbt_data_raw_model_dir}')") + log_as_info(task_logger, f"Dir exists? {dbt_data_raw_model_dir.is_dir()}") table_model_path = dbt_data_raw_model_dir.joinpath(f"{socrata_metadata.table_name}.sql") if table_model_path.is_file(): return "update_socrata_table.persist_new_raw_data_tg.update_data_raw_table" @@ -546,7 +558,7 @@ def make_dbt_data_raw_model(conn_id: str, task_logger: Logger, **kwargs) -> Socr make_dbt_data_raw_model_file( table_name=socrata_metadata.table_name, engine=get_pg_engine(conn_id=conn_id) ) - task_logger.info(f"Leaving make_dbt_data_raw_model") + log_as_info(task_logger, f"Leaving make_dbt_data_raw_model") return socrata_metadata @@ -560,10 +572,10 @@ def update_data_raw_table(task_logger: Logger, **kwargs) -> SocrataTableMetadata dbt --warn-error-options \ '{{"include": "all", "exclude": [UnusedResourceConfigPath]}}' \ run --select re_dbt.data_raw.{socrata_metadata.table_name}""" - task_logger.info(f"dbt run command: {dbt_cmd}") + log_as_info(task_logger, f"dbt run command: {dbt_cmd}") subproc_output = subprocess.run(dbt_cmd, shell=True, capture_output=True, text=True) for el in subproc_output.stdout.split("\n"): - task_logger.info(f"{el}") + log_as_info(task_logger, f"{el}") return socrata_metadata @@ -589,8 +601,10 @@ def update_result_of_check_in_metadata_table( socrata_metadata = ti.xcom_pull( task_ids="update_socrata_table.check_table_metadata.ingest_table_freshness_check_metadata" ) - task_logger.info(f"Updating table_metadata record id #{socrata_metadata.freshness_check_id}.") - task_logger.info(f"Data_pulled_this_check: {data_updated}.") + log_as_info( + task_logger, f"Updating table_metadata record id #{socrata_metadata.freshness_check_id}." + ) + log_as_info(task_logger, f"Data_pulled_this_check: {data_updated}.") socrata_metadata.update_current_freshness_check_in_db( engine=get_pg_engine(conn_id=conn_id), update_payload={"data_pulled_this_check": data_updated}, @@ -600,7 +614,7 @@ def update_result_of_check_in_metadata_table( @task_group def persist_new_raw_data_tg(conn_id: str, datasource_name: str, task_logger: Logger) -> None: - task_logger.info(f"Entered persist_new_raw_data_tg task_group") + log_as_info(task_logger, f"Entered persist_new_raw_data_tg task_group") table_exists_1 = table_exists_in_data_raw(conn_id=conn_id, task_logger=task_logger) create_data_raw_table_1 = create_table_in_data_raw(conn_id=conn_id, task_logger=task_logger) dbt_data_raw_model_exists_1 = dbt_data_raw_model_exists( @@ -646,26 +660,32 @@ def dbt_standardized_model_ready(task_logger: Logger, **kwargs) -> str: "REPLACE_WITH_COMPOSITE_KEY_COLUMNS" in file_line or "REPLACE_WITH_BETTER_id" in file_line ): - task_logger.info( + log_as_info( + task_logger, f"Found unfinished stub for dbt _standardized model in {host_file_path}." - + " Please update that model before proceeding to feature engineering." + + " Please update that model before proceeding to feature engineering.", ) return "update_socrata_table.transform_data_tg.highlight_unfinished_dbt_standardized_stub" - task_logger.info(f"Found a _standardized stage dbt model that looks finished; Proceeding") + log_as_info( + task_logger, f"Found a _standardized stage dbt model that looks finished; Proceeding" + ) return "update_socrata_table.transform_data_tg.dbt_clean_model_ready" else: - task_logger.info(f"No _standardized stage dbt model found.") - task_logger.info(f"Creating a stub in loc: {host_file_path}") - task_logger.info(f"Edit the stub before proceeding to generate _clean stage dbt models.") + log_as_info(task_logger, f"No _standardized stage dbt model found.") + log_as_info(task_logger, f"Creating a stub in loc: {host_file_path}") + log_as_info( + task_logger, f"Edit the stub before proceeding to generate _clean stage dbt models." + ) return "update_socrata_table.transform_data_tg.make_dbt_standardized_model" @task def highlight_unfinished_dbt_standardized_stub(task_logger: Logger) -> str: - task_logger.info( + log_as_info( + task_logger, f"Hey! Go finish the dbt _standardized model file indicated in the logs for the " + "dbt_standardized_model_ready task! It still contains at least one placeholder value" - + "(REPLACE_WITH_COMPOSITE_KEY_COLUMNS or REPLACE_WITH_BETTER_id)." + + "(REPLACE_WITH_COMPOSITE_KEY_COLUMNS or REPLACE_WITH_BETTER_id).", ) return "Please and thank you!" @@ -682,11 +702,11 @@ def make_dbt_standardized_model(conn_id: str, task_logger: Logger, **kwargs) -> f"/opt/airflow/dbt/models/standardized/{socrata_metadata.table_name}_standardized.sql" ) write_lines_to_file(file_lines=std_file_lines, file_path=file_path) - task_logger.info(f"file_lines for table {socrata_metadata.table_name}") + log_as_info(task_logger, f"file_lines for table {socrata_metadata.table_name}") for file_line in std_file_lines: - task_logger.info(f" {file_line}") + log_as_info(task_logger, f" {file_line}") - task_logger.info(f"Leaving make_dbt_standardized_model") + log_as_info(task_logger, f"Leaving make_dbt_standardized_model") @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -701,7 +721,7 @@ def dbt_clean_model_ready(task_logger: Logger, **kwargs) -> str: f"{socrata_metadata.table_name}_clean.sql", ) if file_path.is_file(): - task_logger.info(f"Found a _clean stage dbt model that looks finished; Ending") + log_as_info(task_logger, f"Found a _clean stage dbt model that looks finished; Ending") return "update_socrata_table.transform_data_tg.run_dbt_models__standardized_onward" else: return "update_socrata_table.transform_data_tg.dbt_make_clean_model" @@ -719,7 +739,7 @@ def dbt_make_clean_model(task_logger: Logger, **kwargs) -> SocrataTableMetadata: f"{socrata_metadata.table_name}_clean.sql", ) clean_file_lines = format_dbt_stub_for_clean_stage(table_name=socrata_metadata.table_name) - task_logger.info(f"clean_file_lines: {clean_file_lines}") + log_as_info(task_logger, f"clean_file_lines: {clean_file_lines}") write_lines_to_file(file_lines=clean_file_lines, file_path=clean_file_path) return socrata_metadata @@ -732,16 +752,16 @@ def run_dbt_models__standardized_onward(task_logger: Logger, **kwargs) -> Socrat dbt --warn-error-options \ '{{"include": "all", "exclude": [UnusedResourceConfigPath]}}' \ run --select re_dbt.standardized.{socrata_metadata.table_name}_standardized+""" - task_logger.info(f"dbt run command: {dbt_cmd}") + log_as_info(task_logger, f"dbt run command: {dbt_cmd}") subproc_output = subprocess.run(dbt_cmd, shell=True, capture_output=True, text=True) for el in subproc_output.stdout.split("\n"): - task_logger.info(f"{el}") + log_as_info(task_logger, f"{el}") return socrata_metadata @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def endpoint(task_logger: Logger) -> None: - task_logger.info("Ending run") + log_as_info(task_logger, "Ending run") return "end" @@ -787,7 +807,7 @@ def update_socrata_table( datasource_name: str, task_logger: Logger, ) -> SocrataTableMetadata: - task_logger.info(f"Updating Socrata table {socrata_table.table_name}") + log_as_info(task_logger, f"Updating Socrata table {socrata_table.table_name}") metadata_1 = check_table_metadata( socrata_table=socrata_table, conn_id=conn_id, task_logger=task_logger