From 659b9a4ce286720a68e95a7b99afdd2c1eaca819 Mon Sep 17 00:00:00 2001 From: Ankush Khanna Date: Wed, 11 Sep 2024 12:31:51 +0200 Subject: [PATCH] Fixing blocking behavior of imap_queued --- compass_sdk/utils.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/compass_sdk/utils.py b/compass_sdk/utils.py index 679bf97..71ee072 100644 --- a/compass_sdk/utils.py +++ b/compass_sdk/utils.py @@ -1,7 +1,7 @@ import glob import os -from collections import deque -from concurrent.futures import Executor, Future +from concurrent import futures +from concurrent.futures import Executor from typing import Callable, Iterable, Iterator, List, Optional, TypeVar import fsspec @@ -15,16 +15,18 @@ def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]: assert max_queued >= 1 - tasks = deque[Future[U]]() + futures_set = set() for x in it: - tasks.append(executor.submit(f, x)) + futures_set.add(executor.submit(f, x)) - while len(tasks) > max_queued: - yield tasks.popleft().result() + while len(futures_set) > max_queued: + done, futures_set = futures.wait(futures_set, return_when=futures.FIRST_COMPLETED) + for future in done: + yield future.result() - while tasks: - yield tasks.popleft().result() + for future in futures.as_completed(futures_set): + yield future.result() def get_fs(document_path: str) -> AbstractFileSystem: