Skip to content

Commit

Permalink
Fixing blocking behavior of imap_queued
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Sep 11, 2024
1 parent 73b44a2 commit ebcf2d7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
18 changes: 10 additions & 8 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import glob
import os
from collections import deque
from concurrent.futures import Executor, Future
import concurrent
from concurrent.futures import Executor
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar

import fsspec
Expand All @@ -15,16 +15,18 @@

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
futures = set()

for x in it:
tasks.append(executor.submit(f, x))
futures.add(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()
while len(futures) > max_queued:
done, futures = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)

Check failure on line 24 in compass_sdk/utils.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"futures" is not a known attribute of module "concurrent" (reportAttributeAccessIssue)

Check failure on line 24 in compass_sdk/utils.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"futures" is not a known attribute of module "concurrent" (reportAttributeAccessIssue)
for future in done:
yield future.result()

while tasks:
yield tasks.popleft().result()
for future in concurrent.futures.as_completed(futures):

Check failure on line 28 in compass_sdk/utils.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"futures" is not a known attribute of module "concurrent" (reportAttributeAccessIssue)
yield future.result()


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ python = ">=3.9,<3.12"
requests = ">=2.25.0,<3.0.0"
tenacity = "8.2.3"
tqdm = ">=4.42.1"
aiohttp = "3.10.5"

[tool.pyright]
typeCheckingMode = 'basic'
Expand Down

0 comments on commit ebcf2d7

Please sign in to comment.