diff --git a/backend/capellacollab/sessions/hooks/__init__.py b/backend/capellacollab/sessions/hooks/__init__.py index 09d7de790..bfebfb9c1 100644 --- a/backend/capellacollab/sessions/hooks/__init__.py +++ b/backend/capellacollab/sessions/hooks/__init__.py @@ -9,6 +9,7 @@ http, interface, jupyter, + log_collector, networking, persistent_workspace, provisioning, @@ -33,6 +34,7 @@ "session_preparation": session_preparation.GitRepositoryCloningHook(), "networking": networking.NetworkingIntegration(), "authentication": authentication.PreAuthenticationHook(), + "log_collector": log_collector.LogCollectorIntegration(), } diff --git a/backend/capellacollab/sessions/hooks/interface.py b/backend/capellacollab/sessions/hooks/interface.py index fefbd60f4..743a3c2c8 100644 --- a/backend/capellacollab/sessions/hooks/interface.py +++ b/backend/capellacollab/sessions/hooks/interface.py @@ -152,6 +152,7 @@ def post_session_creation_hook( operator: operators.KubernetesOperator, user: users_models.DatabaseUser, connection_method: tools_models.ToolSessionConnectionMethod, + db: orm.Session, **kwargs, ) -> PostSessionCreationHookResult: """Hook executed after session creation @@ -173,6 +174,8 @@ def post_session_creation_hook( User who has requested the session connection_method : tools_models.ToolSessionConnectionMethod Requested connection method for the session + db : sqlalchemy.orm.Session + Database session. Can be used to access the database Returns ------- diff --git a/backend/capellacollab/sessions/hooks/log_collector.py b/backend/capellacollab/sessions/hooks/log_collector.py new file mode 100644 index 000000000..ab1ffa660 --- /dev/null +++ b/backend/capellacollab/sessions/hooks/log_collector.py @@ -0,0 +1,159 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import logging +import pathlib + +import yaml +from sqlalchemy import orm + +from capellacollab.config import config +from capellacollab.sessions import models as sessions_models +from capellacollab.sessions import operators +from capellacollab.sessions.operators import models as operators_models +from capellacollab.users import models as users_models +from capellacollab.users.workspaces import crud as users_workspaces_crud + +from . import interface + +log = logging.getLogger(__name__) + + +class LogCollectorIntegration(interface.HookRegistration): + _loki_enabled: bool = config.k8s.promtail.loki_enabled + + def post_session_creation_hook( # type: ignore[override] + self, + db_session: sessions_models.DatabaseSession, + operator: operators.KubernetesOperator, + user: users_models.DatabaseUser, + db: orm.Session, + **kwargs, + ) -> interface.PostSessionCreationHookResult: + if ( + not self._loki_enabled + or db_session.type == sessions_models.SessionType.READONLY + ): + return interface.PostSessionCreationHookResult() + + workspaces = users_workspaces_crud.get_workspaces_for_user(db, user) + if not workspaces: + return interface.PostSessionCreationHookResult() + + operator._create_configmap( + name=db_session.id, + data=self._promtail_configuration( + username=user.name, + session_type=db_session.type.value, + tool_name=db_session.tool.name, + version_name=db_session.version.name, + ), + ) + + labels: dict[str, str] = { + "capellacollab/workload": "session-sidecar", + "capellacollab/session-id": db_session.id, + "capellacollab/owner-id": str(user.id), + } + + volumes = [ + operators_models.ConfigMapReferenceVolume( + name="prom-config", + read_only=True, + container_path=pathlib.PurePosixPath("/etc/promtail"), + config_map_name=db_session.id, + optional=False, + ), + operators_models.PersistentVolume( + name="workspace", + read_only=False, + container_path=pathlib.PurePosixPath("/var/log/promtail"), + volume_name=workspaces[0].pvc_name, + ), + ] + + operator._create_sidecar_pod( + image=f"{config.docker.external_registry}/grafana/promtail", + name=f"{db_session.id}-promtail", + labels=labels, + args=[ + "--config.file=/etc/promtail/promtail.yaml", + "-log-config-reverse-order", + ], + volumes=volumes, + ) + + return interface.PostSessionCreationHookResult() + + def pre_session_termination_hook( # type: ignore[override] + self, + session: sessions_models.DatabaseSession, + operator: operators.KubernetesOperator, + **kwargs, + ) -> interface.PreSessionTerminationHookResult: + if ( + not self._loki_enabled + or session.type == sessions_models.SessionType.READONLY + ): + return interface.PostSessionCreationHookResult() + + operator._delete_config_map(name=session.id) + operator._delete_pod(name=f"{session.id}-promtail") + + return interface.PreSessionTerminationHookResult() + + @classmethod + def _promtail_configuration( + cls, + username: str, + session_type: str, + tool_name: str, + version_name: str, + ) -> dict: + cfg = config.k8s.promtail + + return { + "promtail.yaml": yaml.dump( + { + "server": { + "http_listen_port": cfg.server_port, + }, + "clients": [ + { + "url": cfg.loki_url + "/push", + "basic_auth": { + "username": cfg.loki_username, + "password": cfg.loki_password, + }, + } + ], + "positions": { + "filename": "/var/log/promtail/positions.yaml" + }, + "scrape_configs": [ + { + "job_name": "system", + "pipeline_stages": [ + { + "multiline": { + "firstline": "^[^\t]", + }, + } + ], + "static_configs": [ + { + "targets": ["localhost"], + "labels": { + "username": username, + "session_type": session_type, + "tool": tool_name, + "version": version_name, + "__path__": "/var/log/promtail/**/*.log", + }, + } + ], + } + ], + } + ) + } diff --git a/backend/capellacollab/sessions/operators/k8s.py b/backend/capellacollab/sessions/operators/k8s.py index 3d877c9cb..864fec67d 100644 --- a/backend/capellacollab/sessions/operators/k8s.py +++ b/backend/capellacollab/sessions/operators/k8s.py @@ -19,7 +19,6 @@ import kubernetes.config import kubernetes.stream.stream import prometheus_client -import yaml from kubernetes import client from kubernetes.client import exceptions @@ -42,7 +41,6 @@ cfg: config_models.K8sConfig = config.k8s namespace: str = cfg.namespace -loki_enabled: bool = cfg.promtail.loki_enabled image_pull_policy: str = cfg.cluster.image_pull_policy @@ -92,7 +90,6 @@ def start_session( username: str, session_type: sessions_models.SessionType, tool: tools_models.DatabaseTool, - version: tools_models.DatabaseVersion, environment: dict[str, str], init_environment: dict[str, str], ports: dict[str, int], @@ -107,16 +104,7 @@ def start_session( "Launching a %s session for user %s", session_type.value, username ) - if loki_enabled: - self._create_promtail_configmap( - name=session_id, - username=username, - session_type=session_type.value, - tool_name=tool.name, - version_name=version.name, - ) - - pod = self._create_pod( + pod = self._create_session_pod( image=image, name=session_id, environment=environment, @@ -159,9 +147,6 @@ def kill_session(self, _id: str): self._delete_disruptionbudget(name=_id) self._delete_service(name=_id) - if loki_enabled: - self._delete_config_map(name=_id) - SESSIONS_KILLED.inc() def get_job_by_name(self, name: str) -> client.V1Job: @@ -462,6 +447,13 @@ def _map_volume_to_k8s_volume(self, volume: models.Volume): optional=volume.optional, ), ) + case models.ConfigMapReferenceVolume(): + return client.V1Volume( + name=volume.name, + config_map=client.V1ConfigMapVolumeSource( + name=volume.config_map_name, optional=volume.optional + ), + ) case _: raise KeyError( f"The Kubernetes operator encountered an unsupported session volume type '{type(volume)}'" @@ -512,7 +504,48 @@ def delete_network_policy(self, name: str): return raise - def _create_pod( + def _create_sidecar_pod( + self, + image: str, + name: str, + labels: dict[str, str], + volumes: list[models.Volume], + args: list[str] | None = None, + ): + k8s_volumes, k8s_volume_mounts = self._map_volumes_to_k8s_volumes( + volumes + ) + + pod: client.V1Pod = client.V1Pod( + metadata=client.V1ObjectMeta( + name=name, + labels=labels, + ), + spec=client.V1PodSpec( + automount_service_account_token=False, + security_context=pod_security_context, + node_selector=cfg.cluster.node_selector, + containers=[ + client.V1Container( + name=name, + image=image, + args=args, + resources=client.V1ResourceRequirements( + limits={"cpu": "0.1", "memory": "50Mi"}, + requests={"cpu": "0.05", "memory": "5Mi"}, + ), + volume_mounts=k8s_volume_mounts, + image_pull_policy=image_pull_policy, + ) + ], + volumes=k8s_volumes, + restart_policy="Always", + ), + ) + + return self.v1_core.create_namespaced_pod(namespace, pod) + + def _create_session_pod( self, image: str, name: str, @@ -533,28 +566,6 @@ def _create_pod( init_volumes ) - promtail_volume_mounts: list[client.V1VolumeMount] = [] - - if loki_enabled: - promtail_volume_mounts.append( - client.V1VolumeMount( - name="workspace", mount_path="/var/log/promtail" - ) - ) - - k8s_volumes.append( - client.V1Volume( - name="prom-config", - config_map=client.V1ConfigMapVolumeSource(name=name), - ) - ) - - promtail_volume_mounts.append( - client.V1VolumeMount( - name="prom-config", mount_path="/etc/promtail" - ) - ) - resources = client.V1ResourceRequirements( limits={ "cpu": tool_resources.cpu.limits, @@ -566,55 +577,32 @@ def _create_pod( }, ) - containers: list[client.V1Container] = [] - containers.append( - client.V1Container( - name="session", - image=image, - ports=[ - client.V1ContainerPort(container_port=port, protocol="TCP") - for port in ports.values() - ], - env=self._transform_env_to_k8s_env(environment), - resources=resources, - volume_mounts=k8s_volume_mounts, - image_pull_policy=image_pull_policy, - ) - ) - if loki_enabled: - containers.append( - client.V1Container( - name="promtail", - image=f"{config.docker.external_registry}/grafana/promtail", - args=[ - "--config.file=/etc/promtail/promtail.yaml", - "-log-config-reverse-order", - ], - ports=[ - client.V1ContainerPort( - name="metrics", container_port=3101, protocol="TCP" - ) - ], - resources=client.V1ResourceRequirements( - limits={"cpu": "0.1", "memory": "50Mi"}, - requests={"cpu": "0.05", "memory": "5Mi"}, - ), - volume_mounts=promtail_volume_mounts, - image_pull_policy=image_pull_policy, - ) - ) - pod: client.V1Pod = client.V1Pod( metadata=client.V1ObjectMeta( name=name, - labels={"capellacollab/workload": "session"} | labels, + labels=labels, annotations=annotations, ), spec=client.V1PodSpec( automount_service_account_token=False, security_context=pod_security_context, node_selector=cfg.cluster.node_selector, - containers=containers, + containers=[ + client.V1Container( + name="session", + image=image, + ports=[ + client.V1ContainerPort( + container_port=port, protocol="TCP" + ) + for port in ports.values() + ], + env=self._transform_env_to_k8s_env(environment), + resources=resources, + volume_mounts=k8s_volume_mounts, + image_pull_policy=image_pull_policy, + ) + ], init_containers=[ client.V1Container( name="session-preparation", @@ -835,70 +823,16 @@ def _transform_env_to_k8s_env( for key, value in environment.items() ] - def _create_promtail_configmap( + def _create_configmap( self, name: str, - username: str, - session_type: str, - tool_name: str, - version_name: str, + data: dict, ) -> client.V1ConfigMap | None: - """Create the configuration for promtail. - - Do not call if loki is disabled! - """ - - assert cfg.promtail.loki_url config_map: client.V1ConfigMap = client.V1ConfigMap( kind="ConfigMap", api_version="v1", metadata=client.V1ObjectMeta(name=name), - data={ - "promtail.yaml": yaml.dump( - { - "server": { - "http_listen_port": cfg.promtail.server_port, - }, - "clients": [ - { - "url": cfg.promtail.loki_url + "/push", - "basic_auth": { - "username": cfg.promtail.loki_username, - "password": cfg.promtail.loki_password, - }, - } - ], - "positions": { - "filename": "/var/log/promtail/positions.yaml" - }, - "scrape_configs": [ - { - "job_name": "system", - "pipeline_stages": [ - { - "multiline": { - "firstline": "^[^\t]", - }, - } - ], - "static_configs": [ - { - "targets": ["localhost"], - "labels": { - "deployment": f"{namespace}-sessions", - "username": username, - "session_type": session_type, - "tool": tool_name, - "version": version_name, - "__path__": "/var/log/promtail/**/*.log", - }, - } - ], - } - ], - } - ) - }, + data=data, ) return self.v1_core.create_namespaced_config_map(namespace, config_map) diff --git a/backend/capellacollab/sessions/operators/models.py b/backend/capellacollab/sessions/operators/models.py index 7b0b0e89c..d58ee7ed9 100644 --- a/backend/capellacollab/sessions/operators/models.py +++ b/backend/capellacollab/sessions/operators/models.py @@ -21,6 +21,14 @@ class SecretReferenceVolume(Volume): optional: bool +@dataclasses.dataclass +class ConfigMapReferenceVolume(Volume): + """Mount an existing config map volume to the container.""" + + config_map_name: str + optional: bool + + @dataclasses.dataclass class PersistentVolume(Volume): """A persistent volume that is mounted into the container.""" diff --git a/backend/capellacollab/sessions/routes.py b/backend/capellacollab/sessions/routes.py index 545b56f4c..fb4c0546e 100644 --- a/backend/capellacollab/sessions/routes.py +++ b/backend/capellacollab/sessions/routes.py @@ -183,6 +183,7 @@ def request_session( } labels: dict[str, str] = { + "capellacollab/workload": "session", "capellacollab/session-id": session_id, "capellacollab/owner-id": str(user.id), } @@ -193,7 +194,6 @@ def request_session( username=user.name, session_type=body.session_type, tool=tool, - version=version, environment=environment, init_environment=init_environment, ports=connection_method.ports.model_dump(), @@ -232,6 +232,7 @@ def request_session( session=session, db_session=db_session, connection_method=connection_method, + db=db, ) hook_config |= result.get("config", {})