Skip to content

Commit

Permalink
Fixing blocking behavior of imap_queued
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Sep 12, 2024
1 parent 73b44a2 commit 10e8790
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import glob
import os
from collections import deque
from concurrent.futures import Executor, Future
from concurrent import futures
from concurrent.futures import Executor
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar

import fsspec
Expand All @@ -15,16 +15,17 @@

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
futures_set = set()

for x in it:
tasks.append(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()

while tasks:
yield tasks.popleft().result()
futures_set.add(executor.submit(f, x))
while len(futures_set) > max_queued:
done, futures_set = futures.wait(futures_set, return_when=futures.FIRST_COMPLETED)
for future in done:
yield future.result()

for future in futures.as_completed(futures_set):
yield future.result()


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down

0 comments on commit 10e8790

Please sign in to comment.