From 7f535f549b3fd7196e9e779f96ea94b79b380eed Mon Sep 17 00:00:00 2001 From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com> Date: Thu, 21 Nov 2024 16:25:52 -0500 Subject: [PATCH] tests done --- .doc_gen/metadata/healthlake_metadata.yaml | 2 +- python/example_code/healthlake/README.md | 127 +---- .../healthlake/health_lake_wrapper.py | 69 +-- .../example_code/healthlake/test/conftest.py | 8 +- .../test/test_health_lake_wrapper.py | 289 +++++++++++ .../test/test_medical_imaging_basics.py | 482 ------------------ python/test_tools/healthlake_stubber.py | 345 +++++++++++++ python/test_tools/stubber_factory.py | 3 + 8 files changed, 700 insertions(+), 625 deletions(-) create mode 100644 python/example_code/healthlake/test/test_health_lake_wrapper.py delete mode 100644 python/example_code/healthlake/test/test_medical_imaging_basics.py create mode 100644 python/test_tools/healthlake_stubber.py diff --git a/.doc_gen/metadata/healthlake_metadata.yaml b/.doc_gen/metadata/healthlake_metadata.yaml index 6e159338307..850302d60cf 100644 --- a/.doc_gen/metadata/healthlake_metadata.yaml +++ b/.doc_gen/metadata/healthlake_metadata.yaml @@ -11,7 +11,7 @@ healthlake_CreateFHIRDatastore: - python.example_code.healthlake.HealthLakeWrapper.decl - python.example_code.healthlake.CreateFHIRDatastore - description: > - The following code shows the parameters for SMART on + The following code shows an example of parameters for a SMART on FHIR-enabled HealthLake data store. snippet_tags: - python.example_code.healthlake.CreateFHIRDatastore.smart diff --git a/python/example_code/healthlake/README.md b/python/example_code/healthlake/README.md index 46974e528ce..6cc8b900788 100644 --- a/python/example_code/healthlake/README.md +++ b/python/example_code/healthlake/README.md @@ -1,13 +1,13 @@ -# HealthImaging code examples for the SDK for Python +# HealthLake code examples for the SDK for Python ## Overview -Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthImaging. +Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthLake. -_HealthImaging is a HIPAA-eligible service that helps health care providers and their medical imaging ISV partners store, transform, and apply machine learning to medical images._ +_HealthLake _ ## ⚠ Important @@ -34,42 +34,23 @@ python -m pip install -r requirements.txt -### Get started - -- [Hello HealthImaging](imaging_set_and_frames_workflow/hello.py#L4) (`ListDatastores`) - - ### Single actions Code excerpts that show you how to call individual service functions. -- [CopyImageSet](health_lake_wrapper.py#L417) -- [CreateDatastore](health_lake_wrapper.py#L31) -- [DeleteDatastore](health_lake_wrapper.py#L104) -- [DeleteImageSet](health_lake_wrapper.py#L489) -- [GetDICOMImportJob](health_lake_wrapper.py#L158) -- [GetDatastore](health_lake_wrapper.py#L54) -- [GetImageFrame](health_lake_wrapper.py#L318) -- [GetImageSet](health_lake_wrapper.py#L241) -- [GetImageSetMetadata](health_lake_wrapper.py#L274) -- [ListDICOMImportJobs](health_lake_wrapper.py#L183) -- [ListDatastores](health_lake_wrapper.py#L79) -- [ListImageSetVersions](health_lake_wrapper.py#L350) -- [ListTagsForResource](health_lake_wrapper.py#L556) -- [SearchImageSets](health_lake_wrapper.py#L211) -- [StartDICOMImportJob](health_lake_wrapper.py#L124) -- [TagResource](health_lake_wrapper.py#L514) -- [UntagResource](health_lake_wrapper.py#L534) -- [UpdateImageSetMetadata](health_lake_wrapper.py#L381) - -### Scenarios - -Code examples that show you how to accomplish a specific task by calling multiple -functions within the same service. - -- [Get started with image sets and image frames](imaging_set_and_frames_workflow/imaging_set_and_frames.py) -- [Tagging a data store](tagging_data_stores.py) -- [Tagging an image set](tagging_image_sets.py) +- [CreateFHIRDatastore](health_lake_wrapper.py#L42) +- [DeleteFHIRDatastore](health_lake_wrapper.py#L136) +- [DescribeFHIRDatastore](health_lake_wrapper.py#L84) +- [DescribeFHIRExportJob](health_lake_wrapper.py#L310) +- [DescribeFHIRImportJob](health_lake_wrapper.py#L197) +- [ListFHIRDatastoreImportJobs](health_lake_wrapper.py#L222) +- [ListFHIRDatastores](health_lake_wrapper.py#L106) +- [ListFHIRExportJobs](health_lake_wrapper.py#L335) +- [ListTagsForResource](health_lake_wrapper.py#L404) +- [StartFHIRExportJob](health_lake_wrapper.py#L272) +- [StartFHIRImportJob](health_lake_wrapper.py#L154) +- [TagResource](health_lake_wrapper.py#L385) +- [UntagResource](health_lake_wrapper.py#L426) @@ -83,77 +64,7 @@ functions within the same service. -#### Hello HealthImaging - -This example shows you how to get started using HealthImaging. - -``` -python imaging_set_and_frames_workflow/hello.py -``` - - -#### Get started with image sets and image frames - -This example shows you how to import DICOM files and download image frames in HealthImaging. - The implementation is structured as a workflow command-line - application. - - -- Set up resources for a DICOM import. -- Import DICOM files into a data store. -- Retrieve the image set IDs for the import job. -- Retrieve the image frame IDs for the image sets. -- Download, decode and verify the image frames. -- Clean up resources. - - - - -Start the example by running the following at a command prompt: - -``` -python imaging_set_and_frames_workflow/imaging_set_and_frames.py -``` - - - - - -#### Tagging a data store - -This example shows you how to tag a HealthImaging data store. - - - - - -Start the example by running the following at a command prompt: - -``` -python tagging_data_stores.py -``` - - - - - -#### Tagging an image set - -This example shows you how to tag a HealthImaging image set. - - - - - -Start the example by running the following at a command prompt: - -``` -python tagging_image_sets.py -``` - - - ### Tests @@ -170,9 +81,9 @@ in the `python` folder. ## Additional resources -- [HealthImaging Developer Guide](https://docs.aws.amazon.com/healthimaging/latest/devguide/what-is.html) -- [HealthImaging API Reference](https://docs.aws.amazon.com/healthimaging/latest/APIReference/Welcome.html) -- [SDK for Python HealthImaging reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html) +- [HealthLake Developer Guide](https://docs.aws.amazon.com/healthlake/latest/devguide/what-is-amazon-health-lake.html) +- [HealthLake API Reference](https://docs.aws.amazon.com/healthlake/latest/APIReference/Welcome.html) +- [SDK for Python HealthLake reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html) diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py index d5aea3febdc..84231ddb80b 100644 --- a/python/example_code/healthlake/health_lake_wrapper.py +++ b/python/example_code/healthlake/health_lake_wrapper.py @@ -34,13 +34,13 @@ def from_client(cls) -> "HealthLakeWrapper": :return: An instance of HealthLakeWrapper initialized with the default HealthLake client. """ - kms_client = boto3.client("healthlake") - return cls(kms_client) + health_lake_client = boto3.client("healthlake") + return cls(health_lake_client) # snippet-end:[python.example_code.healthlake.HealthLakeWrapper.decl] # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore] - def create_fihr_datastore( + def create_fhir_datastore( self, datastore_name: str, sse_configuration: dict[str, any] = None, @@ -73,7 +73,7 @@ def create_fihr_datastore( return response except ClientError as err: logger.exception( - "Couldn't create datastore %s. Here's why", + "Couldn't create datastore %s. Here's why %s", datastore_name, err.response["Error"]["Message"], ) @@ -95,7 +95,7 @@ def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]: return response["DatastoreProperties"] except ClientError as err: logger.exception( - "Couldn't describe datastore with ID %s. Here's why", + "Couldn't describe datastore with ID %s. Here's why %s", datastore_id, err.response["Error"]["Message"], ) @@ -112,6 +112,8 @@ def list_fhir_datastores(self) -> list[dict[str, any]]: try: next_token = None datastores = [] + + # Loop through paginated results. while True: parameters = {} if next_token is not None: @@ -122,11 +124,11 @@ def list_fhir_datastores(self) -> list[dict[str, any]]: next_token = response["NextToken"] else: break - response = self.health_lake_client.list_fhir_datastores() - return response["DatastorePropertiesList"] + + return datastores except ClientError as err: logger.exception( - "Couldn't list datastores. Here's why", err.response["Error"]["Message"] + "Couldn't list datastores. Here's why %s", err.response["Error"]["Message"] ) raise # snippet-end:[python.example_code.healthlake.ListFHIRDatastores] @@ -141,7 +143,7 @@ def delete_fhir_datastore(self, datastore_id: str) -> None: self.health_lake_client.delete_fhir_datastore(DatastoreId=datastore_id) except ClientError as err: logger.exception( - "Couldn't delete datastore with ID %s. Here's why", + "Couldn't delete datastore with ID %s. Here's why %s", datastore_id, err.response["Error"]["Message"], ) @@ -150,7 +152,7 @@ def delete_fhir_datastore(self, datastore_id: str) -> None: # snippet-end:[python.example_code.healthlake.DeleteFHIRDatastore] # snippet-start:[python.example_code.healthlake.StartFHIRImportJob] - def start_fihr_import_job( + def start_fhir_import_job( self, job_name: str, datastore_id: str, @@ -185,7 +187,7 @@ def start_fihr_import_job( return response except ClientError as err: logger.exception( - "Couldn't start import job. Here's why", + "Couldn't start import job. Here's why %s", err.response["Error"]["Message"], ) raise @@ -193,7 +195,7 @@ def start_fihr_import_job( # snippet-end:[python.example_code.healthlake.StartFHIRImportJob] # snippet-start:[python.example_code.healthlake.DescribeFHIRImportJob] - def describe_fihr_import_job( + def describe_fhir_import_job( self, datastore_id: str, job_id: str ) -> dict[str, any]: """ @@ -209,7 +211,7 @@ def describe_fihr_import_job( return response["ImportJobProperties"] except ClientError as err: logger.exception( - "Couldn't describe import job with ID %s. Here's why", + "Couldn't describe import job with ID %s. Here's why %s", job_id, err.response["Error"]["Message"], ) @@ -247,6 +249,7 @@ def list_fhir_import_jobs( parameters["SubmittedAfter"] = submitted_after next_token = None jobs = [] + # Loop through paginated results. while True: if next_token is not None: parameters["NextToken"] = next_token @@ -259,7 +262,7 @@ def list_fhir_import_jobs( return jobs except ClientError as err: logger.exception( - "Couldn't list import jobs. Here's why", + "Couldn't list import jobs. Here's why %s", err.response["Error"]["Message"], ) raise @@ -297,7 +300,7 @@ def start_fhir_export_job( return response except ClientError as err: logger.exception( - "Couldn't start export job. Here's why", + "Couldn't start export job. Here's why %s", err.response["Error"]["Message"], ) raise @@ -321,7 +324,7 @@ def describe_fhir_export_job( return response["ExportJobProperties"] except ClientError as err: logger.exception( - "Couldn't describe export job with ID %s. Here's why", + "Couldn't describe export job with ID %s. Here's why %s", job_id, err.response["Error"]["Message"], ) @@ -359,6 +362,7 @@ def list_fhir_export_jobs( parameters["SubmittedAfter"] = submitted_after next_token = None jobs = [] + # Loop through paginated results. while True: if next_token is not None: parameters["NextToken"] = next_token @@ -371,7 +375,7 @@ def list_fhir_export_jobs( return jobs except ClientError as err: logger.exception( - "Couldn't list export jobs. Here's why", + "Couldn't list export jobs. Here's why %s", err.response["Error"]["Message"], ) raise @@ -389,7 +393,7 @@ def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None: self.health_lake_client.tag_resource(ResourceARN=resource_arn, Tags=tags) except ClientError as err: logger.exception( - "Couldn't tag resource %s. Here's why", + "Couldn't tag resource %s. Here's why %s", resource_arn, err.response["Error"]["Message"], ) @@ -411,7 +415,7 @@ def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]: return response["Tags"] except ClientError as err: logger.exception( - "Couldn't list tags for resource %s. Here's why", + "Couldn't list tags for resource %s. Here's why %s", resource_arn, err.response["Error"]["Message"], ) @@ -432,7 +436,7 @@ def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None: ) except ClientError as err: logger.exception( - "Couldn't untag resource %s. Here's why", + "Couldn't untag resource %s. Here's why %s", resource_arn, err.response["Error"]["Message"], ) @@ -491,7 +495,7 @@ def wait_import_job_complete(self, datastore_id: str, job_id: str) -> None: ) status = "IN_PROGRESS" while counter < max_count_minutes: - job = self.describe_fihr_import_job(datastore_id, job_id) + job = self.describe_fhir_import_job(datastore_id, job_id) status = job["JobStatus"] if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS": break @@ -545,15 +549,15 @@ def wait_export_job_complete(self, datastore_id: str, job_id: str) -> None: ) def health_lake_demo(self) -> None: - use_smart_on_fihr_data_store = True + use_smart_on_fhir_data_store = True datastore_name = "health_imaging_datastore2" - if use_smart_on_fihr_data_store: + if use_smart_on_fhir_data_store: # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore.smart] sse_configuration = { "KmsEncryptionConfig": {"CmkType": "AWS_OWNED_KMS_KEY"} } - + # TODO: Update the metadata to match your environment. metadata = { "issuer": "https://ehr.example.com", "jwks_uri": "https://ehr.example.com/.well-known/jwks.json", @@ -577,18 +581,19 @@ def health_lake_demo(self) -> None: "client-public", ], } - indentity_provider_configuration = { + # TODO: Update the IdpLambdaArn. + identity_provider_configuration = { "AuthorizationStrategy": "SMART_ON_FHIR_V1", "FineGrainedAuthorizationEnabled": True, - "IdpLambdaArn": "arn:aws:lambda:us-east-1:123502194722:function:healthlaketest37-ahl-introspec:active", + "IdpLambdaArn": "arn:aws:lambda:your-region:your-account-id:function:your-lambda-name", "Metadata": json.dumps(metadata), } - data_store = self.create_fihr_datastore( - datastore_name, sse_configuration, indentity_provider_configuration + data_store = self.create_fhir_datastore( + datastore_name, sse_configuration, identity_provider_configuration ) # snippet-end:[python.example_code.healthlake.CreateFHIRDatastore.smart] else: - data_store = self.create_fihr_datastore(datastore_name) + data_store = self.create_fhir_datastore(datastore_name) data_store_id = data_store["DatastoreId"] data_store_arn = data_store["DatastoreArn"] @@ -629,7 +634,7 @@ def health_lake_demo(self) -> None: data_access_role_arn = ( "arn:aws:iam::123502194722:role/healthlaketest37-ahl-full-access" ) - import_job = self.start_fihr_import_job( + import_job = self.start_fhir_import_job( job_name, data_store_id, input_s3_uri, @@ -671,8 +676,8 @@ def health_lake_demo(self) -> None: f"Job id: {export_job['JobId']}, status: {export_job['JobStatus']}, submit time: {export_job['SubmitTime']}" ) - -# self.delete_fhir_datastore(data_store_id) + self.delete_fhir_datastore(data_store_id) + print(f"Data store with ID {data_store_id} deleted.") if __name__ == "__main__": diff --git a/python/example_code/healthlake/test/conftest.py b/python/example_code/healthlake/test/conftest.py index 2d0663bfbd2..0faa3f8991e 100644 --- a/python/example_code/healthlake/test/conftest.py +++ b/python/example_code/healthlake/test/conftest.py @@ -7,7 +7,11 @@ """ import sys +import os + +script_dir = os.path.dirname(os.path.abspath(__file__)) # This is needed so Python can find test_tools on the path. -sys.path.append("../..") -from test_tools.fixtures.common import * +sys.path.append(os.path.join(script_dir, "../../..")) + +from test_tools.fixtures.common import * \ No newline at end of file diff --git a/python/example_code/healthlake/test/test_health_lake_wrapper.py b/python/example_code/healthlake/test/test_health_lake_wrapper.py new file mode 100644 index 00000000000..8c2f4dbb097 --- /dev/null +++ b/python/example_code/healthlake/test/test_health_lake_wrapper.py @@ -0,0 +1,289 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for health_lake_wrapper functions. +""" + +import os +import sys + +import boto3 +import pytest +from botocore.exceptions import ClientError + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# Append parent directory to import health_lake_wrapper. +sys.path.append(os.path.join(script_dir, "..")) +from health_lake_wrapper import HealthLakeWrapper + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_create_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_name = "test-datastore" + datastore_id = "abcdedf1234567890abcdef123456789" + + healthlake_stubber.stub_create_fhir_datastore( + datastore_name, datastore_id, error_code=error_code + ) + + if error_code is None: + response = wrapper.create_fhir_datastore(datastore_name) + assert response["DatastoreId"] == datastore_id + else: + with pytest.raises(ClientError) as exc_info: + wrapper.create_fhir_datastore(datastore_name) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "abcdedf1234567890abcdef123456789" + + healthlake_stubber.stub_describe_fhir_datastore(datastore_id, error_code=error_code) + + if error_code is None: + response = wrapper.describe_fhir_datastore(datastore_id) + assert response["DatastoreId"] == datastore_id + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_datastore(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_datastores(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + + healthlake_stubber.stub_list_fhir_datastores(error_code=error_code) + + if error_code is None: + response = wrapper.list_fhir_datastores() + assert len(response) == 1 + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_datastores() + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_delete_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_delete_fhir_datastore(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.delete_fhir_datastore(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.delete_fhir_datastore(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_start_fhir_import_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + job_name = "test-job" + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + input_s3_uri = "s3://amzn-s3-demo-bucket/test-data" + job_output_s3_uri = "s3://amzn-s3-demo-bucket/test-output" + kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" + data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + error_code=error_code, + ) + + if error_code is None: + wrapper.start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_import_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_describe_fhir_import_job( + datastore_id, job_id, error_code=error_code + ) + + if error_code is None: + wrapper.describe_fhir_import_job(datastore_id, job_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_import_job(datastore_id, job_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_import_jobs(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_list_fhir_import_jobs(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.list_fhir_import_jobs(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_import_jobs(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_start_fhir_export_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + job_name = "test-job" + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + output_s3_uri = "s3://amzn-s3-demo-bucket/test-output" + data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" + + healthlake_stubber.stub_start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + error_code=error_code, + ) + + if error_code is None: + wrapper.start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_export_jobs(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_list_fhir_export_jobs(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.list_fhir_export_jobs(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_export_jobs(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_export_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_describe_fhir_export_job( + datastore_id, job_id, error_code=error_code + ) + + if error_code is None: + wrapper.describe_fhir_export_job(datastore_id, job_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_export_job(datastore_id, job_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_tag_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + tags = [{"Key" :"test-key", "Value" : "test-value"}] + healthlake_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code) + if error_code is None: + wrapper.tag_resource(resource_arn, tags) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.tag_resource(resource_arn, tags) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_untag_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + tag_keys = ["test-key"] + healthlake_stubber.stub_untag_resource(resource_arn, tag_keys, error_code=error_code) + if error_code is None: + wrapper.untag_resource(resource_arn, tag_keys) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.untag_resource(resource_arn, tag_keys) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_tags_for_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + healthlake_stubber.stub_list_tags_for_resource(resource_arn, error_code=error_code) + if error_code is None: + wrapper.list_tags_for_resource(resource_arn) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_tags_for_resource(resource_arn) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/example_code/healthlake/test/test_medical_imaging_basics.py b/python/example_code/healthlake/test/test_medical_imaging_basics.py deleted file mode 100644 index 7956e8978ec..00000000000 --- a/python/example_code/healthlake/test/test_medical_imaging_basics.py +++ /dev/null @@ -1,482 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -""" -Unit tests for medical_imaging_basics functions. -""" - -import os - -import boto3 -import pytest -from botocore.exceptions import ClientError - -from health_lake_wrapper import MedicalImagingWrapper - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_create_datastore(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_name = "test-datastore" - datastore_id = "abcdedf1234567890abcdef123456789" - - medical_imaging_stubber.stub_create_datastore( - datastore_name, datastore_id, error_code=error_code - ) - - if error_code is None: - got_datastore_id = wrapper.create_datastore(datastore_name) - assert got_datastore_id == datastore_id - else: - with pytest.raises(ClientError) as exc_info: - wrapper.create_datastore(datastore_name) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_get_datastore_properties(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - - medical_imaging_stubber.stub_get_datastore_properties( - datastore_id, error_code=error_code - ) - - if error_code is None: - got_properties = wrapper.get_datastore_properties(datastore_id) - assert got_properties["datastoreId"] == datastore_id - else: - with pytest.raises(ClientError) as exc_info: - wrapper.get_datastore_properties(datastore_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_list_datastores(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - medical_imaging_stubber.stub_list_datastores(datastore_id, error_code=error_code) - - if error_code is None: - datastores = wrapper.list_datastores() - assert datastores[0]["datastoreId"] == datastore_id - else: - with pytest.raises(ClientError) as exc_info: - wrapper.list_datastores() - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_delete_datastore(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - - medical_imaging_stubber.stub_delete_data_store(datastore_id, error_code=error_code) - - if error_code is None: - wrapper.delete_datastore(datastore_id) - else: - with pytest.raises(ClientError) as exc_info: - wrapper.delete_datastore(datastore_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_start_dicom_import_job(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - job_id = "cccccc1234567890abcdef123456789" - job_name = "job_name" - datastore_id = "abcdedf1234567890abcdef123456789" - role_arn = "arn:aws:iam::111111111111:role/dicom_import" - input_s3_uri = "s3://healthimaging-source/CRStudy/" - output_s3_uri = "s3://healthimaging-destination/CRStudy/" - - medical_imaging_stubber.stub_start_dicom_import_job( - job_name, - datastore_id, - role_arn, - input_s3_uri, - output_s3_uri, - job_id, - error_code=error_code, - ) - - if error_code is None: - result = wrapper.start_dicom_import_job( - job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri - ) - assert result == job_id - else: - with pytest.raises(ClientError) as exc_info: - wrapper.start_dicom_import_job( - job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri - ) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_get_dicom_import_job(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - job_id = "cccccc1234567890abcdef123456789" - job_status = "TESTING" - - medical_imaging_stubber.stub_get_dicom_import_job( - job_id, datastore_id, job_status, error_code=error_code - ) - - if error_code is None: - result = wrapper.get_dicom_import_job(datastore_id, job_id) - assert result["jobStatus"] == job_status - else: - with pytest.raises(ClientError) as exc_info: - wrapper.get_dicom_import_job(datastore_id, job_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_list_dicom_import_jobs(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - - medical_imaging_stubber.stub_list_dicom_import_jobs( - datastore_id, error_code=error_code - ) - - if error_code is None: - wrapper.list_dicom_import_jobs(datastore_id) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.list_dicom_import_jobs(datastore_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_search_mage_sets(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - search_filter = { - "filters": [ - { - "values": [ - {"createdAt": "2023-09-13T14:13:39.302000-04:00"}, - {"createdAt": "2023-09-13T14:13:39.302000-04:00"}, - ], - "operator": "BETWEEN", - } - ] - } - medical_imaging_stubber.stub_search_image_sets( - datastore_id, search_filter, error_code=error_code - ) - - if error_code is None: - wrapper.search_image_sets(datastore_id, search_filter) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.search_image_sets(datastore_id, search_filter) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_get_image_set(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - version_id = "1" - - medical_imaging_stubber.stub_get_image_set( - datastore_id, image_set_id, version_id, error_code=error_code - ) - - if error_code is None: - wrapper.get_image_set(datastore_id, image_set_id, version_id) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.get_image_set(datastore_id, image_set_id, version_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_get_image_set_metadata(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - test_file = "med-imag-test_file_1234.gzip" - medical_imaging_stubber.stub_get_image_set_metadata( - datastore_id, image_set_id, error_code=error_code - ) - - if error_code is None: - wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id) - assert os.path.exists(test_file) - os.remove(test_file) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_get_pixel_data(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - image_frame_id = "cccccc1234567890abcdef123456789" - test_file = "med-imag-test_file_789654.jph" - medical_imaging_stubber.stub_get_pixel_data( - datastore_id, image_set_id, image_frame_id, error_code=error_code - ) - - if error_code is None: - wrapper.get_pixel_data(test_file, datastore_id, image_set_id, image_frame_id) - assert os.path.exists(test_file) - os.remove(test_file) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.get_pixel_data( - test_file, datastore_id, image_set_id, image_frame_id - ) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_list_image_set_versions(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - - medical_imaging_stubber.stub_list_image_set_versions( - datastore_id, image_set_id, error_code=error_code - ) - - if error_code is None: - wrapper.list_image_set_versions(datastore_id, image_set_id) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.list_image_set_versions(datastore_id, image_set_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_update_image_set_metadata(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - version_id = "1" - force = False - metadata = { - "DICOMUpdates": { - "updatableAttributes": '{"SchemaVersion":1.1,"Patient":{"DICOM":{"PatientName":"Garcia^Gloria"}}}' - } - } - - medical_imaging_stubber.stub_update_image_set_metadata( - datastore_id, image_set_id, version_id, metadata, force, error_code=error_code - ) - - if error_code is None: - wrapper.update_image_set_metadata( - datastore_id, image_set_id, version_id, metadata, force - ) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.update_image_set_metadata( - datastore_id, image_set_id, version_id, metadata, force - ) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_copy_image_set_without_destination(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - version_id = "1" - new_image_set_id = "cccccc1234567890abcdef123456789" - - medical_imaging_stubber.stub_copy_image_set_without_destination( - datastore_id, image_set_id, version_id, new_image_set_id, error_code=error_code - ) - - if error_code is None: - wrapper.copy_image_set(datastore_id, image_set_id, version_id) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.copy_image_set(datastore_id, image_set_id, version_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_copy_image_set_with_destination(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - version_id = "1" - destination_image_set_id = "cccccc1234567890abcdef123456789" - destination_version_id = "1" - force = True - subset_Id = "cccccc1234567890abcdef123456789" - - medical_imaging_stubber.stub_copy_image_set_with_destination( - datastore_id, - image_set_id, - version_id, - destination_image_set_id, - destination_version_id, - force, - subset_Id, - error_code=error_code, - ) - - if error_code is None: - wrapper.copy_image_set( - datastore_id, - image_set_id, - version_id, - destination_image_set_id, - destination_version_id, - force, - [subset_Id], - ) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.copy_image_set( - datastore_id, - image_set_id, - version_id, - destination_image_set_id, - destination_version_id, - force, - [subset_Id], - ) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_delete_image_set(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - datastore_id = "abcdedf1234567890abcdef123456789" - image_set_id = "cccccc1234567890abcdef123456789" - - medical_imaging_stubber.stub_delete_image_set( - datastore_id, image_set_id, error_code=error_code - ) - - if error_code is None: - wrapper.delete_image_set(datastore_id, image_set_id) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.delete_image_set(datastore_id, image_set_id) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_tag_resource(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - resource_arn = ( - "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image" - "-set/cccccc1234567890abcdef123456789 " - ) - tags = {"test-key": "test-value"} - - medical_imaging_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code) - - if error_code is None: - wrapper.tag_resource(resource_arn, tags) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.tag_resource(resource_arn, tags) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_untag_resource(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - resource_arn = ( - "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image" - "-set/cccccc1234567890abcdef123456789 " - ) - tag_keys = ["test-key"] - - medical_imaging_stubber.stub_untag_resource( - resource_arn, tag_keys, error_code=error_code - ) - - if error_code is None: - wrapper.untag_resource(resource_arn, tag_keys) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.untag_resource(resource_arn, tag_keys) - assert exc_info.value.response["Error"]["Code"] == error_code - - -@pytest.mark.parametrize("error_code", [None, "TestException"]) -def test_list_tags_for_resource(make_stubber, error_code): - medical_imaging_client = boto3.client("medical-imaging") - medical_imaging_stubber = make_stubber(medical_imaging_client) - wrapper = MedicalImagingWrapper(medical_imaging_client) - resource_arn = ( - "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image" - "-set/cccccc1234567890abcdef123456789 " - ) - - medical_imaging_stubber.stub_list_tags_for_resource( - resource_arn, error_code=error_code - ) - - if error_code is None: - wrapper.list_tags_for_resource(resource_arn) - - else: - with pytest.raises(ClientError) as exc_info: - wrapper.list_tags_for_resource(resource_arn) - assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/test_tools/healthlake_stubber.py b/python/test_tools/healthlake_stubber.py new file mode 100644 index 00000000000..5d85af53f5a --- /dev/null +++ b/python/test_tools/healthlake_stubber.py @@ -0,0 +1,345 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the AWS HealthLake unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +import io +import json +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + +from datetime import timedelta, timezone, datetime + + +class HealthLakeStubber(ExampleStubber): + """ + A class that implements a variety of stub functions that are used by the + AWS HealthLake unit tests. + + The stubbed functions all expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, healthlake_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param healthlake_client: A Boto 3 AWS HealthLake client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(healthlake_client, use_stubs) + + def stub_create_fhir_datastore( + self, data_store_name: str, data_store_id: str, error_code: str = None + ) -> None: + expected_params = { + "DatastoreName": data_store_name, + "DatastoreTypeVersion": "R4", + } + + response = { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "CREATING", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + } + + self._stub_bifurcator( + "create_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_datastore( + self, data_store_id, error_code: str = None + ) -> None: + expected_params = {"DatastoreId": data_store_id} + + response = { + "DatastoreProperties": { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "ACTIVE", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + "CreatedAt": datetime.now(timezone.utc), + "DatastoreName": "datastore_name", + "DatastoreTypeVersion": "R4", + } + } + + self._stub_bifurcator( + "describe_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_datastores(self, error_code: str = None) -> None: + expected_params = {} + + response = { + "DatastorePropertiesList": [ + { + "DatastoreId": "6407b9ae4c2def3cb6f1a46a0Example", + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "ACTIVE", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/6407b9ae4c2def3cb6f1a46a0Example/r4/", + "CreatedAt": datetime.now(timezone.utc), + "DatastoreName": "datastore_name", + "DatastoreTypeVersion": "R4", + } + ] + } + + self._stub_bifurcator( + "list_fhir_datastores", expected_params, response, error_code=error_code + ) + + def stub_delete_fhir_datastore(self, data_store_id, error_code: str = None) -> None: + expected_params = {"DatastoreId": data_store_id} + + response = { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "DELETING", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + } + + self._stub_bifurcator( + "delete_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_start_fhir_import_job( + self, + job_name: str, + data_store_id: str, + input_s3_uri: str, + output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + error_code: str = None, + ) -> None: + expected_params = { + "JobName": job_name, + "InputDataConfig": { + "S3Uri": input_s3_uri, + }, + "DatastoreId": data_store_id, + "JobOutputDataConfig": { + "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id} + }, + "DatastoreId": data_store_id, + "DataAccessRoleArn": data_access_role_arn, + } + + response = { + "JobId": "my_import_job", + "JobStatus": "SUBMITTED", + "DatastoreId": data_store_id, + } + + self._stub_bifurcator( + "start_fhir_import_job", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_import_job( + self, datastore_id, job_id, error_code: str = None + ): + expected_params = {"DatastoreId": datastore_id, "JobId": job_id} + + response = { + "ImportJobProperties": { + "JobId": job_id, + "JobName": "my_import_job", + "JobStatus": "COMPLETED", + "DatastoreId": datastore_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "InputDataConfig": { + "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", + }, + "JobOutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/", + "KmsKeyId": "kms_key_id", + } + }, + "JobProgressReport": { + "TotalNumberOfScannedFiles": 123, + "TotalSizeOfScannedFilesInMB": 123.0, + "TotalNumberOfImportedFiles": 123, + "TotalNumberOfResourcesScanned": 123, + "TotalNumberOfResourcesImported": 123, + "TotalNumberOfResourcesWithCustomerError": 123, + "TotalNumberOfFilesReadWithCustomerError": 123, + "Throughput": 123.0, + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Import job completed successfully", + } + } + + self._stub_bifurcator( + "describe_fhir_import_job", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_import_jobs(self, data_store_id, error_code: str = None): + expected_params = {"DatastoreId": data_store_id} + + response = { + "ImportJobPropertiesList": [ + { + "JobId": "my_import_job", + "JobName": "my_import_job", + "JobStatus": "COMPLETED", + "DatastoreId": data_store_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "InputDataConfig": { + "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", + }, + "JobOutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/", + "KmsKeyId": "kms_key_id", + } + }, + "JobProgressReport": { + "TotalNumberOfScannedFiles": 123, + "TotalSizeOfScannedFilesInMB": 123.0, + "TotalNumberOfImportedFiles": 123, + "TotalNumberOfResourcesScanned": 123, + "TotalNumberOfResourcesImported": 123, + "TotalNumberOfResourcesWithCustomerError": 123, + "TotalNumberOfFilesReadWithCustomerError": 123, + "Throughput": 123.0, + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Import job completed successfully", + } + ] + } + + self._stub_bifurcator( + "list_fhir_import_jobs", expected_params, response, error_code=error_code + ) + + def stub_start_fhir_export_job( + self, + job_name: str, + data_store_id: str, + output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + error_code: str = None, + ) -> None: + expected_params = { + "JobName": job_name, + "OutputDataConfig": { + "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id} + }, + "DatastoreId": data_store_id, + "DataAccessRoleArn": data_access_role_arn, + } + + response = { + "JobId": "my_export_job", + "JobStatus": "SUBMITTED", + "DatastoreId": data_store_id, + } + + self._stub_bifurcator( + "start_fhir_export_job", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_export_jobs(self, data_store_id, error_code: str = None): + expected_params = {"DatastoreId": data_store_id} + + response = { + "ExportJobPropertiesList": [ + { + "JobId": "my_export_job", + "JobName": "my_export_job", + "JobStatus": "COMPLETED", + "DatastoreId": data_store_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "OutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/", + "KmsKeyId": "kms_key_id", + } + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Export job completed successfully", + } + ] + } + + self._stub_bifurcator( + "list_fhir_export_jobs", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_export_job( + self, datastore_id, job_id, error_code: str = None + ): + expected_params = {"DatastoreId": datastore_id, "JobId": job_id} + + response = { + "ExportJobProperties": { + "JobId": job_id, + "JobName": "my_export_job", + "JobStatus": "COMPLETED", + "DatastoreId": datastore_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "OutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/", + "KmsKeyId": "kms_key_id", + } + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Export job completed successfully", + } + } + + self._stub_bifurcator( + "describe_fhir_export_job", expected_params, response, error_code=error_code + ) + + def stub_tag_resource(self, resource_arn: str, tags: dict[str, str], error_code: str = None) -> None: + expected_params = { + "ResourceARN": resource_arn, + "Tags": tags, + } + + response = {} + + self._stub_bifurcator("tag_resource", expected_params, response, error_code=error_code) + + def stub_untag_resource(self, resource_arn: str, tag_keys: list[str], error_code: str = None) -> None: + expected_params = { + "ResourceARN": resource_arn, + "TagKeys": tag_keys, + } + response = {} + self._stub_bifurcator("untag_resource", expected_params, response, error_code=error_code) + + def stub_list_tags_for_resource(self, resource_arn: str, error_code: str = None) -> dict[str, str]: + expected_params = { + "ResourceARN": resource_arn, + } + + response = { + "Tags": [{"Key" :"test-key", "Value" : "test-value"}] + } + + self._stub_bifurcator( + "list_tags_for_resource", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py index e0164656a80..e21b17a232b 100644 --- a/python/test_tools/stubber_factory.py +++ b/python/test_tools/stubber_factory.py @@ -33,6 +33,7 @@ from test_tools.glacier_stubber import GlacierStubber from test_tools.glue_stubber import GlueStubber from test_tools.iam_stubber import IamStubber +from test_tools.healthlake_stubber import HealthLakeStubber from test_tools.keyspaces_stubber import KeyspacesStubber from test_tools.kinesis_stubber import KinesisStubber from test_tools.kinesis_analytics_v2_stubber import KinesisAnalyticsV2Stubber @@ -121,6 +122,8 @@ def stubber_factory(service_name): return GlueStubber elif service_name == "iam": return IamStubber + elif service_name == "healthlake": + return HealthLakeStubber elif service_name == "keyspaces": return KeyspacesStubber elif service_name == "kinesis":