Skip to content

Commit

Permalink
feat: backup tar.gz of workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
amolenaar committed Sep 25, 2023
1 parent b8a15c6 commit d4ad1b8
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 11 deletions.
128 changes: 117 additions & 11 deletions backend/capellacollab/cli/ws.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
# SPDX-FileCopyrightText: Copyright DB Netz AG and the capella-collab-manager contributors
# SPDX-License-Identifier: Apache-2.0

import contextlib
import select
import time
import uuid
from pathlib import Path

import typer
from kubernetes import client, config
from kubernetes import client, config, stream
from websocket import ABNF

app = typer.Typer()


@app.command()
def list(namespace: str = None):
def ls(namespace: str = None):
config.load_kube_config()
core_api = client.CoreV1Api()

Expand All @@ -19,14 +26,30 @@ def list(namespace: str = None):


@app.command()
def download(volume_name: str, namespace: str = None):
def backup(volume_name: str, namespace: str = None, out: Path = None):
config.load_kube_config()
pod = create_temporary_pod(volume_name, namespace)
print(pod)
v1 = client.CoreV1Api()
mount_path = "/workspace"
if not out:
out = Path.cwd()

Check warning on line 34 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L34

Added line #L34 was not covered by tests

targz = out / f"{volume_name}.tar.gz"

with pod_for_volume(volume_name, namespace, mount_path, v1) as pod_name:
print(f"Downloading workspace volume to '{targz}'")

with targz.open("wb") as outfile:
for data in stream_tar_from_pod(
pod_name, namespace, mount_path, v1
):
outfile.write(data)

def create_temporary_pod(volume_name: str, namespace: str):
name = f"download-{volume_name}"

@contextlib.contextmanager
def pod_for_volume(
volume_name: str, namespace: str, mount_path: str, v1: client.CoreV1Api
):
name = f"ws-download-{volume_name}-{uuid.uuid1()}"[:63]

containers = [
client.V1Container(
Expand All @@ -35,8 +58,8 @@ def create_temporary_pod(volume_name: str, namespace: str):
command=["sleep", "infinity"],
volume_mounts=[
client.V1VolumeMount(
name=volume_name,
mount_path="/mnt",
name="vol",
mount_path=mount_path,
read_only=True,
)
],
Expand All @@ -46,7 +69,7 @@ def create_temporary_pod(volume_name: str, namespace: str):

volumes = [
client.V1Volume(
name=volume_name,
name="vol",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=volume_name
),
Expand All @@ -65,7 +88,15 @@ def create_temporary_pod(volume_name: str, namespace: str):
),
)

return client.CoreV1Api().create_namespaced_pod(namespace, pod)
v1.create_namespaced_pod(namespace, pod)

while not is_pod_ready(name, namespace, v1):
print("Waiting for pod to come online...")
time.sleep(2)

Check warning on line 95 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L94-L95

Added lines #L94 - L95 were not covered by tests

yield name

v1.delete_namespaced_pod(name, namespace)


def get_current_namespace():
Expand All @@ -74,3 +105,78 @@ def get_current_namespace():
return active_context["context"]["namespace"]
except KeyError:
return "default"

Check warning on line 107 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L103-L107

Added lines #L103 - L107 were not covered by tests


def is_pod_ready(pod_name, namespace, v1):
try:
pod_status = v1.read_namespaced_pod_status(pod_name, namespace)
return pod_status.status.phase == "Running"
except client.exceptions.ApiException as e:
print(

Check warning on line 115 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L114-L115

Added lines #L114 - L115 were not covered by tests
f"Exception when calling CoreV1Api->read_namespaced_pod_status: {e}"
)
return False

Check warning on line 118 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L118

Added line #L118 was not covered by tests


def stream_tar_from_pod(pod_name, namespace, source_path, v1):
exec_stream = stream.stream(
v1.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=["tar", "zcf", "-", source_path],
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False,
)

try:
reader = WSFileManager(exec_stream)
while True:
out, err, closed = reader.read_bytes()
if out:
yield out
elif err:
print(err.decode("utf-8", "replace"))

Check warning on line 141 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L141

Added line #L141 was not covered by tests
if closed:
break
finally:
exec_stream.close()


class WSFileManager:
"""WS wrapper to manage read and write bytes in K8s WSClient."""

def __init__(self, ws_client):
self.ws_client = ws_client

def read_bytes(self, timeout=0):
stdout_bytes = None
stderr_bytes = None

if not self.ws_client.is_open():
return stdout_bytes, stderr_bytes, not self.ws_client._connected

Check warning on line 159 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L159

Added line #L159 was not covered by tests

if not self.ws_client.sock.connected:
self.ws_client._connected = False
return stdout_bytes, stderr_bytes, not self.ws_client._connected

Check warning on line 163 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L162-L163

Added lines #L162 - L163 were not covered by tests

r, _, _ = select.select((self.ws_client.sock.sock,), (), (), timeout)
if not r:
return stdout_bytes, stderr_bytes, not self.ws_client._connected

Check warning on line 167 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L167

Added line #L167 was not covered by tests

op_code, frame = self.ws_client.sock.recv_data_frame(True)
if op_code == ABNF.OPCODE_CLOSE:
self.ws_client._connected = False
elif op_code in (ABNF.OPCODE_BINARY, ABNF.OPCODE_TEXT):
data = frame.data
if len(data) > 1:
channel = data[0]
data = data[1:]
if data:
if channel == stream.ws_client.STDOUT_CHANNEL:
stdout_bytes = data
elif channel == stream.ws_client.STDERR_CHANNEL:
stderr_bytes = data

Check warning on line 181 in backend/capellacollab/cli/ws.py

View check run for this annotation

Codecov / codecov/patch

backend/capellacollab/cli/ws.py#L181

Added line #L181 was not covered by tests
return stdout_bytes, stderr_bytes, not self.ws_client._connected
90 changes: 90 additions & 0 deletions backend/tests/cli/test_ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# SPDX-FileCopyrightText: Copyright DB Netz AG and the capella-collab-manager contributors
# SPDX-License-Identifier: Apache-2.0

import kubernetes.client
import kubernetes.config
import pytest
from websocket import ABNF

from capellacollab.cli.ws import backup, ls


@pytest.fixture(autouse=True)
def mock_kube_config(monkeypatch):
monkeypatch.setattr("kubernetes.config.load_kube_config", lambda: None)


@pytest.fixture(autouse=True)
def mock_core_v1_api(monkeypatch):
monkeypatch.setattr(
"kubernetes.client.CoreV1Api.create_namespaced_pod",
lambda self, ns, pod: pod,
)
monkeypatch.setattr(
"kubernetes.client.CoreV1Api.delete_namespaced_pod",
lambda self, name, ns: None,
)
monkeypatch.setattr(
"kubernetes.client.CoreV1Api.read_namespaced_pod_status",
lambda self, name, namespace: kubernetes.client.V1Pod(
status=kubernetes.client.V1PodStatus(phase="Running")
),
)


def test_ls_workspace(monkeypatch, capsys):
monkeypatch.setattr(
"kubernetes.client.CoreV1Api.list_namespaced_persistent_volume_claim",
lambda self, namespace, watch: kubernetes.client.V1PersistentVolumeClaimList(
items=[
kubernetes.client.V1PersistentVolumeClaim(
metadata=kubernetes.client.V1ObjectMeta(name="my-volume")
)
]
),
)

ls(namespace="default")

assert "my-volume" in capsys.readouterr().out


def test_backup_workspace(monkeypatch, tmp_path):
mock_stream = MockWSClient([b"\01hello"])
monkeypatch.setattr(
"kubernetes.stream.stream", lambda *a, **ka: mock_stream
)
monkeypatch.setattr("select.select", lambda *a: (1, None, None))

backup("my-volume-name", "my-namespace", tmp_path)

assert (tmp_path / "my-volume-name.tar.gz").exists()


class MockWSClient:
def __init__(self, blocks):
self._blocks = blocks
self._connected = True
self.sock = self

@property
def connected(self):
return self._connected

def is_open(self):
return self._connected

def close(self):
self._connected = False

def recv_data_frame(self, wait):
if self._blocks:
return ABNF.OPCODE_BINARY, Frame(self._blocks.pop(0))
else:
self._connected = False
return ABNF.OPCODE_CLOSE, None


class Frame:
def __init__(self, data):
self.data = data

0 comments on commit d4ad1b8

Please sign in to comment.