From 4906babd22bf5ea5f7f8eeb3d720cd49bda5c849 Mon Sep 17 00:00:00 2001 From: MoritzWeber Date: Thu, 15 Aug 2024 09:24:46 +0200 Subject: [PATCH] feat: Allow communication between sessions of same user A requirement for the training mode is that a Jupyter session can talk to a Capella session. This PR adds network policies so that Pods can reach the ports of other Pods from the same user. --- .../capellacollab/sessions/hooks/__init__.py | 2 + .../sessions/hooks/networking.py | 38 +++++++++++ .../capellacollab/sessions/operators/k8s.py | 50 +++++++++++++- backend/capellacollab/sessions/routes.py | 6 ++ .../sessions/hooks/test_networking_hook.py | 66 +++++++++++++++++++ .../k8s_operator/test_session_k8s_operator.py | 1 + .../backend/backend.serviceaccount.yaml | 2 +- 7 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 backend/capellacollab/sessions/hooks/networking.py create mode 100644 backend/tests/sessions/hooks/test_networking_hook.py diff --git a/backend/capellacollab/sessions/hooks/__init__.py b/backend/capellacollab/sessions/hooks/__init__.py index 86dfe644d..6032bbfce 100644 --- a/backend/capellacollab/sessions/hooks/__init__.py +++ b/backend/capellacollab/sessions/hooks/__init__.py @@ -8,6 +8,7 @@ http, interface, jupyter, + networking, persistent_workspace, provisioning, pure_variants, @@ -29,6 +30,7 @@ "read_only_hook": read_only_workspace.ReadOnlyWorkspaceHook(), "provisioning": provisioning.ProvisionWorkspaceHook(), "session_preparation": session_preparation.GitRepositoryCloningHook(), + "networking": networking.NetworkingIntegration(), } diff --git a/backend/capellacollab/sessions/hooks/networking.py b/backend/capellacollab/sessions/hooks/networking.py new file mode 100644 index 000000000..fb0a36ca6 --- /dev/null +++ b/backend/capellacollab/sessions/hooks/networking.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + + +from capellacollab.sessions import operators +from capellacollab.users import models as users_models + +from .. import models as sessions_models +from . import interface + + +class NetworkingIntegration(interface.HookRegistration): + """Allow sessions of the same user to talk to each other.""" + + def post_session_creation_hook( # type: ignore + self, + session_id: str, + operator: operators.KubernetesOperator, + user: users_models.DatabaseUser, + **kwargs, + ) -> interface.PostSessionCreationHookResult: + """Allow sessions of the user to talk to each other.""" + + operator.create_network_policy_from_pod_to_label( + session_id, + {"capellacollab/session-id": session_id}, + {"capellacollab/owner-id": str(user.id)}, + ) + + return interface.PostSessionCreationHookResult() + + def pre_session_termination_hook( # type: ignore + self, + operator: operators.KubernetesOperator, + session: sessions_models.DatabaseSession, + **kwargs, + ): + operator.delete_network_policy(session.id) diff --git a/backend/capellacollab/sessions/operators/k8s.py b/backend/capellacollab/sessions/operators/k8s.py index 6b3da6e76..4a1e4769f 100644 --- a/backend/capellacollab/sessions/operators/k8s.py +++ b/backend/capellacollab/sessions/operators/k8s.py @@ -99,6 +99,7 @@ def start_session( volumes: list[models.Volume], init_volumes: list[models.Volume], annotations: dict[str, str], + labels: dict[str, str], prometheus_path="/metrics", prometheus_port=9118, ) -> Session: @@ -125,6 +126,7 @@ def start_session( init_volumes=init_volumes, tool_resources=tool.config.resources, annotations=annotations, + labels=labels, ) self._create_disruption_budget( @@ -440,6 +442,51 @@ def _map_volume_to_k8s_volume(self, volume: models.Volume): f"The Kubernetes operator encountered an unsupported session volume type '{type(volume)}'" ) + def create_network_policy_from_pod_to_label( + self, + name: str, + match_labels_from: dict[str, str], + match_labels_to: dict[str, str], + ): + return self.v1_networking.create_namespaced_network_policy( + namespace, + client.V1NetworkPolicy( + kind="NetworkPolicy", + api_version="networking.k8s.io/v1", + metadata=client.V1ObjectMeta(name=name), + spec=client.V1NetworkPolicySpec( + pod_selector=client.V1LabelSelector( + match_labels=match_labels_to + ), + policy_types=["Ingress"], + ingress=[ + client.V1NetworkPolicyIngressRule( + _from=[ + client.V1NetworkPolicyPeer( + pod_selector=client.V1LabelSelector( + match_labels=match_labels_from + ) + ) + ], + ) + ], + ), + ), + ) + + def delete_network_policy(self, name: str): + try: + self.v1_networking.delete_namespaced_network_policy( + name, + namespace, + ) + except exceptions.ApiException as e: + # Network policy doesn't exist or was already deleted + # Nothing to do + if e.status == http.HTTPStatus.NOT_FOUND: + return + raise + def _create_deployment( self, image: str, @@ -451,6 +498,7 @@ def _create_deployment( init_volumes: list[models.Volume], tool_resources: tools_models.Resources, annotations: dict[str, str], + labels: dict[str, str], ) -> client.V1Deployment: k8s_volumes, k8s_volume_mounts = self._map_volumes_to_k8s_volumes( volumes @@ -541,7 +589,7 @@ def _create_deployment( selector=client.V1LabelSelector(match_labels={"app": name}), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta( - labels={"app": name, "workload": "session"}, + labels={"app": name, "workload": "session"} | labels, annotations=annotations, ), spec=client.V1PodSpec( diff --git a/backend/capellacollab/sessions/routes.py b/backend/capellacollab/sessions/routes.py index 141825263..bd4032647 100644 --- a/backend/capellacollab/sessions/routes.py +++ b/backend/capellacollab/sessions/routes.py @@ -178,6 +178,11 @@ def request_session( "capellacollab/connection-method-name": connection_method.name, } + labels: dict[str, str] = { + "capellacollab/session-id": session_id, + "capellacollab/owner-id": str(user.id), + } + session = operator.start_session( session_id=session_id, image=docker_image, @@ -191,6 +196,7 @@ def request_session( volumes=volumes, init_volumes=init_volumes, annotations=annotations, + labels=labels, prometheus_path=tool.config.monitoring.prometheus.path, prometheus_port=connection_method.ports.metrics, ) diff --git a/backend/tests/sessions/hooks/test_networking_hook.py b/backend/tests/sessions/hooks/test_networking_hook.py new file mode 100644 index 000000000..3ec70ca52 --- /dev/null +++ b/backend/tests/sessions/hooks/test_networking_hook.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + + +import kubernetes.client +import pytest + +from capellacollab.sessions import models as sessions_models +from capellacollab.sessions import operators +from capellacollab.sessions.hooks import networking as networking_hook +from capellacollab.users import models as users_models + + +def test_network_policy_created( + user: users_models.DatabaseUser, monkeypatch: pytest.MonkeyPatch +): + network_policy_counter = 0 + + def mock_create_namespaced_network_policy( + self, + namespace: str, + network_policy: kubernetes.client.V1PersistentVolumeClaim, + ): + nonlocal network_policy_counter + network_policy_counter += 1 + + monkeypatch.setattr( + kubernetes.client.NetworkingV1Api, + "create_namespaced_network_policy", + mock_create_namespaced_network_policy, + ) + + networking_hook.NetworkingIntegration().post_session_creation_hook( + session_id="test", + operator=operators.KubernetesOperator(), + user=user, + ) + + assert network_policy_counter == 1 + + +def test_network_policy_deleted( + session: sessions_models.DatabaseSession, monkeypatch: pytest.MonkeyPatch +): + network_policy_del_counter = 0 + + def mock_delete_namespaced_network_policy( + self, + name: str, + namespace: str, + ): + nonlocal network_policy_del_counter + network_policy_del_counter += 1 + + monkeypatch.setattr( + kubernetes.client.NetworkingV1Api, + "delete_namespaced_network_policy", + mock_delete_namespaced_network_policy, + ) + + networking_hook.NetworkingIntegration().pre_session_termination_hook( + operator=operators.KubernetesOperator(), + session=session, + ) + + assert network_policy_del_counter == 1 diff --git a/backend/tests/sessions/k8s_operator/test_session_k8s_operator.py b/backend/tests/sessions/k8s_operator/test_session_k8s_operator.py index a6af69f75..b23ad33e6 100644 --- a/backend/tests/sessions/k8s_operator/test_session_k8s_operator.py +++ b/backend/tests/sessions/k8s_operator/test_session_k8s_operator.py @@ -73,6 +73,7 @@ def create_namespaced_pod_disruption_budget(namespace, budget): ports={"rdp": 3389}, volumes=[], init_volumes=[], + labels={}, annotations={}, ) diff --git a/helm/templates/backend/backend.serviceaccount.yaml b/helm/templates/backend/backend.serviceaccount.yaml index 6a562e7f8..3b5dacfd7 100644 --- a/helm/templates/backend/backend.serviceaccount.yaml +++ b/helm/templates/backend/backend.serviceaccount.yaml @@ -43,7 +43,7 @@ rules: resources: ["poddisruptionbudgets"] verbs: ["create", "delete"] - apiGroups: ["networking.k8s.io"] - resources: ["ingresses"] + resources: ["ingresses", "networkpolicies"] verbs: ["create", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1