diff --git a/kr8s/_exec.py b/kr8s/_exec.py index aab1d519..4d784b4f 100644 --- a/kr8s/_exec.py +++ b/kr8s/_exec.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: BSD 3-Clause License from __future__ import annotations +import json from contextlib import asynccontextmanager from dataclasses import dataclass from typing import TYPE_CHECKING, BinaryIO, List, Union @@ -18,6 +19,8 @@ STDERR_CHANNEL = 2 ERROR_CHANNEL = 3 RESIZE_CHANNEL = 4 +CLOSE_CHANNEL = 255 +EXEC_PROTOCOL = "v4.channel.k8s.io" class Exec: @@ -55,6 +58,7 @@ async def run( async with self._resource.api.open_websocket( version=self._resource.version, url=f"{self._resource.endpoint}/{self._resource.name}/exec", + protocols=(EXEC_PROTOCOL,), namespace=self._resource.namespace, params={ "command": self.args, @@ -65,13 +69,16 @@ async def run( }, ) as ws: if self._stdin: + if ws.protocol != "v5.channel.k8s.io": + raise ExecError( + "Stdin is not supported with protocol " + f"{ws.protocol}, only with v5.channel.k8s.io" + ) if isinstance(self._stdin, str): - await ws.send_bytes(b"\x00" + self._stdin.encode()) + await ws.send_bytes(STDIN_CHANNEL.to_bytes() + self._stdin.encode()) else: - await ws.send_bytes(b"\x00" + self._stdin.read()) - # Ideally we need to close stdin at some point but that's not possible - # with the current websocket implementation in Kubernetes - # https://github.com/kubernetes/kubernetes/issues/89899 + await ws.send_bytes(STDIN_CHANNEL.to_bytes() + self._stdin.read()) + await ws.send_bytes(CLOSE_CHANNEL.to_bytes() + STDIN_CHANNEL.to_bytes()) async for message in ws: if message.type == aiohttp.WSMsgType.BINARY: channel, message = int(message.data[0]), message.data[1:] @@ -87,15 +94,27 @@ async def run( if self._stderr: self._stderr.write(message) elif channel == ERROR_CHANNEL: - self.returncode = 1 + error = json.loads(message.decode()) + if error["status"] == "Success": + self.returncode = 0 + continue + # Extract return code from details + if "details" in error and "causes" in error["details"]: + for cause in error["details"]["causes"]: + if ( + "reason" in cause + and cause["reason"] == "ExitCode" + ): + self.returncode = int(int(cause["message"])) + break + else: + self.returncode = 1 if self.check: - raise ExecError(message) + raise ExecError(error["message"]) else: raise ExecError( f"Unhandled message on channel {channel}: {message}" ) - if self.returncode is None: - self.returncode = 0 yield self async def wait(self) -> CompletedExec: diff --git a/kr8s/tests/test_objects.py b/kr8s/tests/test_objects.py index bdf5d051..3b6b1867 100644 --- a/kr8s/tests/test_objects.py +++ b/kr8s/tests/test_objects.py @@ -759,7 +759,7 @@ async def test_pod_exec_to_file(ubuntu_pod): assert b"invalid date" in tmp.read() -@pytest.mark.skip("Closing stdin not supported so hangs forever") +@pytest.mark.xfail(reason="Exec protocol v5.channel.k8s.io not available") async def test_pod_exec_stdin(ubuntu_pod): ex = await ubuntu_pod.exec(["cat"], stdin="foo") assert b"foo" in ex.stdout