diff --git a/kr8s/_objects.py b/kr8s/_objects.py index 3dccf2df..554dc9fc 100644 --- a/kr8s/_objects.py +++ b/kr8s/_objects.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: BSD 3-Clause License from __future__ import annotations -import asyncio import contextlib import json import pathlib @@ -221,7 +220,7 @@ async def get( else: raise ValueError("Must specify name or selector") if len(resources) == 0: - await asyncio.sleep(backoff) + await anyio.sleep(backoff) backoff = min(backoff * 2, 1) continue if len(resources) > 1: @@ -343,7 +342,7 @@ async def scale(self, replicas: int = None) -> None: ) while self.replicas != replicas: await self.async_refresh() - await asyncio.sleep(0.1) + await anyio.sleep(0.1) async def async_watch(self): """Watch this object in Kubernetes.""" diff --git a/kr8s/_portforward.py b/kr8s/_portforward.py index f0920d97..62c32b0e 100644 --- a/kr8s/_portforward.py +++ b/kr8s/_portforward.py @@ -127,7 +127,7 @@ async def f(): self._bg_task = self._loop.create_task(f()) while self.local_port == 0: - await asyncio.sleep(0.1) + await anyio.sleep(0.1) return self.local_port async def stop(self) -> None: @@ -211,7 +211,7 @@ async def _connect_websocket(self) -> None: self.pod = None if connection_attempts > 5: raise ConnectionClosedError("Unable to connect to Pod") from e - await asyncio.sleep(0.1 * connection_attempts) + await anyio.sleep(0.1 * connection_attempts) async def _sync_sockets(self, reader: BinaryIO, writer: BinaryIO) -> None: """Start two tasks to copy bytes from tcp=>websocket and websocket=>tcp.""" diff --git a/kr8s/conftest.py b/kr8s/conftest.py index 7257ef64..95744b84 100644 --- a/kr8s/conftest.py +++ b/kr8s/conftest.py @@ -1,6 +1,5 @@ # SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list) # SPDX-License-Identifier: BSD 3-Clause License -import asyncio import base64 import os import socket @@ -10,6 +9,7 @@ from contextlib import closing from pathlib import Path +import anyio import pytest import yaml @@ -135,7 +135,7 @@ async def kubectl_proxy(k8s_cluster): host = "localhost" port = 8001 while not check_socket(host, port): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) yield f"http://{host}:{port}" proxy.kill() diff --git a/kr8s/tests/test_api.py b/kr8s/tests/test_api.py index 0138739a..913cdaaf 100644 --- a/kr8s/tests/test_api.py +++ b/kr8s/tests/test_api.py @@ -1,6 +1,5 @@ # SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list) # SPDX-License-Identifier: BSD 3-Clause License -import asyncio import queue import threading @@ -112,10 +111,10 @@ async def test_concurrent_api_creation(): async def get_api(): api = await kr8s.asyncio.api() await api.version() - return api - apis = await asyncio.gather(*[get_api() for _ in range(10)]) - assert len(set(apis)) == 1 + async with anyio.create_task_group() as tg: + for _ in range(10): + tg.start_soon(get_api) async def test_both_api_creation_methods_together(): @@ -160,7 +159,7 @@ async def test_watch_pods(example_pod_spec, ns): pod = await Pod(example_pod_spec) await pod.create() while not await pod.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) async for event, obj in kr8s.asyncio.watch("pods", namespace=ns): assert event in ["ADDED", "MODIFIED", "DELETED"] assert isinstance(obj, Pod) @@ -170,7 +169,7 @@ async def test_watch_pods(example_pod_spec, ns): elif event == "MODIFIED" and "test" in obj.labels and await obj.exists(): await obj.delete() while await obj.exists(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) elif event == "DELETED": break diff --git a/kr8s/tests/test_objects.py b/kr8s/tests/test_objects.py index d936c33c..e64a2769 100644 --- a/kr8s/tests/test_objects.py +++ b/kr8s/tests/test_objects.py @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list) # SPDX-License-Identifier: BSD 3-Clause License -import asyncio import datetime import pathlib import tempfile import time from contextlib import suppress +import anyio import httpx import pytest @@ -50,7 +50,7 @@ async def nginx_pod(k8s_cluster, example_pod_spec): pod = await Pod(example_pod_spec) await pod.create() while not await pod.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) await pod.exec( [ "dd", @@ -75,7 +75,7 @@ async def ubuntu_pod(k8s_cluster, example_pod_spec, ns): pod = await Pod(example_pod_spec) await pod.create() while not await pod.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) yield pod await pod.delete() @@ -87,7 +87,7 @@ async def nginx_service(example_service_spec, nginx_pod): service = await Service(example_service_spec) await service.create() while not await service.ready(): - await asyncio.sleep(0.1) # pragma: no cover + await anyio.sleep(0.1) # pragma: no cover yield service try: await service.delete() @@ -102,10 +102,10 @@ async def test_pod_create_and_delete(example_pod_spec): pod.replicas assert await pod.exists() while not await pod.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) await pod.delete() while await pod.exists(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) assert not await pod.exists() @@ -294,7 +294,7 @@ async def test_pod_get(example_pod_spec): assert pod2.namespace == pod.namespace await pod.delete() while await pod.exists(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) with pytest.raises(kr8s.NotFoundError): await pod2.delete() @@ -324,14 +324,14 @@ async def test_pod_from_name(example_pod_spec): assert pod2.namespace == pod.namespace await pod.delete() while await pod.exists(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) with pytest.raises(kr8s.NotFoundError): await pod2.delete() async def test_pod_get_timeout(example_pod_spec): async def create_pod(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) pod = await Pod(example_pod_spec) await pod.create() return pod @@ -344,9 +344,11 @@ async def get_pod(): ) return pod - pods = await asyncio.gather(create_pod(), get_pod()) - assert pods[0].name == pods[1].name - await pods[0].delete() + async with anyio.create_task_group() as tg: + tg.start_soon(create_pod) + tg.start_soon(get_pod) + pod = await get_pod() + await pod.delete() async def test_missing_pod(): @@ -519,7 +521,7 @@ async def test_deployment_scale(example_deployment_spec): await deployment.scale(2) assert deployment.replicas == 2 while not await deployment.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) pods = await deployment.pods() assert len(pods) == 2 await deployment.scale(1) @@ -550,7 +552,7 @@ async def test_pod_logs(example_pod_spec): pod = await Pod(example_pod_spec) await pod.create() while not await pod.ready(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) log = "\n".join([line async for line in pod.logs(container="pause")]) assert isinstance(log, str) await pod.delete() @@ -592,7 +594,7 @@ async def test_pod_port_forward_context_manager_manual(nginx_service): done = False while not done: # Put a breakpoint here and set done = True when you're finished. - await asyncio.sleep(1) + await anyio.sleep(1) async def test_pod_port_forward_start_stop(nginx_service): @@ -750,7 +752,7 @@ async def test_adoption(nginx_service): assert nginx_pod.metadata["ownerReferences"][0]["name"] == nginx_service.name await nginx_service.delete() while await nginx_pod.exists(): - await asyncio.sleep(0.1) + await anyio.sleep(0.1) async def test_cast_to_from_lightkube(example_pod_spec):