diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py index aaff54d..23d5da5 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py @@ -205,6 +205,7 @@ def with_defaults( image_path: Optional[str] = None, handler_path: Optional[str] = None, payload_path: Optional[str] = None, + overrides_path: Optional[str] = None, command: Optional[List[str]] = None, memory: str = "1024", vcpus: str = "1", @@ -260,7 +261,7 @@ def with_defaults( "handler": sfn.JsonPath.string_at(handler_path or "$.handler"), "payload": sfn.JsonPath.object_at(payload_path or "$.payload"), # We will merge the rest with the defaults - "input": sfn.JsonPath.object_at("$"), + "input": sfn.JsonPath.object_at(overrides_path if overrides_path else "$"), }, ) diff --git a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py index 6aeaf09..df42f99 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py +++ b/src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py @@ -29,6 +29,25 @@ def get_data_path_stats_fragment( memory: int = 1024, vcpus: int = 1, ) -> BatchInvokedLambdaFunction: + """Returns a BatchInvokedLambdaFunction fragment for getting data path stats of EFS/S3 path + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the get_data_path_stats_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. + + Returns: + BatchInvokedLambdaFunction fragment for getting data path stats + """ fragment = BatchInvokedLambdaFunction( scope=scope, id=id, @@ -62,12 +81,25 @@ def outdated_data_path_scanner_fragment( memory: int = 1024, vcpus: int = 1, ) -> BatchInvokedLambdaFunction: - # if mount_point_configs is not None: - # mount_points, volumes = BatchInvokedLambdaFunction.convert_to_mount_point_and_volumes(list(mount_point_configs)) + """Returns a BatchInvokedLambdaFunction fragment for scanning outdated data paths of EFS/S3 path root + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the outdated_data_path_scanner_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. - # else: - # mount_points = None - # volumes = None + Returns: + BatchInvokedLambdaFunction fragment for scanning outdated data paths + """ fragment = BatchInvokedLambdaFunction( scope=scope, @@ -104,6 +136,25 @@ def remove_data_paths_fragment( memory: int = 1024, vcpus: int = 1, ) -> BatchInvokedLambdaFunction: + """Returns a BatchInvokedLambdaFunction fragment for removing data paths (EFS / S3) during execution of a Step Function + + Args: + scope (constructs.Construct): scope + id (str): id of the fragment + env_base (EnvBase): env base + aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri + that has the remove_data_paths_handler function + batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that + the batch job will be submitted to. This can be override by the payload. + scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in + mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount. + Defaults to None. + memory (int, optional): memory needed. Defaults to 1024. + vcpus (int, optional): vcpus needed. Defaults to 1. + + Returns: + BatchInvokedLambdaFunction fragment for removing data paths + """ fragment = BatchInvokedLambdaFunction( scope=scope, id=id, @@ -253,7 +304,7 @@ def __init__( # fmt: off self.definition = ( start_pass_state - .next(self.outdated_data_path_scanner.enclose("1Scan for Outdated Data Paths")) - .next(self.remove_data_paths.enclose("1Remove Data Paths")) + .next(self.outdated_data_path_scanner.enclose()) + .next(self.remove_data_paths.enclose()) ) # fmt: on