diff --git a/igf_airflow/utils/dag38_project_cleanup_step2_utils.py b/igf_airflow/utils/dag38_project_cleanup_step2_utils.py index 1ff526e6..f74a6304 100644 --- a/igf_airflow/utils/dag38_project_cleanup_step2_utils.py +++ b/igf_airflow/utils/dag38_project_cleanup_step2_utils.py @@ -6,6 +6,7 @@ from airflow.models import Variable from igf_data.utils.fileutils import ( get_temp_dir, + read_json_data, check_file_path) from igf_portal.metadata_utils import _gzip_json_file from airflow.operators.python import get_current_context @@ -15,7 +16,7 @@ _create_output_from_jinja_template) from igf_airflow.utils.generic_airflow_utils import ( send_airflow_failed_logs_to_channels, - format_and_send_email_to_user) + format_and_send_generic_email_to_user) log = logging.getLogger(__name__) @@ -38,34 +39,35 @@ EMAIL_TEMPLATE = Variable.get("project_cleanup_email_notification_template", default_var=None) @task( - task_id="send_email_to_user", + task_id="notify_user_about_project_cleanup", retry_delay=timedelta(minutes=5), retries=4, queue='hpc_4G') -def send_email_to_user( +def notify_user_about_project_cleanup( + project_cleanup_data_file: str, send_email: bool = True, email_user_key: str = 'username') -> None: try: - ## dag_run.conf should have analysis_id - context = get_current_context() - dag_run = context.get('dag_run') - analysis_id = None - if dag_run is not None and \ - dag_run.conf is not None and \ - dag_run.conf.get('analysis_id') is not None: - analysis_id = \ - dag_run.conf.get('analysis_id') - if analysis_id is None: - raise ValueError( - 'analysis_id not found in dag_run.conf') - ## get default user from email config - format_and_send_email_to_user( - email_template=EMAIL_TEMPLATE, - email_config_file=EMAIL_CONFIG, - analysis_id=analysis_id, - database_config_file=DATABASE_CONFIG_FILE, - email_user_key=email_user_key, - send_email=send_email) + check_file_path(project_cleanup_data_file) + json_data = read_json_data(project_cleanup_data_file) + if isinstance(json_data, list): + json_data = json_data[0] + user_name = json_data.get("user_name") + user_email = json_data.get("user_email") + ## TO DO: check project after getting json dump + projects = json_data.get("projects") + deletion_date = json_data.get("deletion_date") + ## get default user from email config + format_and_send_generic_email_to_user( + user_name=user_name, + user_email=user_email, + email_template=EMAIL_TEMPLATE, + email_config_file=EMAIL_CONFIG, + email_user_key=email_user_key, + send_email=send_email, + email_data=dict( + projectLists=projects, + deletionDate=deletion_date)) except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( diff --git a/igf_airflow/utils/generic_airflow_utils.py b/igf_airflow/utils/generic_airflow_utils.py index 92e14684..2dd2a988 100644 --- a/igf_airflow/utils/generic_airflow_utils.py +++ b/igf_airflow/utils/generic_airflow_utils.py @@ -2,6 +2,7 @@ import logging from datetime import timedelta from igf_data.utils.fileutils import ( + get_temp_dir, check_file_path, read_json_data) from airflow.decorators import task @@ -11,6 +12,7 @@ from igf_airflow.utils.dag22_bclconvert_demult_utils import ( send_email_via_smtp) from igf_airflow.utils.dag26_snakemake_rnaseq_utils import ( + _create_output_from_jinja_template, generate_email_text_for_analysis) log = logging.getLogger(__name__) @@ -82,3 +84,54 @@ def format_and_send_email_to_user( except Exception as e: raise ValueError(f"Failed to send email, error: {e}") + +def format_and_send_generic_email_to_user( + user_name: str, + user_email: str, + email_template: str, + email_config_file: str, + email_user_key: str = 'username', + send_email: bool = True, + email_data: dict = {}) \ + -> None: + try: + check_file_path(email_template) + check_file_path(email_config_file) + ## get default user from email config + email_config = \ + read_json_data(email_config_file) + if isinstance(email_config, list): + email_config = email_config[0] + default_email_user = \ + email_config.get(email_user_key) + if default_email_user is None: + raise KeyError( + f"Missing default user info in email config file {email_config_file}") + ## generate email text + temp_dir = \ + get_temp_dir(use_ephemeral_space=True) + output_file = \ + os.path.join(temp_dir, 'email.txt') + email_template_data = \ + dict( + user_email=user_email, + defaultUser=default_email_user, + user_name=user_name, + send_email_to_user=send_email) + if len(email_data) > 0: + email_template_data.\ + update(**email_data) + _create_output_from_jinja_template( + template_file=email_template, + output_file=output_file, + autoescape_list=['xml', 'html'], + data=email_template_data) + ## send email to user + send_email_via_smtp( + sender=default_email_user, + receivers=[user_email, default_email_user], + email_config_json=email_config_file, + email_text_file=output_file) + except Exception as e: + raise ValueError( + f"Failed to send email, error: {e}") \ No newline at end of file