Skip to content

Commit

Permalink
Combine event streams with activations
Browse files Browse the repository at this point in the history
  • Loading branch information
hsong-rh committed Feb 9, 2024
1 parent 61272d2 commit ea7d435
Show file tree
Hide file tree
Showing 23 changed files with 1,466 additions and 21 deletions.
31 changes: 31 additions & 0 deletions src/aap_eda/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,34 @@

# EDA_SERVER_VAULT_LABEL is reserved for system vault password identifiers
EDA_SERVER_VAULT_LABEL = "EDA_SERVER"

PG_NOTIFY_TEMPLATE_RULEBOOK_NAME = "_PG_NOTIFY_TEMPLATE_RULEBOOK_"
PG_NOTIFY_TEMPLATE_RULEBOOK_DATA = """
---
- name: PG Notify Template Event Stream
hosts: all
sources:
- name: my_range
ansible.eda.range:
limit: 5
complementary_source:
type: ansible.eda.pg_listener
name: Postgres Listener
args:
dsn: "{{ EDA_PG_NOTIFY_DSN }}"
channels:
- "{{ EDA_PG_NOTIFY_CHANNEL }}"
extra_vars:
EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}"
EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}"
encrypt_vars:
- EDA_PG_NOTIFY_DSN
rules:
- name: Post event
condition: true
action:
pg_notify:
dsn: "{{ EDA_PG_NOTIFY_DSN }}"
channel: "{{ EDA_PG_NOTIFY_CHANNEL }}"
event: "{{ event }}"
"""
30 changes: 30 additions & 0 deletions src/aap_eda/api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,33 @@ class InvalidWebsocketHost(APIException):
default_detail = (
"Connection Error: WebSocket URL must have a valid host address."
)


class MissingEventStreamRulebook(APIException):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = (
"Configuration Error: Event stream template rulebook not found"
)


class MissingEventStreamRulebookKeys(APIException):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = (
"Configuration Error: Event stream template rulebook is missing "
"required keys in complementary_source: type, name and args"
)


class MissingEventStreamRulebookSource(APIException):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = (
"Configuration Error: Event stream template rulebook is missing "
"required complementary_source"
)


class InvalidEventStreamRulebook(APIException):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = (
"Configuration Error: Event stream template rulebook is invalid"
)
3 changes: 3 additions & 0 deletions src/aap_eda/api/filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from .credential import CredentialFilter
from .decision_environment import DecisionEnvironmentFilter
from .event_stream import EventStreamFilter
from .project import ProjectFilter
from .role import RoleFilter
from .rulebook import (
Expand Down Expand Up @@ -51,4 +52,6 @@
"UserFilter",
# role
"RoleFilter",
# event_stream
"EventStreamFilter",
)
29 changes: 29 additions & 0 deletions src/aap_eda/api/filters/event_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import django_filters

from aap_eda.core import models


class EventStreamFilter(django_filters.FilterSet):
name = django_filters.CharFilter(
field_name="name",
lookup_expr="istartswith",
label="Filter by event source name.",
)

class Meta:
model = models.EventStream
fields = ["name"]
9 changes: 9 additions & 0 deletions src/aap_eda/api/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
DecisionEnvironmentRefSerializer,
DecisionEnvironmentSerializer,
)
from .event_stream import (
EventStreamCreateSerializer,
EventStreamOutSerializer,
EventStreamSerializer,
)
from .project import (
ExtraVarCreateSerializer,
ExtraVarRefSerializer,
Expand Down Expand Up @@ -118,4 +123,8 @@
"RoleSerializer",
"RoleListSerializer",
"RoleDetailSerializer",
# event streams
"EventStreamSerializer",
"EventStreamCreateSerializer",
"EventStreamOutSerializer",
)
100 changes: 98 additions & 2 deletions src/aap_eda/api/serializers/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,65 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Union

import yaml
from rest_framework import serializers

from aap_eda.api.constants import PG_NOTIFY_TEMPLATE_RULEBOOK_DATA
from aap_eda.api.exceptions import InvalidEventStreamRulebook
from aap_eda.api.serializers.credential import CredentialSerializer
from aap_eda.api.serializers.decision_environment import (
DecisionEnvironmentRefSerializer,
)
from aap_eda.api.serializers.event_stream import EventStreamOutSerializer
from aap_eda.api.serializers.project import (
ExtraVarRefSerializer,
ProjectRefSerializer,
)
from aap_eda.api.serializers.rulebook import RulebookRefSerializer
from aap_eda.api.serializers.utils import (
substitute_extra_vars,
substitute_source_args,
swap_sources,
)
from aap_eda.core import models, validators
from aap_eda.core.enums import ProcessParentType

logger = logging.getLogger(__name__)


def _updated_ruleset(validated_data):
try:
sources_info = []

for event_stream_id in validated_data["event_streams"]:
event_stream = models.EventStream.objects.get(id=event_stream_id)

if event_stream.rulebook:
rulesets = yaml.safe_load(event_stream.rulebook.rulesets)
else:
rulesets = yaml.safe_load(PG_NOTIFY_TEMPLATE_RULEBOOK_DATA)

extra_vars = rulesets[0]["sources"][0].get("extra_vars", {})
encrypt_vars = rulesets[0]["sources"][0].get("encrypt_vars", [])

# TODO: encrypt password when engine is ready for vaulted data
extra_vars = substitute_extra_vars(
event_stream.__dict__, extra_vars, encrypt_vars, "password"
)

source = rulesets[0]["sources"][0]["complementary_source"]
source = substitute_source_args(
event_stream.__dict__, source, extra_vars
)
sources_info.append(source)

return swap_sources(validated_data["rulebook_rulesets"], sources_info)
except Exception as e:
logger.error(f"Failed to update rulesets: {e}")
raise InvalidEventStreamRulebook(e)


class ActivationSerializer(serializers.ModelSerializer):
Expand All @@ -34,6 +81,12 @@ class ActivationSerializer(serializers.ModelSerializer):
child=CredentialSerializer(),
)

event_streams = serializers.ListField(
required=False,
allow_null=True,
child=EventStreamOutSerializer(),
)

class Meta:
model = models.Activation
fields = [
Expand All @@ -57,6 +110,7 @@ class Meta:
"status_message",
"awx_token_id",
"credentials",
"event_streams",
]
read_only_fields = [
"id",
Expand All @@ -77,6 +131,12 @@ class ActivationListSerializer(serializers.ModelSerializer):
child=CredentialSerializer(),
)

event_streams = serializers.ListField(
required=False,
allow_null=True,
child=EventStreamOutSerializer(),
)

class Meta:
model = models.Activation
fields = [
Expand All @@ -100,6 +160,7 @@ class Meta:
"status_message",
"awx_token_id",
"credentials",
"event_streams",
]
read_only_fields = ["id", "created_at", "modified_at"]

Expand All @@ -111,6 +172,10 @@ def to_representation(self, activation):
CredentialSerializer(credential).data
for credential in activation.credentials.all()
]
event_streams = [
EventStreamOutSerializer(event_stream).data
for event_stream in activation.event_streams.all()
]

return {
"id": activation.id,
Expand All @@ -133,6 +198,7 @@ def to_representation(self, activation):
"status_message": activation.status_message,
"awx_token_id": activation.awx_token_id,
"credentials": credentials,
"event_streams": event_streams,
}


Expand All @@ -152,6 +218,7 @@ class Meta:
"restart_policy",
"awx_token_id",
"credentials",
"event_streams",
]

rulebook_id = serializers.IntegerField(
Expand All @@ -177,6 +244,12 @@ class Meta:
allow_null=True,
child=serializers.IntegerField(),
)
event_streams = serializers.ListField(
required=False,
allow_null=True,
child=serializers.IntegerField(),
validators=[validators.check_if_event_streams_exists],
)

def validate(self, data):
user = data["user"]
Expand All @@ -200,6 +273,10 @@ def create(self, validated_data):
validated_data["rulebook_rulesets"] = rulebook.rulesets
validated_data["git_hash"] = rulebook.project.git_hash
validated_data["project_id"] = rulebook.project.id
if validated_data.get("event_streams"):
validated_data["rulebook_rulesets"] = _updated_ruleset(
validated_data
)
return super().create(validated_data)


Expand All @@ -215,6 +292,7 @@ class Meta:
"git_hash",
"status_message",
"activation_id",
"event_stream_id",
"started_at",
"ended_at",
]
Expand Down Expand Up @@ -243,6 +321,11 @@ class ActivationReadSerializer(serializers.ModelSerializer):
rules_count = serializers.IntegerField()
rules_fired_count = serializers.IntegerField()
restarted_at = serializers.DateTimeField(required=False, allow_null=True)
event_streams = serializers.ListField(
required=False,
allow_null=True,
child=EventStreamOutSerializer(),
)

class Meta:
model = models.Activation
Expand Down Expand Up @@ -270,6 +353,7 @@ class Meta:
"status_message",
"awx_token_id",
"credentials",
"event_streams",
]
read_only_fields = ["id", "created_at", "modified_at", "restarted_at"]

Expand Down Expand Up @@ -297,7 +381,8 @@ def to_representation(self, activation):
else None
)
activation_instances = models.RulebookProcess.objects.filter(
activation_id=activation.id
activation_id=activation.id,
parent_type=ProcessParentType.ACTIVATION,
)
rules_count, rules_fired_count = get_rules_count(
activation.ruleset_stats
Expand All @@ -315,6 +400,11 @@ def to_representation(self, activation):
for credential in activation.credentials.all()
]

event_streams = [
EventStreamOutSerializer(event_stream).data
for event_stream in activation.event_streams.all()
]

return {
"id": activation.id,
"name": activation.name,
Expand All @@ -341,6 +431,7 @@ def to_representation(self, activation):
"status_message": activation.status_message,
"awx_token_id": activation.awx_token_id,
"credentials": credentials,
"event_streams": event_streams,
}


Expand All @@ -356,7 +447,9 @@ class PostActivationSerializer(serializers.ModelSerializer):
allow_null=True,
validators=[validators.check_if_extra_var_exists],
)
# TODO: is_activation_valid needs to tell event stream/activation
awx_token_id = serializers.IntegerField(
required=False,
allow_null=True,
validators=[validators.check_if_awx_token_exists],
)
Expand Down Expand Up @@ -418,8 +511,11 @@ def parse_validation_errors(errors: dict) -> str:
return str(messages)


def validate_rulebook_token(rulebook_id: int) -> None:
def validate_rulebook_token(rulebook_id: Union[int, None]) -> None:
"""Validate if the rulebook requires an Awx Token."""
if rulebook_id is None:
return

rulebook = models.Rulebook.objects.get(id=rulebook_id)

# TODO: rulesets are stored as a string in the rulebook model
Expand Down
Loading

0 comments on commit ea7d435

Please sign in to comment.