diff --git a/src/aibs_informatics_cdk_lib/constructs_/batch/infrastructure.py b/src/aibs_informatics_cdk_lib/constructs_/batch/infrastructure.py index 9c56be9..2a43760 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/batch/infrastructure.py +++ b/src/aibs_informatics_cdk_lib/constructs_/batch/infrastructure.py @@ -26,8 +26,6 @@ BATCH_READ_ONLY_ACTIONS, S3_READ_ONLY_ACCESS_ACTIONS, batch_policy_statement, - dynamodb_policy_statement, - lambda_policy_statement, ) from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstruct from aibs_informatics_cdk_lib.constructs_.batch.launch_template import IBatchLaunchTemplateBuilder @@ -40,6 +38,17 @@ class Batch(EnvBaseConstruct): """ + Out of the box Batch construct that can be used to create multiple Batch Environments. + + This construct creates simplifies the creation of Batch Environments. + It allows for the creation of multiple Batch Environments with different configurations + and launch templates, but using the same instance role and security group. + + Notes: + - Instance Roles are created with a set of managed policies that are commonly used + by Batch jobs. It also includes custom resources to allow access to S3, Lambda, and DynamoDB. + + Defines: - Batch Compute Environment (Spot and OnDemand) - Instance Role @@ -53,6 +62,7 @@ def __init__( id: str, env_base: EnvBase, vpc: ec2.IVpc, + instance_role_name: Optional[str] = None, instance_role_policy_statements: Optional[List[iam.PolicyStatement]] = None, ) -> None: super().__init__(scope, id, env_base) @@ -64,8 +74,11 @@ def __init__( # - security group # - launch template # --------------------------------------------------------------------- - self.instance_role = self.create_instance_role(instance_role_policy_statements) - self.instance_profile = self.create_instance_profile() + self.instance_role = self.create_instance_role( + role_name=instance_role_name, + statements=instance_role_policy_statements, + ) + self.instance_profile = self.create_instance_profile(self.instance_role.role_name) self.security_group = self.create_security_group() self._batch_environment_mapping: MutableMapping[str, BatchEnvironment] = {} @@ -77,11 +90,14 @@ def environments(self) -> List["BatchEnvironment"]: ) def create_instance_role( - self, statements: Optional[List[iam.PolicyStatement]] = None + self, + role_name: Optional[str] = None, + statements: Optional[List[iam.PolicyStatement]] = None, ) -> iam.Role: instance_role = iam.Role( self, self.get_child_id(self, f"instance-role"), + role_name=role_name, description="Role used by ec2 instance in batch compute environment", assumed_by=iam.ServicePrincipal("ec2.amazonaws.com"), # type: ignore # Interface not inferred ) @@ -152,23 +168,6 @@ def create_instance_role( resources=["*"], ), batch_policy_statement(actions=BATCH_READ_ONLY_ACTIONS, env_base=self.env_base), - lambda_policy_statement(actions=["lambda:InvokeFunction"], env_base=self.env_base), - dynamodb_policy_statement( - env_base=self.env_base, - sid="DynamoDBReadWrite", - actions=[ - "dynamodb:BatchGet*", - "dynamodb:DescribeStream", - "dynamodb:DescribeTable", - "dynamodb:Get*", - "dynamodb:Query", - "dynamodb:Scan", - "dynamodb:BatchWrite*", - "dynamodb:Delete*", - "dynamodb:Update*", - "dynamodb:PutItem", - ], - ), ], roles=[instance_role], # type: ignore # Role is not inferred as IRole ) @@ -184,11 +183,11 @@ def create_instance_role( return instance_role - def create_instance_profile(self) -> iam.CfnInstanceProfile: + def create_instance_profile(self, instance_role_name: str) -> iam.CfnInstanceProfile: return iam.CfnInstanceProfile( self, f"instance-profile", - roles=[self.instance_role.role_name], + roles=[instance_role_name], ) def create_security_group(self) -> ec2.SecurityGroup: diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py index 1bb33ef..4452624 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py @@ -173,7 +173,7 @@ def end_states(self) -> List[sfn.INextable]: def enclose( self, - id: str, + id: Optional[str] = None, input_path: Optional[str] = None, result_path: Optional[str] = None, ) -> sfn.Chain: @@ -193,6 +193,8 @@ def enclose( Returns: sfn.Chain: the new state machine fragment """ + id = id or self.node.id + if input_path is None: input_path = "$" if result_path is None: diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/__init__.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/__init__.py new file mode 100644 index 0000000..2d5c288 --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/__init__.py @@ -0,0 +1,14 @@ +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.batch import ( + BatchInvokedExecutorFragment, + BatchInvokedLambdaFunction, +) +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.data_sync import ( + DataSyncFragment, +) +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.demand_execution import ( + DemandExecutionFragment, +) +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.efs import ( + CleanFileSystemFragment, + CleanFileSystemTriggerConfig, +) diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py similarity index 55% rename from src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics.py rename to src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py index 19c4ac3..23d5da5 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics.py +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py @@ -205,6 +205,7 @@ def with_defaults( image_path: Optional[str] = None, handler_path: Optional[str] = None, payload_path: Optional[str] = None, + overrides_path: Optional[str] = None, command: Optional[List[str]] = None, memory: str = "1024", vcpus: str = "1", @@ -260,7 +261,7 @@ def with_defaults( "handler": sfn.JsonPath.string_at(handler_path or "$.handler"), "payload": sfn.JsonPath.object_at(payload_path or "$.payload"), # We will merge the rest with the defaults - "input": sfn.JsonPath.object_at("$"), + "input": sfn.JsonPath.object_at(overrides_path if overrides_path else "$"), }, ) @@ -396,331 +397,3 @@ def start_state(self) -> sfn.State: @property def end_states(self) -> List[sfn.INextable]: return self.definition.end_states - - -class DataSyncFragment(BatchInvokedBaseFragment, EnvBaseConstructMixins): - def __init__( - self, - scope: constructs.Construct, - id: str, - env_base: EnvBase, - aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], - batch_job_queue: Union[batch.JobQueue, str], - scaffolding_bucket: s3.Bucket, - mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, - ) -> None: - """Sync data from one s3 bucket to another - - - Args: - scope (Construct): construct scope - id (str): id - env_base (EnvBase): env base - aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri - str for the aibs informatics aws lambda - batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that - the batch job will be submitted to. This can be override by the payload. - primary_bucket (Bucket): Primary bucket used for request/response json blobs used in - the batch invoked lambda function. - mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): - List of mount point configurations to use. These can be overridden in the payload. - - """ - super().__init__(scope, id, env_base) - - aibs_informatics_image_uri = ( - aibs_informatics_docker_asset - if isinstance(aibs_informatics_docker_asset, str) - else aibs_informatics_docker_asset.image_uri - ) - - self.batch_job_queue_name = ( - batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name - ) - - start = sfn.Pass( - self, - "Input Restructure", - parameters={ - "handler": "aibs_informatics_aws_lambda.handlers.data_sync.data_sync_handler", - "image": aibs_informatics_image_uri, - "payload": sfn.JsonPath.object_at("$"), - }, - ) - - self.fragment = BatchInvokedLambdaFunction.with_defaults( - self, - "Data Sync", - env_base=self.env_base, - name="data-sync", - job_queue=self.batch_job_queue_name, - bucket_name=scaffolding_bucket.bucket_name, - handler_path="$.handler", - image_path="$.image", - payload_path="$.payload", - memory="1024", - vcpus="1", - mount_point_configs=list(mount_point_configs) if mount_point_configs else None, - environment={ - EnvBase.ENV_BASE_KEY: self.env_base, - "AWS_REGION": self.aws_region, - "AWS_ACCOUNT_ID": self.aws_account, - }, - ) - - self.definition = start.next(self.fragment.to_single_state()) - - @property - def required_managed_policies(self) -> List[Union[iam.IManagedPolicy, str]]: - return [ - *super().required_managed_policies, - *[_ for _ in self.fragment.required_managed_policies], - ] - - @property - def required_inline_policy_statements(self) -> List[iam.PolicyStatement]: - return [ - *self.fragment.required_inline_policy_statements, - *super().required_inline_policy_statements, - sfn_policy_statement( - self.env_base, - actions=SFN_STATES_EXECUTION_ACTIONS + SFN_STATES_READ_ACCESS_ACTIONS, - ), - ] - - -class DemandExecutionFragment(EnvBaseStateMachineFragment, EnvBaseConstructMixins): - def __init__( - self, - scope: constructs.Construct, - id: str, - env_base: EnvBase, - aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], - scaffolding_bucket: s3.Bucket, - scaffolding_job_queue: Union[batch.JobQueue, str], - batch_invoked_lambda_state_machine: sfn.StateMachine, - data_sync_state_machine: sfn.StateMachine, - shared_mount_point_config: Optional[MountPointConfiguration], - scratch_mount_point_config: Optional[MountPointConfiguration], - ) -> None: - super().__init__(scope, id, env_base) - - # ----------------- Validation ----------------- - if not (shared_mount_point_config and scratch_mount_point_config) or not ( - shared_mount_point_config or scratch_mount_point_config - ): - raise ValueError( - "If shared or scratch mount point configurations are provided," - "Both shared and scratch mount point configurations must be provided." - ) - - # ------------------- Setup ------------------- - - config_scaffolding_path = "config.scaffolding" - config_setup_results_path = f"{config_scaffolding_path}.setup_results" - config_batch_args_path = f"{config_setup_results_path}.batch_args" - - config_cleanup_results_path = f"tasks.cleanup.cleanup_results" - - # Create common kwargs for the batch invoked lambda functions - # - specify the bucket name and job queue - # - specify the mount points and volumes if provided - batch_invoked_lambda_kwargs: dict[str, Any] = { - "bucket_name": scaffolding_bucket.bucket_name, - "image": aibs_informatics_docker_asset - if isinstance(aibs_informatics_docker_asset, str) - else aibs_informatics_docker_asset.image_uri, - "job_queue": scaffolding_job_queue - if isinstance(scaffolding_job_queue, str) - else scaffolding_job_queue.job_queue_name, - } - - # Create request input for the demand scaffolding - file_system_configurations = {} - - # Update arguments with mount points and volumes if provided - if shared_mount_point_config or scratch_mount_point_config: - mount_points = [] - volumes = [] - if shared_mount_point_config: - # update file system configurations for scaffolding function - file_system_configurations["shared"] = { - "file_system": shared_mount_point_config.file_system_id, - "access_point": shared_mount_point_config.access_point_id, - "container_path": shared_mount_point_config.mount_point, - } - # add to mount point and volumes list for batch invoked lambda functions - mount_points.append( - shared_mount_point_config.to_batch_mount_point("shared", sfn_format=True) - ) - volumes.append( - shared_mount_point_config.to_batch_volume("shared", sfn_format=True) - ) - - if scratch_mount_point_config: - # update file system configurations for scaffolding function - file_system_configurations["scratch"] = { - "file_system": scratch_mount_point_config.file_system_id, - "access_point": scratch_mount_point_config.access_point_id, - "container_path": scratch_mount_point_config.mount_point, - } - # add to mount point and volumes list for batch invoked lambda functions - mount_points.append( - scratch_mount_point_config.to_batch_mount_point("scratch", sfn_format=True) - ) - volumes.append( - scratch_mount_point_config.to_batch_volume("scratch", sfn_format=True) - ) - - batch_invoked_lambda_kwargs["mount_points"] = mount_points - batch_invoked_lambda_kwargs["volumes"] = volumes - - start_state = sfn.Pass( - self, - f"Start Demand Batch Task", - parameters={ - "request": { - "demand_execution": sfn.JsonPath.object_at("$"), - "file_system_configurations": file_system_configurations, - } - }, - ) - - prep_scaffolding_task = CommonOperation.enclose_chainable( - self, - "Prepare Demand Scaffolding", - sfn.Pass( - self, - "Pass: Prepare Demand Scaffolding", - parameters={ - "handler": "aibs_informatics_aws_lambda.handlers.demand.scaffolding.handler", - "payload": sfn.JsonPath.object_at("$"), - **batch_invoked_lambda_kwargs, - }, - ).next( - sfn_tasks.StepFunctionsStartExecution( - self, - "SM: Prepare Demand Scaffolding", - state_machine=batch_invoked_lambda_state_machine, - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - associate_with_parent=False, - input_path="$", - output_path=f"$.Output", - ) - ), - input_path="$.request", - result_path=f"$.{config_scaffolding_path}", - ) - - create_def_and_prepare_job_args_task = CommonOperation.enclose_chainable( - self, - "Create Definition and Prep Job Args", - sfn.Pass( - self, - "Pass: Create Definition and Prep Job Args", - parameters={ - "handler": "aibs_informatics_aws_lambda.handlers.batch.create.handler", - "payload": sfn.JsonPath.object_at("$"), - **batch_invoked_lambda_kwargs, - }, - ).next( - sfn_tasks.StepFunctionsStartExecution( - self, - "SM: Create Definition and Prep Job Args", - state_machine=batch_invoked_lambda_state_machine, - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - associate_with_parent=False, - input_path="$", - output_path=f"$.Output", - ) - ), - input_path="$.batch_create_request", - result_path=f"$", - ) - - setup_tasks = ( - sfn.Parallel( - self, - "Execution Setup Steps", - input_path=f"$.{config_scaffolding_path}.setup_configs", - result_path=f"$.{'.'.join(config_batch_args_path.split('.')[:-1])}", - result_selector={f'{config_batch_args_path.split(".")[-1]}.$': "$[0]"}, - ) - .branch(create_def_and_prepare_job_args_task) - .branch( - sfn.Map( - self, - "Transfer Inputs TO Batch Job", - items_path="$.data_sync_requests", - ).iterator( - sfn_tasks.StepFunctionsStartExecution( - self, - "Transfer Input", - state_machine=data_sync_state_machine, - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - associate_with_parent=False, - result_path=sfn.JsonPath.DISCARD, - ) - ) - ) - ) - - execution_task = sfn.CustomState( - self, - f"Submit Batch Job", - state_json={ - "Type": "Task", - "Resource": "arn:aws:states:::batch:submitJob.sync", - # fmt: off - "Parameters": { - "JobName.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_name"), - "JobDefinition.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_definition_arn"), - "JobQueue.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_queue_arn"), - "Parameters.$": sfn.JsonPath.object_at(f"$.{config_batch_args_path}.parameters"), - "ContainerOverrides.$": sfn.JsonPath.object_at(f"$.{config_batch_args_path}.container_overrides"), - }, - # fmt: on - "ResultPath": "$.tasks.batch_submit_task", - }, - ) - - cleanup_tasks = sfn.Chain.start( - sfn.Map( - self, - "Transfer Results FROM Batch Job", - input_path=f"$.{config_scaffolding_path}.cleanup_configs.data_sync_requests", - result_path=f"$.{config_cleanup_results_path}.transfer_results", - ).iterator( - sfn_tasks.StepFunctionsStartExecution( - self, - "Transfer Result", - state_machine=data_sync_state_machine, - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - associate_with_parent=False, - result_path=sfn.JsonPath.DISCARD, - ) - ) - ).to_single_state("Execution Cleanup Steps", output_path="$[0]") - - # fmt: off - definition = ( - start_state - .next(prep_scaffolding_task) - .next(setup_tasks) - .next(execution_task) - .next(cleanup_tasks) - ) - # fmt: on - self.definition = definition - - @property - def required_inline_policy_statements(self) -> List[iam.PolicyStatement]: - return [ - *super().required_inline_policy_statements, - batch_policy_statement(self.env_base), - s3_policy_statement(self.env_base), - sfn_policy_statement( - self.env_base, - actions=SFN_STATES_EXECUTION_ACTIONS + SFN_STATES_READ_ACCESS_ACTIONS, - ), - ] diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py new file mode 100644 index 0000000..fd4f0ac --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py @@ -0,0 +1,112 @@ +from typing import TYPE_CHECKING, Iterable, List, Optional, Union + +import constructs +from aibs_informatics_core.env import EnvBase +from aws_cdk import aws_batch_alpha as batch +from aws_cdk import aws_ecr_assets as ecr_assets +from aws_cdk import aws_iam as iam +from aws_cdk import aws_s3 as s3 +from aws_cdk import aws_stepfunctions as sfn + +from aibs_informatics_cdk_lib.common.aws.iam_utils import ( + SFN_STATES_EXECUTION_ACTIONS, + SFN_STATES_READ_ACCESS_ACTIONS, + sfn_policy_statement, +) +from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstructMixins +from aibs_informatics_cdk_lib.constructs_.efs.file_system import MountPointConfiguration +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.batch import ( + BatchInvokedBaseFragment, + BatchInvokedLambdaFunction, +) + + +class DataSyncFragment(BatchInvokedBaseFragment, EnvBaseConstructMixins): + def __init__( + self, + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + batch_job_queue: Union[batch.JobQueue, str], + scaffolding_bucket: s3.Bucket, + mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, + ) -> None: + """Sync data from one s3 bucket to another + + + Args: + scope (Construct): construct scope + id (str): id + env_base (EnvBase): env base + aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri + str for the aibs informatics aws lambda + batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + primary_bucket (Bucket): Primary bucket used for request/response json blobs used in + the batch invoked lambda function. + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): + List of mount point configurations to use. These can be overridden in the payload. + + """ + super().__init__(scope, id, env_base) + + aibs_informatics_image_uri = ( + aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri + ) + + self.batch_job_queue_name = ( + batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name + ) + + start = sfn.Pass( + self, + "Input Restructure", + parameters={ + "handler": "aibs_informatics_aws_lambda.handlers.data_sync.data_sync_handler", + "image": aibs_informatics_image_uri, + "payload": sfn.JsonPath.object_at("$"), + }, + ) + + self.fragment = BatchInvokedLambdaFunction.with_defaults( + self, + "Data Sync", + env_base=self.env_base, + name="data-sync", + job_queue=self.batch_job_queue_name, + bucket_name=scaffolding_bucket.bucket_name, + handler_path="$.handler", + image_path="$.image", + payload_path="$.payload", + memory="1024", + vcpus="1", + mount_point_configs=list(mount_point_configs) if mount_point_configs else None, + environment={ + EnvBase.ENV_BASE_KEY: self.env_base, + "AWS_REGION": self.aws_region, + "AWS_ACCOUNT_ID": self.aws_account, + }, + ) + + self.definition = start.next(self.fragment.to_single_state()) + + @property + def required_managed_policies(self) -> List[Union[iam.IManagedPolicy, str]]: + return [ + *super().required_managed_policies, + *[_ for _ in self.fragment.required_managed_policies], + ] + + @property + def required_inline_policy_statements(self) -> List[iam.PolicyStatement]: + return [ + *self.fragment.required_inline_policy_statements, + *super().required_inline_policy_statements, + sfn_policy_statement( + self.env_base, + actions=SFN_STATES_EXECUTION_ACTIONS + SFN_STATES_READ_ACCESS_ACTIONS, + ), + ] diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py new file mode 100644 index 0000000..4e57e8b --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py @@ -0,0 +1,259 @@ +from typing import Any, List, Optional, Union + +import constructs +from aibs_informatics_core.env import EnvBase +from aws_cdk import aws_batch_alpha as batch +from aws_cdk import aws_ecr_assets as ecr_assets +from aws_cdk import aws_iam as iam +from aws_cdk import aws_s3 as s3 +from aws_cdk import aws_stepfunctions as sfn +from aws_cdk import aws_stepfunctions_tasks as sfn_tasks + +from aibs_informatics_cdk_lib.common.aws.iam_utils import ( + SFN_STATES_EXECUTION_ACTIONS, + SFN_STATES_READ_ACCESS_ACTIONS, + batch_policy_statement, + s3_policy_statement, + sfn_policy_statement, +) +from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstructMixins +from aibs_informatics_cdk_lib.constructs_.efs.file_system import MountPointConfiguration +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.base import EnvBaseStateMachineFragment +from aibs_informatics_cdk_lib.constructs_.sfn.states.common import CommonOperation + + +class DemandExecutionFragment(EnvBaseStateMachineFragment, EnvBaseConstructMixins): + def __init__( + self, + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + scaffolding_bucket: s3.Bucket, + scaffolding_job_queue: Union[batch.JobQueue, str], + batch_invoked_lambda_state_machine: sfn.StateMachine, + data_sync_state_machine: sfn.StateMachine, + shared_mount_point_config: Optional[MountPointConfiguration], + scratch_mount_point_config: Optional[MountPointConfiguration], + ) -> None: + super().__init__(scope, id, env_base) + + # ----------------- Validation ----------------- + if not (shared_mount_point_config and scratch_mount_point_config) or not ( + shared_mount_point_config or scratch_mount_point_config + ): + raise ValueError( + "If shared or scratch mount point configurations are provided," + "Both shared and scratch mount point configurations must be provided." + ) + + # ------------------- Setup ------------------- + + config_scaffolding_path = "config.scaffolding" + config_setup_results_path = f"{config_scaffolding_path}.setup_results" + config_batch_args_path = f"{config_setup_results_path}.batch_args" + + config_cleanup_results_path = f"tasks.cleanup.cleanup_results" + + # Create common kwargs for the batch invoked lambda functions + # - specify the bucket name and job queue + # - specify the mount points and volumes if provided + batch_invoked_lambda_kwargs: dict[str, Any] = { + "bucket_name": scaffolding_bucket.bucket_name, + "image": aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri, + "job_queue": scaffolding_job_queue + if isinstance(scaffolding_job_queue, str) + else scaffolding_job_queue.job_queue_name, + } + + # Create request input for the demand scaffolding + file_system_configurations = {} + + # Update arguments with mount points and volumes if provided + if shared_mount_point_config or scratch_mount_point_config: + mount_points = [] + volumes = [] + if shared_mount_point_config: + # update file system configurations for scaffolding function + file_system_configurations["shared"] = { + "file_system": shared_mount_point_config.file_system_id, + "access_point": shared_mount_point_config.access_point_id, + "container_path": shared_mount_point_config.mount_point, + } + # add to mount point and volumes list for batch invoked lambda functions + mount_points.append( + shared_mount_point_config.to_batch_mount_point("shared", sfn_format=True) + ) + volumes.append( + shared_mount_point_config.to_batch_volume("shared", sfn_format=True) + ) + + if scratch_mount_point_config: + # update file system configurations for scaffolding function + file_system_configurations["scratch"] = { + "file_system": scratch_mount_point_config.file_system_id, + "access_point": scratch_mount_point_config.access_point_id, + "container_path": scratch_mount_point_config.mount_point, + } + # add to mount point and volumes list for batch invoked lambda functions + mount_points.append( + scratch_mount_point_config.to_batch_mount_point("scratch", sfn_format=True) + ) + volumes.append( + scratch_mount_point_config.to_batch_volume("scratch", sfn_format=True) + ) + + batch_invoked_lambda_kwargs["mount_points"] = mount_points + batch_invoked_lambda_kwargs["volumes"] = volumes + + start_state = sfn.Pass( + self, + f"Start Demand Batch Task", + parameters={ + "request": { + "demand_execution": sfn.JsonPath.object_at("$"), + "file_system_configurations": file_system_configurations, + } + }, + ) + + prep_scaffolding_task = CommonOperation.enclose_chainable( + self, + "Prepare Demand Scaffolding", + sfn.Pass( + self, + "Pass: Prepare Demand Scaffolding", + parameters={ + "handler": "aibs_informatics_aws_lambda.handlers.demand.scaffolding.handler", + "payload": sfn.JsonPath.object_at("$"), + **batch_invoked_lambda_kwargs, + }, + ).next( + sfn_tasks.StepFunctionsStartExecution( + self, + "SM: Prepare Demand Scaffolding", + state_machine=batch_invoked_lambda_state_machine, + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + associate_with_parent=False, + input_path="$", + output_path=f"$.Output", + ) + ), + input_path="$.request", + result_path=f"$.{config_scaffolding_path}", + ) + + create_def_and_prepare_job_args_task = CommonOperation.enclose_chainable( + self, + "Create Definition and Prep Job Args", + sfn.Pass( + self, + "Pass: Create Definition and Prep Job Args", + parameters={ + "handler": "aibs_informatics_aws_lambda.handlers.batch.create.handler", + "payload": sfn.JsonPath.object_at("$"), + **batch_invoked_lambda_kwargs, + }, + ).next( + sfn_tasks.StepFunctionsStartExecution( + self, + "SM: Create Definition and Prep Job Args", + state_machine=batch_invoked_lambda_state_machine, + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + associate_with_parent=False, + input_path="$", + output_path=f"$.Output", + ) + ), + input_path="$.batch_create_request", + result_path=f"$", + ) + + setup_tasks = ( + sfn.Parallel( + self, + "Execution Setup Steps", + input_path=f"$.{config_scaffolding_path}.setup_configs", + result_path=f"$.{'.'.join(config_batch_args_path.split('.')[:-1])}", + result_selector={f'{config_batch_args_path.split(".")[-1]}.$': "$[0]"}, + ) + .branch(create_def_and_prepare_job_args_task) + .branch( + sfn.Map( + self, + "Transfer Inputs TO Batch Job", + items_path="$.data_sync_requests", + ).iterator( + sfn_tasks.StepFunctionsStartExecution( + self, + "Transfer Input", + state_machine=data_sync_state_machine, + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + associate_with_parent=False, + result_path=sfn.JsonPath.DISCARD, + ) + ) + ) + ) + + execution_task = sfn.CustomState( + self, + f"Submit Batch Job", + state_json={ + "Type": "Task", + "Resource": "arn:aws:states:::batch:submitJob.sync", + # fmt: off + "Parameters": { + "JobName.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_name"), + "JobDefinition.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_definition_arn"), + "JobQueue.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_queue_arn"), + "Parameters.$": sfn.JsonPath.object_at(f"$.{config_batch_args_path}.parameters"), + "ContainerOverrides.$": sfn.JsonPath.object_at(f"$.{config_batch_args_path}.container_overrides"), + }, + # fmt: on + "ResultPath": "$.tasks.batch_submit_task", + }, + ) + + cleanup_tasks = sfn.Chain.start( + sfn.Map( + self, + "Transfer Results FROM Batch Job", + input_path=f"$.{config_scaffolding_path}.cleanup_configs.data_sync_requests", + result_path=f"$.{config_cleanup_results_path}.transfer_results", + ).iterator( + sfn_tasks.StepFunctionsStartExecution( + self, + "Transfer Result", + state_machine=data_sync_state_machine, + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + associate_with_parent=False, + result_path=sfn.JsonPath.DISCARD, + ) + ) + ).to_single_state("Execution Cleanup Steps", output_path="$[0]") + + # fmt: off + definition = ( + start_state + .next(prep_scaffolding_task) + .next(setup_tasks) + .next(execution_task) + .next(cleanup_tasks) + ) + # fmt: on + self.definition = definition + + @property + def required_inline_policy_statements(self) -> List[iam.PolicyStatement]: + return [ + *super().required_inline_policy_statements, + batch_policy_statement(self.env_base), + s3_policy_statement(self.env_base), + sfn_policy_statement( + self.env_base, + actions=SFN_STATES_EXECUTION_ACTIONS + SFN_STATES_READ_ACCESS_ACTIONS, + ), + ] diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py new file mode 100644 index 0000000..df42f99 --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py @@ -0,0 +1,310 @@ +from dataclasses import dataclass +from typing import Iterable, List, Optional, Union + +import constructs +from aibs_informatics_core.env import EnvBase +from aws_cdk import aws_batch_alpha as batch +from aws_cdk import aws_ecr_assets as ecr_assets +from aws_cdk import aws_efs as efs +from aws_cdk import aws_events as events +from aws_cdk import aws_events_targets as events_targets +from aws_cdk import aws_s3 as s3 +from aws_cdk import aws_stepfunctions as sfn + +from aibs_informatics_cdk_lib.constructs_.efs.file_system import MountPointConfiguration +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.batch import ( + BatchInvokedBaseFragment, + BatchInvokedLambdaFunction, +) + + +def get_data_path_stats_fragment( + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + batch_job_queue: Union[batch.JobQueue, str], + scaffolding_bucket: s3.Bucket, + mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, + memory: int = 1024, + vcpus: int = 1, +) -> BatchInvokedLambdaFunction: + """Returns a BatchInvokedLambdaFunction fragment for getting data path stats of EFS/S3 path + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the get_data_path_stats_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. + + Returns: + BatchInvokedLambdaFunction fragment for getting data path stats + """ + fragment = BatchInvokedLambdaFunction( + scope=scope, + id=id, + env_base=env_base, + name="get-data-path-stats", + image=( + aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri + ), + handler="aibs_informatics_aws_lambda.handlers.data_sync.get_data_path_stats_handler", + job_queue=( + batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name + ), + bucket_name=scaffolding_bucket.bucket_name, + memory=memory, + vcpus=vcpus, + mount_point_configs=list(mount_point_configs) if mount_point_configs else None, + ) + return fragment + + +def outdated_data_path_scanner_fragment( + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + batch_job_queue: Union[batch.JobQueue, str], + scaffolding_bucket: s3.Bucket, + mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, + memory: int = 1024, + vcpus: int = 1, +) -> BatchInvokedLambdaFunction: + """Returns a BatchInvokedLambdaFunction fragment for scanning outdated data paths of EFS/S3 path root + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the outdated_data_path_scanner_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. + + Returns: + BatchInvokedLambdaFunction fragment for scanning outdated data paths + """ + + fragment = BatchInvokedLambdaFunction( + scope=scope, + id=id, + env_base=env_base, + name="outdated-data-path-scanner", + image=( + aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri + ), + handler="aibs_informatics_aws_lambda.handlers.data_sync.outdated_data_path_scanner_handler", + job_queue=( + batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name + ), + bucket_name=scaffolding_bucket.bucket_name, + memory=memory, + vcpus=vcpus, + # mount_points=mount_points, + # volumes=volumes, + mount_point_configs=list(mount_point_configs) if mount_point_configs else None, + ) + return fragment + + +def remove_data_paths_fragment( + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + batch_job_queue: Union[batch.JobQueue, str], + scaffolding_bucket: s3.Bucket, + mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, + memory: int = 1024, + vcpus: int = 1, +) -> BatchInvokedLambdaFunction: + """Returns a BatchInvokedLambdaFunction fragment for removing data paths (EFS / S3) during execution of a Step Function + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the remove_data_paths_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. + + Returns: + BatchInvokedLambdaFunction fragment for removing data paths + """ + fragment = BatchInvokedLambdaFunction( + scope=scope, + id=id, + env_base=env_base, + name="remove-data-paths", + image=( + aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri + ), + handler="aibs_informatics_aws_lambda.handlers.data_sync.remove_data_paths_handler", + job_queue=( + batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name + ), + bucket_name=scaffolding_bucket.bucket_name, + memory=memory, + vcpus=vcpus, + mount_point_configs=list(mount_point_configs) if mount_point_configs else None, + ) + return fragment + + +@dataclass +class CleanFileSystemTriggerConfig: + file_system: efs.FileSystem + path: str + days_since_last_accessed: float = 3.0 + max_depth: Optional[int] = None + min_depth: Optional[int] = None + min_size_bytes_allowed: int = 0 + + schedule: events.Schedule = events.Schedule.cron(minute="0", hour="9") + + def to_dict(self): + d = { + "path": f"{self.file_system.file_system_id}:{self.path}", + "days_since_last_accessed": self.days_since_last_accessed, + } + if self.max_depth is not None: + d["max_depth"] = self.max_depth + if self.min_depth is not None: + d["min_depth"] = self.min_depth + if self.min_size_bytes_allowed > 0: + d["min_size_bytes_allowed"] = self.min_size_bytes_allowed + return d + + +@dataclass +class CleanFileSystemTriggerRuleConfig: + rule_name: str + trigger_configs: List[CleanFileSystemTriggerConfig] + # https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule-schedule.html#cron-expressions + # Want to run around 00:00 in PST by default + schedule: events.Schedule = events.Schedule.cron(minute="0", hour="9") + + @property + def description(self): + return ( + "Daily trigger for EFS file cleanup (older than " + f"{sorted({_.days_since_last_accessed for _ in self.trigger_configs})} " + "days) for target subdirectories" + ) + + def create_rule( + self, + scope: constructs.Construct, + clean_file_system_state_machine: sfn.StateMachine, + ): + return events.Rule( + scope, + self.rule_name, + rule_name=self.rule_name, + description=self.description, + enabled=True, + schedule=self.schedule, + targets=[ + events_targets.SfnStateMachine( + clean_file_system_state_machine, + input=events.RuleTargetInput.from_object(config.to_dict()), + ) + for config in self.trigger_configs + ], # type: ignore[arg-type] # jsii implementation issue - https://github.com/aws/jsii/issues/4531 + ) + + +class CleanFileSystemFragment(BatchInvokedBaseFragment): + def __init__( + self, + scope: constructs.Construct, + id: str, + env_base: EnvBase, + aibs_informatics_docker_asset: Union[ecr_assets.DockerImageAsset, str], + batch_job_queue: Union[batch.JobQueue, str], + scaffolding_bucket: s3.Bucket, + mount_point_configs: Optional[Iterable[MountPointConfiguration]] = None, + ) -> None: + """Clean up the file system + + + Args: + scope (Construct): construct scope + id (str): id + env_base (EnvBase): env base + aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri + str for the aibs informatics aws lambda + batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + primary_bucket (Bucket): Primary bucket used for request/response json blobs used in + the batch invoked lambda function. + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): + List of mount point configurations to use. These can be overridden in the payload. + + """ + super().__init__(scope, id, env_base) + + aibs_informatics_image_uri = ( + aibs_informatics_docker_asset + if isinstance(aibs_informatics_docker_asset, str) + else aibs_informatics_docker_asset.image_uri + ) + + start_pass_state = sfn.Pass( + self, + "Data Cleanup: Start", + ) + + self.outdated_data_path_scanner = outdated_data_path_scanner_fragment( + self, + "Scan for Outdated Data Paths", + env_base=self.env_base, + aibs_informatics_docker_asset=aibs_informatics_image_uri, + batch_job_queue=batch_job_queue, + scaffolding_bucket=scaffolding_bucket, + mount_point_configs=mount_point_configs, + ) + + self.remove_data_paths = remove_data_paths_fragment( + self, + "Remove Data Paths", + env_base=self.env_base, + aibs_informatics_docker_asset=aibs_informatics_image_uri, + batch_job_queue=batch_job_queue, + scaffolding_bucket=scaffolding_bucket, + mount_point_configs=mount_point_configs, + ) + + # fmt: off + self.definition = ( + start_pass_state + .next(self.outdated_data_path_scanner.enclose()) + .next(self.remove_data_paths.enclose()) + ) + # fmt: on diff --git a/src/aibs_informatics_core_app/stacks/demand_execution.py b/src/aibs_informatics_core_app/stacks/demand_execution.py index 182bcd3..ca83509 100644 --- a/src/aibs_informatics_core_app/stacks/demand_execution.py +++ b/src/aibs_informatics_core_app/stacks/demand_execution.py @@ -1,6 +1,11 @@ from typing import Iterable, Optional, Union import constructs +from aibs_informatics_aws_utils.constants.efs import ( + EFS_SCRATCH_PATH, + EFS_SHARED_PATH, + EFS_TMP_PATH, +) from aibs_informatics_core.env import EnvBase from aws_cdk import aws_batch_alpha as batch from aws_cdk import aws_ec2 as ec2 @@ -23,6 +28,11 @@ DataSyncFragment, DemandExecutionFragment, ) +from aibs_informatics_cdk_lib.constructs_.sfn.fragments.informatics.efs import ( + CleanFileSystemFragment, + CleanFileSystemTriggerConfig, + CleanFileSystemTriggerRuleConfig, +) from aibs_informatics_cdk_lib.stacks.base import EnvBaseStack DATA_SYNC_ASSET_NAME = "aibs_informatics_aws_lambda" @@ -150,3 +160,36 @@ def __init__( scratch_mount_point_config=scratch_mount_point_config, ) self.demand_execution_state_machine = demand_execution.to_state_machine("demand-execution") + + ## EFS Cleanup + + clean_fs = CleanFileSystemFragment( + self, + "clean-file-system", + env_base=self.env_base, + aibs_informatics_docker_asset=self._assets.docker_assets.AIBS_INFORMATICS_AWS_LAMBDA, + batch_job_queue=self.execution_job_queue, + scaffolding_bucket=scaffolding_bucket, + mount_point_configs=[root_mount_point_config], + ) + self.clean_file_system_state_machine = clean_fs.to_state_machine("clean-file-system") + + CleanFileSystemTriggerRuleConfig( + rule_name="clean-file-system-trigger", + trigger_configs=[ + CleanFileSystemTriggerConfig( + file_system=self.efs_ecosystem.file_system, + path=path, + days_since_last_accessed=days_since_last_accessed, + max_depth=max_depth, + min_depth=min_depth, + min_size_bytes_allowed=0, + ) + for path, days_since_last_accessed, min_depth, max_depth in [ + (EFS_TMP_PATH, 3.0, 1, 1), + (EFS_SCRATCH_PATH, 3.0, 1, 1), + (f"{EFS_SCRATCH_PATH}/tmp", 3.0, 1, 1), + (EFS_SHARED_PATH, 3.0, 1, 1), + ] + ], + ).create_rule(self, clean_file_system_state_machine=self.clean_file_system_state_machine)