diff --git a/inspirehep/config.py b/inspirehep/config.py index cef268e301..a33d9026ef 100644 --- a/inspirehep/config.py +++ b/inspirehep/config.py @@ -58,7 +58,9 @@ """This feature flag will prevent to send a ``replace`` update to legacy.""" FEATURE_FLAG_USE_ROOT_TABLE_ON_HEP = False FEATURE_FLAG_ENABLE_SNOW = False -FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS = True +# This flag is to help with empty key documents from the merge process +FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS = True + # Default language and timezone # ============================= BABEL_DEFAULT_LANGUAGE = 'en' diff --git a/inspirehep/modules/workflows/config.py b/inspirehep/modules/workflows/config.py index 1bbae1306e..c900f8434e 100644 --- a/inspirehep/modules/workflows/config.py +++ b/inspirehep/modules/workflows/config.py @@ -31,3 +31,5 @@ WORKFLOWS_PLOTEXTRACT_TIMEOUT = 5 * 60 """Time in seconds a plotextract task is allowed to run before it is killed.""" WORKFLOWS_MAX_AUTHORS_COUNT_FOR_GROBID_EXTRACTION = 50 +WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT = 5 * 60 +"""Time in seconds a download document task is allowed to run before it is killed.""" diff --git a/inspirehep/modules/workflows/tasks/actions.py b/inspirehep/modules/workflows/tasks/actions.py index a1d2ff259e..e006d67713 100644 --- a/inspirehep/modules/workflows/tasks/actions.py +++ b/inspirehep/modules/workflows/tasks/actions.py @@ -89,7 +89,9 @@ get_validation_errors, log_workflows_action, with_debug_logging, check_mark, set_mark, get_mark, get_record_from_hep, - delete_empty_key + delete_empty_key, + timeout_with_config, + ignore_timeout_error, ) from inspirehep.modules.workflows.utils.grobid_authors_parser import GrobidAuthors from inspirehep.utils.url import is_pdf_link @@ -401,6 +403,8 @@ def fix_submission_number(obj, eng): @with_debug_logging +@ignore_timeout_error(return_value=[]) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def populate_submission_document(obj, eng): submission_pdf = obj.extra_data.get('submission_pdf') if submission_pdf and is_pdf_link(submission_pdf): @@ -422,15 +426,15 @@ def populate_submission_document(obj, eng): LOGGER.info('Workflow data updated with %s new documents' % len(obj.data.get('documents', []))) else: LOGGER.info('Submission document not found or in an incorrect format (%s)', submission_pdf) - delete_empty_key(obj, 'documents') - - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: - save_workflow(obj, eng) - else: - obj.save() + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') + save_workflow(obj, eng) + return @with_debug_logging +@ignore_timeout_error(return_value=None) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def download_documents(obj, eng): LOGGER.info('Downloading documents for %s', obj.id) documents = obj.data.get('documents', []) @@ -456,12 +460,12 @@ def download_documents(obj, eng): else: obj.log.error( 'Cannot download document for %s from %s', obj.id, url) - delete_empty_key(obj, 'documents') - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: - save_workflow(obj, eng) - else: - obj.save() + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') + + save_workflow(obj, eng) LOGGER.info('Documents downloaded for %s: %s', obj.id, len(obj.data.get('documents', []))) + return @backoff.on_exception(backoff.expo, (BadGatewayError, requests.exceptions.ConnectionError), base=4, max_tries=5)