Skip to content

Commit

Permalink
Fixing blocking behavior of imap_queued
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Sep 12, 2024
1 parent 73b44a2 commit b8878c5
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import glob
import os
from collections import deque
from concurrent.futures import Executor, Future
from concurrent import futures
from concurrent.futures import Executor
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar

import fsspec
Expand All @@ -15,16 +15,18 @@

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
futures_set = set()

for x in it:
tasks.append(executor.submit(f, x))
futures_set.add(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()
while len(futures_set) > max_queued:
done, futures_set = futures.wait(futures_set, return_when=futures.FIRST_COMPLETED)
for future in done:
yield future.result()

while tasks:
yield tasks.popleft().result()
for future in futures.as_completed(futures_set):
yield future.result()


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down

0 comments on commit b8878c5

Please sign in to comment.