diff --git a/compass_sdk/utils.py b/compass_sdk/utils.py index 679bf97..3010f6a 100644 --- a/compass_sdk/utils.py +++ b/compass_sdk/utils.py @@ -1,7 +1,7 @@ import glob import os -from collections import deque -from concurrent.futures import Executor, Future +from concurrent import futures +from concurrent.futures import Executor from typing import Callable, Iterable, Iterator, List, Optional, TypeVar import fsspec @@ -15,16 +15,17 @@ def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]: assert max_queued >= 1 - tasks = deque[Future[U]]() + futures_set = set() for x in it: - tasks.append(executor.submit(f, x)) - - while len(tasks) > max_queued: - yield tasks.popleft().result() - - while tasks: - yield tasks.popleft().result() + futures_set.add(executor.submit(f, x)) + while len(futures_set) > max_queued: + done, futures_set = futures.wait(futures_set, return_when=futures.FIRST_COMPLETED) + for future in done: + yield future.result() + + for future in futures.as_completed(futures_set): + yield future.result() def get_fs(document_path: str) -> AbstractFileSystem: