From d7b671a97c9406bbbed57f6e2c0b06d54e5b1d9e Mon Sep 17 00:00:00 2001 From: Harris Tzovanakis Date: Fri, 24 Nov 2023 09:52:49 +0100 Subject: [PATCH 1/2] actions: change the download functions and steps --- inspirehep/config.py | 2 +- inspirehep/modules/workflows/tasks/actions.py | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/inspirehep/config.py b/inspirehep/config.py index 693f337e52..cef268e301 100644 --- a/inspirehep/config.py +++ b/inspirehep/config.py @@ -58,7 +58,7 @@ """This feature flag will prevent to send a ``replace`` update to legacy.""" FEATURE_FLAG_USE_ROOT_TABLE_ON_HEP = False FEATURE_FLAG_ENABLE_SNOW = False -FEATURE_FLAG_ENABLE_SAVE_WORFLOW_ON_DOWNLOAD_DOCUMENTS = True +FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS = True # Default language and timezone # ============================= BABEL_DEFAULT_LANGUAGE = 'en' diff --git a/inspirehep/modules/workflows/tasks/actions.py b/inspirehep/modules/workflows/tasks/actions.py index e9f7e03828..a1d2ff259e 100644 --- a/inspirehep/modules/workflows/tasks/actions.py +++ b/inspirehep/modules/workflows/tasks/actions.py @@ -423,7 +423,11 @@ def populate_submission_document(obj, eng): else: LOGGER.info('Submission document not found or in an incorrect format (%s)', submission_pdf) delete_empty_key(obj, 'documents') - save_workflow(obj, eng) + + if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: + save_workflow(obj, eng) + else: + obj.save() @with_debug_logging @@ -435,7 +439,7 @@ def download_documents(obj, eng): url = document['url'] scheme = urlparse(url).scheme LOGGER.info( - 'Downloading document key:%s url:%s scheme:%s', document['key'], document['url'], scheme + 'Downloading document for %s key:%s url:%s scheme:%s', obj.id, document['key'], document['url'], scheme ) if scheme == 'file': downloaded = copy_file_to_workflow(obj, filename, url) @@ -448,14 +452,16 @@ def download_documents(obj, eng): if downloaded: document['url'] = '/api/files/{bucket}/{key}'.format( bucket=obj.files[filename].bucket_id, key=quote(filename)) - obj.log.info('Document downloaded from %s', url) + obj.log.info('Document downloaded for %s from %s', obj.id, url) else: obj.log.error( - 'Cannot download document from %s', url) + 'Cannot download document for %s from %s', obj.id, url) delete_empty_key(obj, 'documents') - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORFLOW_ON_DOWNLOAD_DOCUMENTS']: + if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: save_workflow(obj, eng) - LOGGER.info('Documents downloaded: %s', len(obj.data.get('documents', []))) + else: + obj.save() + LOGGER.info('Documents downloaded for %s: %s', obj.id, len(obj.data.get('documents', []))) @backoff.on_exception(backoff.expo, (BadGatewayError, requests.exceptions.ConnectionError), base=4, max_tries=5) From f85f29abc8de9ef09eaee333e21d03d3fa3a7fc6 Mon Sep 17 00:00:00 2001 From: Harris Tzovanakis Date: Tue, 5 Mar 2024 11:16:46 +0100 Subject: [PATCH 2/2] actions: add feature flag to empty key documents --- inspirehep/config.py | 4 ++- inspirehep/modules/workflows/config.py | 2 ++ inspirehep/modules/workflows/tasks/actions.py | 28 +++++++++++-------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/inspirehep/config.py b/inspirehep/config.py index cef268e301..a33d9026ef 100644 --- a/inspirehep/config.py +++ b/inspirehep/config.py @@ -58,7 +58,9 @@ """This feature flag will prevent to send a ``replace`` update to legacy.""" FEATURE_FLAG_USE_ROOT_TABLE_ON_HEP = False FEATURE_FLAG_ENABLE_SNOW = False -FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS = True +# This flag is to help with empty key documents from the merge process +FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS = True + # Default language and timezone # ============================= BABEL_DEFAULT_LANGUAGE = 'en' diff --git a/inspirehep/modules/workflows/config.py b/inspirehep/modules/workflows/config.py index 1bbae1306e..c900f8434e 100644 --- a/inspirehep/modules/workflows/config.py +++ b/inspirehep/modules/workflows/config.py @@ -31,3 +31,5 @@ WORKFLOWS_PLOTEXTRACT_TIMEOUT = 5 * 60 """Time in seconds a plotextract task is allowed to run before it is killed.""" WORKFLOWS_MAX_AUTHORS_COUNT_FOR_GROBID_EXTRACTION = 50 +WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT = 5 * 60 +"""Time in seconds a download document task is allowed to run before it is killed.""" diff --git a/inspirehep/modules/workflows/tasks/actions.py b/inspirehep/modules/workflows/tasks/actions.py index a1d2ff259e..e006d67713 100644 --- a/inspirehep/modules/workflows/tasks/actions.py +++ b/inspirehep/modules/workflows/tasks/actions.py @@ -89,7 +89,9 @@ get_validation_errors, log_workflows_action, with_debug_logging, check_mark, set_mark, get_mark, get_record_from_hep, - delete_empty_key + delete_empty_key, + timeout_with_config, + ignore_timeout_error, ) from inspirehep.modules.workflows.utils.grobid_authors_parser import GrobidAuthors from inspirehep.utils.url import is_pdf_link @@ -401,6 +403,8 @@ def fix_submission_number(obj, eng): @with_debug_logging +@ignore_timeout_error(return_value=[]) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def populate_submission_document(obj, eng): submission_pdf = obj.extra_data.get('submission_pdf') if submission_pdf and is_pdf_link(submission_pdf): @@ -422,15 +426,15 @@ def populate_submission_document(obj, eng): LOGGER.info('Workflow data updated with %s new documents' % len(obj.data.get('documents', []))) else: LOGGER.info('Submission document not found or in an incorrect format (%s)', submission_pdf) - delete_empty_key(obj, 'documents') - - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: - save_workflow(obj, eng) - else: - obj.save() + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') + save_workflow(obj, eng) + return @with_debug_logging +@ignore_timeout_error(return_value=None) +@timeout_with_config('WORKFLOWS_DOWNLOAD_DOCUMENT_TIMEOUT') def download_documents(obj, eng): LOGGER.info('Downloading documents for %s', obj.id) documents = obj.data.get('documents', []) @@ -456,12 +460,12 @@ def download_documents(obj, eng): else: obj.log.error( 'Cannot download document for %s from %s', obj.id, url) - delete_empty_key(obj, 'documents') - if current_app.config['FEATURE_FLAG_ENABLE_SAVE_WORKFLOW_ON_DOWNLOAD_DOCUMENTS']: - save_workflow(obj, eng) - else: - obj.save() + if current_app.config['FEATURE_FLAG_ENABLE_DELETE_EMPTY_KEY_DOCUMENTS']: + delete_empty_key(obj, 'documents') + + save_workflow(obj, eng) LOGGER.info('Documents downloaded for %s: %s', obj.id, len(obj.data.get('documents', []))) + return @backoff.on_exception(backoff.expo, (BadGatewayError, requests.exceptions.ConnectionError), base=4, max_tries=5)