From 7ca377e1a0a0647644e2f0952f20f2cc97060811 Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 1 Mar 2024 21:22:54 +0000 Subject: [PATCH 1/7] #894 add send_slack_msg adapted from vfh_data_mgmt --- dags/dag_functions.py | 51 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/dags/dag_functions.py b/dags/dag_functions.py index 54390833f..34d81265d 100644 --- a/dags/dag_functions.py +++ b/dags/dag_functions.py @@ -152,3 +152,54 @@ def get_readme_docmd(readme_path, dag_name): doc_md_key = '' doc_md_regex = '(?<=' + doc_md_key + '\n)[\s\S]+(?=\n' + doc_md_key + ')' return re.findall(doc_md_regex, contents)[0] + +def send_slack_msg( + context: dict, + msg: str, + attachments: Optional[list] = None, + blocks: Optional[list] = None, + use_proxy: Optional[bool] = False, + dev_mode: Optional[bool] = None +) -> Any: + """Sends a message to Slack. + + Args: + context: The calling Airflow task's context. + msg : A string message be sent to Slack. + slack_conn_id: ID of the Airflow connection with the details of the + Slack channel to send messages to. + attachments: List of dictionaries representing Slack attachments. + blocks: List of dictionaries representing Slack blocks. + use_proxy: A boolean to indicate whether to use a proxy or not. Proxy + usage is required to make the Slack webhook call on on-premises + servers (default False). + dev_mode: A boolean to indicate if working in development mode to send + Slack alerts to data_pipeline_dev instead of the regular + data_pipeline (default None, to be determined based on the location + of the file). + """ + if dev_mode or (dev_mode is None and not is_prod_mode()): + SLACK_CONN_ID = "slack_data_pipeline_dev" + else: + SLACK_CONN_ID = "slack_data_pipeline" + + if use_proxy: + # get the proxy credentials from the Airflow connection ``slack``. It + # contains username and password to set the proxy : + proxy=( + f"http://{BaseHook.get_connection('slack').password}" + f"@{json.loads(BaseHook.get_connection('slack').extra)['url']}" + ) + else: + proxy = None + + slack_alert = SlackWebhookOperator( + task_id="slack_test", + slack_webhook_conn_id=SLACK_CONN_ID, + message=msg, + username="airflow", + attachments=attachments, + blocks=blocks, + proxy=proxy, + ) + return slack_alert.execute(context=context) \ No newline at end of file From 0654207a978ba0295033669b225774d03702a5f1 Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 1 Mar 2024 21:36:29 +0000 Subject: [PATCH 2/7] #894 add status_message task, turn off on_failure_callback in copy_tables --- dags/counts_replicator.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/dags/counts_replicator.py b/dags/counts_replicator.py index 4196cf443..9fd5ceb52 100644 --- a/dags/counts_replicator.py +++ b/dags/counts_replicator.py @@ -14,14 +14,14 @@ from functools import partial import pendulum # pylint: disable=import-error -from airflow.decorators import dag +from airflow.decorators import dag, task from airflow.models import Variable # import custom operators and helper functions repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) sys.path.insert(0, repo_path) # pylint: disable=wrong-import-position -from dags.dag_functions import task_fail_slack_alert +from dags.dag_functions import task_fail_slack_alert, send_slack_msg # pylint: enable=import-error DAG_NAME = "counts_replicator" @@ -56,9 +56,32 @@ def counts_replicator(): # Returns a list of source and destination tables stored in the given # Airflow variable. tables = get_variable.override(task_id="get_list_of_tables")("counts_tables") - # Waits for an external trigger - wait_for_external_trigger() >> tables + + @task( + retries = 0, + doc_md = """A status message to report DAG success OR any failures from the `copy_tables` task.""" + ) + def status_message(tables, **context): + ti = context["ti"] + failures = [] + #iterate through mapped tasks to find any failure messages + for m_i in range(0, len(tables)): + failure_msg = ti.xcom_pull(key="extra_msg", task_ids="copy_tables", map_indexes=m_i)[0] + if failure_msg is not None: + failures.append(failure_msg) + if failures == []: + slack_msg = f"{DAG_NAME} DAG succeeded :white_check_mark:" + send_slack_msg( + context=context, + msg=slack_msg + ) + else: #add details of failures to task_fail_slack_alert + context.get("task_instance").xcom_push(key="extra_msg", value=failures) + # Copies tables - copy_table.override(task_id="copy_tables").partial(conn_id="traffic_bot").expand(table=tables) + copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id="traffic_bot").expand(table=tables) + + # Waits for an external trigger + wait_for_external_trigger() >> tables >> copy_tables >> status_message(tables=tables) counts_replicator() From f18900b7f95b5e0e4f34f2d70b36968b0767d9f5 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:50:39 +0000 Subject: [PATCH 3/7] #894 change to only push xcom w/pscyopg2 error --- dags/common_tasks.py | 53 ++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/dags/common_tasks.py b/dags/common_tasks.py index b85768191..fc45448f7 100644 --- a/dags/common_tasks.py +++ b/dags/common_tasks.py @@ -1,5 +1,5 @@ -from psycopg2 import sql +from psycopg2 import sql, Error from typing import Tuple import logging # pylint: disable=import-error @@ -47,11 +47,6 @@ def copy_table(conn_id:str, table:Tuple[str, str], **context) -> None: ``schema.table``, and the destination table in the same format ``schema.table``. """ - # push an extra failure message to be sent to Slack in case of failing - context["task_instance"].xcom_push( - "extra_msg", - f"Failed to copy `{table[0]}` to `{table[1]}`." - ) # separate tables and schemas try: src_schema, src_table = table[0].split(".") @@ -94,25 +89,35 @@ def copy_table(conn_id:str, table:Tuple[str, str], **context) -> None: sql.Identifier(src_schema), sql.Identifier(src_table), sql.Identifier(dst_schema), sql.Identifier(dst_table), ) - with con, con.cursor() as cur: - # truncate the destination table - cur.execute(truncate_query) - # get the column names of the source table - cur.execute(source_columns_query, [src_schema, src_table]) - src_columns = [r[0] for r in cur.fetchall()] - # copy all the data - insert_query = sql.SQL( - "INSERT INTO {}.{} ({}) SELECT {} FROM {}.{}" - ).format( - sql.Identifier(dst_schema), sql.Identifier(dst_table), - sql.SQL(', ').join(map(sql.Identifier, src_columns)), - sql.SQL(', ').join(map(sql.Identifier, src_columns)), - sql.Identifier(src_schema), sql.Identifier(src_table) - ) - cur.execute(insert_query) - # copy the table's comment - cur.execute(comment_query) + try: + with con, con.cursor() as cur: + # truncate the destination table + cur.execute(truncate_query) + # get the column names of the source table + cur.execute(source_columns_query, [src_schema, src_table]) + src_columns = [r[0] for r in cur.fetchall()] + # copy all the data + insert_query = sql.SQL( + "INSERT INTO {}.{} ({}) SELECT {} FROM {}.{}" + ).format( + sql.Identifier(dst_schema), sql.Identifier(dst_table), + sql.SQL(', ').join(map(sql.Identifier, src_columns)), + sql.SQL(', ').join(map(sql.Identifier, src_columns)), + sql.Identifier(src_schema), sql.Identifier(src_table) + ) + cur.execute(insert_query) + # copy the table's comment + cur.execute(comment_query) + #catch psycopg2 errors: + except Error as e: + # push an extra failure message to be sent to Slack in case of failing + context["task_instance"].xcom_push( + key="extra_msg", + value=f"Failed to copy `{table[0]}` to `{table[1]}`: `{str(e).strip()}`." + ) + raise AirflowFailException(e) + LOGGER.info(f"Successfully copied {table[0]} to {table[1]}.") @task.short_circuit(ignore_downstream_trigger_rules=False, retries=0) #only skip immediately downstream task From ac84d101b42c6ca57249b83b2331869fceb00144 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:51:18 +0000 Subject: [PATCH 4/7] #894 fixes to status_message task --- dags/counts_replicator.py | 24 +++++++++++++----------- dags/dag_functions.py | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/dags/counts_replicator.py b/dags/counts_replicator.py index 9fd5ceb52..d6fdf8844 100644 --- a/dags/counts_replicator.py +++ b/dags/counts_replicator.py @@ -11,11 +11,11 @@ import os import sys from datetime import timedelta -from functools import partial import pendulum # pylint: disable=import-error from airflow.decorators import dag, task from airflow.models import Variable +from airflow.exceptions import AirflowFailException # import custom operators and helper functions repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) @@ -45,7 +45,7 @@ max_active_tasks=5, schedule=None, doc_md=__doc__, - tags=["counts"] + tags=["counts", "replicator"] ) def counts_replicator(): """The main function of the counts DAG.""" @@ -57,30 +57,32 @@ def counts_replicator(): # Airflow variable. tables = get_variable.override(task_id="get_list_of_tables")("counts_tables") + # Copies tables + copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id="traffic_bot").expand(table=tables) + @task( - retries = 0, - doc_md = """A status message to report DAG success OR any failures from the `copy_tables` task.""" + retries=0, + trigger_rule='all_done', + doc_md="""A status message to report DAG success OR any failures from the `copy_tables` task.""" ) def status_message(tables, **context): ti = context["ti"] failures = [] #iterate through mapped tasks to find any failure messages for m_i in range(0, len(tables)): - failure_msg = ti.xcom_pull(key="extra_msg", task_ids="copy_tables", map_indexes=m_i)[0] + failure_msg = ti.xcom_pull(key="extra_msg", task_ids="copy_tables", map_indexes=m_i) if failure_msg is not None: failures.append(failure_msg) if failures == []: - slack_msg = f"{DAG_NAME} DAG succeeded :white_check_mark:" send_slack_msg( context=context, - msg=slack_msg + msg=f"{DAG_NAME} DAG succeeded :white_check_mark:" ) else: #add details of failures to task_fail_slack_alert - context.get("task_instance").xcom_push(key="extra_msg", value=failures) + failure_extra_msg = ['One or more tables failed to copy:', failures] + context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg) + raise AirflowFailException('One or more tables failed to copy.') - # Copies tables - copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id="traffic_bot").expand(table=tables) - # Waits for an external trigger wait_for_external_trigger() >> tables >> copy_tables >> status_message(tables=tables) diff --git a/dags/dag_functions.py b/dags/dag_functions.py index 34d81265d..d2cc751d3 100644 --- a/dags/dag_functions.py +++ b/dags/dag_functions.py @@ -98,7 +98,7 @@ def task_fail_slack_alert( # in case of a string (or the default empty string) extra_msg_str = extra_msg - if isinstance(extra_msg_str, tuple): + if isinstance(extra_msg_str, tuple) or isinstance(extra_msg_str, list): #recursively collapse extra_msg_str's which are in the form of a list with new lines. extra_msg_str = '\n'.join( ['\n'.join(item) if isinstance(item, list) else item for item in extra_msg_str] From e06b862418062fc576d80c12eb4d89cbfa44a03d Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:55:34 +0000 Subject: [PATCH 5/7] #894 create counts and collisions DAGs from variable --- dags/counts_replicator.py | 116 ++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 56 deletions(-) diff --git a/dags/counts_replicator.py b/dags/counts_replicator.py index d6fdf8844..e0d268e1e 100644 --- a/dags/counts_replicator.py +++ b/dags/counts_replicator.py @@ -24,66 +24,70 @@ from dags.dag_functions import task_fail_slack_alert, send_slack_msg # pylint: enable=import-error -DAG_NAME = "counts_replicator" -DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"]) +REPLICATORS = Variable.get('replicators', deserialize_json=True) -default_args = { - "owner": ",".join(DAG_OWNERS), - "depends_on_past": False, - "start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"), - "email_on_failure": False, - "retries": 3, - "retry_delay": timedelta(minutes=60), - "on_failure_callback": task_fail_slack_alert, -} +for replicator in REPLICATORS: -@dag( - dag_id=DAG_NAME, - default_args=default_args, - catchup=False, - max_active_runs=5, - max_active_tasks=5, - schedule=None, - doc_md=__doc__, - tags=["counts", "replicator"] -) -def counts_replicator(): - """The main function of the counts DAG.""" - from dags.common_tasks import ( - wait_for_external_trigger, get_variable, copy_table + DAG_NAME = REPLICATORS[replicator]['dag_name'] + DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"]) + + default_args = { + "owner": ",".join(DAG_OWNERS), + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"), + "email_on_failure": False, + "retries": 3, + "retry_delay": timedelta(minutes=60), + "on_failure_callback": task_fail_slack_alert, + } + + @dag( + dag_id=DAG_NAME, + default_args=default_args, + catchup=False, + max_active_runs=5, + max_active_tasks=5, + schedule=None, + doc_md=__doc__, + tags=[replicator, "replicator"] ) + def replicator_DAG(): + f"""The main function of the {replicator} DAG.""" + from dags.common_tasks import ( + wait_for_external_trigger, get_variable, copy_table + ) - # Returns a list of source and destination tables stored in the given - # Airflow variable. - tables = get_variable.override(task_id="get_list_of_tables")("counts_tables") + # Returns a list of source and destination tables stored in the given + # Airflow variable. + tables = get_variable.override(task_id="get_list_of_tables")(REPLICATORS[replicator]['tables']) - # Copies tables - copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id="traffic_bot").expand(table=tables) + # Copies tables + copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id=REPLICATORS[replicator]['conn']).expand(table=tables) - @task( - retries=0, - trigger_rule='all_done', - doc_md="""A status message to report DAG success OR any failures from the `copy_tables` task.""" - ) - def status_message(tables, **context): - ti = context["ti"] - failures = [] - #iterate through mapped tasks to find any failure messages - for m_i in range(0, len(tables)): - failure_msg = ti.xcom_pull(key="extra_msg", task_ids="copy_tables", map_indexes=m_i) - if failure_msg is not None: - failures.append(failure_msg) - if failures == []: - send_slack_msg( - context=context, - msg=f"{DAG_NAME} DAG succeeded :white_check_mark:" - ) - else: #add details of failures to task_fail_slack_alert - failure_extra_msg = ['One or more tables failed to copy:', failures] - context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg) - raise AirflowFailException('One or more tables failed to copy.') - - # Waits for an external trigger - wait_for_external_trigger() >> tables >> copy_tables >> status_message(tables=tables) + @task( + retries=0, + trigger_rule='all_done', + doc_md="""A status message to report DAG success OR any failures from the `copy_tables` task.""" + ) + def status_message(tables, **context): + ti = context["ti"] + failures = [] + #iterate through mapped tasks to find any failure messages + for m_i in range(0, len(tables)): + failure_msg = ti.xcom_pull(key="extra_msg", task_ids="copy_tables", map_indexes=m_i) + if failure_msg is not None: + failures.append(failure_msg) + if failures == []: + send_slack_msg( + context=context, + msg=f"{DAG_NAME} DAG succeeded :white_check_mark:" + ) + else: #add details of failures to task_fail_slack_alert + failure_extra_msg = ['One or more tables failed to copy:', failures] + context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg) + raise AirflowFailException('One or more tables failed to copy.') + + # Waits for an external trigger + wait_for_external_trigger() >> tables >> copy_tables >> status_message(tables=tables) -counts_replicator() + replicator_DAG() From 9d8f261c09766c66c37c8cffd1a6a93e9ee2dd0b Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 13 Mar 2024 18:22:49 +0000 Subject: [PATCH 6/7] #894 rename + remove old dag --- dags/collisions_replicator.py | 64 ------------------- dags/{counts_replicator.py => replicators.py} | 0 2 files changed, 64 deletions(-) delete mode 100644 dags/collisions_replicator.py rename dags/{counts_replicator.py => replicators.py} (100%) diff --git a/dags/collisions_replicator.py b/dags/collisions_replicator.py deleted file mode 100644 index 99b32dac6..000000000 --- a/dags/collisions_replicator.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/data/airflow/airflow_venv/bin/python3 -# -*- coding: utf-8 -*- -# noqa: D415 -r"""### The Daily Collision Replicator DAG - -This DAG runs daily to copy MOVE's collisions tables from the ``move_staging`` -schema, which is updated by the MOVE's ``bigdata_replicator`` DAG, to the -``collisions`` schema. This DAG runs only when it is triggered by the MOVE's -DAG. -""" -import os -import sys -from datetime import timedelta -from functools import partial -import pendulum -# pylint: disable=import-error -from airflow.decorators import dag -from airflow.models import Variable - -# import custom operators and helper functions -repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) -sys.path.insert(0, repo_path) -# pylint: disable=wrong-import-position -from dags.dag_functions import task_fail_slack_alert -# pylint: enable=import-error - -DAG_NAME = "collisions_replicator" -DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"]) - -default_args = { - "owner": ",".join(DAG_OWNERS), - "depends_on_past": False, - "start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"), - "email_on_failure": False, - "retries": 3, - "retry_delay": timedelta(minutes=60), - "on_failure_callback": task_fail_slack_alert, -} - -@dag( - dag_id=DAG_NAME, - default_args=default_args, - catchup=False, - max_active_runs=5, - max_active_tasks=5, - schedule=None, - doc_md=__doc__, - tags=["collisions"] -) -def collisions_replicator(): - """The main function of the collisions DAG.""" - from dags.common_tasks import ( - wait_for_external_trigger, get_variable, copy_table - ) - - # Returns a list of source and destination tables stored in the given - # Airflow variable. - tables = get_variable.override(task_id="get_list_of_tables")("collisions_tables") - # Waits for an external trigger - wait_for_external_trigger() >> tables - # Copies tables - copy_table.override(task_id="copy_tables").partial(conn_id="collisions_bot").expand(table=tables) - -collisions_replicator() diff --git a/dags/counts_replicator.py b/dags/replicators.py similarity index 100% rename from dags/counts_replicator.py rename to dags/replicators.py From fd6e7fd754eed0196a27390bba919f1e44241ecd Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 13 Mar 2024 20:08:18 +0000 Subject: [PATCH 7/7] #894 fix issues with dynamic dag generation --- dags/replicators.py | 81 +++++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/dags/replicators.py b/dags/replicators.py index e0d268e1e..ba47c2be8 100644 --- a/dags/replicators.py +++ b/dags/replicators.py @@ -1,13 +1,6 @@ #!/data/airflow/airflow_venv/bin/python3 # -*- coding: utf-8 -*- # noqa: D415 -r"""### The Daily counts Replicator DAG - -This DAG runs daily to copy MOVE's counts tables from the ``move_staging`` -schema, which is updated by the MOVE's ``bigdata_replicator`` DAG, to the -``traffic`` schema. This DAG runs only when it is triggered by the MOVE's -DAG. -""" import os import sys from datetime import timedelta @@ -24,45 +17,29 @@ from dags.dag_functions import task_fail_slack_alert, send_slack_msg # pylint: enable=import-error -REPLICATORS = Variable.get('replicators', deserialize_json=True) - -for replicator in REPLICATORS: - - DAG_NAME = REPLICATORS[replicator]['dag_name'] - DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"]) - - default_args = { - "owner": ",".join(DAG_OWNERS), - "depends_on_past": False, - "start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"), - "email_on_failure": False, - "retries": 3, - "retry_delay": timedelta(minutes=60), - "on_failure_callback": task_fail_slack_alert, - } - +def create_replicator_dag(dag_id, short_name, tables_var, conn, doc_md, default_args): @dag( - dag_id=DAG_NAME, + dag_id=dag_id, default_args=default_args, catchup=False, max_active_runs=5, max_active_tasks=5, - schedule=None, - doc_md=__doc__, - tags=[replicator, "replicator"] + schedule=None, #triggered externally + doc_md=doc_md, + tags=[short_name, "replicator"] ) def replicator_DAG(): - f"""The main function of the {replicator} DAG.""" + f"""The main function of the {short_name} DAG.""" from dags.common_tasks import ( wait_for_external_trigger, get_variable, copy_table ) # Returns a list of source and destination tables stored in the given # Airflow variable. - tables = get_variable.override(task_id="get_list_of_tables")(REPLICATORS[replicator]['tables']) + tables = get_variable.override(task_id="get_list_of_tables")(tables_var) # Copies tables - copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id=REPLICATORS[replicator]['conn']).expand(table=tables) + copy_tables = copy_table.override(task_id="copy_tables", on_failure_callback = None).partial(conn_id=conn).expand(table=tables) @task( retries=0, @@ -80,7 +57,7 @@ def status_message(tables, **context): if failures == []: send_slack_msg( context=context, - msg=f"{DAG_NAME} DAG succeeded :white_check_mark:" + msg=f"`{dag_id}` DAG succeeded :white_check_mark:" ) else: #add details of failures to task_fail_slack_alert failure_extra_msg = ['One or more tables failed to copy:', failures] @@ -90,4 +67,42 @@ def status_message(tables, **context): # Waits for an external trigger wait_for_external_trigger() >> tables >> copy_tables >> status_message(tables=tables) - replicator_DAG() + generated_dag = replicator_DAG() + + return generated_dag + +#get replicator details from airflow variable +REPLICATORS = Variable.get('replicators', deserialize_json=True) + +#generate replicator DAGs from dict +for replicator, dag_items in REPLICATORS.items(): + DAG_NAME = dag_items['dag_name'] + DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"]) + + default_args = { + "owner": ",".join(DAG_OWNERS), + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"), + "email_on_failure": False, + "retries": 3, + "retry_delay": timedelta(minutes=60), + "on_failure_callback": task_fail_slack_alert, + } + + doc_md = f"""### The Daily {replicator} Replicator DAG + + This DAG runs daily to copy MOVE's {replicator} tables from the ``move_staging`` + schema, which is updated by the MOVE's ``bigdata_replicator`` DAG, to the + ``{replicator}`` schema. This DAG runs only when it is triggered by the MOVE's + DAG.""" + + globals()[DAG_NAME] = ( + create_replicator_dag( + dag_id=DAG_NAME, + short_name=replicator, + tables_var=dag_items['tables'], + conn=dag_items['conn'], + doc_md=doc_md, + default_args=default_args + ) + ) \ No newline at end of file