diff --git a/astronomer/providers/apache/hive/example_dags/example_hive.py b/astronomer/providers/apache/hive/example_dags/example_hive.py index 1e91b4e0d..1723c6870 100644 --- a/astronomer/providers/apache/hive/example_dags/example_hive.py +++ b/astronomer/providers/apache/hive/example_dags/example_hive.py @@ -4,7 +4,7 @@ import os import time from datetime import datetime, timedelta -from typing import Any, List +from typing import TYPE_CHECKING, Any, List from airflow import DAG, settings from airflow.exceptions import AirflowException @@ -26,6 +26,9 @@ NamedHivePartitionSensorAsync, ) +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + HIVE_CLUSTER = os.getenv("HIVE_CLUSTER", "example_hive_sensor_cluster") AWS_S3_CREDS = { "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID", "aws_access_key"), @@ -211,6 +214,39 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: raise error +def revoke_inbound_rules(task_instance: TaskInstance) -> None: + """Remove an ingress rule from security group""" + import boto3 + + current_docker_ip = get("https://api.ipify.org").text + ip_range = str(current_docker_ip) + "/32" + logging.info("Trying to revoke ingress ip address is: %s", str(ip_range)) + client = boto3.client("ec2", **AWS_S3_CREDS) + response = client.revoke_security_group_ingress( + CidrIp=ip_range, + FromPort=22, + ToPort=22, + GroupId=task_instance.xcom_pull( + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], + IpProtocol="tcp", + ) + + logging.info("%s", response) + + response = client.revoke_security_group_ingress( + CidrIp=ip_range, + FromPort=HIVE_OPERATOR_INGRESS_PORT, + ToPort=HIVE_OPERATOR_INGRESS_PORT, + GroupId=task_instance.xcom_pull( + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], + IpProtocol="tcp", + ) + + logging.info("%s", response) + + def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: """ Load the private_key from airflow variable and creates a pem_file @@ -326,6 +362,11 @@ def check_dag_status(**kwargs: Any) -> None: ) # [END add_ip_address_for_inbound_rules] + revoke_inbound_rule = PythonOperator( + task_id="revoke_inbound_rules", + python_callable=revoke_inbound_rules, + ) + # [START add_example_pi_file_in_hdfs] ssh_and_copy_pifile_to_hdfs = PythonOperator( task_id="ssh_and_copy_pifile_to_hdfs", @@ -411,5 +452,6 @@ def check_dag_status(**kwargs: Any) -> None: >> hive_sensor >> wait_for_partition >> remove_cluster + >> revoke_inbound_rule >> dag_final_status ) diff --git a/astronomer/providers/apache/livy/example_dags/example_livy.py b/astronomer/providers/apache/livy/example_dags/example_livy.py index f2cc834d1..b55d3b10c 100644 --- a/astronomer/providers/apache/livy/example_dags/example_livy.py +++ b/astronomer/providers/apache/livy/example_dags/example_livy.py @@ -8,7 +8,7 @@ import os import time from datetime import datetime, timedelta -from typing import Any, List +from typing import TYPE_CHECKING, Any, List from airflow import DAG, settings from airflow.exceptions import AirflowException @@ -25,6 +25,9 @@ from astronomer.providers.apache.livy.operators.livy import LivyOperatorAsync +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + LIVY_CLUSTER = os.getenv("LIVY_CLUSTER", "example_livy_operator_cluster") BOTO_DUPLICATE_PERMISSION_ERROR = "InvalidPermission.Duplicate" LIVY_JAVA_FILE = os.getenv("LIVY_JAVA_FILE", "/spark-examples.jar") @@ -174,6 +177,26 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: raise error +def revoke_inbound_rules(task_instance: TaskInstance) -> None: + """Remove an ingress rule from security group""" + import boto3 + + current_docker_ip = get("https://api.ipify.org").text + ip_range = str(current_docker_ip) + "/32" + logging.info("Trying to revoke ingress ip address is: %s", str(ip_range)) + client = boto3.client("ec2", **AWS_S3_CREDS) + response = client.revoke_security_group_ingress( + CidrIp=ip_range, + FromPort=LIVY_OPERATOR_INGRESS_PORT, + ToPort=LIVY_OPERATOR_INGRESS_PORT, + GroupId=task_instance.xcom_pull( + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], + IpProtocol="tcp", + ) + logging.info("%s", response) + + def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: """ Load the private_key from airflow variable and creates a pem_file @@ -336,6 +359,11 @@ def check_dag_status(**kwargs: Any) -> None: ) # [END howto_operator_emr_terminate_job_flow] + revoke_inbound_rule = PythonOperator( + task_id="revoke_inbound_rules", + python_callable=revoke_inbound_rules, + ) + dag_final_status = PythonOperator( task_id="dag_final_status", provide_context=True, @@ -354,5 +382,6 @@ def check_dag_status(**kwargs: Any) -> None: >> livy_java_task >> livy_python_task >> remove_cluster + >> revoke_inbound_rule >> dag_final_status ) diff --git a/astronomer/providers/sftp/example_dags/example_sftp.py b/astronomer/providers/sftp/example_dags/example_sftp.py index fa4d53a2b..832bfb481 100644 --- a/astronomer/providers/sftp/example_dags/example_sftp.py +++ b/astronomer/providers/sftp/example_dags/example_sftp.py @@ -157,6 +157,26 @@ def add_inbound_rule_for_security_group(task_instance: "TaskInstance") -> None: raise error +def revoke_inbound_rules(task_instance: TaskInstance) -> None: + """Remove an ingress rule from security group""" + import boto3 + + current_docker_ip = get("https://api.ipify.org").text + ip_range = str(current_docker_ip) + "/32" + logging.info("Trying to revoke ingress ip address is: %s", str(ip_range)) + client = boto3.client("ec2", **AWS_S3_CREDS) + response = client.revoke_security_group_ingress( + CidrIp=ip_range, + FromPort=22, + ToPort=22, + GroupId=task_instance.xcom_pull( + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], + IpProtocol="tcp", + ) + logging.info("%s", response) + + def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: """ Load the private_key from airflow variable and creates a pem_file @@ -279,6 +299,12 @@ def check_dag_status(**kwargs: Any) -> None: task_id="terminate_instance", trigger_rule=TriggerRule.ALL_DONE, python_callable=terminate_instance ) + revoke_inbound_rule = PythonOperator( + task_id="revoke_inbound_rules", + trigger_rule=TriggerRule.ALL_DONE, + python_callable=revoke_inbound_rules, + ) + dag_final_status = PythonOperator( task_id="dag_final_status", provide_context=True, @@ -295,5 +321,6 @@ def check_dag_status(**kwargs: Any) -> None: >> create_sftp_default_airflow_connection >> [async_sftp_sensor, async_sftp_sensor_without_pattern] >> terminate_ec2_instance + >> revoke_inbound_rule >> dag_final_status )