Skip to content

Commit

Permalink
Increasing version
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Jan 6, 2025
1 parent 4899342 commit f51c5c4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
19 changes: 8 additions & 11 deletions cohere/compass/clients/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,25 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Optional, Union

# 3rd party imports
import requests
from requests.exceptions import InvalidSchema
from tenacity import (
RetryError,
retry,
retry_if_not_exception_type,
stop_after_attempt,
wait_fixed,
)

# 3rd party imports
import requests

# Local imports
from cohere.compass import (
ProcessFileParameters,
)
from cohere.compass.constants import DEFAULT_MAX_ACCEPTED_FILE_SIZE_BYTES, DEFAULT_SLEEP_RETRY_SECONDS, \
DEFAULT_MAX_RETRIES
from cohere.compass.constants import (
DEFAULT_MAX_ACCEPTED_FILE_SIZE_BYTES,
DEFAULT_MAX_RETRIES,
DEFAULT_SLEEP_RETRY_SECONDS,
)
from cohere.compass.models import (
CompassDocument,
MetadataConfig,
Expand Down Expand Up @@ -208,11 +209,7 @@ def _get_metadata(
@retry(
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
wait=wait_fixed(DEFAULT_SLEEP_RETRY_SECONDS),
retry=retry_if_not_exception_type(
(
InvalidSchema,
)
),
retry=retry_if_not_exception_type((InvalidSchema,)),
)
def process_file(
self,
Expand Down
11 changes: 8 additions & 3 deletions cohere/compass/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def imap_queued(
) -> Iterator[U]:
"""
Similar to Python's `map`, but uses an executor to parallelize the calls.
@@ -34,20 +41,43 @@ def imap_queued(
:param f: the function to call.
:param it: the iterable to map over.
:param max_queued: the maximum number of futures to keep in flight.
Expand All @@ -44,17 +44,22 @@ def imap_queued(
for x in it:
futures_set.add(executor.submit(f, x))
while len(futures_set) > max_queued:
done, futures_set = futures.wait(
done, not_done = futures.wait(
futures_set, return_when=futures.FIRST_COMPLETED
)
futures_set = not_done

for future in done:
try:
yield future.result()
except Exception as e:
logger.error(f"Error in processing file: {e}")

for future in futures.as_completed(futures_set):
yield future.result()
try:
yield future.result()
except Exception as e:
logger.error(f"Error in processing file: {e}")


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down

0 comments on commit f51c5c4

Please sign in to comment.