From 0ead7d21d743be3332303b893a06d6fb163a2de9 Mon Sep 17 00:00:00 2001 From: m c <458905+goatrocks@users.noreply.github.com> Date: Thu, 5 Aug 2021 11:31:53 -0400 Subject: [PATCH] [CAT-102] byte streams/buffered io streams (#117) --- indico/http/client.py | 38 ++++++++++++++++++++++++++++++-------- indico/queries/storage.py | 17 ++++++++++++----- indico/queries/workflow.py | 28 +++++++++++++++++++++++----- 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/indico/http/client.py b/indico/http/client.py index 900b7ef0..fce08160 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -82,11 +82,20 @@ def execute_request(self, request: HTTPRequest): @contextmanager def _handle_files(self, req_kwargs): + + streams = None + # deepcopying buffers is not supported + # so, remove "streams" before the deepcopy. + if "streams" in req_kwargs: + streams = req_kwargs["streams"].copy() + del req_kwargs["streams"] + new_kwargs = deepcopy(req_kwargs) + files = [] file_arg = {} dup_counts = {} - if "files" in new_kwargs: + if "files" in new_kwargs and new_kwargs["files"] is not None: for filepath in new_kwargs["files"]: path = Path(filepath) fd = path.open("rb") @@ -99,19 +108,32 @@ def _handle_files(self, req_kwargs): file_arg[path.stem] = fd dup_counts[path.stem] = 1 - new_kwargs["files"] = file_arg + if streams is not None and len(streams) > 0: + for filename in streams: + # similar operation as above. + stream = streams[filename] + files.append(stream) + if filename in dup_counts: + file_arg[filename + f"({dup_counts[filename]})"] = stream + dup_counts[filename] += 1 + else: + file_arg[filename] = stream + dup_counts[filename] = 1 + + new_kwargs["files"] = file_arg + yield new_kwargs if files: [f.close() for f in files] def _make_request( - self, - method: str, - path: str, - headers: dict = None, - _refreshed=False, - **request_kwargs, + self, + method: str, + path: str, + headers: dict = None, + _refreshed=False, + **request_kwargs, ): logger.debug( f"[{method}] {path}\n\t Headers: {headers}\n\tRequest Args:{request_kwargs}" diff --git a/indico/queries/storage.py b/indico/queries/storage.py index 814c9d09..a41dce42 100644 --- a/indico/queries/storage.py +++ b/indico/queries/storage.py @@ -1,5 +1,6 @@ +import io import json -from typing import List +from typing import List, Dict from indico.client.request import HTTPMethod, HTTPRequest, RequestChain from indico.errors import IndicoRequestError, IndicoInputError @@ -46,14 +47,20 @@ class UploadDocument(HTTPRequest): Used internally for uploading documents to indico platform for later processing Args: - filepaths (str): list of filepaths to upload + files (str): A list of local filepaths to upload. + streams (Dict[str, io.BufferedIOBase]): A dict of filenames to BufferedIOBase streams + (any class that inherits BufferedIOBase is acceptable). Returns: files: storage objects to be use for further processing requests E.G. Document extraction (implicitly called) """ - def __init__(self, files: List[str]): - super().__init__(HTTPMethod.POST, "/storage/files/store", files=files) + def __init__(self, files: List[str] = None, streams: Dict[str, io.BufferedIOBase] = None): + + if (files is None and streams is None) or (files is not None and streams is not None): + raise IndicoInputError("Must define one of files or streams, but not both.") + + super().__init__(HTTPMethod.POST, "/storage/files/store", files=files, streams=streams) def process_response(self, uploaded_files: List[dict]): files = [ @@ -124,5 +131,5 @@ def process_response(self, uploaded_files: List[dict]) -> List[str]: return urls -# Alias to ensure backwards compatability +# Alias to ensure backwards compatibility UploadImages = CreateStorageURLs diff --git a/indico/queries/workflow.py b/indico/queries/workflow.py index a3be93ef..e631aa4f 100644 --- a/indico/queries/workflow.py +++ b/indico/queries/workflow.py @@ -1,4 +1,5 @@ -from typing import List, Union +import io +from typing import List, Union, Dict from indico.client.request import GraphQLRequest, RequestChain, Debouncer from indico.errors import IndicoError, IndicoInputError @@ -249,6 +250,9 @@ class WorkflowSubmission(RequestChain): The format of the submission result file. One of: {SUBMISSION_RESULT_VERSIONS} If bundle is enabled, this must be version TWO or later. + streams (Dict[str, io.BufferedIOBase]): List of filename keys mapped to streams + for upload. Similar to files but mutually exclusive with files. + Can take for example: io.BufferedReader, io.BinaryIO, or io.BytesIO. Returns: List[int]: If `submission`, these will be submission ids. @@ -266,6 +270,7 @@ def __init__( submission: bool = True, bundle: bool = False, result_version: str = None, + streams: Dict[str, io.BufferedIOBase] = None ): self.workflow_id = workflow_id self.files = files @@ -273,11 +278,14 @@ def __init__( self.submission = submission self.bundle = bundle self.result_version = result_version + self.streams = streams.copy() - if not self.files and not self.urls: - raise IndicoInputError("One of 'files' or 'urls' must be specified") - elif self.files and self.urls: - raise IndicoInputError("Only one of 'files' or 'urls' must be specified") + if not self.files and not self.urls and not len(streams) > 0: + raise IndicoInputError("One of 'files', 'streams', or 'urls' must be specified") + elif self.files and len(self.streams) > 0: + raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified.") + elif (self.files or len(streams) > 0) and self.urls: + raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified") def requests(self): if self.files: @@ -299,6 +307,16 @@ def requests(self): bundle=self.bundle, result_version=self.result_version, ) + elif len(self.streams) > 0: + yield UploadDocument(streams=self.streams) + yield _WorkflowSubmission( + self.detailed_response, + workflow_id=self.workflow_id, + record_submission=self.submission, + files=self.previous, + bundle=self.bundle, + result_version=self.result_version, + ) class WorkflowSubmissionDetailed(WorkflowSubmission):