Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iss 203 compress backups #206

Merged
merged 2 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
dev_utils/
airflow/dags/dev_experiments/


notes/
sandbox/
scraps/
**/logs
airflow/data_raw
airflow/backups
11 changes: 11 additions & 0 deletions airflow/dags/cc_utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from contextlib import contextmanager
import datetime as dt
import logging
from pathlib import Path
import re
import subprocess
Expand Down Expand Up @@ -58,3 +60,12 @@ def produce_offset_and_nrows_counts_for_pd_read_csv(file_path: Path, rows_per_ba
nrow_nums = [rows_per_batch for el in offsets]
nrow_and_offset_nums = [{"offset": el[0], "nrows": el[1]} for el in zip(offsets, nrow_nums)]
return nrow_and_offset_nums


def log_as_info(logger: logging.Logger, msg: str) -> None:
try:
original_level = logger.level
logger.setLevel(logging.INFO)
logger.info(msg)
finally:
logger.setLevel(original_level)
36 changes: 3 additions & 33 deletions airflow/dags/maintenance/backup_dwh_db.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,9 @@
import datetime as dt
from logging import Logger
import logging
import subprocess

from airflow.decorators import dag, task
from airflow.decorators import dag
from airflow.models.baseoperator import chain


task_logger = logging.getLogger("airflow.task")


@task
def dump_db(task_logger: Logger) -> None:
cmd = f"""PGPASSWORD=$POSTGRES_PASSWORD \
pg_dump -h airflow_db -U $POSTGRES_USER $POSTGRES_DB > \
/opt/airflow/backups/airflow_metadata_db_dump_`date +%d-%m-%Y"_"%H_%M_%S`.sql && \
PGPASSWORD="" \
"""
subproc_output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
for el in subproc_output.stdout.split("\n"):
task_logger.info(f"{el}")
return "result"


@task
def dumpall_dwh_db(task_logger: Logger) -> None:
cmd = f"""PGPASSWORD=$DWH_POSTGRES_PASSWORD \
pg_dumpall -U $DWH_POSTGRES_USER -h dwh_db -c | gzip > \
/opt/airflow/backups/dwh_db_dump_`date +%d-%m-%Y"_"%H_%M_%S`.sql.gz && \
PGPASSWORD="" \
"""
subproc_output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
for el in subproc_output.stdout.split("\n"):
task_logger.info(f"{el}")
return "result"
from tasks.maintenance.backup import backup_and_lightly_verify_postgres_db


@dag(
Expand All @@ -43,7 +13,7 @@ def dumpall_dwh_db(task_logger: Logger) -> None:
tags=["utility", "maintenance"],
)
def backup_dwh_db():
dump_db_1 = dumpall_dwh_db(task_logger=task_logger)
dump_db_1 = backup_and_lightly_verify_postgres_db(conn_id="dwh_db_conn")
chain(dump_db_1)


Expand Down
123 changes: 123 additions & 0 deletions airflow/dags/tasks/maintenance/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import datetime as dt
import gzip
import logging
import os
from pathlib import Path
import shutil
import subprocess

from airflow.decorators import task, task_group
from airflow.exceptions import AirflowException
from airflow.models.baseoperator import chain
from airflow.providers.postgres.hooks.postgres import PostgresHook

from cc_utils.utils import log_as_info


@task
def dump_postgres_db(conn_id: str) -> Path:
logger = logging.getLogger(__name__)
hook = PostgresHook(postgres_conn_id=conn_id)
conn = hook.get_connection(conn_id)
service_name = conn.host
file_name = f"{service_name}_backup_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_file_dir = Path("/opt/airflow/backups").joinpath(service_name)
backup_file_dir.mkdir(exist_ok=True)
backup_file_path = backup_file_dir.joinpath(f"{file_name}.dump")
gzipped_file_path = backup_file_dir.joinpath(f"{file_name}.gz")
log_as_info(logger, f"Preparing to backup the {service_name} db to {backup_file_path}.")
try:
with gzip.open(gzipped_file_path, "wb", compresslevel=9) as gz_file:
process = subprocess.Popen(
[
"pg_dumpall",
"-h",
service_name,
"-p",
str(conn.port),
"-U",
conn.login,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PGPASSWORD": conn.password},
)
while True:
chunk = process.stdout.read(8192)
if not chunk:
break
gz_file.write(chunk)
stderr = process.stderr.read()
return_code = process.wait()
if return_code != 0:
raise AirflowException(f"Database backup failed: {stderr.decode()}")
if stderr:
logger.warning(f"pg_dumpall warnings: {stderr.decode()}")
except Exception as e:
if gzipped_file_path.exists():
gzipped_file_path.unlink()
raise AirflowException(f"Backup failed: {str(e)}")
log_as_info(
logger, f"Successfully backed up and compressed {service_name} db to {gzipped_file_path}"
)
return gzipped_file_path


@task
def lightly_verify_backup(backup_file_path: Path) -> bool:
"""Verify the compressed backup file using lightweight heuristic checks"""

logger = logging.getLogger(__name__)
backup_file_path = Path(backup_file_path)
try:
log_as_info(logger, f"Verifying backup file: {backup_file_path}")
try:
with gzip.open(backup_file_path, "rb") as f:
f.read(1024)
except gzip.BadGzipFile as e:
raise AirflowException(f"Corrupt gzip file: {str(e)}")
expected_patterns = [
b"PostgreSQL database dump",
b"SET statement_timeout",
b"CREATE DATABASE",
b"COMMENT",
]
pattern_matches = {pattern: False for pattern in expected_patterns}
with gzip.open(backup_file_path, "rb") as f:
chunk_size = 1024 * 1024
while True:
chunk = f.read(chunk_size)
if not chunk:
break
for pattern in expected_patterns:
if pattern in chunk:
pattern_matches[pattern] = True
if all(pattern_matches.values()):
break
matches_found = sum(pattern_matches.values())
if matches_found < 2:
raise AirflowException(
f"Backup file doesn't appear to be a valid PostgreSQL dump. "
f"Only found {matches_found} expected patterns."
)
file_size = backup_file_path.stat().st_size
if file_size < 100:
raise AirflowException(f"Backup file suspiciously small: {file_size} bytes")
log_as_info(
logger,
f"Backup verification successful: "
f"Found {matches_found} expected patterns, "
f"file size: {file_size/1024/1024:.2f}MB",
)
return True
except Exception as e:
logger.error(f"Backup verification failed: {str(e)}")
raise AirflowException(f"Backup verification failed: {str(e)}")


@task_group
def backup_and_lightly_verify_postgres_db(conn_id: str) -> None:
dump_db_ = dump_postgres_db(conn_id=conn_id)
verify_db_ = lightly_verify_backup(dump_db_)

chain(dump_db_, verify_db_)
Loading