Skip to content

Commit

Permalink
[CAT-102] byte streams/buffered io streams (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
goatrocks authored Aug 5, 2021
1 parent 09df0e7 commit 0ead7d2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
38 changes: 30 additions & 8 deletions indico/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,20 @@ def execute_request(self, request: HTTPRequest):

@contextmanager
def _handle_files(self, req_kwargs):

streams = None
# deepcopying buffers is not supported
# so, remove "streams" before the deepcopy.
if "streams" in req_kwargs:
streams = req_kwargs["streams"].copy()
del req_kwargs["streams"]

new_kwargs = deepcopy(req_kwargs)

files = []
file_arg = {}
dup_counts = {}
if "files" in new_kwargs:
if "files" in new_kwargs and new_kwargs["files"] is not None:
for filepath in new_kwargs["files"]:
path = Path(filepath)
fd = path.open("rb")
Expand All @@ -99,19 +108,32 @@ def _handle_files(self, req_kwargs):
file_arg[path.stem] = fd
dup_counts[path.stem] = 1

new_kwargs["files"] = file_arg
if streams is not None and len(streams) > 0:
for filename in streams:
# similar operation as above.
stream = streams[filename]
files.append(stream)
if filename in dup_counts:
file_arg[filename + f"({dup_counts[filename]})"] = stream
dup_counts[filename] += 1
else:
file_arg[filename] = stream
dup_counts[filename] = 1

new_kwargs["files"] = file_arg

yield new_kwargs

if files:
[f.close() for f in files]

def _make_request(
self,
method: str,
path: str,
headers: dict = None,
_refreshed=False,
**request_kwargs,
self,
method: str,
path: str,
headers: dict = None,
_refreshed=False,
**request_kwargs,
):
logger.debug(
f"[{method}] {path}\n\t Headers: {headers}\n\tRequest Args:{request_kwargs}"
Expand Down
17 changes: 12 additions & 5 deletions indico/queries/storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import json
from typing import List
from typing import List, Dict
from indico.client.request import HTTPMethod, HTTPRequest, RequestChain
from indico.errors import IndicoRequestError, IndicoInputError

Expand Down Expand Up @@ -46,14 +47,20 @@ class UploadDocument(HTTPRequest):
Used internally for uploading documents to indico platform for later processing
Args:
filepaths (str): list of filepaths to upload
files (str): A list of local filepaths to upload.
streams (Dict[str, io.BufferedIOBase]): A dict of filenames to BufferedIOBase streams
(any class that inherits BufferedIOBase is acceptable).
Returns:
files: storage objects to be use for further processing requests E.G. Document extraction (implicitly called)
"""

def __init__(self, files: List[str]):
super().__init__(HTTPMethod.POST, "/storage/files/store", files=files)
def __init__(self, files: List[str] = None, streams: Dict[str, io.BufferedIOBase] = None):

if (files is None and streams is None) or (files is not None and streams is not None):
raise IndicoInputError("Must define one of files or streams, but not both.")

super().__init__(HTTPMethod.POST, "/storage/files/store", files=files, streams=streams)

def process_response(self, uploaded_files: List[dict]):
files = [
Expand Down Expand Up @@ -124,5 +131,5 @@ def process_response(self, uploaded_files: List[dict]) -> List[str]:
return urls


# Alias to ensure backwards compatability
# Alias to ensure backwards compatibility
UploadImages = CreateStorageURLs
28 changes: 23 additions & 5 deletions indico/queries/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Union
import io
from typing import List, Union, Dict

from indico.client.request import GraphQLRequest, RequestChain, Debouncer
from indico.errors import IndicoError, IndicoInputError
Expand Down Expand Up @@ -249,6 +250,9 @@ class WorkflowSubmission(RequestChain):
The format of the submission result file. One of:
{SUBMISSION_RESULT_VERSIONS}
If bundle is enabled, this must be version TWO or later.
streams (Dict[str, io.BufferedIOBase]): List of filename keys mapped to streams
for upload. Similar to files but mutually exclusive with files.
Can take for example: io.BufferedReader, io.BinaryIO, or io.BytesIO.
Returns:
List[int]: If `submission`, these will be submission ids.
Expand All @@ -266,18 +270,22 @@ def __init__(
submission: bool = True,
bundle: bool = False,
result_version: str = None,
streams: Dict[str, io.BufferedIOBase] = None
):
self.workflow_id = workflow_id
self.files = files
self.urls = urls
self.submission = submission
self.bundle = bundle
self.result_version = result_version
self.streams = streams.copy()

if not self.files and not self.urls:
raise IndicoInputError("One of 'files' or 'urls' must be specified")
elif self.files and self.urls:
raise IndicoInputError("Only one of 'files' or 'urls' must be specified")
if not self.files and not self.urls and not len(streams) > 0:
raise IndicoInputError("One of 'files', 'streams', or 'urls' must be specified")
elif self.files and len(self.streams) > 0:
raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified.")
elif (self.files or len(streams) > 0) and self.urls:
raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified")

def requests(self):
if self.files:
Expand All @@ -299,6 +307,16 @@ def requests(self):
bundle=self.bundle,
result_version=self.result_version,
)
elif len(self.streams) > 0:
yield UploadDocument(streams=self.streams)
yield _WorkflowSubmission(
self.detailed_response,
workflow_id=self.workflow_id,
record_submission=self.submission,
files=self.previous,
bundle=self.bundle,
result_version=self.result_version,
)


class WorkflowSubmissionDetailed(WorkflowSubmission):
Expand Down

0 comments on commit 0ead7d2

Please sign in to comment.