diff --git a/inspirehep/config.py b/inspirehep/config.py index 693f337e52..a33d9026ef 100644 --- a/inspirehep/config.py +++ b/inspirehep/config.py @@ -58,7 +58,9 @@ """This feature flag will prevent to send a ``replace`` update to legacy.""" FEATURE_FLAG_USE_ROOT_TABLE_ON_HEP = False FEATURE_FLAG_ENABLE_SNOW = False -FEATURE_FLAG_ENABLE_SAVE_WORFLOW_ON_DOWNLOAD_DOCUMENTS = True +# This flag is to help with empty key documents from the merge process +FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS = True + # Default language and timezone # ============================= BABEL_DEFAULT_LANGUAGE = 'en' diff --git a/inspirehep/modules/workflows/config.py b/inspirehep/modules/workflows/config.py index 1bbae1306e..c900f8434e 100644 --- a/inspirehep/modules/workflows/config.py +++ b/inspirehep/modules/workflows/config.py @@ -31,3 +31,5 @@ WORKFLOWS_PLOTEXTRACT_TIMEOUT = 5 * 60 """Time in seconds a plotextract task is allowed to run before it is killed.""" WORKFLOWS_MAX_AUTHORS_COUNT_FOR_GROBID_EXTRACTION = 50 +WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT = 5 * 60 +"""Time in seconds a download document task is allowed to run before it is killed.""" diff --git a/inspirehep/modules/workflows/tasks/actions.py b/inspirehep/modules/workflows/tasks/actions.py index e9f7e03828..e006d67713 100644 --- a/inspirehep/modules/workflows/tasks/actions.py +++ b/inspirehep/modules/workflows/tasks/actions.py @@ -89,7 +89,9 @@ get_validation_errors, log_workflows_action, with_debug_logging, check_mark, set_mark, get_mark, get_record_from_hep, - delete_empty_key + delete_empty_key, + timeout_with_config, + ignore_timeout_error, ) from inspirehep.modules.workflows.utils.grobid_authors_parser import GrobidAuthors from inspirehep.utils.url import is_pdf_link @@ -401,6 +403,8 @@ def fix_submission_number(obj, eng): @with_debug_logging +@ignore_timeout_error(return_value=[]) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def populate_submission_document(obj, eng): submission_pdf = obj.extra_data.get('submission_pdf') if submission_pdf and is_pdf_link(submission_pdf): @@ -422,11 +426,15 @@ def populate_submission_document(obj, eng): LOGGER.info('Workflow data updated with %s new documents' % len(obj.data.get('documents', []))) else: LOGGER.info('Submission document not found or in an incorrect format (%s)', submission_pdf) - delete_empty_key(obj, 'documents') + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') save_workflow(obj, eng) + return @with_debug_logging +@ignore_timeout_error(return_value=None) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def download_documents(obj, eng): LOGGER.info('Downloading documents for %s', obj.id) documents = obj.data.get('documents', []) @@ -435,7 +443,7 @@ def download_documents(obj, eng): url = document['url'] scheme = urlparse(url).scheme LOGGER.info( - 'Downloading document key:%s url:%s scheme:%s', document['key'], document['url'], scheme + 'Downloading document for %s key:%s url:%s scheme:%s', obj.id, document['key'], document['url'], scheme ) if scheme == 'file': downloaded = copy_file_to_workflow(obj, filename, url) @@ -448,14 +456,16 @@ def download_documents(obj, eng): if downloaded: document['url'] = '/api/files/{bucket}/{key}'.format( bucket=obj.files[filename].bucket_id, key=quote(filename)) - obj.log.info('Document downloaded from %s', url) + obj.log.info('Document downloaded for %s from %s', obj.id, url) else: obj.log.error( - 'Cannot download document from %s', url) - delete_empty_key(obj, 'documents') - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORFLOW_ON_DOWNLOAD_DOCUMENTS']: - save_workflow(obj, eng) - LOGGER.info('Documents downloaded: %s', len(obj.data.get('documents', []))) + 'Cannot download document for %s from %s', obj.id, url) + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') + + save_workflow(obj, eng) + LOGGER.info('Documents downloaded for %s: %s', obj.id, len(obj.data.get('documents', []))) + return @backoff.on_exception(backoff.expo, (BadGatewayError, requests.exceptions.ConnectionError), base=4, max_tries=5)