diff --git a/inspirehep/modules/workflows/config.py b/inspirehep/modules/workflows/config.py index 1bbae1306e..64c7e271ac 100644 --- a/inspirehep/modules/workflows/config.py +++ b/inspirehep/modules/workflows/config.py @@ -31,3 +31,6 @@ WORKFLOWS_PLOTEXTRACT_TIMEOUT = 5 * 60 """Time in seconds a plotextract task is allowed to run before it is killed.""" WORKFLOWS_MAX_AUTHORS_COUNT_FOR_GROBID_EXTRACTION = 50 + +WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT = 5 * 60 +WORKFLOWS_DELETE_KEYS_TIMEOUT = 2 * 60 diff --git a/inspirehep/modules/workflows/tasks/actions.py b/inspirehep/modules/workflows/tasks/actions.py index e9f7e03828..6ae4d659e9 100644 --- a/inspirehep/modules/workflows/tasks/actions.py +++ b/inspirehep/modules/workflows/tasks/actions.py @@ -89,7 +89,8 @@ get_validation_errors, log_workflows_action, with_debug_logging, check_mark, set_mark, get_mark, get_record_from_hep, - delete_empty_key + delete_empty_key, + timeout_with_config ) from inspirehep.modules.workflows.utils.grobid_authors_parser import GrobidAuthors from inspirehep.utils.url import is_pdf_link @@ -424,22 +425,28 @@ def populate_submission_document(obj, eng): LOGGER.info('Submission document not found or in an incorrect format (%s)', submission_pdf) delete_empty_key(obj, 'documents') save_workflow(obj, eng) + return @with_debug_logging +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def download_documents(obj, eng): LOGGER.info('Downloading documents for %s', obj.id) documents = obj.data.get('documents', []) for document in documents: - filename = document['key'] url = document['url'] + if url.startswith('/api/files/'): + continue + filename = document['key'] scheme = urlparse(url).scheme LOGGER.info( 'Downloading document key:%s url:%s scheme:%s', document['key'], document['url'], scheme ) if scheme == 'file': + LOGGER.info('Copying file to workflow [%s]: %s %s', obj.id, filename, url) downloaded = copy_file_to_workflow(obj, filename, url) else: + LOGGER.info('Downloading file to workflow [%s]: %s %s', obj.id, filename, url) downloaded = download_file_to_workflow( workflow=obj, name=filename, @@ -456,6 +463,7 @@ def download_documents(obj, eng): if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORFLOW_ON_DOWNLOAD_DOCUMENTS']: save_workflow(obj, eng) LOGGER.info('Documents downloaded: %s', len(obj.data.get('documents', []))) + return @backoff.on_exception(backoff.expo, (BadGatewayError, requests.exceptions.ConnectionError), base=4, max_tries=5) diff --git a/inspirehep/modules/workflows/utils/__init__.py b/inspirehep/modules/workflows/utils/__init__.py index 64ea51ee32..df88829dfd 100644 --- a/inspirehep/modules/workflows/utils/__init__.py +++ b/inspirehep/modules/workflows/utils/__init__.py @@ -58,6 +58,7 @@ from inspirehep.utils.url import retrieve_uri from invenio_workflows import ObjectStatus + LOGGER = getStackTraceLogger(__name__) @@ -722,6 +723,7 @@ def create_error(response): ) +@timeout_with_config('WORKFLOWS_DELETE_KEYS_TIMEOUT') def delete_empty_key(obj, key): if key in obj.data and len(obj.data[key]) == 0: LOGGER.info('Deleting %s from workflow. Key is empty.', key)