From 0e4f0dfa02bff07a9db9e8f0bd0d170d2dbeabc4 Mon Sep 17 00:00:00 2001 From: Ankush Khanna Date: Wed, 11 Sep 2024 12:31:51 +0200 Subject: [PATCH] Fixing blocking behavior of imap_queued --- compass_sdk/utils.py | 20 ++++++++++++-------- pyproject.toml | 1 + 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/compass_sdk/utils.py b/compass_sdk/utils.py index 679bf97..dbcf521 100644 --- a/compass_sdk/utils.py +++ b/compass_sdk/utils.py @@ -1,7 +1,9 @@ +import concurrent +import concurrent.futures import glob import os -from collections import deque -from concurrent.futures import Executor, Future +from concurrent import futures +from concurrent.futures import Executor from typing import Callable, Iterable, Iterator, List, Optional, TypeVar import fsspec @@ -15,16 +17,18 @@ def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]: assert max_queued >= 1 - tasks = deque[Future[U]]() + futures_set = set() for x in it: - tasks.append(executor.submit(f, x)) + futures_set.add(executor.submit(f, x)) - while len(tasks) > max_queued: - yield tasks.popleft().result() + while len(futures_set) > max_queued: + done, futures_set = futures.wait(futures_set, return_when=concurrent.futures.FIRST_COMPLETED) + for future in done: + yield future.result() - while tasks: - yield tasks.popleft().result() + for future in futures.as_completed(futures_set): + yield future.result() def get_fs(document_path: str) -> AbstractFileSystem: diff --git a/pyproject.toml b/pyproject.toml index fd66f74..60702e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ python = ">=3.9,<3.12" requests = ">=2.25.0,<3.0.0" tenacity = "8.2.3" tqdm = ">=4.42.1" +aiohttp = "3.10.5" [tool.pyright] typeCheckingMode = 'basic'