Skip to content

Commit

Permalink
Add task to revoke ingress rule (#1494)
Browse files Browse the repository at this point in the history
* Add task to revoke ingress rule
  • Loading branch information
pankajastro authored Mar 19, 2024
1 parent cde3421 commit a6f8f8d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
44 changes: 43 additions & 1 deletion astronomer/providers/apache/hive/example_dags/example_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import time
from datetime import datetime, timedelta
from typing import Any, List
from typing import TYPE_CHECKING, Any, List

from airflow import DAG, settings
from airflow.exceptions import AirflowException
Expand All @@ -26,6 +26,9 @@
NamedHivePartitionSensorAsync,
)

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

HIVE_CLUSTER = os.getenv("HIVE_CLUSTER", "example_hive_sensor_cluster")
AWS_S3_CREDS = {
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID", "aws_access_key"),
Expand Down Expand Up @@ -211,6 +214,39 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None:
raise error


def revoke_inbound_rules(task_instance: TaskInstance) -> None:
"""Remove an ingress rule from security group"""
import boto3

current_docker_ip = get("https://api.ipify.org").text
ip_range = str(current_docker_ip) + "/32"
logging.info("Trying to revoke ingress ip address is: %s", str(ip_range))
client = boto3.client("ec2", **AWS_S3_CREDS)
response = client.revoke_security_group_ingress(
CidrIp=ip_range,
FromPort=22,
ToPort=22,
GroupId=task_instance.xcom_pull(
key="cluster_response_master_security_group", task_ids=["describe_created_cluster"]
)[0],
IpProtocol="tcp",
)

logging.info("%s", response)

response = client.revoke_security_group_ingress(
CidrIp=ip_range,
FromPort=HIVE_OPERATOR_INGRESS_PORT,
ToPort=HIVE_OPERATOR_INGRESS_PORT,
GroupId=task_instance.xcom_pull(
key="cluster_response_master_security_group", task_ids=["describe_created_cluster"]
)[0],
IpProtocol="tcp",
)

logging.info("%s", response)


def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
Load the private_key from airflow variable and creates a pem_file
Expand Down Expand Up @@ -326,6 +362,11 @@ def check_dag_status(**kwargs: Any) -> None:
)
# [END add_ip_address_for_inbound_rules]

revoke_inbound_rule = PythonOperator(
task_id="revoke_inbound_rules",
python_callable=revoke_inbound_rules,
)

# [START add_example_pi_file_in_hdfs]
ssh_and_copy_pifile_to_hdfs = PythonOperator(
task_id="ssh_and_copy_pifile_to_hdfs",
Expand Down Expand Up @@ -411,5 +452,6 @@ def check_dag_status(**kwargs: Any) -> None:
>> hive_sensor
>> wait_for_partition
>> remove_cluster
>> revoke_inbound_rule
>> dag_final_status
)
31 changes: 30 additions & 1 deletion astronomer/providers/apache/livy/example_dags/example_livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import time
from datetime import datetime, timedelta
from typing import Any, List
from typing import TYPE_CHECKING, Any, List

from airflow import DAG, settings
from airflow.exceptions import AirflowException
Expand All @@ -25,6 +25,9 @@

from astronomer.providers.apache.livy.operators.livy import LivyOperatorAsync

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

LIVY_CLUSTER = os.getenv("LIVY_CLUSTER", "example_livy_operator_cluster")
BOTO_DUPLICATE_PERMISSION_ERROR = "InvalidPermission.Duplicate"
LIVY_JAVA_FILE = os.getenv("LIVY_JAVA_FILE", "/spark-examples.jar")
Expand Down Expand Up @@ -174,6 +177,26 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None:
raise error


def revoke_inbound_rules(task_instance: TaskInstance) -> None:
"""Remove an ingress rule from security group"""
import boto3

current_docker_ip = get("https://api.ipify.org").text
ip_range = str(current_docker_ip) + "/32"
logging.info("Trying to revoke ingress ip address is: %s", str(ip_range))
client = boto3.client("ec2", **AWS_S3_CREDS)
response = client.revoke_security_group_ingress(
CidrIp=ip_range,
FromPort=LIVY_OPERATOR_INGRESS_PORT,
ToPort=LIVY_OPERATOR_INGRESS_PORT,
GroupId=task_instance.xcom_pull(
key="cluster_response_master_security_group", task_ids=["describe_created_cluster"]
)[0],
IpProtocol="tcp",
)
logging.info("%s", response)


def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
Load the private_key from airflow variable and creates a pem_file
Expand Down Expand Up @@ -336,6 +359,11 @@ def check_dag_status(**kwargs: Any) -> None:
)
# [END howto_operator_emr_terminate_job_flow]

revoke_inbound_rule = PythonOperator(
task_id="revoke_inbound_rules",
python_callable=revoke_inbound_rules,
)

dag_final_status = PythonOperator(
task_id="dag_final_status",
provide_context=True,
Expand All @@ -354,5 +382,6 @@ def check_dag_status(**kwargs: Any) -> None:
>> livy_java_task
>> livy_python_task
>> remove_cluster
>> revoke_inbound_rule
>> dag_final_status
)
27 changes: 27 additions & 0 deletions astronomer/providers/sftp/example_dags/example_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ def add_inbound_rule_for_security_group(task_instance: "TaskInstance") -> None:
raise error


def revoke_inbound_rules(task_instance: TaskInstance) -> None:
"""Remove an ingress rule from security group"""
import boto3

current_docker_ip = get("https://api.ipify.org").text
ip_range = str(current_docker_ip) + "/32"
logging.info("Trying to revoke ingress ip address is: %s", str(ip_range))
client = boto3.client("ec2", **AWS_S3_CREDS)
response = client.revoke_security_group_ingress(
CidrIp=ip_range,
FromPort=22,
ToPort=22,
GroupId=task_instance.xcom_pull(
key="cluster_response_master_security_group", task_ids=["describe_created_cluster"]
)[0],
IpProtocol="tcp",
)
logging.info("%s", response)


def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
Load the private_key from airflow variable and creates a pem_file
Expand Down Expand Up @@ -279,6 +299,12 @@ def check_dag_status(**kwargs: Any) -> None:
task_id="terminate_instance", trigger_rule=TriggerRule.ALL_DONE, python_callable=terminate_instance
)

revoke_inbound_rule = PythonOperator(
task_id="revoke_inbound_rules",
trigger_rule=TriggerRule.ALL_DONE,
python_callable=revoke_inbound_rules,
)

dag_final_status = PythonOperator(
task_id="dag_final_status",
provide_context=True,
Expand All @@ -295,5 +321,6 @@ def check_dag_status(**kwargs: Any) -> None:
>> create_sftp_default_airflow_connection
>> [async_sftp_sensor, async_sftp_sensor_without_pattern]
>> terminate_ec2_instance
>> revoke_inbound_rule
>> dag_final_status
)

0 comments on commit a6f8f8d

Please sign in to comment.