Skip to content

Commit

Permalink
Replace asyncio with anyio throughout (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored Mar 13, 2024
1 parent b69505f commit 5aa6e80
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
5 changes: 2 additions & 3 deletions kr8s/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: BSD 3-Clause License
from __future__ import annotations

import asyncio
import contextlib
import json
import pathlib
Expand Down Expand Up @@ -221,7 +220,7 @@ async def get(
else:
raise ValueError("Must specify name or selector")
if len(resources) == 0:
await asyncio.sleep(backoff)
await anyio.sleep(backoff)
backoff = min(backoff * 2, 1)
continue
if len(resources) > 1:
Expand Down Expand Up @@ -343,7 +342,7 @@ async def scale(self, replicas: int = None) -> None:
)
while self.replicas != replicas:
await self.async_refresh()
await asyncio.sleep(0.1)
await anyio.sleep(0.1)

async def async_watch(self):
"""Watch this object in Kubernetes."""
Expand Down
4 changes: 2 additions & 2 deletions kr8s/_portforward.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async def f():

self._bg_task = self._loop.create_task(f())
while self.local_port == 0:
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
return self.local_port

async def stop(self) -> None:
Expand Down Expand Up @@ -211,7 +211,7 @@ async def _connect_websocket(self) -> None:
self.pod = None
if connection_attempts > 5:
raise ConnectionClosedError("Unable to connect to Pod") from e
await asyncio.sleep(0.1 * connection_attempts)
await anyio.sleep(0.1 * connection_attempts)

async def _sync_sockets(self, reader: BinaryIO, writer: BinaryIO) -> None:
"""Start two tasks to copy bytes from tcp=>websocket and websocket=>tcp."""
Expand Down
4 changes: 2 additions & 2 deletions kr8s/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
import asyncio
import base64
import os
import socket
Expand All @@ -10,6 +9,7 @@
from contextlib import closing
from pathlib import Path

import anyio
import pytest
import yaml

Expand Down Expand Up @@ -135,7 +135,7 @@ async def kubectl_proxy(k8s_cluster):
host = "localhost"
port = 8001
while not check_socket(host, port):
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
yield f"http://{host}:{port}"
proxy.kill()

Expand Down
11 changes: 5 additions & 6 deletions kr8s/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
import asyncio
import queue
import threading

Expand Down Expand Up @@ -112,10 +111,10 @@ async def test_concurrent_api_creation():
async def get_api():
api = await kr8s.asyncio.api()
await api.version()
return api

apis = await asyncio.gather(*[get_api() for _ in range(10)])
assert len(set(apis)) == 1
async with anyio.create_task_group() as tg:
for _ in range(10):
tg.start_soon(get_api)


async def test_both_api_creation_methods_together():
Expand Down Expand Up @@ -160,7 +159,7 @@ async def test_watch_pods(example_pod_spec, ns):
pod = await Pod(example_pod_spec)
await pod.create()
while not await pod.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
async for event, obj in kr8s.asyncio.watch("pods", namespace=ns):
assert event in ["ADDED", "MODIFIED", "DELETED"]
assert isinstance(obj, Pod)
Expand All @@ -170,7 +169,7 @@ async def test_watch_pods(example_pod_spec, ns):
elif event == "MODIFIED" and "test" in obj.labels and await obj.exists():
await obj.delete()
while await obj.exists():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
elif event == "DELETED":
break

Expand Down
34 changes: 18 additions & 16 deletions kr8s/tests/test_objects.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
import asyncio
import datetime
import pathlib
import tempfile
import time
from contextlib import suppress

import anyio
import httpx
import pytest

Expand Down Expand Up @@ -50,7 +50,7 @@ async def nginx_pod(k8s_cluster, example_pod_spec):
pod = await Pod(example_pod_spec)
await pod.create()
while not await pod.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
await pod.exec(
[
"dd",
Expand All @@ -75,7 +75,7 @@ async def ubuntu_pod(k8s_cluster, example_pod_spec, ns):
pod = await Pod(example_pod_spec)
await pod.create()
while not await pod.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
yield pod
await pod.delete()

Expand All @@ -87,7 +87,7 @@ async def nginx_service(example_service_spec, nginx_pod):
service = await Service(example_service_spec)
await service.create()
while not await service.ready():
await asyncio.sleep(0.1) # pragma: no cover
await anyio.sleep(0.1) # pragma: no cover
yield service
try:
await service.delete()
Expand All @@ -102,10 +102,10 @@ async def test_pod_create_and_delete(example_pod_spec):
pod.replicas
assert await pod.exists()
while not await pod.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
await pod.delete()
while await pod.exists():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
assert not await pod.exists()


Expand Down Expand Up @@ -294,7 +294,7 @@ async def test_pod_get(example_pod_spec):
assert pod2.namespace == pod.namespace
await pod.delete()
while await pod.exists():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
with pytest.raises(kr8s.NotFoundError):
await pod2.delete()

Expand Down Expand Up @@ -324,14 +324,14 @@ async def test_pod_from_name(example_pod_spec):
assert pod2.namespace == pod.namespace
await pod.delete()
while await pod.exists():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
with pytest.raises(kr8s.NotFoundError):
await pod2.delete()


async def test_pod_get_timeout(example_pod_spec):
async def create_pod():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
pod = await Pod(example_pod_spec)
await pod.create()
return pod
Expand All @@ -344,9 +344,11 @@ async def get_pod():
)
return pod

pods = await asyncio.gather(create_pod(), get_pod())
assert pods[0].name == pods[1].name
await pods[0].delete()
async with anyio.create_task_group() as tg:
tg.start_soon(create_pod)
tg.start_soon(get_pod)
pod = await get_pod()
await pod.delete()


async def test_missing_pod():
Expand Down Expand Up @@ -519,7 +521,7 @@ async def test_deployment_scale(example_deployment_spec):
await deployment.scale(2)
assert deployment.replicas == 2
while not await deployment.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
pods = await deployment.pods()
assert len(pods) == 2
await deployment.scale(1)
Expand Down Expand Up @@ -550,7 +552,7 @@ async def test_pod_logs(example_pod_spec):
pod = await Pod(example_pod_spec)
await pod.create()
while not await pod.ready():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
log = "\n".join([line async for line in pod.logs(container="pause")])
assert isinstance(log, str)
await pod.delete()
Expand Down Expand Up @@ -592,7 +594,7 @@ async def test_pod_port_forward_context_manager_manual(nginx_service):
done = False
while not done:
# Put a breakpoint here and set done = True when you're finished.
await asyncio.sleep(1)
await anyio.sleep(1)


async def test_pod_port_forward_start_stop(nginx_service):
Expand Down Expand Up @@ -750,7 +752,7 @@ async def test_adoption(nginx_service):
assert nginx_pod.metadata["ownerReferences"][0]["name"] == nginx_service.name
await nginx_service.delete()
while await nginx_pod.exists():
await asyncio.sleep(0.1)
await anyio.sleep(0.1)


async def test_cast_to_from_lightkube(example_pod_spec):
Expand Down

0 comments on commit 5aa6e80

Please sign in to comment.