Skip to content

Commit

Permalink
Fixing blocking behavior of imap_queued
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Sep 11, 2024
1 parent 73b44a2 commit 0e4f0df
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
20 changes: 12 additions & 8 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import concurrent
import concurrent.futures
import glob
import os
from collections import deque
from concurrent.futures import Executor, Future
from concurrent import futures
from concurrent.futures import Executor
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar

import fsspec
Expand All @@ -15,16 +17,18 @@

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
futures_set = set()

for x in it:
tasks.append(executor.submit(f, x))
futures_set.add(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()
while len(futures_set) > max_queued:
done, futures_set = futures.wait(futures_set, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
yield future.result()

while tasks:
yield tasks.popleft().result()
for future in futures.as_completed(futures_set):
yield future.result()


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ python = ">=3.9,<3.12"
requests = ">=2.25.0,<3.0.0"
tenacity = "8.2.3"
tqdm = ">=4.42.1"
aiohttp = "3.10.5"

[tool.pyright]
typeCheckingMode = 'basic'
Expand Down

0 comments on commit 0e4f0df

Please sign in to comment.