Skip to content

Commit

Permalink
added dag38
Browse files Browse the repository at this point in the history
  • Loading branch information
avikdatta committed Feb 14, 2024
1 parent 3d459e4 commit d9ddd5c
Showing 1 changed file with 78 additions and 4 deletions.
82 changes: 78 additions & 4 deletions igf_airflow/utils/dag38_project_cleanup_step2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
get_temp_dir,
read_json_data,
check_file_path)
from igf_portal.metadata_utils import _gzip_json_file
from igf_portal.api_utils import get_data_from_portal
from airflow.operators.python import get_current_context
from airflow.decorators import task
from igf_airflow.logging.upload_log_msg import send_log_to_channels
from igf_airflow.utils.dag22_bclconvert_demult_utils import (
_create_output_from_jinja_template)
from igf_airflow.utils.generic_airflow_utils import (
send_airflow_failed_logs_to_channels,
format_and_send_generic_email_to_user)
Expand All @@ -30,6 +27,10 @@
## PORTAL
IGF_PORTAL_CONF = \
Variable.get('igf_portal_conf', default_var=None)
PROJECT_CLEANUP_NOTIFY_USER_URI = \
Variable.get('project_cleanup_notify_user_uri', default_var=None)
PROJECT_CLEANUP_GET_DATA_URI = \
Variable.get('project_cleanup_get_data_uri', default_var=None)

## DB
DATABASE_CONFIG_FILE = Variable.get('database_config_file', default_var=None)
Expand All @@ -38,6 +39,46 @@
EMAIL_CONFIG = Variable.get("email_config", default_var=None)
EMAIL_TEMPLATE = Variable.get("project_cleanup_email_notification_template", default_var=None)

@task(
task_id="fetch_project_cleanup_data",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def fetch_project_cleanup_data() -> str:
try:
## dag_run.conf should have project_cleanup_id
context = get_current_context()
dag_run = context.get('dag_run')
project_cleanup_id = None
if dag_run is not None and \
dag_run.conf is not None and \
dag_run.conf.get('project_cleanup_id') is not None:
project_cleanup_id = \
dag_run.conf.get('project_cleanup_id')
if project_cleanup_id is None:
raise ValueError(
'project_cleanup_id not found in dag_run.conf')
project_cleanup_data = \
get_data_from_portal(
portal_config_file=IGF_PORTAL_CONF,
url_suffix=f'{PROJECT_CLEANUP_GET_DATA_URI}/{project_cleanup_id}',
request_mode='post')
temp_dir = \
get_temp_dir(use_ephemeral_space=True)
json_file = \
os.path.join(temp_dir, 'project_cleanup_data.json')
with open(json_file, 'w') as fp:
json.dump(project_cleanup_data, fp)
return json_file
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=e)
raise ValueError(e)


@task(
task_id="notify_user_about_project_cleanup",
retry_delay=timedelta(minutes=5),
Expand Down Expand Up @@ -68,6 +109,39 @@ def notify_user_about_project_cleanup(
email_data=dict(
projectLists=projects,
deletionDate=deletion_date))
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=e)
raise ValueError(e)


@task(
task_id="mark_user_notified_on_portal",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def mark_user_notified_on_portal() -> None:
try:
## dag_run.conf should have project_cleanup_id
context = get_current_context()
dag_run = context.get('dag_run')
project_cleanup_id = None
if dag_run is not None and \
dag_run.conf is not None and \
dag_run.conf.get('project_cleanup_id') is not None:
project_cleanup_id = \
dag_run.conf.get('project_cleanup_id')
if project_cleanup_id is None:
raise ValueError(
'project_cleanup_id not found in dag_run.conf')
_ = \
get_data_from_portal(
portal_config_file=IGF_PORTAL_CONF,
url_suffix=f'{PROJECT_CLEANUP_NOTIFY_USER_URI}/{project_cleanup_id}',
request_mode='post')
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
Expand Down

0 comments on commit d9ddd5c

Please sign in to comment.