Skip to content

Commit

Permalink
Fixing blocking behavior of imap_queued (#14)
Browse files Browse the repository at this point in the history
When using process_folder in sdk, it seems to be stuck on a single file
* [imap function is being used for
processing](https://github.com/cohere-ai/cohere-compass-sdk/blob/main/compass_sdk/parser.py#L150-L156)
* [definition of
imap_queue](https://github.com/cohere-ai/cohere-compass-sdk/blob/main/compass_sdk/parser.py#L150-L156)
* The interesting part is the [popleft
line](https://github.com/cohere-ai/cohere-compass-sdk/blob/main/compass_sdk/utils.py#L24)
, if one file is too big it will get stuck on that file and not process
any further files.

That is why the processing is really slow if we have couple of big pdfs
This code aims to improve that by using `as_completed` and waits for
only any first completed future

## Auto generated
<!-- begin-generated-description -->

This PR introduces changes to the `compass_sdk/utils.py` and
`pyproject.toml` files, primarily focusing on updating import statements
and adding a new dependency.

## `compass_sdk/utils.py` Changes:
- **Import Updates:** The PR modifies the import statements for the
`concurrent` and `concurrent.futures` modules. It now imports
`concurrent.futures` directly from `concurrent`, ensuring a more
organized and explicit import structure.
- **Data Structure Change:** In the `imap_queued` function, the data
structure used to store tasks has been changed from a `deque` to a
`set`. This is done by creating a `futures_set` set and adding tasks to
it using `futures_set.add()`.
- **Task Management:** The task management logic has been updated.
Instead of using a `while` loop to manage tasks, the code now employs
`futures.wait()` and `futures.as_completed()` to handle task completion
and yielding results.

## `pyproject.toml` Changes:
- **New Dependency:** The `aiohttp` dependency has been added with the
version specified as "3.10.5". This addition suggests the introduction
of asynchronous HTTP capabilities to the project.

<!-- end-generated-description -->
  • Loading branch information
ankush-cohere authored Sep 13, 2024
1 parent 73b44a2 commit 3960096
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import glob
import os
from collections import deque
from concurrent.futures import Executor, Future
from concurrent import futures
from concurrent.futures import Executor
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar

import fsspec
Expand All @@ -15,16 +15,17 @@

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
futures_set = set()

for x in it:
tasks.append(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()

while tasks:
yield tasks.popleft().result()
futures_set.add(executor.submit(f, x))
while len(futures_set) > max_queued:
done, futures_set = futures.wait(futures_set, return_when=futures.FIRST_COMPLETED)
for future in done:
yield future.result()

for future in futures.as_completed(futures_set):
yield future.result()


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down

0 comments on commit 3960096

Please sign in to comment.