diff --git a/src/aap_eda/api/constants.py b/src/aap_eda/api/constants.py index 61887759b..f845e2408 100644 --- a/src/aap_eda/api/constants.py +++ b/src/aap_eda/api/constants.py @@ -14,3 +14,34 @@ # EDA_SERVER_VAULT_LABEL is reserved for system vault password identifiers EDA_SERVER_VAULT_LABEL = "EDA_SERVER" + +PG_NOTIFY_TEMPLATE_RULEBOOK_NAME = "_PG_NOTIFY_TEMPLATE_RULEBOOK_" +PG_NOTIFY_TEMPLATE_RULEBOOK_DATA = """ +--- +- name: PG Notify Template Event Stream + hosts: all + sources: + - name: my_range + ansible.eda.range: + limit: 5 + complementary_source: + type: ansible.eda.pg_listener + name: Postgres Listener + args: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channels: + - "{{ EDA_PG_NOTIFY_CHANNEL }}" + extra_vars: + EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" + EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" + encrypt_vars: + - EDA_PG_NOTIFY_DSN + rules: + - name: Post event + condition: true + action: + pg_notify: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" + event: "{{ event }}" +""" diff --git a/src/aap_eda/api/exceptions.py b/src/aap_eda/api/exceptions.py index bbe0195e8..7f3f778b1 100644 --- a/src/aap_eda/api/exceptions.py +++ b/src/aap_eda/api/exceptions.py @@ -95,3 +95,33 @@ class InvalidWebsocketHost(APIException): default_detail = ( "Connection Error: WebSocket URL must have a valid host address." ) + + +class MissingEventStreamRulebook(APIException): + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + default_detail = ( + "Configuration Error: Event stream template rulebook not found" + ) + + +class MissingEventStreamRulebookKeys(APIException): + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + default_detail = ( + "Configuration Error: Event stream template rulebook is missing " + "required keys in complementary_source: type, name and args" + ) + + +class MissingEventStreamRulebookSource(APIException): + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + default_detail = ( + "Configuration Error: Event stream template rulebook is missing " + "required complementary_source" + ) + + +class InvalidEventStreamRulebook(APIException): + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + default_detail = ( + "Configuration Error: Event stream template rulebook is invalid" + ) diff --git a/src/aap_eda/api/filters/__init__.py b/src/aap_eda/api/filters/__init__.py index b2426dd1c..dc30fa668 100644 --- a/src/aap_eda/api/filters/__init__.py +++ b/src/aap_eda/api/filters/__init__.py @@ -19,6 +19,7 @@ ) from .credential import CredentialFilter from .decision_environment import DecisionEnvironmentFilter +from .event_stream import EventStreamFilter from .project import ProjectFilter from .role import RoleFilter from .rulebook import ( @@ -51,4 +52,6 @@ "UserFilter", # role "RoleFilter", + # event_stream + "EventStreamFilter", ) diff --git a/src/aap_eda/api/filters/event_stream.py b/src/aap_eda/api/filters/event_stream.py new file mode 100644 index 000000000..53455855c --- /dev/null +++ b/src/aap_eda/api/filters/event_stream.py @@ -0,0 +1,29 @@ +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import django_filters + +from aap_eda.core import models + + +class EventStreamFilter(django_filters.FilterSet): + name = django_filters.CharFilter( + field_name="name", + lookup_expr="istartswith", + label="Filter by event source name.", + ) + + class Meta: + model = models.EventStream + fields = ["name"] diff --git a/src/aap_eda/api/serializers/__init__.py b/src/aap_eda/api/serializers/__init__.py index 642a61a5c..2f83736f3 100644 --- a/src/aap_eda/api/serializers/__init__.py +++ b/src/aap_eda/api/serializers/__init__.py @@ -38,6 +38,11 @@ DecisionEnvironmentRefSerializer, DecisionEnvironmentSerializer, ) +from .event_stream import ( + EventStreamCreateSerializer, + EventStreamOutSerializer, + EventStreamSerializer, +) from .project import ( ExtraVarCreateSerializer, ExtraVarRefSerializer, @@ -118,4 +123,8 @@ "RoleSerializer", "RoleListSerializer", "RoleDetailSerializer", + # event streams + "EventStreamSerializer", + "EventStreamCreateSerializer", + "EventStreamOutSerializer", ) diff --git a/src/aap_eda/api/serializers/activation.py b/src/aap_eda/api/serializers/activation.py index 2fd73b8a3..4af561ccc 100644 --- a/src/aap_eda/api/serializers/activation.py +++ b/src/aap_eda/api/serializers/activation.py @@ -11,18 +11,65 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +from typing import Union + +import yaml from rest_framework import serializers +from aap_eda.api.constants import PG_NOTIFY_TEMPLATE_RULEBOOK_DATA +from aap_eda.api.exceptions import InvalidEventStreamRulebook from aap_eda.api.serializers.credential import CredentialSerializer from aap_eda.api.serializers.decision_environment import ( DecisionEnvironmentRefSerializer, ) +from aap_eda.api.serializers.event_stream import EventStreamOutSerializer from aap_eda.api.serializers.project import ( ExtraVarRefSerializer, ProjectRefSerializer, ) from aap_eda.api.serializers.rulebook import RulebookRefSerializer +from aap_eda.api.serializers.utils import ( + substitute_extra_vars, + substitute_source_args, + swap_sources, +) from aap_eda.core import models, validators +from aap_eda.core.enums import ProcessParentType + +logger = logging.getLogger(__name__) + + +def _updated_ruleset(validated_data): + try: + sources_info = [] + + for event_stream_id in validated_data["event_streams"]: + event_stream = models.EventStream.objects.get(id=event_stream_id) + + if event_stream.rulebook: + rulesets = yaml.safe_load(event_stream.rulebook.rulesets) + else: + rulesets = yaml.safe_load(PG_NOTIFY_TEMPLATE_RULEBOOK_DATA) + + extra_vars = rulesets[0]["sources"][0].get("extra_vars", {}) + encrypt_vars = rulesets[0]["sources"][0].get("encrypt_vars", []) + + # TODO: encrypt password when engine is ready for vaulted data + extra_vars = substitute_extra_vars( + event_stream.__dict__, extra_vars, encrypt_vars, "password" + ) + + source = rulesets[0]["sources"][0]["complementary_source"] + source = substitute_source_args( + event_stream.__dict__, source, extra_vars + ) + sources_info.append(source) + + return swap_sources(validated_data["rulebook_rulesets"], sources_info) + except Exception as e: + logger.error(f"Failed to update rulesets: {e}") + raise InvalidEventStreamRulebook(e) class ActivationSerializer(serializers.ModelSerializer): @@ -34,6 +81,12 @@ class ActivationSerializer(serializers.ModelSerializer): child=CredentialSerializer(), ) + event_streams = serializers.ListField( + required=False, + allow_null=True, + child=EventStreamOutSerializer(), + ) + class Meta: model = models.Activation fields = [ @@ -57,6 +110,7 @@ class Meta: "status_message", "awx_token_id", "credentials", + "event_streams", ] read_only_fields = [ "id", @@ -77,6 +131,12 @@ class ActivationListSerializer(serializers.ModelSerializer): child=CredentialSerializer(), ) + event_streams = serializers.ListField( + required=False, + allow_null=True, + child=EventStreamOutSerializer(), + ) + class Meta: model = models.Activation fields = [ @@ -100,6 +160,7 @@ class Meta: "status_message", "awx_token_id", "credentials", + "event_streams", ] read_only_fields = ["id", "created_at", "modified_at"] @@ -111,6 +172,10 @@ def to_representation(self, activation): CredentialSerializer(credential).data for credential in activation.credentials.all() ] + event_streams = [ + EventStreamOutSerializer(event_stream).data + for event_stream in activation.event_streams.all() + ] return { "id": activation.id, @@ -133,6 +198,7 @@ def to_representation(self, activation): "status_message": activation.status_message, "awx_token_id": activation.awx_token_id, "credentials": credentials, + "event_streams": event_streams, } @@ -152,6 +218,7 @@ class Meta: "restart_policy", "awx_token_id", "credentials", + "event_streams", ] rulebook_id = serializers.IntegerField( @@ -177,6 +244,12 @@ class Meta: allow_null=True, child=serializers.IntegerField(), ) + event_streams = serializers.ListField( + required=False, + allow_null=True, + child=serializers.IntegerField(), + validators=[validators.check_if_event_streams_exists], + ) def validate(self, data): user = data["user"] @@ -200,6 +273,10 @@ def create(self, validated_data): validated_data["rulebook_rulesets"] = rulebook.rulesets validated_data["git_hash"] = rulebook.project.git_hash validated_data["project_id"] = rulebook.project.id + if validated_data.get("event_streams"): + validated_data["rulebook_rulesets"] = _updated_ruleset( + validated_data + ) return super().create(validated_data) @@ -215,6 +292,7 @@ class Meta: "git_hash", "status_message", "activation_id", + "event_stream_id", "started_at", "ended_at", ] @@ -243,6 +321,11 @@ class ActivationReadSerializer(serializers.ModelSerializer): rules_count = serializers.IntegerField() rules_fired_count = serializers.IntegerField() restarted_at = serializers.DateTimeField(required=False, allow_null=True) + event_streams = serializers.ListField( + required=False, + allow_null=True, + child=EventStreamOutSerializer(), + ) class Meta: model = models.Activation @@ -270,6 +353,7 @@ class Meta: "status_message", "awx_token_id", "credentials", + "event_streams", ] read_only_fields = ["id", "created_at", "modified_at", "restarted_at"] @@ -297,7 +381,8 @@ def to_representation(self, activation): else None ) activation_instances = models.RulebookProcess.objects.filter( - activation_id=activation.id + activation_id=activation.id, + parent_type=ProcessParentType.ACTIVATION, ) rules_count, rules_fired_count = get_rules_count( activation.ruleset_stats @@ -315,6 +400,11 @@ def to_representation(self, activation): for credential in activation.credentials.all() ] + event_streams = [ + EventStreamOutSerializer(event_stream).data + for event_stream in activation.event_streams.all() + ] + return { "id": activation.id, "name": activation.name, @@ -341,6 +431,7 @@ def to_representation(self, activation): "status_message": activation.status_message, "awx_token_id": activation.awx_token_id, "credentials": credentials, + "event_streams": event_streams, } @@ -356,7 +447,9 @@ class PostActivationSerializer(serializers.ModelSerializer): allow_null=True, validators=[validators.check_if_extra_var_exists], ) + # TODO: is_activation_valid needs to tell event stream/activation awx_token_id = serializers.IntegerField( + required=False, allow_null=True, validators=[validators.check_if_awx_token_exists], ) @@ -418,8 +511,11 @@ def parse_validation_errors(errors: dict) -> str: return str(messages) -def validate_rulebook_token(rulebook_id: int) -> None: +def validate_rulebook_token(rulebook_id: Union[int, None]) -> None: """Validate if the rulebook requires an Awx Token.""" + if rulebook_id is None: + return + rulebook = models.Rulebook.objects.get(id=rulebook_id) # TODO: rulesets are stored as a string in the rulebook model diff --git a/src/aap_eda/api/serializers/event_stream.py b/src/aap_eda/api/serializers/event_stream.py new file mode 100644 index 000000000..f743af67f --- /dev/null +++ b/src/aap_eda/api/serializers/event_stream.py @@ -0,0 +1,230 @@ +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid + +import yaml +from django.conf import settings +from django.core.validators import RegexValidator +from rest_framework import serializers +from rest_framework.exceptions import ValidationError + +from aap_eda.api.constants import ( + PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, + PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, +) +from aap_eda.api.exceptions import ( + MissingEventStreamRulebook, + MissingEventStreamRulebookKeys, + MissingEventStreamRulebookSource, +) +from aap_eda.api.serializers.credential import CredentialSerializer +from aap_eda.api.serializers.utils import substitute_extra_vars, swap_sources +from aap_eda.core import models, validators + +logger = logging.getLogger(__name__) + +EDA_CHANNEL_PREFIX = "eda_" + + +def _get_rulebook(): + rulebook = None + name = settings.PG_NOTIFY_TEMPLATE_RULEBOOK + if name: + rulebook = models.Rulebook.objects.filter(name=name).first() + + if not rulebook: + logger.error( + "Missing Listener rulebook %s", + settings.PG_NOTIFY_TEMPLATE_RULEBOOK, + ) + raise MissingEventStreamRulebook + + required_keys = ["type", "name", "args"] + rulesets = yaml.safe_load(rulebook.rulesets) + for ruleset in rulesets: + sources = ruleset.get("sources", []) + for source in sources: + complementary_source = source.get("complementary_source") + + if not complementary_source: + raise MissingEventStreamRulebookSource + + for key in required_keys: + if key not in complementary_source.keys(): + raise MissingEventStreamRulebookKeys + + return rulebook + + +def _get_default_channel_name(): + stream_uuid = str(uuid.uuid4()) + return f"{EDA_CHANNEL_PREFIX}{stream_uuid.replace('-','_')}" + + +def _get_extra_var_id(validated_data: dict) -> dict: + rulesets = yaml.safe_load(validated_data["rulebook_rulesets"]) + extra_vars = rulesets[0]["sources"][0]["extra_vars"] + extra_vars = substitute_extra_vars( + validated_data, extra_vars, [], "password" + ) + + extra_var = models.ExtraVar.objects.create(extra_var=yaml.dump(extra_vars)) + return extra_var.id + + +def _updated_listener_ruleset(validated_data): + sources_info = [ + { + "name": validated_data["name"], + "type": validated_data["source_type"], + "args": validated_data["args"], + } + ] + return swap_sources(validated_data["rulebook_rulesets"], sources_info) + + +class YAMLSerializerField(serializers.Field): + """Serializer for YAML a superset of JSON.""" + + def to_internal_value(self, data) -> dict: + if data: + try: + parsed_args = yaml.safe_load(data) + except yaml.YAMLError: + raise ValidationError("Invalid YAML format for 'args'") + + if not isinstance(parsed_args, dict): + raise ValidationError( + "The 'args' field must be a YAML object (dictionary)" + ) + + return parsed_args + return data + + def to_representation(self, value) -> str: + return yaml.dump(value) + + +class EventStreamSerializer(serializers.ModelSerializer): + decision_environment_id = serializers.IntegerField( + validators=[validators.check_if_de_exists] + ) + user = serializers.SerializerMethodField() + args = YAMLSerializerField(required=False, allow_null=True) + credentials = serializers.SerializerMethodField() + + class Meta: + model = models.EventStream + read_only_fields = [ + "id", + "created_at", + "modified_at", + ] + fields = [ + "name", + "args", + "source_type", + "channel_name", + "is_enabled", + "status", + "status_message", + "decision_environment_id", + "user", + "credentials", + *read_only_fields, + ] + + def get_user(self, obj) -> str: + return f"{obj.user.username}" + + def get_credentials(self, obj) -> str: + return [ + CredentialSerializer(credential).data + for credential in obj.credentials.all() + ] + + +class EventStreamCreateSerializer(serializers.ModelSerializer): + """Serializer for creating the EventStream.""" + + decision_environment_id = serializers.IntegerField( + validators=[validators.check_if_de_exists] + ) + user = serializers.HiddenField(default=serializers.CurrentUserDefault()) + args = YAMLSerializerField() + channel_name = serializers.CharField( + default=_get_default_channel_name(), + validators=[ + RegexValidator( + regex=r"^\w+$", + message="Channel name can only contain alphanumeric and " + "underscore characters", + ), + ], + ) + credentials = serializers.ListField( + required=False, + allow_null=True, + child=serializers.IntegerField(), + ) + + class Meta: + model = models.EventStream + fields = [ + "name", + "description", + "is_enabled", + "source_type", + "args", + "channel_name", + "decision_environment_id", + "rulebook_id", + "extra_var_id", + "user", + "restart_policy", + "credentials", + ] + + def create(self, validated_data): + rulebook = _get_rulebook() + validated_data["user_id"] = validated_data["user"].id + if rulebook: + validated_data["rulebook_name"] = rulebook.name + validated_data["rulebook_id"] = rulebook.id + validated_data["rulebook_rulesets"] = rulebook.rulesets + else: + validated_data["rulebook_name"] = PG_NOTIFY_TEMPLATE_RULEBOOK_NAME + validated_data["rulebook_id"] = None + validated_data[ + "rulebook_rulesets" + ] = PG_NOTIFY_TEMPLATE_RULEBOOK_DATA + + validated_data["channel_name"] = validated_data.get( + "channel_name", _get_default_channel_name() + ) + validated_data["extra_var_id"] = _get_extra_var_id(validated_data) + validated_data["rulebook_rulesets"] = _updated_listener_ruleset( + validated_data + ) + return super().create(validated_data) + + +class EventStreamOutSerializer(serializers.ModelSerializer): + """Serializer for UI to show EventStream.""" + + class Meta: + model = models.EventStream + fields = ["id", "name"] diff --git a/src/aap_eda/api/serializers/utils.py b/src/aap_eda/api/serializers/utils.py new file mode 100644 index 000000000..3d209d920 --- /dev/null +++ b/src/aap_eda/api/serializers/utils.py @@ -0,0 +1,98 @@ +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, Dict, List, Union + +import jinja2 +import yaml +from django.conf import settings +from jinja2.nativetypes import NativeTemplate + +LOGGER = logging.getLogger(__name__) + + +def _render_string(value: str, context: dict) -> str: + if "{{" in value and "}}" in value: + return NativeTemplate(value, undefined=jinja2.StrictUndefined).render( + context + ) + + return value + + +def _render_string_or_return_value(value: Any, context: Dict) -> Any: + if isinstance(value, str): + return _render_string(value, context) + return value + + +def substitute_variables( + value: Union[str, int, Dict, List], context: Dict +) -> Union[str, int, Dict, List]: + if isinstance(value, str): + return _render_string_or_return_value(value, context) + elif isinstance(value, list): + new_value = [] + for item in value: + new_value.append(substitute_variables(item, context)) + return new_value + elif isinstance(value, dict): + new_value = value.copy() + for key, subvalue in new_value.items(): + new_value[key] = substitute_variables(subvalue, context) + return new_value + else: + return value + + +def substitute_source_args(event_stream, source, extra_vars) -> dict: + context = { + "settings": settings.__dict__["_wrapped"].__dict__, + "event_stream": event_stream, + } + for key in extra_vars: + context[key] = extra_vars[key] + + source["args"] = substitute_variables(source.get("args", {}), context) + return source + + +def substitute_extra_vars( + event_stream, extra_vars, encrypt_keys, password +) -> dict: + context = { + "settings": settings.__dict__["_wrapped"].__dict__, + "event_stream": event_stream, + } + extra_vars = substitute_variables(extra_vars, context) + # Encrypt any of the extra_vars + for key in encrypt_keys: + if key in extra_vars: + # extra_vars[key] = encrypt with password + pass + return extra_vars + + +def swap_sources(data: str, sources: list[dict]) -> str: + rulesets = yaml.safe_load(data) + new_sources = [] + for source in sources: + src_obj = {"name": source["name"], source["type"]: source["args"]} + new_sources.append(src_obj) + + for ruleset in rulesets: + ruleset["sources"] = new_sources + + return yaml.dump(rulesets) diff --git a/src/aap_eda/api/urls.py b/src/aap_eda/api/urls.py index 0a4d24062..36a24456d 100644 --- a/src/aap_eda/api/urls.py +++ b/src/aap_eda/api/urls.py @@ -35,6 +35,7 @@ router.register("activation-instances", views.ActivationInstanceViewSet) router.register("audit-rules", views.AuditRuleViewSet) router.register("users", views.UserViewSet) +router.register("event-streams", views.EventStreamViewSet) router.register( "users/me/awx-tokens", views.CurrentUserAwxTokenViewSet, diff --git a/src/aap_eda/api/views/__init__.py b/src/aap_eda/api/views/__init__.py index bc514bc49..9f792e903 100644 --- a/src/aap_eda/api/views/__init__.py +++ b/src/aap_eda/api/views/__init__.py @@ -16,6 +16,7 @@ from .auth import RoleViewSet, SessionLoginView, SessionLogoutView from .credential import CredentialViewSet from .decision_environment import DecisionEnvironmentViewSet +from .event_stream import EventStreamViewSet from .project import ExtraVarViewSet, ProjectViewSet from .rulebook import ( AuditRuleViewSet, @@ -48,4 +49,6 @@ "CredentialViewSet", # decision_environment "DecisionEnvironmentViewSet", + # event_stream + "EventStreamViewSet", ) diff --git a/src/aap_eda/api/views/event_stream.py b/src/aap_eda/api/views/event_stream.py new file mode 100644 index 000000000..d5210cd26 --- /dev/null +++ b/src/aap_eda/api/views/event_stream.py @@ -0,0 +1,327 @@ +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from django.shortcuts import get_object_or_404 +from django_filters import rest_framework as defaultfilters +from drf_spectacular.utils import ( + OpenApiParameter, + OpenApiResponse, + extend_schema, + extend_schema_view, +) +from rest_framework import exceptions, mixins, status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response + +from aap_eda.api import exceptions as api_exc, filters, serializers +from aap_eda.api.serializers.activation import is_activation_valid +from aap_eda.core import models +from aap_eda.core.enums import ( + Action, + ActivationStatus, + ProcessParentType, + ResourceType, +) +from aap_eda.tasks.orchestrator import ( + delete_rulebook_process, + restart_rulebook_process, + start_rulebook_process, + stop_rulebook_process, +) + +logger = logging.getLogger(__name__) + + +@extend_schema_view( + destroy=extend_schema( + description="Delete an existing EventStream", + responses={ + status.HTTP_204_NO_CONTENT: OpenApiResponse( + None, + description="The EventStream has been deleted.", + ), + }, + ), +) +class EventStreamViewSet( + mixins.DestroyModelMixin, + viewsets.GenericViewSet, +): + queryset = models.EventStream.objects.all() + serializer_class = serializers.EventStreamSerializer + filter_backends = (defaultfilters.DjangoFilterBackend,) + filterset_class = filters.EventStreamFilter + rbac_resource_type = ResourceType.EVENT_STREAM + rbac_action = None + + @extend_schema( + request=serializers.EventStreamCreateSerializer, + responses={ + status.HTTP_201_CREATED: serializers.EventStreamSerializer, + status.HTTP_400_BAD_REQUEST: OpenApiResponse( + description="Invalid data to create event_stream." + ), + }, + ) + def create(self, request): + context = {"request": request} + serializer = serializers.EventStreamCreateSerializer( + data=request.data, context=context + ) + serializer.is_valid(raise_exception=True) + + event_stream = serializer.create(serializer.validated_data) + + if event_stream.is_enabled: + start_rulebook_process( + process_parent_type=ProcessParentType.EVENT_STREAM, + id=event_stream.id, + ) + + return Response( + serializers.EventStreamSerializer(event_stream).data, + status=status.HTTP_201_CREATED, + ) + + @extend_schema( + description="Get an event_stream by id", + responses={ + status.HTTP_200_OK: OpenApiResponse( + serializers.EventStreamSerializer, + description="Return an event_stream by id.", + ), + }, + ) + def retrieve(self, request, pk: int): + event_stream = get_object_or_404(models.EventStream, pk=pk) + return Response(serializers.EventStreamSerializer(event_stream).data) + + @extend_schema( + description="List all EventStreams", + request=None, + responses={ + status.HTTP_200_OK: OpenApiResponse( + serializers.EventStreamSerializer(many=True), + description="Return a list of EventStreams.", + ), + }, + ) + def list(self, request): + event_streams = models.EventStream.objects.all() + event_streams = self.filter_queryset(event_streams) + + serializer = serializers.EventStreamSerializer( + event_streams, many=True + ) + result = self.paginate_queryset(serializer.data) + + return self.get_paginated_response(result) + + def perform_destroy(self, event_stream): + event_stream.status = ActivationStatus.DELETING + event_stream.save(update_fields=["status"]) + logger.info(f"Now deleting {event_stream.name} ...") + delete_rulebook_process( + process_parent_type=ProcessParentType.EVENT_STREAM, + id=event_stream.id, + ) + + @extend_schema( + description="List all instances for the EventStream", + request=None, + responses={ + status.HTTP_200_OK: serializers.ActivationInstanceSerializer( + many=True + ), + }, + parameters=[ + OpenApiParameter( + name="id", + type=int, + location=OpenApiParameter.PATH, + description="A unique integer value identifying this rulebook.", # noqa: E501 + ) + ], + ) + @action( + detail=False, + queryset=models.RulebookProcess.objects.order_by("id"), + filterset_class=filters.ActivationInstanceFilter, + rbac_resource_type=ResourceType.ACTIVATION_INSTANCE, + rbac_action=Action.READ, + url_path="(?P[^/.]+)/instances", + ) + def instances(self, request, id): + event_stream_exists = models.EventStream.objects.filter(id=id).exists() + if not event_stream_exists: + raise api_exc.NotFound( + code=status.HTTP_404_NOT_FOUND, + detail=f"EventStream with ID={id} does not exist.", + ) + + event_stream_instances = models.RulebookProcess.objects.filter( + parent_type=ProcessParentType.EVENT_STREAM, + event_stream_id=id, + ) + filtered_instances = self.filter_queryset(event_stream_instances) + result = self.paginate_queryset(filtered_instances) + serializer = serializers.ActivationInstanceSerializer( + result, many=True + ) + return self.get_paginated_response(serializer.data) + + @extend_schema( + description="Enable the EventStream", + request=None, + responses={ + status.HTTP_204_NO_CONTENT: OpenApiResponse( + None, + description="EventStream has been enabled.", + ), + status.HTTP_400_BAD_REQUEST: OpenApiResponse( + None, + description="EventStream not enabled.", + ), + status.HTTP_409_CONFLICT: OpenApiResponse( + None, + description="EventStream not enabled do to current event" + " stream status", + ), + }, + ) + @action(methods=["post"], detail=True, rbac_action=Action.ENABLE) + def enable(self, request, pk): + event_stream = get_object_or_404(models.EventStream, pk=pk) + + if event_stream.is_enabled: + return Response(status=status.HTTP_204_NO_CONTENT) + + if event_stream.status in [ + ActivationStatus.STARTING, + ActivationStatus.STOPPING, + ActivationStatus.DELETING, + ActivationStatus.RUNNING, + ActivationStatus.UNRESPONSIVE, + ]: + return Response(status=status.HTTP_409_CONFLICT) + + valid, error = is_activation_valid(event_stream) + if not valid: + event_stream.status = ActivationStatus.ERROR + event_stream.status_message = error + event_stream.save(update_fields=["status", "status_message"]) + logger.error(f"Failed to enable {event_stream.name}: {error}") + + return Response( + {"errors": error}, status=status.HTTP_400_BAD_REQUEST + ) + + logger.info(f"Now enabling {event_stream.name} ...") + + event_stream.is_enabled = True + event_stream.failure_count = 0 + event_stream.status = ActivationStatus.PENDING + event_stream.save( + update_fields=[ + "is_enabled", + "failure_count", + "status", + "modified_at", + ] + ) + start_rulebook_process( + process_parent_type=ProcessParentType.EVENT_STREAM, + id=pk, + ) + + return Response(status=status.HTTP_204_NO_CONTENT) + + @extend_schema( + description="Disable the EventStream", + request=None, + responses={ + status.HTTP_204_NO_CONTENT: OpenApiResponse( + None, + description="EventStream has been disabled.", + ), + }, + ) + @action(methods=["post"], detail=True, rbac_action=Action.DISABLE) + def disable(self, request, pk): + event_stream = get_object_or_404(models.EventStream, pk=pk) + + self._check_deleting(event_stream) + + if event_stream.is_enabled: + event_stream.status = ActivationStatus.STOPPING + event_stream.is_enabled = False + event_stream.save( + update_fields=["is_enabled", "status", "modified_at"] + ) + stop_rulebook_process( + process_parent_type=ProcessParentType.EVENT_STREAM, + id=event_stream.id, + ) + + return Response(status=status.HTTP_204_NO_CONTENT) + + @extend_schema( + description="Restart the EventStream", + request=None, + responses={ + status.HTTP_204_NO_CONTENT: OpenApiResponse( + None, + description="EventStream restart was successful.", + ), + status.HTTP_400_BAD_REQUEST: OpenApiResponse( + None, + description="EventStream not enabled.", + ), + }, + ) + @action(methods=["post"], detail=True, rbac_action=Action.RESTART) + def restart(self, request, pk): + event_stream = get_object_or_404(models.EventStream, pk=pk) + + self._check_deleting(event_stream) + + if not event_stream.is_enabled: + raise api_exc.Forbidden( + detail="EventStream is disabled and cannot be run." + ) + + valid, error = is_activation_valid(event_stream) + if not valid: + event_stream.status = ActivationStatus.ERROR + event_stream.status_message = error + event_stream.save(update_fields=["status", "status_message"]) + logger.error(f"Failed to restart {event_stream.name}: {error}") + + return Response( + {"errors": error}, status=status.HTTP_400_BAD_REQUEST + ) + + restart_rulebook_process( + process_parent_type=ProcessParentType.EVENT_STREAM, + id=event_stream.id, + ) + + return Response(status=status.HTTP_204_NO_CONTENT) + + def _check_deleting(self, event_stream): + if event_stream.status == ActivationStatus.DELETING: + raise exceptions.APIException( + detail="Object is being deleted", code=409 + ) diff --git a/src/aap_eda/core/enums.py b/src/aap_eda/core/enums.py index 22cee3a5e..27c585b59 100644 --- a/src/aap_eda/core/enums.py +++ b/src/aap_eda/core/enums.py @@ -49,6 +49,7 @@ class ResourceType(DjangoStrEnum): ROLE = "role" DECISION_ENVIRONMENT = "decision_environment" CREDENTIAL = "credential" + EVENT_STREAM = "event_stream" class Action(DjangoStrEnum): diff --git a/src/aap_eda/core/migrations/0023_remove_eventstream_listener_args_and_more.py b/src/aap_eda/core/migrations/0023_remove_eventstream_listener_args_and_more.py new file mode 100644 index 000000000..d38709739 --- /dev/null +++ b/src/aap_eda/core/migrations/0023_remove_eventstream_listener_args_and_more.py @@ -0,0 +1,73 @@ +# Generated by Django 4.2.7 on 2024-02-07 21:43 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0022_activationrequestqueue_process_parent_id_and_more"), + ] + + operations = [ + migrations.RemoveField( + model_name="eventstream", + name="listener_args", + ), + migrations.RemoveField( + model_name="eventstream", + name="uuid", + ), + migrations.AddField( + model_name="activation", + name="event_streams", + field=models.ManyToManyField(default=None, to="core.eventstream"), + ), + migrations.AddField( + model_name="eventstream", + name="channel_name", + field=models.TextField(default=None, null=True), + ), + migrations.AddField( + model_name="eventstream", + name="credentials", + field=models.ManyToManyField( + default=None, + related_name="event_streams", + to="core.credential", + ), + ), + migrations.AddField( + model_name="eventstream", + name="system_vault_credential", + field=models.OneToOneField( + default=None, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="+", + to="core.credential", + ), + ), + migrations.AlterField( + model_name="permission", + name="resource_type", + field=models.TextField( + choices=[ + ("activation", "activation"), + ("activation_instance", "activation_instance"), + ("audit_rule", "audit_rule"), + ("audit_event", "audit_event"), + ("task", "task"), + ("user", "user"), + ("project", "project"), + ("inventory", "inventory"), + ("extra_var", "extra_var"), + ("rulebook", "rulebook"), + ("role", "role"), + ("decision_environment", "decision_environment"), + ("credential", "credential"), + ("event_stream", "event_stream"), + ] + ), + ), + ] diff --git a/src/aap_eda/core/models/activation.py b/src/aap_eda/core/models/activation.py index 6648c1594..35a3d1d20 100644 --- a/src/aap_eda/core/models/activation.py +++ b/src/aap_eda/core/models/activation.py @@ -97,3 +97,7 @@ class Meta: on_delete=models.SET_NULL, related_name="+", ) + event_streams = models.ManyToManyField( + "EventStream", + default=None, + ) diff --git a/src/aap_eda/core/models/event_stream.py b/src/aap_eda/core/models/event_stream.py index 0ebef6446..0b0275e62 100644 --- a/src/aap_eda/core/models/event_stream.py +++ b/src/aap_eda/core/models/event_stream.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import uuid - from django.db import models from aap_eda.core.enums import ActivationStatus, RestartPolicy @@ -76,10 +74,19 @@ class EventStream(StatusHandlerModelMixin, models.Model): on_delete=models.SET_NULL, related_name="+", ) - uuid = models.UUIDField(default=uuid.uuid4) + channel_name = models.TextField(null=True, default=None) source_type = models.TextField(null=False) args = models.JSONField(null=True, default=None) - listener_args = models.JSONField(null=True, default=None) + system_vault_credential = models.OneToOneField( + "Credential", + null=True, + default=None, + on_delete=models.SET_NULL, + related_name="+", + ) + credentials = models.ManyToManyField( + "Credential", related_name="event_streams", default=None + ) class Meta: db_table = "core_event_stream" diff --git a/src/aap_eda/core/validators.py b/src/aap_eda/core/validators.py index 273b1758a..47f755e01 100644 --- a/src/aap_eda/core/validators.py +++ b/src/aap_eda/core/validators.py @@ -109,3 +109,14 @@ def is_extra_var_dict(extra_var: str): raise serializers.ValidationError( "Extra var must be in JSON or YAML format" ) + + +def check_if_event_streams_exists(event_stream_ids: list[int]) -> list[int]: + for event_stream_id in event_stream_ids: + try: + models.EventStream.objects.get(pk=event_stream_id) + except models.EventStream.DoesNotExist: + raise serializers.ValidationError( + f"EventStream with id {event_stream_id} does not exist" + ) + return event_stream_ids diff --git a/src/aap_eda/services/activation/manager.py b/src/aap_eda/services/activation/manager.py index 957f3ecb5..cfe4a2bd2 100644 --- a/src/aap_eda/services/activation/manager.py +++ b/src/aap_eda/services/activation/manager.py @@ -228,9 +228,9 @@ def _check_latest_instance_and_pod_id(self) -> None: raise exceptions.ActivationInstancePodIdNotFound(msg) def _check_non_finalized_instances(self) -> None: - instances = models.RulebookProcess.objects.filter( - activation=self.db_instance, - ) + args = {f"{self.db_instance_type}": self.db_instance} + + instances = models.RulebookProcess.objects.filter(**args) for instance in instances: if instance.status not in [ ActivationStatus.STOPPED, @@ -1036,13 +1036,20 @@ def update_logs(self): return def _create_activation_instance(self): + git_hash = ( + self.db_instance.git_hash + if hasattr(self.db_instance, "git_hash") + else "" + ) try: - models.RulebookProcess.objects.create( - activation=self.db_instance, - name=self.db_instance.name, - status=ActivationStatus.STARTING, - git_hash=self.db_instance.git_hash, - ) + args = { + "name": self.db_instance.name, + "status": ActivationStatus.STARTING, + "git_hash": git_hash, + } + args[f"{self.db_instance_type}"] = self.db_instance + + models.RulebookProcess.objects.create(**args) except IntegrityError as exc: msg = ( f"Activation {self.db_instance.id} failed to create " diff --git a/src/aap_eda/settings/default.py b/src/aap_eda/settings/default.py index 2f563fd62..cc22f780d 100644 --- a/src/aap_eda/settings/default.py +++ b/src/aap_eda/settings/default.py @@ -478,3 +478,18 @@ def _get_secret_key() -> str: ANSIBLE_BASE_JWT_KEY = settings.get( "ANSIBLE_BASE_JWT_KEY", "https://localhost" ) + +ACTIVATION_DB_HOST = settings.get( + "ACTIVATION_DB_HOST", "host.containers.internal" +) + +_DEFAULT_PG_NOTIFY_DSN = ( + f"host={ACTIVATION_DB_HOST} " + f"port={DATABASES['default']['PORT']} " + f"dbname={DATABASES['default']['NAME']} " + f"user={DATABASES['default']['USER']} " + f"password={DATABASES['default']['PASSWORD']}" +) + +PG_NOTIFY_DSN = settings.get("PG_NOTIFY_DSN", _DEFAULT_PG_NOTIFY_DSN) +PG_NOTIFY_TEMPLATE_RULEBOOK = settings.get("PG_NOTIFY_TEMPLATE_RULEBOOK", None) diff --git a/src/aap_eda/wsapi/consumers.py b/src/aap_eda/wsapi/consumers.py index 2745faaeb..033111275 100644 --- a/src/aap_eda/wsapi/consumers.py +++ b/src/aap_eda/wsapi/consumers.py @@ -158,7 +158,7 @@ def handle_heartbeat(self, message: HeartbeatMessage) -> None: instance.updated_at = message.reported_at instance.save(update_fields=["updated_at"]) - activation = instance.activation + activation = instance.get_parent() activation.ruleset_stats[ message.stats["ruleSetName"] ] = message.stats @@ -320,9 +320,7 @@ def get_resources( activation_instance = models.RulebookProcess.objects.get( id=activation_instance_id ) - activation = models.Activation.objects.get( - id=activation_instance.activation_id - ) + activation = activation_instance.get_parent() if activation.extra_var_id: extra_var = models.ExtraVar.objects.filter( @@ -339,8 +337,11 @@ def get_awx_token(self, message: WorkerMessage) -> tp.Optional[str]: activation_instance = models.RulebookProcess.objects.get( id=message.activation_id, ) - awx_token = activation_instance.activation.awx_token + parent = activation_instance.get_parent() + if not hasattr(parent, "awx_token"): + return None + awx_token = parent.awx_token return awx_token.token.get_secret_value() if awx_token else None @database_sync_to_async @@ -351,7 +352,7 @@ def get_vault_passwords( activation_instance = models.RulebookProcess.objects.get( id=message.activation_id, ) - activation = activation_instance.activation + activation = activation_instance.get_parent() vault_passwords = [] if activation.system_vault_credential: diff --git a/tests/integration/api/conftest.py b/tests/integration/api/conftest.py index e765056f4..610e66ec2 100644 --- a/tests/integration/api/conftest.py +++ b/tests/integration/api/conftest.py @@ -59,3 +59,25 @@ def check_permission_mock(): wraps=RoleBasedPermission._check_permission, ) as m: yield m + + +@pytest.fixture +def default_de() -> models.DecisionEnvironment: + """Return a default DE.""" + return models.DecisionEnvironment.objects.create( + name="default_de", + image_url="quay.io/ansible/ansible-rulebook:latest", + description="Default DE", + ) + + +@pytest.fixture +def default_user() -> models.User: + """Return a default User.""" + return models.User.objects.create_user( + username="luke.skywalker", + first_name="Luke", + last_name="Skywalker", + email="luke.skywalker@example.com", + password="secret", + ) diff --git a/tests/integration/api/test_activation_instance.py b/tests/integration/api/test_activation_instance.py index cfd781d62..c3fb9e2b1 100644 --- a/tests/integration/api/test_activation_instance.py +++ b/tests/integration/api/test_activation_instance.py @@ -288,6 +288,7 @@ def assert_activation_instance_data( "started_at": instance.started_at.strftime(DATETIME_FORMAT), "ended_at": instance.ended_at, "status_message": ACTIVATION_STATUS_MESSAGE_MAP[instance.status], + "event_stream_id": None, } diff --git a/tests/integration/api/test_event_stream.py b/tests/integration/api/test_event_stream.py new file mode 100644 index 000000000..8fae1417b --- /dev/null +++ b/tests/integration/api/test_event_stream.py @@ -0,0 +1,444 @@ +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import mock + +import pytest +import yaml +from rest_framework import status +from rest_framework.test import APIClient + +from aap_eda.api.constants import ( + PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, + PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, +) +from aap_eda.core import models +from aap_eda.core.enums import Action, ProcessParentType, ResourceType +from tests.integration.constants import api_url_v1 + +BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_TYPE = """ +--- +- name: PG Notify Template Event Stream + hosts: all + sources: + - name: my_range + ansible.eda.range: + limit: 5 + complementary_source: + name: Postgres Listener + args: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channels: + - "{{ EDA_PG_NOTIFY_CHANNEL }}" + extra_vars: + EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" + EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" + encrypt_vars: + - EDA_PG_NOTIFY_DSN + rules: + - name: Post event + condition: true + action: + pg_notify: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" + event: "{{ event }}" +""" +BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_NAME = """ +--- +- name: PG Notify Template Event Stream + hosts: all + sources: + - name: my_range + ansible.eda.range: + limit: 5 + complementary_source: + type: ansible.eda.pg_listener + args: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channels: + - "{{ EDA_PG_NOTIFY_CHANNEL }}" + extra_vars: + EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" + EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" + encrypt_vars: + - EDA_PG_NOTIFY_DSN + rules: + - name: Post event + condition: true + action: + pg_notify: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" + event: "{{ event }}" +""" +BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_ARGS = """ +--- +- name: PG Notify Template Event Stream + hosts: all + sources: + - name: my_range + ansible.eda.range: + limit: 5 + complementary_source: + type: ansible.eda.pg_listener + name: Postgres Listener + extra_vars: + EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" + EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" + encrypt_vars: + - EDA_PG_NOTIFY_DSN + rules: + - name: Post event + condition: true + action: + pg_notify: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" + event: "{{ event }}" +""" +BAD_PG_NOTIFY_NO_COMPLEMENTARY_SOURCE = """ +--- +- name: PG Notify Template Event Stream + hosts: all + sources: + - name: my_range + ansible.eda.range: + limit: 5 + extra_vars: + EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" + EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" + encrypt_vars: + - EDA_PG_NOTIFY_DSN + rules: + - name: Post event + condition: true + action: + pg_notify: + dsn: "{{ EDA_PG_NOTIFY_DSN }}" + channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" + event: "{{ event }}" +""" + + +@pytest.mark.django_db +def test_list_event_streams( + client: APIClient, + check_permission_mock: mock.Mock, + default_de: models.DecisionEnvironment, + default_user: models.User, +): + event_streams = models.EventStream.objects.bulk_create( + [ + models.EventStream( + name="test-event_stream-1", + source_type="ansible.eda.range", + args={"limit": 5, "delay": 1}, + user=default_user, + decision_environment_id=default_de.id, + ), + models.EventStream( + name="test-event_stream-2", + source_type="ansible.eda.range", + args={"limit": 6, "delay": 2}, + user=default_user, + decision_environment_id=default_de.id, + ), + ] + ) + + response = client.get(f"{api_url_v1}/event-streams/") + assert response.status_code == status.HTTP_200_OK + assert len(response.data["results"]) == 2 + assert ( + response.data["results"][1]["source_type"] + == event_streams[0].source_type + ) + assert response.data["results"][1]["name"] == event_streams[0].name + assert response.data["results"][1]["user"] == "luke.skywalker" + + check_permission_mock.assert_called_once_with( + mock.ANY, mock.ANY, ResourceType.EVENT_STREAM, Action.READ + ) + + +@pytest.mark.django_db +def test_retrieve_event_stream( + client: APIClient, + check_permission_mock: mock.Mock, + default_de: models.DecisionEnvironment, + default_user: models.User, +): + args = {"limit": 5, "delay": 1} + credentials = models.Credential.objects.bulk_create( + [ + models.Credential( + name="credential1", username="me", secret="sec1" + ), + models.Credential( + name="credential2", username="me", secret="sec2" + ), + ] + ) + event_stream = models.EventStream.objects.create( + name="test-event_stream-1", + source_type="ansible.eda.range", + args=args, + user=default_user, + decision_environment_id=default_de.id, + ) + for credential in credentials: + event_stream.credentials.add(credential.id) + + response = client.get(f"{api_url_v1}/event-streams/{event_stream.id}/") + assert response.status_code == status.HTTP_200_OK + assert response.data["name"] == event_stream.name + assert response.data["source_type"] == event_stream.source_type + assert yaml.safe_load(response.data["args"]) == args + assert response.data["user"] == "luke.skywalker" + assert len(response.data["credentials"]) == 2 + assert response.data["credentials"][0]["name"] == credentials[0].name + assert response.data["credentials"][1]["name"] == credentials[1].name + + check_permission_mock.assert_called_once_with( + mock.ANY, mock.ANY, ResourceType.EVENT_STREAM, Action.READ + ) + + +@pytest.mark.django_db +def test_create_event_stream( + client: APIClient, + default_de: models.DecisionEnvironment, +): + models.Rulebook.objects.create( + name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, + rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, + ) + + args = {"limit": 5, "delay": 1} + source_type = "ansible.eda.range" + data_in = { + "name": "test_event_stream", + "source_type": f"{source_type}", + "args": f"{args}", + "decision_environment_id": default_de.id, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_201_CREATED + result = response.data + assert result["name"] == "test_event_stream" + assert result["source_type"] == source_type + assert result["user"] == "test.admin" + assert yaml.safe_load(response.data["args"]) == args + + event_stream = models.EventStream.objects.first() + rulesets = yaml.safe_load(event_stream.rulebook_rulesets) + source = rulesets[0]["sources"][0] + assert source[source_type] == args + assert source["name"] == "test_event_stream" + + +@pytest.mark.django_db +def test_create_event_stream_with_credential( + client: APIClient, + default_de: models.DecisionEnvironment, +): + models.Rulebook.objects.create( + name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, + rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, + ) + credential = models.Credential.objects.create( + name="credential", username="me", secret="sec" + ) + + args = {"limit": 5, "delay": 1} + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.range", + "args": f"{args}", + "decision_environment_id": default_de.id, + "credentials": [credential.id], + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_201_CREATED + result = response.data + assert result["name"] == "test_event_stream" + assert result["source_type"] == "ansible.eda.range" + assert result["credentials"][0]["name"] == credential.name + assert yaml.safe_load(response.data["args"]) == args + + +@pytest.mark.parametrize( + "bad_rulebooks", + [ + {"bad_rulebook_1": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_TYPE}"}, + {"bad_rulebook_2": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_NAME}"}, + {"bad_rulebook_3": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_ARGS}"}, + {"bad_rulebook_4": f"{BAD_PG_NOTIFY_NO_COMPLEMENTARY_SOURCE}"}, + ], +) +@pytest.mark.django_db +def test_create_event_stream_with_bad_rulebook( + client: APIClient, + default_de: models.DecisionEnvironment, + settings, + bad_rulebooks, +): + for key in bad_rulebooks: + settings.PG_NOTIFY_TEMPLATE_RULEBOOK = key + settings.PG_NOTIFY_DSN = ( + "host=localhost port=5432 dbname=eda user=postgres password=secret" + ) + models.Rulebook.objects.create( + name=key, + rulesets=bad_rulebooks[key], + ) + + args = {"limit": 5, "delay": 1} + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.range", + "args": f"{args}", + "decision_environment_id": default_de.id, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.data["detail"].startswith( + "Configuration Error: Event stream template rulebook is missing " + ) + + +@pytest.mark.django_db +def test_create_event_stream_bad_channel_name( + client: APIClient, + default_de: models.DecisionEnvironment, +): + args = {"limit": 5, "delay": 1} + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.range", + "args": f"{args}", + "channel_name": "abc-def", + "decision_environment_id": default_de.id, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert ( + str(response.data["channel_name"][0]) + == "Channel name can only contain alphanumeric and " + "underscore characters" + ) + + +@pytest.mark.django_db +def test_create_event_stream_bad_args( + client: APIClient, + default_de: models.DecisionEnvironment, +): + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.range", + "args": "gobbledegook", + "decision_environment_id": default_de.id, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_400_BAD_REQUEST + result = response.data + assert ( + str(result["args"][0]) + == "The 'args' field must be a YAML object (dictionary)" + ) + + +@pytest.mark.django_db +def test_create_event_stream_empty_args( + client: APIClient, + default_de: models.DecisionEnvironment, +): + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.generic", + "decision_environment_id": default_de.id, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["args"][0] == "This field is required." + + +@pytest.mark.django_db +def test_create_event_stream_bad_de(client: APIClient): + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.generic", + "decision_environment_id": 99999, + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_400_BAD_REQUEST + result = response.data + assert ( + str(result["decision_environment_id"][0]) + == "DecisionEnvironment with id 99999 does not exist" + ) + + +@pytest.mark.django_db +def test_create_event_stream_no_de( + client: APIClient, +): + data_in = { + "name": "test_event_stream", + "source_type": "ansible.eda.generic", + } + response = client.post(f"{api_url_v1}/event-streams/", data=data_in) + assert response.status_code == status.HTTP_400_BAD_REQUEST + result = response.data + assert result["decision_environment_id"][0] == "This field is required." + + +@pytest.mark.django_db +def test_list_event_stream_instances( + client: APIClient, + default_de: models.DecisionEnvironment, + default_user: models.User, +): + args = {"limit": 5, "delay": 1} + event_stream = models.EventStream.objects.create( + name="test-event_stream-1", + source_type="ansible.eda.range", + args=args, + user=default_user, + decision_environment_id=default_de.id, + ) + + instances = models.RulebookProcess.objects.bulk_create( + [ + models.RulebookProcess( + name="test-activation-instance-1", + event_stream=event_stream, + parent_type=ProcessParentType.EVENT_STREAM, + ), + models.RulebookProcess( + name="test-activation-instance-1", + event_stream=event_stream, + parent_type=ProcessParentType.EVENT_STREAM, + ), + ] + ) + response = client.get( + f"{api_url_v1}/event-streams/{event_stream.id}/instances/" + ) + data = response.data["results"] + assert response.status_code == status.HTTP_200_OK + assert len(data) == len(instances) + assert data[0]["name"] == instances[0].name + assert data[1]["name"] == instances[1].name diff --git a/tools/deploy/eda-api/deployment.yaml b/tools/deploy/eda-api/deployment.yaml index f300e0a60..12715ca68 100644 --- a/tools/deploy/eda-api/deployment.yaml +++ b/tools/deploy/eda-api/deployment.yaml @@ -30,6 +30,8 @@ spec: value: postgresql+asyncpg://postgres:secret@postgres/eda - name: EDA_DB_HOST value: eda-postgres + - name: EDA_ACTIVATION_DB_HOST + value: eda-postgres - name: EDA_DB_PASSWORD value: secret - name: EDA_SECRET_KEY