From ebcf2d7c479049a5650a4c3aadcc4845a783fd7d Mon Sep 17 00:00:00 2001 From: Ankush Khanna Date: Wed, 11 Sep 2024 12:31:51 +0200 Subject: [PATCH] Fixing blocking behavior of imap_queued --- compass_sdk/utils.py | 18 ++++++++++-------- pyproject.toml | 1 + 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/compass_sdk/utils.py b/compass_sdk/utils.py index 679bf97..6d1441d 100644 --- a/compass_sdk/utils.py +++ b/compass_sdk/utils.py @@ -1,7 +1,7 @@ import glob import os -from collections import deque -from concurrent.futures import Executor, Future +import concurrent +from concurrent.futures import Executor from typing import Callable, Iterable, Iterator, List, Optional, TypeVar import fsspec @@ -15,16 +15,18 @@ def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]: assert max_queued >= 1 - tasks = deque[Future[U]]() + futures = set() for x in it: - tasks.append(executor.submit(f, x)) + futures.add(executor.submit(f, x)) - while len(tasks) > max_queued: - yield tasks.popleft().result() + while len(futures) > max_queued: + done, futures = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) + for future in done: + yield future.result() - while tasks: - yield tasks.popleft().result() + for future in concurrent.futures.as_completed(futures): + yield future.result() def get_fs(document_path: str) -> AbstractFileSystem: diff --git a/pyproject.toml b/pyproject.toml index fd66f74..60702e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ python = ">=3.9,<3.12" requests = ">=2.25.0,<3.0.0" tenacity = "8.2.3" tqdm = ">=4.42.1" +aiohttp = "3.10.5" [tool.pyright] typeCheckingMode = 'basic'