diff --git a/docs/docs/services/osis.rst b/docs/docs/services/osis.rst
new file mode 100644
index 000000000000..e6c68dcdb30b
--- /dev/null
+++ b/docs/docs/services/osis.rst
@@ -0,0 +1,33 @@
+.. _implementedservice_osis:
+
+.. |start-h3| raw:: html
+
+
+
+.. |end-h3| raw:: html
+
+
+
+====
+osis
+====
+
+.. autoclass:: moto.osis.models.OpenSearchIngestionBackend
+
+|start-h3| Implemented features for this service |end-h3|
+
+- [X] create_pipeline
+- [X] delete_pipeline
+- [X] get_pipeline
+- [ ] get_pipeline_blueprint
+- [ ] get_pipeline_change_progress
+- [ ] list_pipeline_blueprints
+- [X] list_pipelines
+- [X] list_tags_for_resource
+- [X] start_pipeline
+- [X] stop_pipeline
+- [X] tag_resource
+- [X] untag_resource
+- [X] update_pipeline
+- [ ] validate_pipeline
+
diff --git a/moto/backend_index.py b/moto/backend_index.py
index 46d10cae806c..b939bd3adfdf 100644
--- a/moto/backend_index.py
+++ b/moto/backend_index.py
@@ -128,6 +128,7 @@
("opensearchserverless", re.compile("https?://aoss\\.(.+)\\.amazonaws\\.com")),
("opsworks", re.compile("https?://opsworks\\.us-east-1\\.amazonaws.com")),
("organizations", re.compile("https?://organizations\\.(.+)\\.amazonaws\\.com")),
+ ("osis", re.compile("https?://osis\\.(.+)\\.amazonaws\\.com")),
("panorama", re.compile("https?://panorama\\.(.+)\\.amazonaws.com")),
("personalize", re.compile("https?://personalize\\.(.+)\\.amazonaws\\.com")),
("pinpoint", re.compile("https?://pinpoint\\.(.+)\\.amazonaws\\.com")),
diff --git a/moto/backends.py b/moto/backends.py
index bb70dda5a250..4142a8ad659d 100644
--- a/moto/backends.py
+++ b/moto/backends.py
@@ -101,6 +101,7 @@
from moto.opensearchserverless.models import OpenSearchServiceServerlessBackend
from moto.opsworks.models import OpsWorksBackend
from moto.organizations.models import OrganizationsBackend
+ from moto.osis.models import OpenSearchIngestionBackend
from moto.personalize.models import PersonalizeBackend
from moto.pinpoint.models import PinpointBackend
from moto.polly.models import PollyBackend
@@ -280,6 +281,7 @@ def get_service_from_url(url: str) -> Optional[str]:
"Literal['opensearchserverless']",
"Literal['opsworks']",
"Literal['organizations']",
+ "Literal['osis']",
"Literal['personalize']",
"Literal['pinpoint']",
"Literal['polly']",
@@ -592,6 +594,10 @@ def get_backend(
@overload
def get_backend(name: "Literal['opsworks']") -> "BackendDict[OpsWorksBackend]": ...
@overload
+def get_backend(
+ name: "Literal['osis']",
+) -> "BackendDict[OpenSearchIngestionBackend]": ...
+@overload
def get_backend(
name: "Literal['organizations']",
) -> "BackendDict[OrganizationsBackend]": ...
diff --git a/moto/ec2/models/vpcs.py b/moto/ec2/models/vpcs.py
index a5f3c113f2a3..1ea59d8398ca 100644
--- a/moto/ec2/models/vpcs.py
+++ b/moto/ec2/models/vpcs.py
@@ -349,7 +349,7 @@ def __init__(
client_token: Optional[str] = None,
security_group_ids: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
- private_dns_enabled: Optional[str] = None,
+ private_dns_enabled: Optional[bool] = None,
destination_prefix_list_id: Optional[str] = None,
):
self.ec2_backend = ec2_backend
@@ -906,7 +906,7 @@ def create_vpc_endpoint(
client_token: Optional[str] = None,
security_group_ids: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
- private_dns_enabled: Optional[str] = None,
+ private_dns_enabled: Optional[bool] = None,
) -> VPCEndPoint:
vpc_endpoint_id = random_vpc_ep_id()
diff --git a/moto/moto_api/__init__.py b/moto/moto_api/__init__.py
index 28b10e140e61..6a2891331bc6 100644
--- a/moto/moto_api/__init__.py
+++ b/moto/moto_api/__init__.py
@@ -24,6 +24,9 @@
state_manager.register_default_transition(
model_name="glue::job_run", transition={"progression": "immediate"}
)
+state_manager.register_default_transition(
+ model_name="osis::pipeline", transition={"progression": "immediate"}
+)
state_manager.register_default_transition(
"s3::keyrestore", transition={"progression": "immediate"}
)
diff --git a/moto/osis/__init__.py b/moto/osis/__init__.py
new file mode 100644
index 000000000000..5214b56d0fd8
--- /dev/null
+++ b/moto/osis/__init__.py
@@ -0,0 +1 @@
+from .models import osis_backends # noqa: F401
diff --git a/moto/osis/exceptions.py b/moto/osis/exceptions.py
new file mode 100644
index 000000000000..9cd0355c62fc
--- /dev/null
+++ b/moto/osis/exceptions.py
@@ -0,0 +1,58 @@
+import json
+from typing import List, Optional
+
+from moto.core.exceptions import JsonRESTError
+
+
+class OpensearchIngestionExceptions(JsonRESTError):
+ pass
+
+
+class PipelineAlreadyExistsException(OpensearchIngestionExceptions):
+ def __init__(self, pipeline_name: str):
+ super().__init__(
+ "ResourceAlreadyExistsException",
+ f"Pipeline with given name {pipeline_name} already exists",
+ )
+ self.description = json.dumps({"message": self.message})
+
+
+class PipelineInvalidStateException(OpensearchIngestionExceptions):
+ def __init__(
+ self, action: str, valid_states: List[str], current_state: Optional[str]
+ ):
+ super().__init__(
+ "ConflictException",
+ f"Only pipelines with one of the following statuses are eligible for {action}: {valid_states}. The current status is {current_state}.",
+ )
+ self.description = json.dumps({"message": self.message})
+
+
+class PipelineNotFoundException(OpensearchIngestionExceptions):
+ def __init__(self, pipeline_name: str):
+ super().__init__(
+ "ResourceNotFoundException", f"Pipeline {pipeline_name} could not be found."
+ )
+ self.description = json.dumps({"message": self.message})
+
+
+class PipelineValidationException(OpensearchIngestionExceptions):
+ def __init__(self, message: str):
+ super().__init__("ValidationException", message)
+ self.description = json.dumps({"message": self.message})
+
+
+class InvalidVPCOptionsException(OpensearchIngestionExceptions):
+ def __init__(self, message: str) -> None:
+ super().__init__("ValidationException", f"Invalid VpcOptions: {message}")
+ self.description = json.dumps({"message": self.message})
+
+
+class SubnetNotFoundException(InvalidVPCOptionsException):
+ def __init__(self, subnet_id: str) -> None:
+ super().__init__(f"The subnet ID {subnet_id} does not exist")
+
+
+class SecurityGroupNotFoundException(InvalidVPCOptionsException):
+ def __init__(self, sg_id: str) -> None:
+ super().__init__(f"The security group {sg_id} does not exist")
diff --git a/moto/osis/models.py b/moto/osis/models.py
new file mode 100644
index 000000000000..b8f0ba01dea8
--- /dev/null
+++ b/moto/osis/models.py
@@ -0,0 +1,505 @@
+"""OpenSearchIngestionBackend class with methods for supported APIs."""
+
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional
+
+import yaml
+
+from moto.core.base_backend import BackendDict, BaseBackend
+from moto.core.common_models import BaseModel
+from moto.moto_api._internal import mock_random as random
+from moto.moto_api._internal.managed_state_model import ManagedState
+from moto.utilities.paginator import paginate
+from moto.utilities.tagging_service import TaggingService
+from moto.utilities.utils import get_partition
+
+from .exceptions import (
+ InvalidVPCOptionsException,
+ PipelineAlreadyExistsException,
+ PipelineInvalidStateException,
+ PipelineNotFoundException,
+ SecurityGroupNotFoundException,
+ SubnetNotFoundException,
+)
+
+if TYPE_CHECKING:
+ from moto.ec2.models import EC2Backend
+
+
+class Pipeline(ManagedState, BaseModel):
+ CREATING_REASON = "The pipeline is being created. It is not able to ingest data."
+ ACTIVE_REASON = "The pipeline is ready to ingest data."
+ DELETING_REASON = "The pipeline is being deleted"
+ STOPPING_REASON = "The pipeline is being stopped"
+ STOPPED_REASON = "The pipeline is stopped"
+ STARTING_REASON = "The pipeline is starting. It is not able to ingest data"
+ UPDATING_REASON = "An update was triggered for the pipeline. It is still available to ingest data."
+
+ STATUS_REASON_MAP: ClassVar[Dict[str, str]] = {
+ "CREATING": CREATING_REASON,
+ "ACTIVE": ACTIVE_REASON,
+ "STOPPING": STOPPING_REASON,
+ "STOPPED": STOPPED_REASON,
+ "STARTING": STARTING_REASON,
+ "UPDATING": UPDATING_REASON,
+ "DELETING": DELETING_REASON,
+ }
+
+ def __init__(
+ self,
+ pipeline_name: str,
+ account_id: str,
+ region: str,
+ min_units: int,
+ max_units: int,
+ pipeline_configuration_body: str,
+ log_publishing_options: Optional[Dict[str, Any]],
+ vpc_options: Optional[Dict[str, Any]],
+ buffer_options: Optional[Dict[str, Any]],
+ encryption_at_rest_options: Optional[Dict[str, Any]],
+ ingest_endpoint_urls: List[str],
+ serverless: bool,
+ vpc_endpoint_service: Optional[str],
+ vpc_endpoint: Optional[str],
+ vpc_id: Optional[str],
+ backend: "OpenSearchIngestionBackend",
+ ):
+ ManagedState.__init__(
+ self,
+ model_name="osis::pipeline",
+ transitions=[
+ ("CREATING", "ACTIVE"),
+ ("UPDATING", "ACTIVE"),
+ ("DELETING", "DELETED"),
+ ("STOPPING", "STOPPED"),
+ ("STARTING", "ACTIVE"),
+ ],
+ )
+
+ self.pipeline_name = pipeline_name
+ self.account_id = account_id
+ self.region = region
+ self.min_units = min_units
+ self.max_units = max_units
+ self.pipeline_configuration_body_str = pipeline_configuration_body
+ self.pipeline_configuration_body = yaml.safe_load(pipeline_configuration_body)
+ self.log_publishing_options = log_publishing_options
+ self.vpc_options = vpc_options
+ self.buffer_options = buffer_options
+ self.encryption_at_rest_options = encryption_at_rest_options
+ self.ingest_endpoint_urls = ingest_endpoint_urls
+ self.serverless = serverless
+ self.vpc_endpoint_service = vpc_endpoint_service
+ self.vpc_endpoint = vpc_endpoint
+ self.vpc_id = vpc_id
+ self.backend = backend
+
+ self.status = "CREATING"
+ self.arn = self._get_arn(self.pipeline_name)
+ self.destinations = self._update_destinations()
+
+ if (
+ self.vpc_options is None
+ or self.vpc_options.get("VpcEndpointManagement", "SERVICE") == "SERVICE"
+ ):
+ # Not returned in this case
+ self.vpc_endpoint_service = None
+
+ self.service_vpc_endpoints = self._get_service_vpc_endpoints()
+ self.created_at: datetime = datetime.now()
+ self.last_updated_at: datetime = datetime.now()
+
+ def _get_arn(self, name: str) -> str:
+ return f"arn:{get_partition(self.region)}:osis:{self.region}:{self.account_id}:pipeline/{name}"
+
+ def _get_service_vpc_endpoints(self) -> Optional[List[Dict[str, str]]]:
+ # ServiceVpcEndpoint.VpcEndpointId not implemented
+ if self.serverless:
+ return [{"ServiceName": "OPENSEARCH_SERVERLESS"}]
+ else:
+ return None
+
+ def _update_destinations(self) -> List[Dict[str, str]]:
+ destinations = []
+ for sub_pipeline in self.pipeline_configuration_body:
+ if sub_pipeline != "version":
+ for sink in self.pipeline_configuration_body[sub_pipeline]["sink"]:
+ for sink_type, sink_config in sink.items():
+ if sink_type == "opensearch":
+ if sink_config["aws"].get("serverless") is True:
+ service_name = "OpenSearch_Serverless"
+ else:
+ service_name = "OpenSearch"
+ endpoint = sink_config["hosts"][0]
+ elif sink_type == "s3":
+ service_name = "S3"
+ endpoint = sink_config["bucket"]
+ else:
+ continue
+ destinations.append(
+ {"ServiceName": service_name, "Endpoint": endpoint}
+ )
+ return destinations
+
+ @staticmethod
+ def is_serverless(pipeline_body: Dict[str, Any]) -> bool: # type: ignore[misc]
+ serverless = False
+ for sub_pipeline in pipeline_body:
+ if sub_pipeline != "version":
+ for sink in pipeline_body[sub_pipeline]["sink"]:
+ for _, sink_config in sink.items():
+ serverless = (
+ sink_config.get("aws", {}).get("serverless", False)
+ or serverless
+ )
+ source_type = list(pipeline_body[sub_pipeline]["source"].keys())[0]
+ source_config = pipeline_body[sub_pipeline]["source"][source_type]
+ serverless = (
+ source_config.get("aws", {}).get("serverless", False) or serverless
+ )
+ return serverless
+
+ def delete(self) -> None:
+ self.status = "DELETING"
+ self.set_last_updated()
+
+ def get_created_at(self) -> str:
+ return self.created_at.astimezone().isoformat()
+
+ def get_last_updated_at(self) -> str:
+ return self.last_updated_at.astimezone().isoformat()
+
+ def set_last_updated(self) -> None:
+ self.last_updated_at = datetime.now()
+
+ def start(self) -> None:
+ self.status = "STARTING"
+ self.set_last_updated()
+
+ def stop(self) -> None:
+ self.status = "STOPPING"
+ self.set_last_updated()
+
+ def update(
+ self,
+ min_units: Optional[int],
+ max_units: Optional[int],
+ pipeline_configuration_body: Optional[str],
+ log_publishing_options: Optional[Dict[str, Any]],
+ buffer_options: Optional[Dict[str, Any]],
+ encryption_at_rest_options: Optional[Dict[str, Any]],
+ ) -> None:
+ if min_units is not None:
+ self.min_units = min_units
+ if max_units is not None:
+ self.max_units = max_units
+ if pipeline_configuration_body is not None:
+ self.pipeline_configuration_body_str = pipeline_configuration_body
+ self.pipeline_configuration_body = yaml.safe_load(
+ pipeline_configuration_body
+ )
+ if log_publishing_options is not None:
+ self.log_publishing_options = log_publishing_options
+ if buffer_options is not None:
+ self.buffer_options = buffer_options
+ if encryption_at_rest_options is not None:
+ self.encryption_at_rest_options = encryption_at_rest_options
+ self.destinations = self._update_destinations()
+ self.serverless = self.is_serverless(self.pipeline_configuration_body)
+ self.service_vpc_endpoints = self._get_service_vpc_endpoints()
+ self.status = "UPDATING"
+ self.set_last_updated()
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "PipelineName": self.pipeline_name,
+ "PipelineArn": self.arn,
+ "MinUnits": self.min_units,
+ "MaxUnits": self.max_units,
+ "Status": self.status,
+ "StatusReason": {
+ "Description": self.STATUS_REASON_MAP.get(self.status or "", ""),
+ },
+ "PipelineConfigurationBody": self.pipeline_configuration_body_str,
+ "CreatedAt": self.get_created_at(),
+ "LastUpdatedAt": self.get_last_updated_at(),
+ "IngestEndpointUrls": self.ingest_endpoint_urls,
+ "LogPublishingOptions": self.log_publishing_options,
+ "VpcEndpoints": None
+ if self.vpc_options is None
+ else [
+ {
+ "VpcEndpointId": self.vpc_endpoint,
+ "VpcId": self.vpc_id,
+ "VpcOptions": self.vpc_options,
+ }
+ ],
+ "BufferOptions": self.buffer_options,
+ "EncryptionAtRestOptions": self.encryption_at_rest_options,
+ "VpcEndpointService": self.vpc_endpoint_service,
+ "ServiceVpcEndpoints": self.service_vpc_endpoints,
+ "Destinations": self.destinations,
+ "Tags": self.backend.list_tags_for_resource(self.arn)["Tags"],
+ }
+
+ def to_short_dict(self) -> Dict[str, Any]:
+ return {
+ "Status": self.status,
+ "StatusReason": {
+ "Description": self.STATUS_REASON_MAP.get(self.status or "", ""),
+ },
+ "PipelineName": self.pipeline_name,
+ "PipelineArn": self.arn,
+ "MinUnits": self.min_units,
+ "MaxUnits": self.max_units,
+ "CreatedAt": self.get_created_at(),
+ "LastUpdatedAt": self.get_last_updated_at(),
+ "Destinations": self.destinations,
+ "Tags": self.backend.list_tags_for_resource(self.arn)["Tags"],
+ }
+
+
+class OpenSearchIngestionBackend(BaseBackend):
+ """Implementation of OpenSearchIngestion APIs."""
+
+ PAGINATION_MODEL = {
+ "list_pipelines": {
+ "input_token": "next_token",
+ "limit_key": "max_results",
+ "limit_default": 100,
+ "unique_attribute": "PipelineName",
+ },
+ }
+
+ PIPELINE_DELETE_VALID_STATES = [
+ "UPDATE_FAILED",
+ "ACTIVE",
+ "START_FAILED",
+ "STOPPED",
+ "CREATE_FAILED",
+ ]
+ PIPELINE_STOP_VALID_STATES = ["UPDATE_FAILED", "ACTIVE"]
+ PIPELINE_START_VALID_STATES = ["START_FAILED", "STOPPED"]
+ PIPELINE_UPDATE_VALID_STATES = [
+ "UPDATE_FAILED",
+ "ACTIVE",
+ "START_FAILED",
+ "STOPPED",
+ ]
+
+ def __init__(self, region_name: str, account_id: str):
+ super().__init__(region_name, account_id)
+ self._pipelines: Dict[str, Pipeline] = dict()
+ self.tagger = TaggingService()
+
+ @property
+ def ec2_backend(self) -> "EC2Backend": # type: ignore[misc]
+ from moto.ec2 import ec2_backends
+
+ return ec2_backends[self.account_id][self.region_name]
+
+ @property
+ def pipelines(self) -> Dict[str, Pipeline]:
+ self._pipelines = {
+ name: pipeline
+ for name, pipeline in self._pipelines.items()
+ if pipeline.status != "DELETED"
+ }
+ return self._pipelines
+
+ def _get_ingest_endpoint_urls(
+ self, pipeline_name: str, endpoint_random_string: str
+ ) -> List[str]:
+ return [
+ f"{pipeline_name}-{endpoint_random_string}.{self.region_name}.osis.amazonaws.com"
+ ]
+
+ def _get_random_endpoint_string(self) -> str:
+ return random.get_random_string(length=26, lower_case=True)
+
+ def _get_vpc_endpoint(
+ self, vpc_id: str, vpc_options: Dict[str, Any], service_name: str
+ ) -> Optional[str]:
+ if vpc_options.get("VpcEndpointManagement", "SERVICE") == "SERVICE":
+ service_managed_endpoint = self.ec2_backend.create_vpc_endpoint(
+ vpc_id=vpc_id,
+ service_name=service_name,
+ endpoint_type="Interface",
+ security_group_ids=vpc_options.get("SecurityGroupIds"),
+ subnet_ids=vpc_options["SubnetIds"],
+ private_dns_enabled=False,
+ policy_document="OSIS Test Doc",
+ route_table_ids=[],
+ tags={"OSISManaged": "true"},
+ )
+ return service_managed_endpoint.id
+ else:
+ return None
+
+ def _get_vpc_endpoint_service(
+ self, pipeline_name: str, endpoint_random_string: str
+ ) -> str:
+ return f"com.amazonaws.osis.{self.region_name}.{pipeline_name}-{endpoint_random_string}"
+
+ def _validate_and_get_vpc(self, vpc_options: Dict[str, Any]) -> str:
+ from moto.ec2.exceptions import InvalidSubnetIdError
+
+ vpc_id = ""
+ for subnet_id in vpc_options["SubnetIds"]:
+ try:
+ subnet = self.ec2_backend.get_subnet(subnet_id)
+ except InvalidSubnetIdError:
+ # re-raising for more accurate error message
+ raise SubnetNotFoundException(subnet_id)
+ if vpc_id == "":
+ vpc_id = subnet.vpc_id
+ else:
+ if subnet.vpc_id != vpc_id:
+ raise InvalidVPCOptionsException(
+ "All specified subnets must belong to the same VPC."
+ )
+
+ for sg_id in vpc_options["SecurityGroupIds"]:
+ sg = self.ec2_backend.get_security_group_from_id(sg_id)
+ if sg is None:
+ raise SecurityGroupNotFoundException(sg_id)
+
+ return vpc_id
+
+ def create_pipeline(
+ self,
+ pipeline_name: str,
+ min_units: int,
+ max_units: int,
+ pipeline_configuration_body: str,
+ log_publishing_options: Optional[Dict[str, Any]],
+ vpc_options: Optional[Dict[str, Any]],
+ buffer_options: Optional[Dict[str, bool]],
+ encryption_at_rest_options: Optional[Dict[str, Any]],
+ tags: List[Dict[str, str]],
+ ) -> Pipeline:
+ if pipeline_name in self.pipelines:
+ raise PipelineAlreadyExistsException(pipeline_name)
+
+ serverless = Pipeline.is_serverless(yaml.safe_load(pipeline_configuration_body))
+
+ endpoint_random_string = self._get_random_endpoint_string()
+ endpoint_service = self._get_vpc_endpoint_service(
+ pipeline_name, endpoint_random_string
+ )
+
+ ingestion_endpoint_urls = self._get_ingest_endpoint_urls(
+ pipeline_name, endpoint_random_string
+ )
+
+ vpc_endpoint = None
+ vpc_id = None
+ if vpc_options is not None:
+ vpc_id = self._validate_and_get_vpc(vpc_options)
+ vpc_endpoint = self._get_vpc_endpoint(vpc_id, vpc_options, endpoint_service)
+
+ pipeline = Pipeline(
+ pipeline_name,
+ self.account_id,
+ self.region_name,
+ min_units,
+ max_units,
+ pipeline_configuration_body,
+ log_publishing_options,
+ vpc_options,
+ buffer_options,
+ encryption_at_rest_options,
+ ingestion_endpoint_urls,
+ serverless,
+ endpoint_service,
+ vpc_endpoint,
+ vpc_id,
+ backend=self,
+ )
+ self.pipelines[pipeline_name] = pipeline
+ self.tag_resource(pipeline.arn, tags)
+ return pipeline
+
+ def delete_pipeline(self, pipeline_name: str) -> None:
+ if pipeline_name not in self.pipelines:
+ raise PipelineNotFoundException(pipeline_name)
+ pipeline = self.pipelines[pipeline_name]
+ if pipeline.status not in self.PIPELINE_DELETE_VALID_STATES:
+ raise PipelineInvalidStateException(
+ "deletion", self.PIPELINE_DELETE_VALID_STATES, pipeline.status
+ )
+ pipeline.delete()
+
+ def start_pipeline(self, pipeline_name: str) -> Pipeline:
+ if pipeline_name not in self.pipelines:
+ raise PipelineNotFoundException(pipeline_name)
+ pipeline = self.pipelines[pipeline_name]
+ if pipeline.status not in self.PIPELINE_START_VALID_STATES:
+ raise PipelineInvalidStateException(
+ "starting", self.PIPELINE_START_VALID_STATES, pipeline.status
+ )
+ pipeline.start()
+ return pipeline
+
+ def stop_pipeline(self, pipeline_name: str) -> Pipeline:
+ if pipeline_name not in self.pipelines:
+ raise PipelineNotFoundException(pipeline_name)
+ pipeline = self.pipelines[pipeline_name]
+ if pipeline.status not in self.PIPELINE_STOP_VALID_STATES:
+ raise PipelineInvalidStateException(
+ "stopping", self.PIPELINE_STOP_VALID_STATES, pipeline.status
+ )
+ pipeline.stop()
+ return pipeline
+
+ def get_pipeline(self, pipeline_name: str) -> Pipeline:
+ if pipeline_name not in self.pipelines:
+ raise PipelineNotFoundException(pipeline_name)
+ pipeline = self.pipelines[pipeline_name]
+ pipeline.advance()
+ return pipeline
+
+ @paginate(pagination_model=PAGINATION_MODEL) # type: ignore
+ def list_pipelines(self) -> List[Pipeline]:
+ for pipeline in self.pipelines.values():
+ pipeline.advance()
+ return [p for p in self.pipelines.values()]
+
+ def list_tags_for_resource(self, arn: str) -> Dict[str, List[Dict[str, str]]]:
+ return self.tagger.list_tags_for_resource(arn)
+
+ def update_pipeline(
+ self,
+ pipeline_name: str,
+ min_units: Optional[int],
+ max_units: Optional[int],
+ pipeline_configuration_body: Optional[str],
+ log_publishing_options: Optional[Dict[str, Any]],
+ buffer_options: Optional[Dict[str, Any]],
+ encryption_at_rest_options: Optional[Dict[str, Any]],
+ ) -> Pipeline:
+ if pipeline_name not in self.pipelines:
+ raise PipelineNotFoundException(pipeline_name)
+ pipeline = self.pipelines[pipeline_name]
+ if pipeline.status not in self.PIPELINE_UPDATE_VALID_STATES:
+ raise PipelineInvalidStateException(
+ "updates", self.PIPELINE_UPDATE_VALID_STATES, pipeline.status
+ )
+ pipeline.update(
+ min_units,
+ max_units,
+ pipeline_configuration_body,
+ log_publishing_options,
+ buffer_options,
+ encryption_at_rest_options,
+ )
+ return pipeline
+
+ def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
+ self.tagger.tag_resource(arn, tags)
+
+ def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
+ self.tagger.untag_resource_using_names(arn, tag_keys)
+
+
+osis_backends = BackendDict(OpenSearchIngestionBackend, "osis")
diff --git a/moto/osis/responses.py b/moto/osis/responses.py
new file mode 100644
index 000000000000..90ee4155fd19
--- /dev/null
+++ b/moto/osis/responses.py
@@ -0,0 +1,128 @@
+"""Handles incoming osis requests, invokes methods, returns responses."""
+
+import json
+
+from moto.core.responses import BaseResponse
+
+from .models import OpenSearchIngestionBackend, osis_backends
+
+
+class OpenSearchIngestionResponse(BaseResponse):
+ """Handler for OpenSearchIngestion requests and responses."""
+
+ def __init__(self) -> None:
+ super().__init__(service_name="osis")
+
+ @property
+ def osis_backend(self) -> OpenSearchIngestionBackend:
+ """Return backend instance specific for this region."""
+ return osis_backends[self.current_account][self.region]
+
+ def create_pipeline(self) -> str:
+ params = json.loads(self.body)
+ pipeline_name = params.get("PipelineName")
+ min_units = params.get("MinUnits")
+ max_units = params.get("MaxUnits")
+ pipeline_configuration_body = params.get("PipelineConfigurationBody")
+ log_publishing_options = params.get("LogPublishingOptions")
+ vpc_options = params.get("VpcOptions")
+ buffer_options = params.get("BufferOptions")
+ encryption_at_rest_options = params.get("EncryptionAtRestOptions")
+ tags = params.get("Tags")
+ pipeline = self.osis_backend.create_pipeline(
+ pipeline_name=pipeline_name,
+ min_units=min_units,
+ max_units=max_units,
+ pipeline_configuration_body=pipeline_configuration_body,
+ log_publishing_options=log_publishing_options,
+ vpc_options=vpc_options,
+ buffer_options=buffer_options,
+ encryption_at_rest_options=encryption_at_rest_options,
+ tags=tags,
+ )
+ return json.dumps(dict(Pipeline=pipeline.to_dict()))
+
+ def delete_pipeline(self) -> str:
+ pipeline_name = self._get_param("PipelineName")
+ self.osis_backend.delete_pipeline(
+ pipeline_name=pipeline_name,
+ )
+ return json.dumps(dict())
+
+ def get_pipeline(self) -> str:
+ pipeline_name = self._get_param("PipelineName")
+ pipeline = self.osis_backend.get_pipeline(
+ pipeline_name=pipeline_name,
+ )
+ return json.dumps(dict(Pipeline=pipeline.to_dict()))
+
+ def list_pipelines(self) -> str:
+ max_results = self._get_int_param("MaxResults")
+ next_token = self._get_param("NextToken")
+ pipelines, next_token = self.osis_backend.list_pipelines(
+ max_results=max_results,
+ next_token=next_token,
+ )
+ return json.dumps(
+ dict(nextToken=next_token, Pipelines=[p.to_short_dict() for p in pipelines])
+ )
+
+ def list_tags_for_resource(self) -> str:
+ arn = self._get_param("arn")
+ tags = self.osis_backend.list_tags_for_resource(arn=arn)
+ return json.dumps(dict(tags))
+
+ def update_pipeline(self) -> str:
+ params = json.loads(self.body)
+ pipeline_name = self.path.split("/")[-1]
+ min_units = params.get("MinUnits")
+ max_units = params.get("MaxUnits")
+ pipeline_configuration_body = params.get("PipelineConfigurationBody")
+ log_publishing_options = params.get("LogPublishingOptions")
+ buffer_options = params.get("BufferOptions")
+ encryption_at_rest_options = params.get("EncryptionAtRestOptions")
+ pipeline = self.osis_backend.update_pipeline(
+ pipeline_name=pipeline_name,
+ min_units=min_units,
+ max_units=max_units,
+ pipeline_configuration_body=pipeline_configuration_body,
+ log_publishing_options=log_publishing_options,
+ buffer_options=buffer_options,
+ encryption_at_rest_options=encryption_at_rest_options,
+ )
+ # TODO: adjust response
+ return json.dumps(dict(Pipeline=pipeline.to_dict()))
+
+ def tag_resource(self) -> str:
+ params = json.loads(self.body)
+ arn = self._get_param("arn")
+ tags = params.get("Tags")
+ self.osis_backend.tag_resource(
+ arn=arn,
+ tags=tags,
+ )
+ return json.dumps(dict())
+
+ def untag_resource(self) -> str:
+ params = json.loads(self.body)
+ arn = self._get_param("arn")
+ tag_keys = params.get("TagKeys")
+ self.osis_backend.untag_resource(
+ arn=arn,
+ tag_keys=tag_keys,
+ )
+ return json.dumps(dict())
+
+ def start_pipeline(self) -> str:
+ pipeline_name = self._get_param("PipelineName")
+ pipeline = self.osis_backend.start_pipeline(
+ pipeline_name=pipeline_name,
+ )
+ return json.dumps(dict(Pipeline=pipeline.to_dict()))
+
+ def stop_pipeline(self) -> str:
+ pipeline_name = self._get_param("PipelineName")
+ pipeline = self.osis_backend.stop_pipeline(
+ pipeline_name=pipeline_name,
+ )
+ return json.dumps(dict(Pipeline=pipeline.to_dict()))
diff --git a/moto/osis/urls.py b/moto/osis/urls.py
new file mode 100644
index 000000000000..00d6a44c9395
--- /dev/null
+++ b/moto/osis/urls.py
@@ -0,0 +1,20 @@
+"""osis base URL and path."""
+
+from .responses import OpenSearchIngestionResponse
+
+url_bases = [
+ r"https?://osis\.(.+)\.amazonaws\.com",
+]
+
+url_paths = {
+ "{0}/2022-01-01/osis/createPipeline$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/deletePipeline/(?P[^/]+)$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/getPipeline/(?P[^/]+)$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/startPipeline/(?P[^/]+)$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/stopPipeline/(?P[^/]+)$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/listPipelines$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/listTagsForResource/$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/updatePipeline/(?P[^/]+)$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/tagResource/$": OpenSearchIngestionResponse.dispatch,
+ "{0}/2022-01-01/osis/untagResource/$": OpenSearchIngestionResponse.dispatch,
+}
diff --git a/tests/test_osis/__init__.py b/tests/test_osis/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/tests/test_osis/test_osis.py b/tests/test_osis/test_osis.py
new file mode 100644
index 000000000000..fe85e1cc550e
--- /dev/null
+++ b/tests/test_osis/test_osis.py
@@ -0,0 +1,760 @@
+"""Unit tests for osis-supported APIs."""
+
+import json
+
+import boto3
+import pytest
+import requests
+from botocore.exceptions import ClientError
+
+from moto import mock_aws, settings
+from moto.moto_api import state_manager
+
+# See our Development Tips on writing tests for hints on how to write good tests:
+# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
+
+BASIC_PIPELINE_KWARGS = {
+ "PipelineName": "test",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": """version: "2"\nopensearch-migration-pipeline:\n source:
+ \n opensearch:\n acknowledgments: true
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n indices:\n exclude:\n - index_name_regex: \'\\..*\'\n aws:
+ \n region: "eu-west-1"\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n serverless: false\n sink:\n - opensearch:
+ \n hosts: ["https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com"]
+ \n aws:\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n region: "eu-west-1"\n serverless: true\n""",
+}
+
+
+@mock_aws
+def test_create_pipeline():
+ set_transition()
+ client = boto3.client("osis", region_name="eu-west-1")
+ resp = client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ assert resp["PipelineName"] == "test"
+ assert resp["PipelineArn"] == "arn:aws:osis:eu-west-1:123456789012:pipeline/test"
+ assert resp["MinUnits"] == 2
+ assert resp["MaxUnits"] == 4
+ assert resp["Status"] == "ACTIVE"
+ assert (
+ resp["StatusReason"]["Description"] == "The pipeline is ready to ingest data."
+ )
+ assert (
+ resp["PipelineConfigurationBody"]
+ == BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"]
+ )
+ assert (
+ ".eu-west-1.osis.amazonaws.com" in resp["IngestEndpointUrls"][0]
+ and "test" in resp["IngestEndpointUrls"][0]
+ )
+ assert resp["ServiceVpcEndpoints"][0]["ServiceName"] == "OPENSEARCH_SERVERLESS"
+ assert resp["Destinations"][0]["ServiceName"] == "OpenSearch_Serverless"
+ assert (
+ resp["Destinations"][0]["Endpoint"]
+ == "https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com"
+ )
+ assert "VpcEndpointService" not in resp
+ assert "VpcOptions" not in resp
+ assert resp["Tags"] == []
+
+ ec2 = boto3.resource("ec2", region_name="eu-west-1")
+ vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24")
+ subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.28.7.192/26")
+ sg = ec2.create_security_group(
+ GroupName="test-group", Description="Test security group sg01"
+ )
+
+ kwargs = {
+ "PipelineName": "test-2",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": """log-pipeline:\n processor:\n - date:
+ \n destination: \'@timestamp\'\n from_time_received: true\n - delete_entries:
+ \n with_keys:\n - s3\n sink:\n - opensearch:\n aws:
+ \n region: eu-west-1\n serverless: true
+ \n sts_role_arn: arn:aws:iam::123456789012:role/MyRole\n hosts:
+ \n - https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com
+ \n index: uncompressed_logs\n - opensearch:\n aws:\n region: eu-west-1
+ \n serverless: false\n sts_role_arn: arn:aws:iam::123456789012:role/MyRole
+ \n hosts:
+ \n - https://vpc-c7ntest-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com
+ \n index: uncompressed_logs\n dlq:\n s3:\n bucket: "dlq-bucket"
+ \n region: "eu-west-1"
+ \n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"\n - s3:\n aws:
+ \n region: eu-west-1\n serverless: false
+ \n sts_role_arn: arn:aws:iam::123456789012:role/MyRole
+ \n bucket: test-s3-bucket-2\n threshold: 3\n codec: json\n - s3:
+ \n aws:\n region: eu-west-1\n serverless: false
+ \n sts_role_arn: arn:aws:iam::123456789012:role/MyRole
+ \n bucket: test-s3-bucket-1\n threshold: 3\n codec: json\n source:
+ \n s3:\n acknowledgments: true\n aws:\n region: eu-west-1
+ \n sts_role_arn: arn:aws:iam::123456789012:role/MyRole\n codec:
+ \n newline: null\n compression: none\n scan:\n buckets:
+ \n - bucket:\n name: test-s3-bucket-2\nlog-pipeline-2:\n processor:
+ \n - date:\n destination: \'@timestamp\'\n from_time_received: true
+ \n - delete_entries:\n with_keys:\n - s3\n sink:\n - pipeline:
+ \n name: "log-to-metrics-pipeline"\n - opensearch:\n aws:
+ \n region: eu-west-1\n serverless: false
+ \n sts_role_arn: arn:aws:iam::123456789012:role/MyRole\n hosts:
+ \n - https://vpc-c7ntest-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com
+ \n index: uncompressed_logs\n - s3:\n aws:\n region: eu-west-1
+ \n serverless: false\n sts_role_arn: arn:aws:iam::123456789012:role/MyRole
+ \n bucket: test-s3-bucket-1\n threshold: 3\n codec: json\n source:
+ \n pipeline:\n name: "apache-log-pipeline-with-metrics"\nversion: \'2\'""",
+ "LogPublishingOptions": {
+ "IsLoggingEnabled": True,
+ "CloudWatchLogDestination": {
+ "LogGroup": "/aws/osis/test",
+ },
+ },
+ "VpcOptions": {
+ "SubnetIds": [subnet.id],
+ "SecurityGroupIds": [sg.id],
+ "VpcEndpointManagement": "SERVICE",
+ "VpcAttachmentOptions": {
+ "AttachToVpc": True,
+ "CidrBlock": "172.168.1.1",
+ },
+ },
+ "BufferOptions": {
+ "PersistentBufferEnabled": True,
+ },
+ "EncryptionAtRestOptions": {
+ "KmsKeyArn": "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012",
+ },
+ "Tags": [
+ {
+ "Key": "TestKey",
+ "Value": "TestValue",
+ }
+ ],
+ }
+ resp = client.create_pipeline(**kwargs)["Pipeline"]
+ assert resp["PipelineName"] == "test-2"
+ assert "CreatedAt" in resp
+ assert "LastUpdatedAt" in resp
+ assert resp["LogPublishingOptions"]["IsLoggingEnabled"]
+ assert (
+ resp["LogPublishingOptions"]["CloudWatchLogDestination"]["LogGroup"]
+ == "/aws/osis/test"
+ )
+ assert resp["VpcEndpoints"][0]["VpcOptions"]["SubnetIds"] == [subnet.id]
+ assert resp["VpcEndpoints"][0]["VpcOptions"]["SecurityGroupIds"] == [sg.id]
+ assert resp["VpcEndpoints"][0]["VpcOptions"]["VpcEndpointManagement"] == "SERVICE"
+ assert resp["VpcEndpoints"][0]["VpcOptions"]["VpcAttachmentOptions"]["AttachToVpc"]
+ assert (
+ resp["VpcEndpoints"][0]["VpcOptions"]["VpcAttachmentOptions"]["CidrBlock"]
+ == "172.168.1.1"
+ )
+ assert (
+ resp["VpcEndpoints"][0]["VpcEndpointId"]
+ and resp["VpcEndpoints"][0]["VpcId"] == vpc.id
+ )
+ assert resp["BufferOptions"]["PersistentBufferEnabled"]
+ assert (
+ resp["EncryptionAtRestOptions"]["KmsKeyArn"]
+ == "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012"
+ )
+ assert "VpcEndpointService" not in resp
+ assert resp["ServiceVpcEndpoints"][0]["ServiceName"] == "OPENSEARCH_SERVERLESS"
+ assert resp["Destinations"] == [
+ {
+ "ServiceName": "OpenSearch_Serverless",
+ "Endpoint": "https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com",
+ },
+ {
+ "ServiceName": "OpenSearch",
+ "Endpoint": "https://vpc-c7ntest-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com",
+ },
+ {"ServiceName": "S3", "Endpoint": "test-s3-bucket-2"},
+ {"ServiceName": "S3", "Endpoint": "test-s3-bucket-1"},
+ {
+ "ServiceName": "OpenSearch",
+ "Endpoint": "https://vpc-c7ntest-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com",
+ },
+ {"ServiceName": "S3", "Endpoint": "test-s3-bucket-1"},
+ ]
+
+ assert (
+ resp["Tags"][0]["Key"] == "TestKey" and resp["Tags"][0]["Value"] == "TestValue"
+ )
+
+ assert (
+ boto3.client("ec2", region_name="eu-west-1").describe_vpc_endpoints(
+ VpcEndpointIds=[resp["VpcEndpoints"][0]["VpcEndpointId"]]
+ )["VpcEndpoints"]
+ != []
+ )
+
+
+@mock_aws
+def test_create_pipeline_customer_endpoint():
+ set_transition({"progression": "manual", "times": 1})
+ client = boto3.client("osis", region_name="eu-west-1")
+ ec2 = boto3.resource("ec2", region_name="eu-west-1")
+ vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24")
+ subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.28.7.192/26")
+ sg = ec2.create_security_group(
+ GroupName="test-group", Description="Test security group sg01"
+ )
+ kwargs = {
+ "PipelineName": "test",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": """version: "2"\nopensearch-migration-pipeline:\n source:
+ \n opensearch:\n acknowledgments: true
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n indices:\n exclude:\n - index_name_regex: \'\\..*\'\n aws:
+ \n region: "eu-west-1"\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n serverless: false\n sink:\n - opensearch:
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n aws:\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n region: "eu-west-1"\n serverless: false\n""",
+ "VpcOptions": {
+ "SubnetIds": [subnet.id],
+ "SecurityGroupIds": [sg.id],
+ "VpcEndpointManagement": "CUSTOMER",
+ },
+ }
+ resp = client.create_pipeline(**kwargs)["Pipeline"]
+ assert resp["PipelineName"] == "test"
+ assert "VpcEndpointService" in resp
+ assert "ServiceVpcEndpoints" not in resp
+ assert resp["VpcEndpoints"][0]["VpcOptions"]["VpcEndpointManagement"] == "CUSTOMER"
+ assert "VpcEndpointId" not in resp["VpcEndpoints"][0]
+ assert "VpcAttachmentOptions" not in resp["VpcEndpoints"][0]
+ assert resp["Status"] == "CREATING"
+
+
+@mock_aws
+def test_create_pipeline_error():
+ set_transition()
+ client = boto3.client("osis", region_name="eu-west-1")
+ kwargs = {
+ "PipelineName": "test",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"],
+ }
+ client.create_pipeline(**kwargs)["Pipeline"]
+ with pytest.raises(ClientError) as exc:
+ client.create_pipeline(**kwargs)
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceAlreadyExistsException"
+
+ kwargs["PipelineName"] = "test-2"
+ kwargs["VpcOptions"] = {}
+ kwargs["VpcOptions"]["SubnetIds"] = ["subnet-12345678901234567"]
+
+ with pytest.raises(ClientError) as exc:
+ client.create_pipeline(**kwargs)
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == "Invalid VpcOptions: The subnet ID subnet-12345678901234567 does not exist"
+ )
+
+ ec2 = boto3.resource("ec2", region_name="eu-west-1")
+ vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24")
+ subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.28.7.192/26")
+
+ kwargs["VpcOptions"]["SubnetIds"] = [subnet.id]
+ kwargs["VpcOptions"]["SecurityGroupIds"] = ["sg-12345678901234567"]
+
+ with pytest.raises(ClientError) as exc:
+ client.create_pipeline(**kwargs)
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == "Invalid VpcOptions: The security group sg-12345678901234567 does not exist"
+ )
+
+ kwargs["VpcOptions"].pop("SecurityGroupIds")
+ vpc = ec2.create_vpc(CidrBlock="172.29.7.0/24")
+ subnet_2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.29.7.192/26")
+ kwargs["VpcOptions"]["SubnetIds"].append(subnet_2.id)
+
+ with pytest.raises(ClientError) as exc:
+ client.create_pipeline(**kwargs)
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == "Invalid VpcOptions: All specified subnets must belong to the same VPC."
+ )
+
+
+@mock_aws
+def test_update_pipeline():
+ set_transition()
+ client = boto3.client("osis", region_name="eu-west-1")
+ ec2 = boto3.resource("ec2", region_name="eu-west-1")
+ vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24")
+ subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.28.7.192/26")
+ sg = ec2.create_security_group(
+ GroupName="test-group", Description="Test security group sg01"
+ )
+
+ kwargs = {
+ "PipelineName": "test",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"],
+ "LogPublishingOptions": {
+ "IsLoggingEnabled": True,
+ "CloudWatchLogDestination": {
+ "LogGroup": "/aws/osis/test",
+ },
+ },
+ "VpcOptions": {
+ "SubnetIds": [subnet.id],
+ "SecurityGroupIds": [sg.id],
+ "VpcEndpointManagement": "SERVICE",
+ "VpcAttachmentOptions": {
+ "AttachToVpc": True,
+ "CidrBlock": "172.168.1.1",
+ },
+ },
+ "BufferOptions": {
+ "PersistentBufferEnabled": True,
+ },
+ "EncryptionAtRestOptions": {
+ "KmsKeyArn": "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012",
+ },
+ "Tags": [
+ {
+ "Key": "TestKey",
+ "Value": "TestValue",
+ }
+ ],
+ }
+ original = client.create_pipeline(**kwargs)["Pipeline"]
+ resp = client.update_pipeline(PipelineName="test", MinUnits=3)["Pipeline"]
+ assert resp["MinUnits"] == 3
+ assert resp["MaxUnits"] == original["MaxUnits"]
+ assert (
+ resp["PipelineConfigurationBody"]
+ == BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"]
+ )
+ assert resp["Destinations"] == original["Destinations"]
+ assert resp["ServiceVpcEndpoints"] == original["ServiceVpcEndpoints"]
+ assert resp["VpcEndpoints"] == original["VpcEndpoints"]
+ assert resp["Tags"] == original["Tags"]
+ assert resp["LogPublishingOptions"] == original["LogPublishingOptions"]
+ assert resp["BufferOptions"] == original["BufferOptions"]
+ assert resp["EncryptionAtRestOptions"] == original["EncryptionAtRestOptions"]
+ assert resp["Status"] == "ACTIVE"
+
+
+@mock_aws
+def test_update_pipeline_all_args():
+ set_transition({"progression": "manual", "times": 1})
+ client = boto3.client("osis", region_name="eu-west-1")
+ kwargs = {
+ "PipelineName": "test",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"],
+ "LogPublishingOptions": {
+ "IsLoggingEnabled": True,
+ "CloudWatchLogDestination": {
+ "LogGroup": "test/osis/logs",
+ },
+ },
+ "BufferOptions": {
+ "PersistentBufferEnabled": False,
+ },
+ "EncryptionAtRestOptions": {
+ "KmsKeyArn": "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012",
+ },
+ }
+
+ resp = client.create_pipeline(**kwargs)["Pipeline"]
+ last_updated = resp["LastUpdatedAt"]
+ # state transition
+ client.list_pipelines()
+
+ new_pipeline_config = """version: "2"\nopensearch-migration-pipeline:\n source:
+ \n opensearch:\n acknowledgments: true
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n indices:\n exclude:\n - index_name_regex: \'\\..*\'\n aws:
+ \n region: "eu-west-1"\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n serverless: false\n sink:\n - opensearch:
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n aws:\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n region: "eu-west-1"\n serverless: false\n"""
+ new_key = (
+ "arn:aws:kms:eu-west-1:123456789012:key/87654321-4321-4321-4321-210987654321"
+ )
+ kwargs["MinUnits"] = 3
+ kwargs["MaxUnits"] = 5
+ kwargs["PipelineConfigurationBody"] = new_pipeline_config
+ kwargs["LogPublishingOptions"]["IsLoggingEnabled"] = False
+ kwargs["BufferOptions"]["PersistentBufferEnabled"] = True
+ kwargs["EncryptionAtRestOptions"]["KmsKeyArn"] = new_key
+
+ resp = client.update_pipeline(**kwargs)["Pipeline"]
+
+ assert resp["MinUnits"] == 3
+ assert resp["MaxUnits"] == 5
+ assert resp["PipelineConfigurationBody"] == new_pipeline_config
+ assert not resp["LogPublishingOptions"]["IsLoggingEnabled"]
+ assert resp["BufferOptions"]["PersistentBufferEnabled"]
+ assert resp["EncryptionAtRestOptions"]["KmsKeyArn"] == new_key
+ assert resp["Destinations"][0]["ServiceName"] == "OpenSearch"
+ assert (
+ resp["Destinations"][0]["Endpoint"]
+ == "https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"
+ )
+ assert "ServiceVpcEndpoints" not in resp
+ assert resp["LastUpdatedAt"] > last_updated
+ assert resp["Status"] == "UPDATING"
+
+
+@mock_aws
+def test_update_pipeline_error():
+ set_transition({"progression": "manual", "times": 1})
+ client = boto3.client("osis", region_name="eu-west-1")
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ with pytest.raises(ClientError) as exc:
+ client.update_pipeline(PipelineName="test", MinUnits=3)["Pipeline"]
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ConflictException"
+ assert (
+ err["Message"]
+ == "Only pipelines with one of the following statuses are eligible for updates: ['UPDATE_FAILED', 'ACTIVE', 'START_FAILED', 'STOPPED']. The current status is CREATING."
+ )
+
+ with pytest.raises(ClientError) as exc:
+ client.update_pipeline(PipelineName="test-2", MinUnits=3)["Pipeline"]
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Pipeline test-2 could not be found."
+
+
+@mock_aws
+def test_delete_pipeline():
+ set_transition({"progression": "manual", "times": 2})
+ client = boto3.client("osis", region_name="eu-west-1")
+ original = client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ for _ in range(2):
+ client.list_pipelines()
+ client.delete_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ pipeline = client.list_pipelines()["Pipelines"][0]
+ assert pipeline["PipelineName"] == BASIC_PIPELINE_KWARGS["PipelineName"]
+ assert pipeline["Status"] == "DELETING"
+ assert pipeline["LastUpdatedAt"] > original["LastUpdatedAt"]
+ pipelines = client.list_pipelines()["Pipelines"]
+ assert pipelines == []
+
+
+@mock_aws
+def test_delete_pipeline_error():
+ set_transition({"progression": "manual", "times": 1})
+ client = boto3.client("osis", region_name="eu-west-1")
+
+ with pytest.raises(ClientError) as exc:
+ client.delete_pipeline(PipelineName="test")
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Pipeline test could not be found."
+
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ with pytest.raises(ClientError) as exc:
+ client.delete_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ConflictException"
+ assert (
+ err["Message"]
+ == "Only pipelines with one of the following statuses are eligible for deletion: ['UPDATE_FAILED', 'ACTIVE', 'START_FAILED', 'STOPPED', 'CREATE_FAILED']. The current status is CREATING."
+ )
+
+
+@mock_aws
+def test_get_pipeline():
+ client = boto3.client("osis", region_name="eu-west-1")
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ resp = client.get_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])[
+ "Pipeline"
+ ]
+ assert resp["PipelineName"] == BASIC_PIPELINE_KWARGS["PipelineName"]
+ assert (
+ resp["PipelineArn"]
+ == f"arn:aws:osis:eu-west-1:123456789012:pipeline/{BASIC_PIPELINE_KWARGS['PipelineName']}"
+ )
+ assert resp["MinUnits"] == BASIC_PIPELINE_KWARGS["MinUnits"]
+ assert resp["MaxUnits"] == BASIC_PIPELINE_KWARGS["MaxUnits"]
+ assert resp["Status"] == "ACTIVE"
+ assert (
+ resp["StatusReason"]["Description"] == "The pipeline is ready to ingest data."
+ )
+ assert (
+ resp["PipelineConfigurationBody"]
+ == BASIC_PIPELINE_KWARGS["PipelineConfigurationBody"]
+ )
+ assert (
+ ".eu-west-1.osis.amazonaws.com" in resp["IngestEndpointUrls"][0]
+ and BASIC_PIPELINE_KWARGS["PipelineName"] in resp["IngestEndpointUrls"][0]
+ )
+ assert resp["ServiceVpcEndpoints"][0]["ServiceName"] == "OPENSEARCH_SERVERLESS"
+ assert resp["Destinations"][0]["ServiceName"] == "OpenSearch_Serverless"
+ assert (
+ resp["Destinations"][0]["Endpoint"]
+ == "https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com"
+ )
+ assert "VpcEndpointService" not in resp
+ assert "VpcEndpoints" not in resp
+ assert resp["Tags"] == []
+ assert "CreatedAt" in resp
+ assert "LastUpdatedAt" in resp
+ assert "LogPublishingOptions" not in resp
+ assert "BufferOptions" not in resp
+ assert "EncryptionAtRestOptions" not in resp
+
+ ec2 = boto3.resource("ec2", region_name="eu-west-1")
+ vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24")
+ subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="172.28.7.192/26")
+ sg = ec2.create_security_group(
+ GroupName="test-group", Description="Test security group sg01"
+ )
+
+ kwargs = {
+ "PipelineName": "test-2",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": """version: "2"\nopensearch-migration-pipeline:\n source:
+ \n opensearch:\n acknowledgments: true
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n indices:\n exclude:\n - index_name_regex: \'\\..*\'\n aws:
+ \n region: "eu-west-1"\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n serverless: false\n sink:\n - opensearch:
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n aws:\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n region: "eu-west-1"\n serverless: false\n""",
+ "LogPublishingOptions": {
+ "IsLoggingEnabled": True,
+ "CloudWatchLogDestination": {
+ "LogGroup": "/aws/osis/test",
+ },
+ },
+ "VpcOptions": {
+ "SubnetIds": [subnet.id],
+ "SecurityGroupIds": [sg.id],
+ "VpcEndpointManagement": "SERVICE",
+ "VpcAttachmentOptions": {
+ "AttachToVpc": True,
+ "CidrBlock": "172.168.1.1",
+ },
+ },
+ "BufferOptions": {
+ "PersistentBufferEnabled": True,
+ },
+ "EncryptionAtRestOptions": {
+ "KmsKeyArn": "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012",
+ },
+ "Tags": [
+ {
+ "Key": "TestKey",
+ "Value": "TestValue",
+ }
+ ],
+ }
+ client.create_pipeline(**kwargs)
+ resp = client.get_pipeline(PipelineName="test-2")["Pipeline"]
+ assert "ServiceVpcEndpoints" not in resp
+ assert resp["Destinations"][0]["ServiceName"] == "OpenSearch"
+ assert (
+ resp["Destinations"][0]["Endpoint"]
+ == "https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"
+ )
+ assert "VpcEndpointService" not in resp
+ assert resp["VpcEndpoints"][0]["VpcOptions"] == {
+ "SubnetIds": [subnet.id],
+ "SecurityGroupIds": [sg.id],
+ "VpcEndpointManagement": "SERVICE",
+ "VpcAttachmentOptions": {
+ "AttachToVpc": True,
+ "CidrBlock": "172.168.1.1",
+ },
+ }
+ assert "VpcEndpointId" in resp["VpcEndpoints"][0]
+ assert resp["VpcEndpoints"][0]["VpcId"] == vpc.id
+ assert resp["Tags"] == [
+ {
+ "Key": "TestKey",
+ "Value": "TestValue",
+ }
+ ]
+ assert resp["LogPublishingOptions"] == {
+ "IsLoggingEnabled": True,
+ "CloudWatchLogDestination": {
+ "LogGroup": "/aws/osis/test",
+ },
+ }
+ assert resp["BufferOptions"]["PersistentBufferEnabled"]
+ assert (
+ resp["EncryptionAtRestOptions"]["KmsKeyArn"]
+ == "arn:aws:kms:eu-west-1:123456789012:key/12345678-1234-1234-1234-123456789012"
+ )
+
+
+@mock_aws
+def test_get_pipeline_error():
+ client = boto3.client("osis", region_name="eu-west-1")
+ with pytest.raises(ClientError) as exc:
+ client.get_pipeline(PipelineName="test")
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Pipeline test could not be found."
+
+
+@mock_aws
+def test_list_pipelines():
+ set_transition({"progression": "manual", "times": 1})
+ client = boto3.client("osis", region_name="ap-southeast-1")
+ resp = client.list_pipelines()
+ assert resp["Pipelines"] == []
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)
+ resp = client.list_pipelines()["Pipelines"][0]
+ assert resp["PipelineName"] == "test"
+ assert (
+ resp["PipelineArn"]
+ == f"arn:aws:osis:ap-southeast-1:123456789012:pipeline/{BASIC_PIPELINE_KWARGS['PipelineName']}"
+ )
+ assert resp["MinUnits"] == 2
+ assert resp["MaxUnits"] == 4
+ assert resp["Status"] == "ACTIVE"
+ assert (
+ resp["StatusReason"]["Description"] == "The pipeline is ready to ingest data."
+ )
+ assert resp["Tags"] == []
+ assert "CreatedAt" in resp
+ assert "LastUpdatedAt" in resp
+
+ kwargs = {
+ "PipelineName": "test-2",
+ "MinUnits": 2,
+ "MaxUnits": 4,
+ "PipelineConfigurationBody": """version: "2"\nopensearch-migration-pipeline:\n source:
+ \n opensearch:\n acknowledgments: true
+ \n hosts: ["https://vpc-test-ieeljhbsnht35i5rtzjl756pk4.eu-west-1.es.amazonaws.com"]
+ \n indices:\n exclude:\n - index_name_regex: \'\\..*\'\n aws:
+ \n region: "eu-west-1"\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n serverless: false\n sink:\n - opensearch:
+ \n hosts: ["https://kbjahvxo2jgx8beq2vob.eu-west-1.aoss.amazonaws.com"]
+ \n aws:\n sts_role_arn: "arn:aws:iam::123456789012:role/MyRole"
+ \n region: "eu-west-1"\n serverless: true\n""",
+ }
+ client.create_pipeline(**kwargs)
+ assert len(client.list_pipelines()["Pipelines"]) == 2
+
+
+@mock_aws
+def test_list_tags_for_resource():
+ client = boto3.client("osis", region_name="eu-west-1")
+ resp = client.create_pipeline(
+ **BASIC_PIPELINE_KWARGS, Tags=[{"Key": "TestKey", "Value": "TestValue"}]
+ )["Pipeline"]
+ tags = client.list_tags_for_resource(Arn=resp["PipelineArn"])["Tags"]
+ assert tags[0]["Key"] == "TestKey"
+ assert tags[0]["Value"] == "TestValue"
+
+
+@mock_aws
+def test_stop_pipeline():
+ set_transition({"progression": "manual", "times": 2})
+ client = boto3.client("osis", region_name="eu-west-1")
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)
+
+ with pytest.raises(ClientError) as exc:
+ client.stop_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ConflictException"
+ assert (
+ err["Message"]
+ == "Only pipelines with one of the following statuses are eligible for stopping: ['UPDATE_FAILED', 'ACTIVE']. The current status is CREATING."
+ )
+
+ for _ in range(2):
+ client.list_pipelines()
+ client.stop_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ pipeline = client.list_pipelines()["Pipelines"][0]
+ assert pipeline["Status"] == "STOPPING"
+
+ for _ in range(2):
+ client.get_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ pipeline = client.list_pipelines()["Pipelines"][0]
+ assert pipeline["Status"] == "STOPPED"
+
+
+@mock_aws
+def test_start_pipeline():
+ set_transition({"progression": "manual", "times": 2})
+ client = boto3.client("osis", region_name="eu-west-1")
+ client.create_pipeline(**BASIC_PIPELINE_KWARGS)
+
+ with pytest.raises(ClientError) as exc:
+ client.start_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ConflictException"
+ assert (
+ err["Message"]
+ == "Only pipelines with one of the following statuses are eligible for starting: ['START_FAILED', 'STOPPED']. The current status is CREATING."
+ )
+
+ for _ in range(2):
+ client.list_pipelines()
+
+ client.stop_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+
+ for _ in range(2):
+ client.list_pipelines()
+
+ client.start_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])
+ pipeline = client.list_pipelines()["Pipelines"][0]
+ assert pipeline["Status"] == "STARTING"
+ client.list_pipelines()
+ pipeline = client.list_pipelines()["Pipelines"][0]
+ assert pipeline["Status"] == "ACTIVE"
+
+
+@mock_aws
+def test_tag_resource():
+ client = boto3.client("osis", region_name="eu-west-1")
+ resp = client.create_pipeline(**BASIC_PIPELINE_KWARGS)["Pipeline"]
+ client.tag_resource(
+ Arn=resp["PipelineArn"], Tags=[{"Key": "TestKey", "Value": "TestValue"}]
+ )
+ resp = client.get_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])[
+ "Pipeline"
+ ]
+ assert resp["Tags"] == [{"Key": "TestKey", "Value": "TestValue"}]
+
+
+@mock_aws
+def test_untag_resource():
+ client = boto3.client("osis", region_name="eu-west-1")
+ resp = client.create_pipeline(
+ **BASIC_PIPELINE_KWARGS, Tags=[{"Key": "TestKey", "Value": "TestValue"}]
+ )["Pipeline"]
+ client.untag_resource(Arn=resp["PipelineArn"], TagKeys=["TestKey"])
+ resp = client.get_pipeline(PipelineName=BASIC_PIPELINE_KWARGS["PipelineName"])[
+ "Pipeline"
+ ]
+ assert resp["Tags"] == []
+
+
+def set_transition(transition={"progression": "immediate"}):
+ if settings.TEST_DECORATOR_MODE:
+ state_manager.set_transition(model_name="osis::pipeline", transition=transition)
+ else:
+ post_body = dict(model_name="osis::pipeline", transition=transition)
+ requests.post(
+ "http://localhost:5000/moto-api/state-manager/set-transition",
+ data=json.dumps(post_body),
+ )
diff --git a/tests/test_osis/test_server.py b/tests/test_osis/test_server.py
new file mode 100644
index 000000000000..f8295293d0fe
--- /dev/null
+++ b/tests/test_osis/test_server.py
@@ -0,0 +1,15 @@
+"""Test different server responses."""
+
+import json
+
+import moto.server as server
+
+
+def test_osis_list():
+ backend = server.create_backend_app("osis")
+ test_client = backend.test_client()
+
+ resp = test_client.get("/2022-01-01/osis/listPipelines")
+
+ assert resp.status_code == 200
+ assert "Pipelines" in json.loads(resp.data)