From d4ad1b84164bb9cc34ff995ab5f288adf7eb24cb Mon Sep 17 00:00:00 2001 From: Arjan Molenaar Date: Fri, 22 Sep 2023 17:05:38 +0200 Subject: [PATCH] feat: backup tar.gz of workspace --- backend/capellacollab/cli/ws.py | 128 +++++++++++++++++++++++++++++--- backend/tests/cli/test_ws.py | 90 ++++++++++++++++++++++ 2 files changed, 207 insertions(+), 11 deletions(-) create mode 100644 backend/tests/cli/test_ws.py diff --git a/backend/capellacollab/cli/ws.py b/backend/capellacollab/cli/ws.py index ca90243371..03eb95b69c 100644 --- a/backend/capellacollab/cli/ws.py +++ b/backend/capellacollab/cli/ws.py @@ -1,14 +1,21 @@ # SPDX-FileCopyrightText: Copyright DB Netz AG and the capella-collab-manager contributors # SPDX-License-Identifier: Apache-2.0 +import contextlib +import select +import time +import uuid +from pathlib import Path + import typer -from kubernetes import client, config +from kubernetes import client, config, stream +from websocket import ABNF app = typer.Typer() @app.command() -def list(namespace: str = None): +def ls(namespace: str = None): config.load_kube_config() core_api = client.CoreV1Api() @@ -19,14 +26,30 @@ def list(namespace: str = None): @app.command() -def download(volume_name: str, namespace: str = None): +def backup(volume_name: str, namespace: str = None, out: Path = None): config.load_kube_config() - pod = create_temporary_pod(volume_name, namespace) - print(pod) + v1 = client.CoreV1Api() + mount_path = "/workspace" + if not out: + out = Path.cwd() + + targz = out / f"{volume_name}.tar.gz" + + with pod_for_volume(volume_name, namespace, mount_path, v1) as pod_name: + print(f"Downloading workspace volume to '{targz}'") + with targz.open("wb") as outfile: + for data in stream_tar_from_pod( + pod_name, namespace, mount_path, v1 + ): + outfile.write(data) -def create_temporary_pod(volume_name: str, namespace: str): - name = f"download-{volume_name}" + +@contextlib.contextmanager +def pod_for_volume( + volume_name: str, namespace: str, mount_path: str, v1: client.CoreV1Api +): + name = f"ws-download-{volume_name}-{uuid.uuid1()}"[:63] containers = [ client.V1Container( @@ -35,8 +58,8 @@ def create_temporary_pod(volume_name: str, namespace: str): command=["sleep", "infinity"], volume_mounts=[ client.V1VolumeMount( - name=volume_name, - mount_path="/mnt", + name="vol", + mount_path=mount_path, read_only=True, ) ], @@ -46,7 +69,7 @@ def create_temporary_pod(volume_name: str, namespace: str): volumes = [ client.V1Volume( - name=volume_name, + name="vol", persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( claim_name=volume_name ), @@ -65,7 +88,15 @@ def create_temporary_pod(volume_name: str, namespace: str): ), ) - return client.CoreV1Api().create_namespaced_pod(namespace, pod) + v1.create_namespaced_pod(namespace, pod) + + while not is_pod_ready(name, namespace, v1): + print("Waiting for pod to come online...") + time.sleep(2) + + yield name + + v1.delete_namespaced_pod(name, namespace) def get_current_namespace(): @@ -74,3 +105,78 @@ def get_current_namespace(): return active_context["context"]["namespace"] except KeyError: return "default" + + +def is_pod_ready(pod_name, namespace, v1): + try: + pod_status = v1.read_namespaced_pod_status(pod_name, namespace) + return pod_status.status.phase == "Running" + except client.exceptions.ApiException as e: + print( + f"Exception when calling CoreV1Api->read_namespaced_pod_status: {e}" + ) + return False + + +def stream_tar_from_pod(pod_name, namespace, source_path, v1): + exec_stream = stream.stream( + v1.connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=["tar", "zcf", "-", source_path], + stderr=True, + stdin=True, + stdout=True, + tty=False, + _preload_content=False, + ) + + try: + reader = WSFileManager(exec_stream) + while True: + out, err, closed = reader.read_bytes() + if out: + yield out + elif err: + print(err.decode("utf-8", "replace")) + if closed: + break + finally: + exec_stream.close() + + +class WSFileManager: + """WS wrapper to manage read and write bytes in K8s WSClient.""" + + def __init__(self, ws_client): + self.ws_client = ws_client + + def read_bytes(self, timeout=0): + stdout_bytes = None + stderr_bytes = None + + if not self.ws_client.is_open(): + return stdout_bytes, stderr_bytes, not self.ws_client._connected + + if not self.ws_client.sock.connected: + self.ws_client._connected = False + return stdout_bytes, stderr_bytes, not self.ws_client._connected + + r, _, _ = select.select((self.ws_client.sock.sock,), (), (), timeout) + if not r: + return stdout_bytes, stderr_bytes, not self.ws_client._connected + + op_code, frame = self.ws_client.sock.recv_data_frame(True) + if op_code == ABNF.OPCODE_CLOSE: + self.ws_client._connected = False + elif op_code in (ABNF.OPCODE_BINARY, ABNF.OPCODE_TEXT): + data = frame.data + if len(data) > 1: + channel = data[0] + data = data[1:] + if data: + if channel == stream.ws_client.STDOUT_CHANNEL: + stdout_bytes = data + elif channel == stream.ws_client.STDERR_CHANNEL: + stderr_bytes = data + return stdout_bytes, stderr_bytes, not self.ws_client._connected diff --git a/backend/tests/cli/test_ws.py b/backend/tests/cli/test_ws.py new file mode 100644 index 0000000000..d6561c6682 --- /dev/null +++ b/backend/tests/cli/test_ws.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: Copyright DB Netz AG and the capella-collab-manager contributors +# SPDX-License-Identifier: Apache-2.0 + +import kubernetes.client +import kubernetes.config +import pytest +from websocket import ABNF + +from capellacollab.cli.ws import backup, ls + + +@pytest.fixture(autouse=True) +def mock_kube_config(monkeypatch): + monkeypatch.setattr("kubernetes.config.load_kube_config", lambda: None) + + +@pytest.fixture(autouse=True) +def mock_core_v1_api(monkeypatch): + monkeypatch.setattr( + "kubernetes.client.CoreV1Api.create_namespaced_pod", + lambda self, ns, pod: pod, + ) + monkeypatch.setattr( + "kubernetes.client.CoreV1Api.delete_namespaced_pod", + lambda self, name, ns: None, + ) + monkeypatch.setattr( + "kubernetes.client.CoreV1Api.read_namespaced_pod_status", + lambda self, name, namespace: kubernetes.client.V1Pod( + status=kubernetes.client.V1PodStatus(phase="Running") + ), + ) + + +def test_ls_workspace(monkeypatch, capsys): + monkeypatch.setattr( + "kubernetes.client.CoreV1Api.list_namespaced_persistent_volume_claim", + lambda self, namespace, watch: kubernetes.client.V1PersistentVolumeClaimList( + items=[ + kubernetes.client.V1PersistentVolumeClaim( + metadata=kubernetes.client.V1ObjectMeta(name="my-volume") + ) + ] + ), + ) + + ls(namespace="default") + + assert "my-volume" in capsys.readouterr().out + + +def test_backup_workspace(monkeypatch, tmp_path): + mock_stream = MockWSClient([b"\01hello"]) + monkeypatch.setattr( + "kubernetes.stream.stream", lambda *a, **ka: mock_stream + ) + monkeypatch.setattr("select.select", lambda *a: (1, None, None)) + + backup("my-volume-name", "my-namespace", tmp_path) + + assert (tmp_path / "my-volume-name.tar.gz").exists() + + +class MockWSClient: + def __init__(self, blocks): + self._blocks = blocks + self._connected = True + self.sock = self + + @property + def connected(self): + return self._connected + + def is_open(self): + return self._connected + + def close(self): + self._connected = False + + def recv_data_frame(self, wait): + if self._blocks: + return ABNF.OPCODE_BINARY, Frame(self._blocks.pop(0)) + else: + self._connected = False + return ABNF.OPCODE_CLOSE, None + + +class Frame: + def __init__(self, data): + self.data = data