diff --git a/.gitignore b/.gitignore index 6d28c2acfa..28d10f5db3 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ htmlcov/* # Don't push our dev configs or the secret app configs. dev.cfg app.cfg +instance.cfg # directories of transient stuff that gets created locally media/* diff --git a/deploy/dev/huey_events.sh b/deploy/dev/huey_events.sh new file mode 100755 index 0000000000..8296525df3 --- /dev/null +++ b/deploy/dev/huey_events.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_events_queue.events_queue >> ~/huey_events_queue.log 2>&1 diff --git a/deploy/dev/huey_scheduled_long.sh b/deploy/dev/huey_scheduled_long.sh new file mode 100755 index 0000000000..3a3cfc0b0c --- /dev/null +++ b/deploy/dev/huey_scheduled_long.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue >> ~/huey_scheduled_long_queue.log 2>&1 diff --git a/deploy/dev/huey_scheduled_short.sh b/deploy/dev/huey_scheduled_short.sh new file mode 100755 index 0000000000..532519cdea --- /dev/null +++ b/deploy/dev/huey_scheduled_short.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue >> ~/huey_scheduled_short_queue.log 2>&1 diff --git a/deploy/logrotate/doaj-nginx b/deploy/logrotate/production/doaj-nginx similarity index 100% rename from deploy/logrotate/doaj-nginx rename to deploy/logrotate/production/doaj-nginx diff --git a/deploy/logrotate/doaj-test-nginx b/deploy/logrotate/test/doaj-nginx similarity index 100% rename from deploy/logrotate/doaj-test-nginx rename to deploy/logrotate/test/doaj-nginx diff --git a/deploy/nginx/doaj b/deploy/nginx/production/doaj similarity index 95% rename from deploy/nginx/doaj rename to deploy/nginx/production/doaj index 2372d1248f..56375fb192 100644 --- a/deploy/nginx/doaj +++ b/deploy/nginx/production/doaj @@ -35,29 +35,24 @@ map $http_user_agent $block_ua { ~*wget 1; ~*curl 1; } - -# the public server (deprecated, use failover) -upstream doaj_apps { - server 10.131.191.139:5050; #doaj-public-app-1 +# For public site components, try all servers +upstream doaj_apps_failover { + server 10.131.191.147:5050; #doaj-public-1 + server 10.131.191.148:5050 backup; #doaj-background-1 + server 10.131.191.149:5050 backup; #doaj-editor-1 } # Background server runs async tasks upstream doaj_bg_apps { - server 10.131.12.33:5050; #doaj-background-app-1 + server 10.131.191.148:5050; #doaj-background-app-1 } # Editor and admin site components upstream doaj_ed_failover { - server 10.131.56.133:5050; #doaj-editor-app-1 - server 10.131.12.33:5050 backup; #doaj-background-app-1 + server 10.131.191.149:5050; #doaj-editor-app-1 + server 10.131.191.148:5050 backup; #doaj-background-app-1 } -# For public site components, try all servers -upstream doaj_apps_failover { - server 10.131.191.139:5050; #doaj-public-app-1 - server 10.131.12.33:5050 backup; #doaj-background-app-1 - server 10.131.56.133:5050 backup; #doaj-editor-app-1 -} upstream doaj_index { server 10.131.191.132:9200; server 10.131.191.133:9200; diff --git a/deploy/nginx/doaj-test b/deploy/nginx/test/doaj similarity index 100% rename from deploy/nginx/doaj-test rename to deploy/nginx/test/doaj diff --git a/deploy/supervisor/production-background/huey-events.conf b/deploy/supervisor/production-background/huey-events.conf new file mode 100644 index 0000000000..2361dcc751 --- /dev/null +++ b/deploy/supervisor/production-background/huey-events.conf @@ -0,0 +1,9 @@ +[program:huey-events] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_events_queue.events_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/production-background/huey-scheduled-long.conf b/deploy/supervisor/production-background/huey-scheduled-long.conf new file mode 100644 index 0000000000..705fe66901 --- /dev/null +++ b/deploy/supervisor/production-background/huey-scheduled-long.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-long] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/production-background/huey-scheduled-short.conf b/deploy/supervisor/production-background/huey-scheduled-short.conf new file mode 100644 index 0000000000..a38439d834 --- /dev/null +++ b/deploy/supervisor/production-background/huey-scheduled-short.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-short] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-events.conf b/deploy/supervisor/test/huey-events.conf new file mode 100644 index 0000000000..bcb47109e2 --- /dev/null +++ b/deploy/supervisor/test/huey-events.conf @@ -0,0 +1,9 @@ +[program:huey-events] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_events_queue.events_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-scheduled-long.conf b/deploy/supervisor/test/huey-scheduled-long.conf new file mode 100644 index 0000000000..4f2d0b9dfc --- /dev/null +++ b/deploy/supervisor/test/huey-scheduled-long.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-long] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-scheduled-short.conf b/deploy/supervisor/test/huey-scheduled-short.conf new file mode 100644 index 0000000000..1f4ab54764 --- /dev/null +++ b/deploy/supervisor/test/huey-scheduled-short.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-short] +command=/home/cloo/doaj/venv/bin/python /home/cloo/doaj/venv/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/doajtest/fixtures/background.py b/doajtest/fixtures/background.py index 9f89581c7d..4733c9b5a1 100644 --- a/doajtest/fixtures/background.py +++ b/doajtest/fixtures/background.py @@ -52,12 +52,14 @@ def cleanup(self): def save_mock_bgjob(action=None, status=None, created_before_sec=0, is_save=True, - queue_id=None): + blocking=True, queue_id=None): bgjob = BackgroundJob() - if action: + if not action: from portality.tasks.journal_csv import JournalCSVBackgroundTask bgjob.action = JournalCSVBackgroundTask.__action__ + else: + bgjob.action = action if status: bgjob._set_with_struct("status", status) @@ -69,6 +71,6 @@ def save_mock_bgjob(action=None, status=None, created_before_sec=0, is_save=True bgjob.queue_id = queue_id if is_save: - bgjob.save(blocking=True) + bgjob.save(blocking=blocking) return bgjob diff --git a/doajtest/helpers.py b/doajtest/helpers.py index b382345a1e..c1d604d7f8 100644 --- a/doajtest/helpers.py +++ b/doajtest/helpers.py @@ -19,7 +19,7 @@ from portality.lib import paths, dates from portality.lib.dates import FMT_DATE_STD from portality.lib.thread_utils import wait_until -from portality.tasks.redis_huey import main_queue, long_running +from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue from portality.util import url_for @@ -183,8 +183,9 @@ def setUpClass(cls) -> None: cls.app_test.logger.setLevel(logging.DEBUG) # Run huey jobs straight away - main_queue.immediate = True - long_running.immediate = True + events_queue.immediate = True + scheduled_short_queue.immediate = True + scheduled_long_queue.immediate = True dao.DomainObject.save = dao_proxy(dao.DomainObject.save, type="instance") dao.DomainObject.delete = dao_proxy(dao.DomainObject.delete, type="instance") diff --git a/doajtest/matrices/background_task_status/background_task_status.matrix.csv b/doajtest/matrices/background_task_status/background_task_status.matrix.csv new file mode 100644 index 0000000000..ec93701d08 --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.matrix.csv @@ -0,0 +1,36 @@ +test_id,in_queue,oldest_queued,error_count,error_age,lrs_success_or_error,queued,errors,lrs +1,0,young,0,out_of_period,complete,stable,stable,stable +2,0,young,0,out_of_period,error,stable,stable,unstable +3,0,young,0,out_of_period,empty,stable,stable,unstable +4,0,young,1,in_period,complete,stable,unstable,stable +5,0,young,1,in_period,error,stable,unstable,unstable +6,0,young,1,out_of_period,complete,stable,stable,stable +7,0,young,1,out_of_period,error,stable,stable,unstable +8,1,old,0,out_of_period,complete,unstable,stable,stable +9,1,old,0,out_of_period,error,unstable,stable,unstable +10,1,old,0,out_of_period,empty,unstable,stable,unstable +11,1,old,1,in_period,complete,unstable,unstable,stable +12,1,old,1,in_period,error,unstable,unstable,unstable +13,1,old,1,out_of_period,complete,unstable,stable,stable +14,1,old,1,out_of_period,error,unstable,stable,unstable +15,1,young,0,out_of_period,complete,stable,stable,stable +16,1,young,0,out_of_period,error,stable,stable,unstable +17,1,young,0,out_of_period,empty,stable,stable,unstable +18,1,young,1,in_period,complete,stable,unstable,stable +19,1,young,1,in_period,error,stable,unstable,unstable +20,1,young,1,out_of_period,complete,stable,stable,stable +21,1,young,1,out_of_period,error,stable,stable,unstable +22,2,old,0,out_of_period,complete,unstable,stable,stable +23,2,old,0,out_of_period,error,unstable,stable,unstable +24,2,old,0,out_of_period,empty,unstable,stable,unstable +25,2,old,1,in_period,complete,unstable,unstable,stable +26,2,old,1,in_period,error,unstable,unstable,unstable +27,2,old,1,out_of_period,complete,unstable,stable,stable +28,2,old,1,out_of_period,error,unstable,stable,unstable +29,2,young,0,out_of_period,complete,unstable,stable,stable +30,2,young,0,out_of_period,error,unstable,stable,unstable +31,2,young,0,out_of_period,empty,unstable,stable,unstable +32,2,young,1,in_period,complete,unstable,unstable,stable +33,2,young,1,in_period,error,unstable,unstable,unstable +34,2,young,1,out_of_period,complete,unstable,stable,stable +35,2,young,1,out_of_period,error,unstable,stable,unstable diff --git a/doajtest/matrices/background_task_status/background_task_status.settings.csv b/doajtest/matrices/background_task_status/background_task_status.settings.csv new file mode 100644 index 0000000000..755877f2bb --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.settings.csv @@ -0,0 +1,17 @@ +field,test_id,in_queue,oldest_queued,error_count,error_age,lrs_success_or_error,queued,errors,lrs +type,index,generated,generated,generated,generated,generated,conditional,conditional,conditional +default,,,,,,,stable,stable,stable +,,,,,,,,, +values,,0,old,0,in_period,complete,stable,stable,stable +values,,1,young,1,out_of_period,error,unstable,unstable,unstable +values,,2,,,,empty,,, +,,,,,,,,, +constraint in_queue,,0,young,,,,,, +constraint error_count,,,,0,out_of_period,,,, +constraint error_count,,,,1,,!empty,,, +,,,,,,,,, +conditional queued,,2,,,,,unstable,, +conditional queued,,,old,,,,unstable,, +conditional errors,,,,1,in_period,,,unstable, +conditional lrs,,,,,,error,,,unstable +conditional lrs,,,,,,empty,,,unstable \ No newline at end of file diff --git a/doajtest/matrices/background_task_status/background_task_status.settings.json b/doajtest/matrices/background_task_status/background_task_status.settings.json new file mode 100644 index 0000000000..f103b658bc --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.settings.json @@ -0,0 +1,150 @@ +{ + "parameters": [ + { + "name": "test_id", + "type": "index" + }, + { + "name": "in_queue", + "type": "generated", + "values": { + "0": { + "constraints": { + "oldest_queued": { + "or": [ + "young" + ] + } + } + }, + "1": {}, + "2": {} + } + }, + { + "name": "oldest_queued", + "type": "generated", + "values": { + "old": {}, + "young": {} + } + }, + { + "name": "error_count", + "type": "generated", + "values": { + "0": { + "constraints": { + "error_age": { + "or": [ + "out_of_period" + ] + } + } + }, + "1": { + "constraints": { + "lrs_success_or_error": { + "nor": [ + "empty" + ] + } + } + } + } + }, + { + "name": "error_age", + "type": "generated", + "values": { + "in_period": {}, + "out_of_period": {} + } + }, + { + "name": "lrs_success_or_error", + "type": "generated", + "values": { + "complete": {}, + "error": {}, + "empty": {} + } + }, + { + "name": "queued", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "in_queue": { + "or": [ + "2" + ] + } + }, + { + "oldest_queued": { + "or": [ + "old" + ] + } + } + ] + } + } + }, + { + "name": "errors", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "error_count": { + "or": [ + "1" + ] + }, + "error_age": { + "or": [ + "in_period" + ] + } + } + ] + } + } + }, + { + "name": "lrs", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "lrs_success_or_error": { + "or": [ + "error" + ] + } + }, + { + "lrs_success_or_error": { + "or": [ + "empty" + ] + } + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/doajtest/unit/test_background_helper.py b/doajtest/unit/test_background_helper.py index 86545d4d4f..33fa15a7c8 100644 --- a/doajtest/unit/test_background_helper.py +++ b/doajtest/unit/test_background_helper.py @@ -7,7 +7,7 @@ from portality.background import BackgroundTask from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running, main_queue +from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue """ Updated for Huey v2: you can't register a function more than once, so we name each fn individually """ @@ -16,8 +16,9 @@ class TestBackgroundHelper(TestCase): def test_get_queue_id_by_task_queue(self): cases = [ - (long_running, constants.BGJOB_QUEUE_ID_LONG), - (main_queue, constants.BGJOB_QUEUE_ID_MAIN), + (events_queue, constants.BGJOB_QUEUE_ID_EVENTS), + (scheduled_long_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_LONG), + (scheduled_short_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT), (None, constants.BGJOB_QUEUE_ID_UNKNOWN), ] @@ -59,7 +60,7 @@ def tearDownClass(cls) -> None: helpers.patch_config(app, cls.org_config) def test_01_register_schedule(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_a)) + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_a)) @helper.register_schedule def _fn1(): @@ -68,7 +69,7 @@ def _fn1(): assert isinstance(_fn1, huey.api.TaskWrapper) def test_02_register_schedule__a_second_time(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_a)) + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_a)) @helper.register_schedule def _fn1(): @@ -77,7 +78,7 @@ def _fn1(): assert _fn1 is None # We've added a catch to check if we get ValueError from huey we should return None def test_03_register_schedule__schedule_not_found(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) with self.assertRaises(RuntimeError): @helper.register_schedule @@ -85,7 +86,7 @@ def _fn2(): print('fake fn') def test_04_register_execute(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_b)) + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_b)) @helper.register_execute(is_load_config=True) def _fn3(): @@ -95,7 +96,7 @@ def _fn3(): assert _fn3.settings['default_retries'] == self.expected_retries def test_05_register_execute__config_not_found(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(events_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) with self.assertRaises(RuntimeError): @@ -104,7 +105,7 @@ def _fn4(): print('fake fn') def test_06_register_execute__a_second_time(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_b)) + helper = background_helper.RedisHueyTaskHelper(events_queue, fixture_bgtask_class(self.task_name_b)) @helper.register_execute(is_load_config=True) def _fn3(): @@ -113,7 +114,7 @@ def _fn3(): assert _fn3 is None # We've added a catch to check if we get ValueError from huey we should return None def test_07_register_execute__without_load_config(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(events_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) @helper.register_execute(is_load_config=False) diff --git a/doajtest/unit/test_background_task_status.py b/doajtest/unit/test_background_task_status.py deleted file mode 100644 index 871a7df18b..0000000000 --- a/doajtest/unit/test_background_task_status.py +++ /dev/null @@ -1,235 +0,0 @@ -import json - -from doajtest.fixtures.background import save_mock_bgjob -from doajtest.helpers import DoajTestCase, apply_test_case_config, patch_config -from portality import constants -from portality.bll import DOAJ -from portality.tasks.anon_export import AnonExportBackgroundTask -from portality.tasks.journal_csv import JournalCSVBackgroundTask - -background_task_status = DOAJ.backgroundTaskStatusService() -is_stable = background_task_status.is_stable - -# config BG_MONITOR_ERRORS_CONFIG -bg_monitor_errors_config__empty = { - 'BG_MONITOR_ERRORS_CONFIG': {} -} - -bg_monitor_errors_config__a = { - 'BG_MONITOR_ERRORS_CONFIG': { - JournalCSVBackgroundTask.__action__: { - 'check_sec': 3600, - 'allowed_num_err': 0, - } - } -} - -bg_monitor_errors_config__b = { - 'BG_MONITOR_ERRORS_CONFIG': { - 'kajdlaksj': { - 'check_sec': 3600, - 'allowed_num_err': 0, - } - } -} - -# config BG_MONITOR_QUEUED_CONFIG -bg_monitor_queued_config__a = { - 'BG_MONITOR_QUEUED_CONFIG': { - 'journal_csv': { - 'total': 99999999, - 'oldest': 1200, - } - } -} - -bg_monitor_queued_config__zero_total = { - 'BG_MONITOR_QUEUED_CONFIG': { - 'journal_csv': { - 'total': 0, - 'oldest': 1200, - } - } -} - -# config BG_MONITOR_LAST_COMPLETED -bg_monitor_last_completed__now = { - 'BG_MONITOR_LAST_COMPLETED': { - 'main_queue': 0, - 'long_running': 0, - } -} - -bg_monitor_last_completed__a = { - 'BG_MONITOR_LAST_COMPLETED': { - 'main_queue': 10000, - 'long_running': 10000, - } -} - - -class TestBackgroundTaskStatus(DoajTestCase): - @classmethod - def setUpClass(cls) -> None: - super().setUpClass() - cls.org_config = patch_config(cls.app_test, { - 'HUEY_SCHEDULE': { - JournalCSVBackgroundTask.__action__: constants.CRON_NEVER, - AnonExportBackgroundTask.__action__: constants.CRON_NEVER, - }, - }) - - @classmethod - def tearDownClass(cls) -> None: - super().tearDownClass() - patch_config(cls.app_test, cls.org_config) - - @staticmethod - def assert_stable_dict(val: dict): - assert is_stable(val.get('status')) - assert len(val.get('err_msgs')) == 0 - - @staticmethod - def assert_unstable_dict(val): - assert not is_stable(val.get('status')) - assert len(val.get('err_msgs')) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_01_create_background_status__invalid_last_completed__main_queue(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_MAIN, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_stable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_02_create_background_status__invalid_last_completed__long_running(self): - save_mock_bgjob(AnonExportBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_LONG, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert not is_stable(status_dict['status']) - self.assert_stable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_unstable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__a) - def test_03_create_background_status__valid_last_completed(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_MAIN, - status=constants.BGJOB_STATUS_COMPLETE, ) - save_mock_bgjob(AnonExportBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_LONG, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert is_stable(status_dict['status']) - self.assert_stable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_stable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_04_create_background_status__valid_last_completed__no_record(self): - status_dict = background_task_status.create_background_status() - assert is_stable(status_dict['status']) - - @apply_test_case_config(bg_monitor_errors_config__empty) - def test_05_create_background_status__empty_errors_config(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - assert journal_csv_dict - # unstable action should be on top of the list after sorting - first_key = next(iter(status_dict['queues']['main_queue']['errors'])) - assert not is_stable(status_dict['queues']['main_queue']['errors'][first_key]['status']) - - @apply_test_case_config(bg_monitor_errors_config__a) - def test_06_create_background_status__error_in_period_found(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(journal_csv_dict) - assert journal_csv_dict.get('in_monitoring_period', 0) > 0 - - @apply_test_case_config(bg_monitor_errors_config__a) - def test_07_create_background_status__error_in_period_not_found(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, - created_before_sec=1000000000) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - self.assert_stable_dict(journal_csv_dict) - assert journal_csv_dict.get('in_monitoring_period', 0) == 0 - - @apply_test_case_config(bg_monitor_queued_config__zero_total) - def test_08_create_background_status__queued_invalid_total(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - assert journal_csv_dict.get('total', 0) - self.assert_unstable_dict(journal_csv_dict) - - @apply_test_case_config(bg_monitor_queued_config__zero_total) - def test_09_create_background_status__queued_valid_total(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - assert journal_csv_dict.get('total', 0) == 0 - self.assert_stable_dict(journal_csv_dict) - - @apply_test_case_config(bg_monitor_queued_config__a) - def test_10_create_background_status__queued_invalid_oldest(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, - created_before_sec=1000000000) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(journal_csv_dict) - assert journal_csv_dict.get('oldest') is not None - - @apply_test_case_config(bg_monitor_queued_config__a) - def test_11_create_background_status__queued_valid_oldest(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, ) - - status_dict = background_task_status.create_background_status() - print(json.dumps(status_dict, indent=4)) - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - self.assert_stable_dict(journal_csv_dict) - assert journal_csv_dict.get('oldest') is not None diff --git a/doajtest/unit/test_background_task_status_parameterised.py b/doajtest/unit/test_background_task_status_parameterised.py new file mode 100644 index 0000000000..7b78538f90 --- /dev/null +++ b/doajtest/unit/test_background_task_status_parameterised.py @@ -0,0 +1,159 @@ +from parameterized import parameterized +from combinatrix.testintegration import load_parameter_sets + +from portality.models.background import BackgroundJob +from portality.lib.paths import rel2abs + +def load_cases(): + return load_parameter_sets(rel2abs(__file__, "..", "matrices", "background_task_status"), + "background_task_status", + "test_id", + {"test_id" : []}) + +import json + +from doajtest.fixtures.background import save_mock_bgjob +from doajtest.helpers import DoajTestCase, apply_test_case_config, patch_config +from portality import constants +from portality.bll import DOAJ +from portality.tasks.anon_export import AnonExportBackgroundTask +from portality.tasks.journal_csv import JournalCSVBackgroundTask + +background_task_status = DOAJ.backgroundTaskStatusService() + +# Configures the monitoring period and the allowed number of errors in that period before a queue is marked +# as unstable +BG_MONITOR_ERRORS_CONFIG = { + 'set_in_doaj': { + 'check_sec': 3000, + 'allowed_num_err': 0 + }, + 'anon_export': { + 'check_sec': 3000, + 'allowed_num_err': 0, + }, + 'journal_csv': { + 'check_sec': 3000, + 'allowed_num_err': 0 + } +} + +# Configures the total number of queued items and the age of the oldest of those queued items allowed +# before the queue is marked as unstable. This is provided by type, so we can monitor all types separately +BG_MONITOR_QUEUED_CONFIG = { + 'set_in_doaj': { + 'total': 1, + 'oldest': 3000, + }, + 'anon_export': { + 'total': 1, + 'oldest': 3000, + }, + 'journal_csv': { + 'total': 1, + 'oldest': 3000, + } +} + +BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG = { + 'anon_export': { + 'last_run_successful_in': 5000 + }, + 'set_in_doaj': { + 'last_run_successful_in': 5000 + }, + 'journal_csv': { + 'last_run_successful_in': 5000 + } +} + + +class TestBackgroundTaskStatus(DoajTestCase): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.org_config = patch_config(cls.app_test, { + 'HUEY_SCHEDULE': { + JournalCSVBackgroundTask.__action__: constants.CRON_NEVER, + AnonExportBackgroundTask.__action__: constants.CRON_NEVER, + }, + "BG_MONITOR_ERRORS_CONFIG": BG_MONITOR_ERRORS_CONFIG, + "BG_MONITOR_QUEUED_CONFIG": BG_MONITOR_QUEUED_CONFIG, + "BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG": BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG + }) + + @classmethod + def tearDownClass(cls) -> None: + super().tearDownClass() + patch_config(cls.app_test, cls.org_config) + + @parameterized.expand(load_cases) + def test_01_background_task_status(self, name, kwargs): + in_queue_arg = kwargs.get("in_queue") + oldest_queued_arg = kwargs.get("oldest_queued") + error_count_arg = kwargs.get("error_count") + error_age_arg = kwargs.get("error_age") + lrs_success_or_error_arg = kwargs.get("lrs_success_or_error") + queued_arg = kwargs.get("queued") + errors_arg = kwargs.get("errors") + lrs_arg = kwargs.get("lrs") + + in_queue = int(in_queue_arg) + oldest_queued = 3600 if oldest_queued_arg == "old" else 600 + error_count = int(error_count_arg) + error_age = 600 if error_age_arg == "in_period" else 3600 + + queues = [("events", "set_in_doaj"), + ("scheduled_long", "anon_export"), + ("scheduled_short", "journal_csv")] + + # set up + ########################################### + blocks = [] + for q, a in queues: + if in_queue > 0: + # create the number of jobs that should be in status "queued" + # their ages are set to the oldest allowed age + the index number for the purposes of disambiguation + for i in range(in_queue): + job = save_mock_bgjob(action=a, status="queued", created_before_sec=oldest_queued + i, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + if error_count > 0: + # create a single error job if the requested age + job = save_mock_bgjob(action=a, status="error", created_before_sec=error_age, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + if lrs_success_or_error_arg != "empty": + age = 4000 + # if there is an error that's being tested (see above), we need to make sure that the more recent job is "complete" + # to deactivate this test, if the lrs test is supposed to be "stable". + if error_count > 0 and lrs_success_or_error_arg == "complete": + age = 100 + job = save_mock_bgjob(action=a, status=lrs_success_or_error_arg, created_before_sec=age, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + BackgroundJob.blockall(blocks) + + # Execute + ########################################### + status = background_task_status.create_background_status() + + + # Assert + ########################################### + + for q, a in queues: + assert status['queues'][q]["errors"][a]["status"] == errors_arg + assert status['queues'][q]["queued"][a]["status"] == queued_arg + assert status['queues'][q]["last_run_successful"][a]["status"] == lrs_arg + + if "unstable" in [errors_arg, queued_arg, lrs_arg]: + assert status['queues'][q]["status"] == "unstable" + else: + assert status['queues'][q]["status"] == "stable" + + if "unstable" in [errors_arg, queued_arg, lrs_arg]: + assert status['status'] == "unstable" + else: + assert status['status'] == "stable" + diff --git a/doajtest/unit_tester/bgtask_tester.py b/doajtest/unit_tester/bgtask_tester.py index c5a02df971..3b3251cbb7 100644 --- a/doajtest/unit_tester/bgtask_tester.py +++ b/doajtest/unit_tester/bgtask_tester.py @@ -6,5 +6,6 @@ def test_queue_id_assigned(bgtask_class: Type[BackgroundTask]): job = bgtask_class.prepare('just a username') - assert job.queue_id in {constants.BGJOB_QUEUE_ID_MAIN, - constants.BGJOB_QUEUE_ID_LONG} + assert job.queue_id in {constants.BGJOB_QUEUE_ID_EVENTS, + constants.BGJOB_QUEUE_ID_SCHEDULED_LONG, + constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT} diff --git a/docs/dev/how-to-implement.md b/docs/dev/how-to-implement.md index b76f849d10..e468cc7c69 100644 --- a/docs/dev/how-to-implement.md +++ b/docs/dev/how-to-implement.md @@ -8,13 +8,13 @@ How to create a background job * choice a task queue, details of task queue can have find in `portality/tasks/redis_huey.py` ```python -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) ``` * add execute function below BackgroundTask class ```python -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) @@ -76,5 +76,5 @@ HUEY_SCHEDULE = { ### Register your task -* add your execute and schedule function in `portality/tasks/consumer_long_running.py` - or `portality/tasks/consumer_main_queue.py` +* add your execute and/or schedule function in `portality/tasks/consumer_scheduled_long.py` + `portality/tasks/consumer_scheduled_short.py` or `portality/tasks/consumer_events_queue.py` diff --git a/docs/dev/user-guide/user-guide.md b/docs/dev/user-guide/user-guide.md index 143c09027b..2c9ba1d3d6 100644 --- a/docs/dev/user-guide/user-guide.md +++ b/docs/dev/user-guide/user-guide.md @@ -30,9 +30,9 @@ HUEY_SCHEDULE = { } ``` -* run your `main` background job consumer +* run your `scheduled_short` background job consumer ``` -~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_main_queue.main_queue +~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue ``` * wait 10 ~ 30 minute for generate some background jobs diff --git a/portality/bll/services/background_task_status.py b/portality/bll/services/background_task_status.py index 3ebe187ff7..c928350287 100644 --- a/portality/bll/services/background_task_status.py +++ b/portality/bll/services/background_task_status.py @@ -4,8 +4,9 @@ import itertools from typing import Iterable -from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \ - BG_STATUS_STABLE, BG_STATUS_UNSTABLE +from portality import constants +# from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \ +# BG_STATUS_STABLE, BG_STATUS_UNSTABLE, BGJOB_STATUS_COMPLETE from portality.core import app from portality.lib import dates from portality.models.background import BackgroundJobQueryBuilder, BackgroundJob, SimpleBgjobQueue, \ @@ -21,11 +22,11 @@ class BackgroundTaskStatusService: @staticmethod def is_stable(val): - return val == BG_STATUS_STABLE + return val == constants.BG_STATUS_STABLE @staticmethod def to_bg_status_str(stable_val: bool) -> str: - return BG_STATUS_STABLE if stable_val else BG_STATUS_UNSTABLE + return constants.BG_STATUS_STABLE if stable_val else constants.BG_STATUS_UNSTABLE def all_stable(self, items: Iterable, field_name='status') -> bool: return all(self.is_stable(q.get(field_name)) for q in items) @@ -33,8 +34,48 @@ def all_stable(self, items: Iterable, field_name='status') -> bool: def all_stable_str(self, items: Iterable, field_name='status') -> str: return self.to_bg_status_str(self.all_stable(items, field_name)) + def create_last_successfully_run_status(self, action, last_run_successful_in=0, **_) -> dict: + if last_run_successful_in == 0: + return dict( + status=constants.BG_STATUS_STABLE, + last_run=None, + last_run_status=None, + err_msgs=[] + ) + + lr_query = (BackgroundJobQueryBuilder().action(action) + .since(dates.before_now(last_run_successful_in)) + .status_includes([constants.BGJOB_STATUS_COMPLETE, constants.BGJOB_STATUS_ERROR]) + .size(1) + .order_by('created_date', 'desc') + .build_query_dict()) + + lr_results = BackgroundJob.q2obj(q=lr_query) + lr_job = lr_results[0] if len(lr_results) > 0 else None + + status = constants.BG_STATUS_UNSTABLE + lr = None + last_run_status = None + msg = ["No background jobs completed or errored in the time period"] + + if lr_job is not None: + lr = lr_job.created_date + last_run_status = lr_job.status + if lr_job.status == constants.BGJOB_STATUS_COMPLETE: + status = constants.BG_STATUS_STABLE + msg = [] + else: + msg = ["Last job did not complete successfully"] + + return dict( + status=status, + last_run=lr, + last_run_status=last_run_status, + err_msgs=msg + ) + def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) -> dict: - in_monitoring_query = SimpleBgjobQueue(action, status=BGJOB_STATUS_ERROR, since=dates.before_now(check_sec)) + in_monitoring_query = SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR, since=dates.before_now(check_sec)) num_err_in_monitoring = BackgroundJob.hit_count(query=in_monitoring_query.query()) # prepare errors messages @@ -44,15 +85,15 @@ def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) - return dict( status=self.to_bg_status_str(not err_msgs), - total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=BGJOB_STATUS_ERROR).query()), + total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR).query()), in_monitoring_period=num_err_in_monitoring, err_msgs=err_msgs, ) def create_queued_status(self, action, total=2, oldest=1200, **_) -> dict: - total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=BGJOB_STATUS_QUEUED).query()) + total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_QUEUED).query()) oldest_query = (BackgroundJobQueryBuilder().action(action) - .status_includes(BGJOB_STATUS_QUEUED).size(1) + .status_includes(constants.BGJOB_STATUS_QUEUED).size(1) .order_by('created_date', 'asc') .build_query_dict()) oldest_jobs = list(BackgroundJob.q2obj(q=oldest_query)) @@ -92,6 +133,9 @@ def create_queues_status(self, queue_name) -> dict: queued = {action: self.create_queued_status(action, **config) for action, config in self.get_config_dict_by_queue_name('BG_MONITOR_QUEUED_CONFIG', queue_name).items()} + last_run = {action: self.create_last_successfully_run_status(action, **config) for action, config + in self.get_config_dict_by_queue_name('BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG', queue_name).items()} + # prepare for err_msgs limited_sec = app.config.get('BG_MONITOR_LAST_COMPLETED', {}).get(queue_name) if limited_sec is None: @@ -107,10 +151,11 @@ def create_queues_status(self, queue_name) -> dict: result_dict = dict( status=self.to_bg_status_str( - not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values()))), + not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values(), last_run.values()))), last_completed_job=last_completed_date and dates.format(last_completed_date), errors=errors, queued=queued, + last_run_successful=last_run, err_msgs=err_msgs, ) return result_dict @@ -129,7 +174,11 @@ def get_config_dict_by_queue_name(config_name, queue_name): def create_background_status(self) -> dict: queues = { queue_name: self.create_queues_status(queue_name) - for queue_name in [BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN] + for queue_name in [constants.BGJOB_QUEUE_ID_LONG, + constants.BGJOB_QUEUE_ID_MAIN, + constants.BGJOB_QUEUE_ID_EVENTS, + constants.BGJOB_QUEUE_ID_SCHEDULED_LONG, + constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT] } result_dict = dict( diff --git a/portality/constants.py b/portality/constants.py index 974ab3f49c..c3bde76b21 100644 --- a/portality/constants.py +++ b/portality/constants.py @@ -106,6 +106,9 @@ BGJOB_QUEUE_ID_LONG = 'long_running' BGJOB_QUEUE_ID_MAIN = 'main_queue' BGJOB_QUEUE_ID_UNKNOWN = 'unknown' +BGJOB_QUEUE_ID_EVENTS = "events" +BGJOB_QUEUE_ID_SCHEDULED_SHORT = "scheduled_short" +BGJOB_QUEUE_ID_SCHEDULED_LONG = "scheduled_long" # Background monitor status BG_STATUS_STABLE = 'stable' diff --git a/portality/core.py b/portality/core.py index 313af94bcf..077f932a13 100644 --- a/portality/core.py +++ b/portality/core.py @@ -71,8 +71,12 @@ def configure_app(app): a) the settings.py file b) the .cfg file c) the local secrets config in app.cfg + d) the ansible-generated instance config - Later imports have precedence, so e.g. app.cfg will override the same setting in production.cfg and settings.py. + instance.cfg is generated by ansible script (server_initial_setup.yml from sysadmin repo) + + Later imports have precedence, so e.g. instance.cfg override app.cfg, app.cfg will override the same setting + in production.cfg and settings.py. """ # import for settings.py @@ -93,6 +97,12 @@ def configure_app(app): app.config.from_pyfile(config_path) print('Loaded secrets config from ' + config_path) + # import from instance.cfg + instance_path = os.path.join(proj_root, 'instance.cfg') + if os.path.exists(instance_path): + app.config.from_pyfile(instance_path) + print('Loaded instance specific config from ' + instance_path) + def get_app_env(app): if not app.config.get('VALID_ENVIRONMENTS'): diff --git a/portality/dao.py b/portality/dao.py index 300e6f3e17..176a741486 100644 --- a/portality/dao.py +++ b/portality/dao.py @@ -1,16 +1,18 @@ -import time +from __future__ import annotations + +import json import re import os import sys -import uuid -import json -import elasticsearch +import time import urllib.parse - +import uuid from collections import UserDict from copy import deepcopy from datetime import timedelta -from typing import List +from typing import List, Iterable, Tuple + +import elasticsearch from portality.core import app, es_connection as ES from portality.lib import dates @@ -724,15 +726,16 @@ def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, o return filenames @classmethod - def bulk_load_from_file(cls, source_file, limit=None, max_content_length=100000000): + def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000): """ ported from esprit.tasks - bulk load to index from file """ + index = index or cls.index_name() source_size = os.path.getsize(source_file) with open(source_file, "r") as f: if limit is None and source_size < max_content_length: # if we aren't selecting a portion of the file, and the file is below the max content length, then # we can just serve it directly - ES.bulk(body=f.read(), index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120) + ES.bulk(body=f.read(), index=index, doc_type=cls.doc_type(), request_timeout=120) return -1 else: count = 0 @@ -755,7 +758,7 @@ def bulk_load_from_file(cls, source_file, limit=None, max_content_length=1000000 else: count += records - ES.bulk(body=chunk, index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120) + ES.bulk(body=chunk, index=index, doc_type=cls.doc_type(), request_timeout=120) if finished: break if limit is not None: @@ -1065,6 +1068,25 @@ def refresh(): return ES.indices.refresh() +def find_indexes_by_prefix(index_prefix) -> list[str]: + data = ES.indices.get(f'{index_prefix}*') + return list(data.keys()) + + +def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]: + def _yield_index_alias(): + data = ES.indices.get_alias() + for index, d in data.items(): + for alias in d['aliases'].keys(): + yield index, alias + + index_aliases = _yield_index_alias() + if alias_prefixes: + index_aliases = ((index, alias) for index, alias in index_aliases + if any(alias.startswith(p) for p in alias_prefixes)) + return index_aliases + + class BlockTimeOutException(Exception): pass diff --git a/portality/lib/dates.py b/portality/lib/dates.py index 52f6b0a809..775cb7d6b8 100644 --- a/portality/lib/dates.py +++ b/portality/lib/dates.py @@ -92,8 +92,8 @@ def now_str_with_microseconds() -> str: return format(now(), format=FMT_DATETIME_MS_STD) -def today() -> str: - return format(now(), format=FMT_DATE_STD) +def today(str_format=FMT_DATE_STD) -> str: + return format(now(), format=str_format) def random_date(fro: datetime = None, to: datetime = None) -> str: diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index f1e6a96419..33bc914fc9 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -13,16 +13,40 @@ DOAJENV=test python portality/scripts/anon_import.py data_import_settings/test_server.json """ -import json, gzip, shutil, elasticsearch -from portality.core import app, es_connection, initialise_index -from portality.store import StoreFactory -from portality.dao import DomainObject -from portality import models +from __future__ import annotations + +import gzip +import itertools +import json +import re +import shutil +from dataclasses import dataclass +from time import sleep + +import portality.dao from doajtest.helpers import patch_config +from portality import models +from portality.core import app, es_connection +from portality.dao import DomainObject +from portality.lib import dates, es_data_mapping +from portality.store import StoreFactory +from portality.util import ipt_prefix -def do_import(config): +@dataclass +class IndexDetail: + index_type: str + instance_name: str + alias_name: str + +def find_toberemoved_indexes(prefix): + for index in portality.dao.find_indexes_by_prefix(prefix): + if index == prefix or re.match(rf"{prefix}-\d+", index): + yield index + + +def do_import(config): # filter for the types we are going to work with import_types = {} for t, s in config.get("types", {}).items(): @@ -35,22 +59,58 @@ def do_import(config): print(("{x} from {y}".format(x=count, y=import_type))) print("\n") + toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()] + toberemoved_indexes = itertools.chain.from_iterable( + find_toberemoved_indexes(p) for p in toberemoved_index_prefixes + ) + toberemoved_index_aliases = list(portality.dao.find_index_aliases(toberemoved_index_prefixes)) + + if toberemoved_indexes: + print("==Removing the following indexes==") + print(' {}'.format(', '.join(toberemoved_indexes))) + print() + if toberemoved_index_aliases: + print("==Removing the following aliases==") + print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) + print() + if config.get("confirm", True): text = input("Continue? [y/N] ") if text.lower() != "y": exit() # remove all the types that we are going to import - for import_type in list(import_types.keys()): - try: - if es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type): - es_connection.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type) - except elasticsearch.exceptions.NotFoundError: - pass + for index in toberemoved_indexes: + if es_connection.indices.exists(index): + print("Deleting index: {}".format(index)) + es_connection.indices.delete(index, ignore=[404]) + + for index, alias in toberemoved_index_aliases: + if es_connection.indices.exists_alias(alias, index=index): + print("Deleting alias: {} -> {}".format(index, alias)) + es_connection.indices.delete_alias(index, alias, ignore=[404]) + + index_details = {} + for import_type in import_types.keys(): + alias_name = ipt_prefix(import_type) + index_details[import_type] = IndexDetail( + index_type=import_type, + instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)), + alias_name=alias_name + ) # re-initialise the index (sorting out mappings, etc) - print("==Initialising Index for Mappings==") - initialise_index(app, es_connection) + print("==Initialising Index Mappings and alias ==") + mappings = es_data_mapping.get_mappings(app) + for index_detail in index_details.values(): + print("Initialising index: {}".format(index_detail.instance_name)) + es_connection.indices.create(index=index_detail.instance_name, + body=mappings[index_detail.index_type], + request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None)) + + print("Creating alias: {:<25} -> {}".format(index_detail.instance_name, index_detail.alias_name)) + blocking_if_indices_exist(index_detail.alias_name) + es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name) mainStore = StoreFactory.get("anon_data") tempStore = StoreFactory.tmp() @@ -85,10 +145,12 @@ def do_import(config): shutil.copyfileobj(f_in, f_out) tempStore.delete_file(container, filename + ".gz") - print(("Importing from {x}".format(x=filename))) + instance_index_name = index_details[import_type].instance_name + print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name)) imported_count = dao.bulk_load_from_file(uncompressed_file, - limit=limit, max_content_length=config.get("max_content_length", 100000000)) + index=instance_index_name, limit=limit, + max_content_length=config.get("max_content_length", 100000000)) tempStore.delete_file(container, filename) if limit is not None and imported_count != -1: @@ -105,9 +167,18 @@ def do_import(config): tempStore.delete_container(container) +def blocking_if_indices_exist(index_name): + for retry in range(5): + if not es_connection.indices.exists(index_name): + break + print(f"Old alias exists, waiting for it to be removed, alias[{index_name}] retry[{retry}]...") + sleep(5) + + if __name__ == '__main__': import argparse + parser = argparse.ArgumentParser() parser.add_argument("config", help="Config file for import run, e.g dev_basics.json") diff --git a/portality/scripts/journals_update_via_csv.py b/portality/scripts/journals_update_via_csv.py index 9d5f61e6fd..15d9ef4182 100644 --- a/portality/scripts/journals_update_via_csv.py +++ b/portality/scripts/journals_update_via_csv.py @@ -189,6 +189,10 @@ def confirm_prompt(): try: if not args.dry_run: + # FIXME: we have to validate, as this pre-filters conditional fields and makes other protections + # on the form values. This is not really ideal, as validate shouldn't really be changing the data! + fc.validate() + # Save the update request fc.finalise(email_alert=False) print('Update request created.') diff --git a/portality/settings.py b/portality/settings.py index 5ec67f0734..b07037aebc 100644 --- a/portality/settings.py +++ b/portality/settings.py @@ -9,7 +9,7 @@ # Application Version information # ~~->API:Feature~~ -DOAJ_VERSION = "7.0.4" +DOAJ_VERSION = "8.0.0" API_VERSION = "4.0.0" ###################################### @@ -442,8 +442,7 @@ "old_data_cleanup": {"month": "*", "day": "12", "day_of_week": "*", "hour": "6", "minute": "30"}, "monitor_bgjobs": {"month": "*", "day": "*/6", "day_of_week": "*", "hour": "10", "minute": "0"}, "find_discontinued_soon": {"month": "*", "day": "*", "day_of_week": "*", "hour": "0", "minute": "3"}, - "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "*/30"}, - "article_bulk_create": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "20"}, + "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "*/30"} } @@ -1390,39 +1389,74 @@ # Background monitor # ~~->BackgroundMonitor:Feature~~ +# some time period for convenience +_MIN = 60 +_HOUR = 3600 +_DAY = 24 * _HOUR +_WEEK = 7 * _DAY + # Configures the age of the last completed job on the queue before the queue is marked as unstable # (in seconds) BG_MONITOR_LAST_COMPLETED = { - 'main_queue': 7200, # 2 hours - 'long_running': 93600, # 26 hours + 'events': 2 * _HOUR, # 2 hours + 'scheduled_short': 2 * _HOUR, # 2 hours + 'scheduled_long': _DAY + 2 * _HOUR, # 26 hours } # Default monitoring config for background job types which are not enumerated in BG_MONITOR_ERRORS_CONFIG below BG_MONITOR_DEFAULT_CONFIG = { + ## default values for queued config + + # the total number of items that are allowed to be in `queued` state at the same time. + # Any more than this and the result is flagged 'total': 2, - 'oldest': 1200, + + # The age of the oldest record allowed to be in the `queued` state. + # If the oldest queued item was created before this, the result is flagged + 'oldest': 20 * _MIN, + + ## default values for error config + + # The time period over which to check for errors, from now to now - check_sec + 'check_sec': _HOUR, + + # The number of errors allowed in the check period before the result is flagged + 'allowed_num_err': 0, + + # The last time this job ran within the specified time period, was it successful. + # If the most recent job in the timeframe is an error, this will trigger an "unstable" state (0 turns this off) + 'last_run_successful_in': 0 } # Configures the monitoring period and the allowed number of errors in that period before a queue is marked # as unstable BG_MONITOR_ERRORS_CONFIG = { - # Main queue - 'journal_csv': { - 'check_sec': 3600, # 1 hour, time period between scheduled runs + 'anon_export': { + 'check_sec': _WEEK, # a week + 'allowed_num_err': 0 + }, + 'article_bulk_create': { + 'check_sec': _DAY, # 1 day 'allowed_num_err': 0, }, - 'ingest_articles': { - 'check_sec': 86400, + 'article_cleanup_sync': { + 'check_sec': 2 * _DAY, # 2 days 'allowed_num_err': 0 }, - - # Long running 'harvest': { - 'check_sec': 86400, + 'check_sec': _DAY, 'allowed_num_err': 0, }, + 'ingest_articles': { + 'check_sec': _DAY, + 'allowed_num_err': 0 + }, + 'journal_csv': { + 'check_sec': 3 * _HOUR, + 'allowed_num_err': 1, + }, 'public_data_dump': { - 'check_sec': 86400 * 7, + 'check_sec': 2 * _HOUR, 'allowed_num_err': 0 } } @@ -1430,24 +1464,92 @@ # Configures the total number of queued items and the age of the oldest of those queued items allowed # before the queue is marked as unstable. This is provided by type, so we can monitor all types separately BG_MONITOR_QUEUED_CONFIG = { - # Main queue - 'journal_csv': { - 'total': 2, - 'oldest': 1200, # 20 mins + 'anon_export': { + 'total': 1, + 'oldest': 20 * _MIN }, - 'ingest_articles': { - 'total': 250, - 'oldest': 86400 + 'article_bulk_create': { + 'total': 3, + 'oldest': 10 * _MIN }, - - # Long running 'harvest': { 'total': 1, - 'oldest': 86400 + 'oldest': _DAY + }, + 'ingest_articles': { + 'total': 10, + 'oldest': 10 * _MIN + }, + 'journal_bulk_edit': { + 'total': 2, + 'oldest': 10 * _MIN + }, + 'journal_csv': { + 'total': 1, + 'oldest': 20 * _MIN }, 'public_data_dump': { 'total': 1, - 'oldest': 86400 + 'oldest': _DAY + }, + 'set_in_doaj': { + 'total': 2, + 'oldest': 10 * _MIN + }, + 'suggestion_bulk_edit': { + 'total': 2, + 'oldest': 10 * _MIN + } +} + +BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG = { + 'anon_export': { + 'last_run_successful_in': 32 * _DAY + }, + 'article_cleanup_sync': { + 'last_run_successful_in': 33 * _DAY + }, + 'async_workflow_notifications': { + 'last_run_successful_in': _WEEK + _DAY + }, + 'check_latest_es_backup': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'datalog_journal_added_update': { + 'last_run_successful_in': _HOUR + }, + 'find_discontinued_soon': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'harvest': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'journal_csv': { + 'last_run_successful_in': 2 * _HOUR + }, + 'monitor_bgjobs': { + 'last_run_successful_in': _WEEK + _DAY + }, + 'old_data_cleanup': { + 'last_run_successful_in': 32 * _DAY + }, + 'prune_es_backups': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'public_data_dump': { + 'last_run_successful_in': 32 * _DAY + }, + 'read_news': { + 'last_run_successful_in': 2 * _HOUR + }, + 'reporting': { + 'last_run_successful_in': 32 * _DAY + }, + 'request_es_backup': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'sitemap': { + 'last_run_successful_in': _DAY + _HOUR } } diff --git a/portality/tasks/anon_export.py b/portality/tasks/anon_export.py index 6b23589d2c..1b06ddd42b 100644 --- a/portality/tasks/anon_export.py +++ b/portality/tasks/anon_export.py @@ -13,7 +13,7 @@ from portality.lib.dataobj import DataStructureException from portality.store import StoreFactory from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue def _anonymise_email(record): @@ -182,7 +182,7 @@ def submit(cls, background_job): anon_export.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = AnonExportBackgroundTask.create_huey_helper(long_running) +huey_helper = AnonExportBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/application_autochecks.py b/portality/tasks/application_autochecks.py index 812fc7683b..aeeb8080e8 100644 --- a/portality/tasks/application_autochecks.py +++ b/portality/tasks/application_autochecks.py @@ -2,7 +2,7 @@ from portality import models from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.bll import DOAJ @@ -88,7 +88,7 @@ def submit(cls, background_job): application_autochecks.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = ApplicationAutochecks.create_huey_helper(main_queue) +huey_helper = ApplicationAutochecks.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/article_bulk_create.py b/portality/tasks/article_bulk_create.py index 3066ecaefe..8ffaf7d84e 100644 --- a/portality/tasks/article_bulk_create.py +++ b/portality/tasks/article_bulk_create.py @@ -9,7 +9,7 @@ from portality.lib import dataobj from portality.models.uploads import BulkArticles from portality.tasks.helpers import background_helper, articles_upload_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def get_upload_dir_path() -> Path: @@ -78,7 +78,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, article_bulk_create) -huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(main_queue) +huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/article_bulk_delete.py b/portality/tasks/article_bulk_delete.py index b45433fd83..7d56add360 100644 --- a/portality/tasks/article_bulk_delete.py +++ b/portality/tasks/article_bulk_delete.py @@ -6,7 +6,7 @@ from portality import models from portality.core import app from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.util import batch_up @@ -127,7 +127,7 @@ def submit(cls, background_job): article_bulk_delete.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/article_cleanup_sync.py b/portality/tasks/article_cleanup_sync.py index 8a17a857db..805fc1c38a 100644 --- a/portality/tasks/article_cleanup_sync.py +++ b/portality/tasks/article_cleanup_sync.py @@ -11,7 +11,7 @@ from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class ArticleCleanupSyncBackgroundTask(BackgroundTask): @@ -216,7 +216,7 @@ def submit(cls, background_job): article_cleanup_sync.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(long_running) +huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/article_duplicate_report.py b/portality/tasks/article_duplicate_report.py index 7c4afcc4e1..2c8a37e6f5 100644 --- a/portality/tasks/article_duplicate_report.py +++ b/portality/tasks/article_duplicate_report.py @@ -12,7 +12,7 @@ from portality.bll.doaj import DOAJ from portality.core import app, es_connection from portality.lib import dates -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class ArticleDuplicateReportBackgroundTask(BackgroundTask): @@ -286,7 +286,7 @@ def submit(cls, background_job): article_duplicate_report.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(long_running) +huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(queue) ''' @long_running.periodic_task(schedule("article_duplicate_report")) diff --git a/portality/tasks/async_workflow_notifications.py b/portality/tasks/async_workflow_notifications.py index af0bde8e8e..b98e85ae39 100644 --- a/portality/tasks/async_workflow_notifications.py +++ b/portality/tasks/async_workflow_notifications.py @@ -10,7 +10,7 @@ from portality.lib import dates from portality.lib.dates import FMT_DATETIME_STD from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue, schedule from portality.ui import templates @@ -432,7 +432,7 @@ def submit(cls, background_job): async_workflow_notifications.schedule(args=(background_job.id,), delay=10) -huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(main_queue) +huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/check_latest_es_backup.py b/portality/tasks/check_latest_es_backup.py index 63b15bd8f3..698c6a7683 100644 --- a/portality/tasks/check_latest_es_backup.py +++ b/portality/tasks/check_latest_es_backup.py @@ -3,7 +3,7 @@ from portality.core import app, es_connection from portality.lib.es_snapshot import ESSnapshotsClient from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class CheckLatestESBackupBackgroundTask(BackgroundTask): @@ -60,7 +60,7 @@ def submit(cls, background_job): check_latest_es_backup.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = CheckLatestESBackupBackgroundTask.create_huey_helper(main_queue) +huey_helper = CheckLatestESBackupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/consumer_events_queue.py b/portality/tasks/consumer_events_queue.py new file mode 100644 index 0000000000..0308bc3661 --- /dev/null +++ b/portality/tasks/consumer_events_queue.py @@ -0,0 +1,26 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging +from portality.core import app + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import events_queue # noqa + +# now import the tasks that will bind to those queues + +from portality.tasks.article_bulk_create import article_bulk_create # noqa +from portality.tasks.article_bulk_delete import article_bulk_delete # noqa +from portality.tasks.ingestarticles import ingest_articles # noqa +from portality.tasks.journal_bulk_delete import journal_bulk_delete # noqa +from portality.tasks.journal_bulk_edit import journal_bulk_edit # noqa +from portality.tasks.preservation import preserve # noqa +from portality.tasks.journal_in_out_doaj import set_in_doaj # noqa +from portality.tasks.suggestion_bulk_edit import suggestion_bulk_edit # noqa + +# Conditionally enable new application autochecking +if app.config.get("AUTOCHECK_INCOMING", False): + from portality.tasks.application_autochecks import application_autochecks diff --git a/portality/tasks/consumer_scheduled_long_queue.py b/portality/tasks/consumer_scheduled_long_queue.py new file mode 100644 index 0000000000..f04b4a11c2 --- /dev/null +++ b/portality/tasks/consumer_scheduled_long_queue.py @@ -0,0 +1,17 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import scheduled_long_queue # noqa + + +from portality.tasks.anon_export import scheduled_anon_export, anon_export # noqa +from portality.tasks.article_cleanup_sync import scheduled_article_cleanup_sync, article_cleanup_sync # noqa +from portality.tasks.harvester import scheduled_harvest # noqa +from portality.tasks.public_data_dump import scheduled_public_data_dump, public_data_dump # noqa +from portality.tasks.sitemap import scheduled_sitemap, generate_sitemap # noqa diff --git a/portality/tasks/consumer_scheduled_short_queue.py b/portality/tasks/consumer_scheduled_short_queue.py new file mode 100644 index 0000000000..bae97df34e --- /dev/null +++ b/portality/tasks/consumer_scheduled_short_queue.py @@ -0,0 +1,24 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging +from portality.core import app + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import scheduled_short_queue # noqa + +# now import the tasks that will bind to those queues +from portality.tasks.async_workflow_notifications import async_workflow_notifications # noqa +from portality.tasks.check_latest_es_backup import scheduled_check_latest_es_backup, check_latest_es_backup # noqa +from portality.tasks.datalog_journal_added_update import scheduled_datalog_journal_added_update, datalog_journal_added_update # noqa +from portality.tasks.find_discontinued_soon import scheduled_find_discontinued_soon, find_discontinued_soon # noqa +from portality.tasks.journal_csv import scheduled_journal_csv, journal_csv # noqa +from portality.tasks.monitor_bgjobs import scheduled_monitor_bgjobs, monitor_bgjobs # noqa +from portality.tasks.old_data_cleanup import scheduled_old_data_cleanup, old_data_cleanup # noqa +from portality.tasks.prune_es_backups import scheduled_prune_es_backups, prune_es_backups # noqa +from portality.tasks.read_news import scheduled_read_news, read_news # noqa +from portality.tasks.reporting import scheduled_reports, run_reports # noqa +from portality.tasks.request_es_backup import scheduled_request_es_backup, request_es_backup # noqa diff --git a/portality/tasks/datalog_journal_added_update.py b/portality/tasks/datalog_journal_added_update.py index 1d967b4ee3..a56259d509 100644 --- a/portality/tasks/datalog_journal_added_update.py +++ b/portality/tasks/datalog_journal_added_update.py @@ -25,7 +25,7 @@ from portality.models import Journal from portality.models.datalog_journal_added import DatalogJournalAdded from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue log = logging.getLogger(__name__) @@ -242,7 +242,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, datalog_journal_added_update) -huey_helper = DatalogJournalAddedUpdate.create_huey_helper(main_queue) +huey_helper = DatalogJournalAddedUpdate.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/find_discontinued_soon.py b/portality/tasks/find_discontinued_soon.py index 88e3669a2e..773bb3b5a8 100644 --- a/portality/tasks/find_discontinued_soon.py +++ b/portality/tasks/find_discontinued_soon.py @@ -3,7 +3,7 @@ from portality.lib import dates from portality import models -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue from portality.background import BackgroundTask, BackgroundApi from portality.tasks.helpers import background_helper @@ -102,7 +102,7 @@ def submit(cls, background_job): find_discontinued_soon.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = FindDiscontinuedSoonBackgroundTask.create_huey_helper(main_queue) +huey_helper = FindDiscontinuedSoonBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/harvester.py b/portality/tasks/harvester.py index 6a4d6f07f5..8776f0e3c7 100644 --- a/portality/tasks/harvester.py +++ b/portality/tasks/harvester.py @@ -8,7 +8,7 @@ from portality.store import StoreFactory from portality.tasks.harvester_helpers import workflow from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class BGHarvesterLogger(object): @@ -111,7 +111,7 @@ def only_me(self): return False -huey_helper = HarvesterBackgroundTask.create_huey_helper(long_running) +huey_helper = HarvesterBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/helpers/background_helper.py b/portality/tasks/helpers/background_helper.py index 92ea036741..6b507b8926 100644 --- a/portality/tasks/helpers/background_helper.py +++ b/portality/tasks/helpers/background_helper.py @@ -12,7 +12,7 @@ from portality.background import BackgroundApi, BackgroundTask from portality.core import app from portality.decorators import write_required -from portality.tasks.redis_huey import long_running, main_queue, configure, schedule +from portality.tasks.redis_huey import long_running, main_queue, events_queue, scheduled_long_queue, scheduled_short_queue, configure, schedule TaskFactory = Callable[[models.BackgroundJob], BackgroundTask] _queue_for_action = None @@ -25,6 +25,12 @@ def get_queue_id_by_task_queue(task_queue: RedisHuey): return constants.BGJOB_QUEUE_ID_LONG elif task_queue.name == main_queue.name: return constants.BGJOB_QUEUE_ID_MAIN + elif task_queue.name == events_queue.name: + return constants.BGJOB_QUEUE_ID_EVENTS + elif task_queue.name == scheduled_long_queue.name: + return constants.BGJOB_QUEUE_ID_SCHEDULED_LONG + elif task_queue.name == scheduled_short_queue.name: + return constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT else: app.logger.warning(f'unknown task_queue[{task_queue}]') return constants.BGJOB_QUEUE_ID_UNKNOWN diff --git a/portality/tasks/ingestarticles.py b/portality/tasks/ingestarticles.py index 998f95fa83..d610bda26d 100644 --- a/portality/tasks/ingestarticles.py +++ b/portality/tasks/ingestarticles.py @@ -13,7 +13,7 @@ from portality.core import app from portality.lib import plugin from portality.tasks.helpers import background_helper, articles_upload_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.ui.messages import Messages DEFAULT_MAX_REMOTE_SIZE = 262144000 @@ -505,7 +505,7 @@ def __fail(record, previous, error): return __fail(record, previous, error="please check it before submitting again; " + str(e)) -huey_helper = IngestArticlesBackgroundTask.create_huey_helper(main_queue) +huey_helper = IngestArticlesBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/journal_autochecks.py b/portality/tasks/journal_autochecks.py index ceaa0db715..53bf912438 100644 --- a/portality/tasks/journal_autochecks.py +++ b/portality/tasks/journal_autochecks.py @@ -2,7 +2,7 @@ from portality.core import app from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import events_queue as queue from portality.bll import DOAJ ####################################### @@ -83,7 +83,7 @@ def submit(cls, background_job): journal_autochecks.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = JournalAutochecks.create_huey_helper(long_running) +huey_helper = JournalAutochecks.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/journal_bulk_delete.py b/portality/tasks/journal_bulk_delete.py index c9b67a7199..23a996f027 100644 --- a/portality/tasks/journal_bulk_delete.py +++ b/portality/tasks/journal_bulk_delete.py @@ -7,7 +7,7 @@ from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary from portality.bll import DOAJ from portality.core import app -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.ui.messages import Messages @@ -153,7 +153,7 @@ def submit(cls, background_job): journal_bulk_delete.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/journal_bulk_edit.py b/portality/tasks/journal_bulk_edit.py index 26347c168a..7747d25fa7 100644 --- a/portality/tasks/journal_bulk_edit.py +++ b/portality/tasks/journal_bulk_edit.py @@ -10,7 +10,7 @@ from portality.forms.application_forms import JournalFormFactory from portality.lib import dates -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary @@ -238,7 +238,7 @@ def submit(cls, background_job): journal_bulk_edit.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = JournalBulkEditBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkEditBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/journal_csv.py b/portality/tasks/journal_csv.py index 198efd5158..0813a1e6e8 100644 --- a/portality/tasks/journal_csv.py +++ b/portality/tasks/journal_csv.py @@ -3,7 +3,7 @@ from portality.bll.doaj import DOAJ from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class JournalCSVBackgroundTask(BackgroundTask): @@ -65,7 +65,7 @@ def submit(cls, background_job): journal_csv.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = JournalCSVBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalCSVBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/journal_in_out_doaj.py b/portality/tasks/journal_in_out_doaj.py index 2fa8bb80d6..868038a98b 100644 --- a/portality/tasks/journal_in_out_doaj.py +++ b/portality/tasks/journal_in_out_doaj.py @@ -3,7 +3,7 @@ from portality.bll import DOAJ from portality.core import app -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.decorators import write_required from portality.background import BackgroundTask, BackgroundApi, BackgroundSummary @@ -143,7 +143,7 @@ def submit(cls, background_job): set_in_doaj.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = SetInDOAJBackgroundTask.create_huey_helper(main_queue) +huey_helper = SetInDOAJBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/monitor_bgjobs.py b/portality/tasks/monitor_bgjobs.py index 2220faf2de..ee4f474d50 100644 --- a/portality/tasks/monitor_bgjobs.py +++ b/portality/tasks/monitor_bgjobs.py @@ -4,7 +4,7 @@ from portality.background import BackgroundTask from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue def get_system_email(): @@ -78,7 +78,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, monitor_bgjobs) -huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(long_running) +huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/old_data_cleanup.py b/portality/tasks/old_data_cleanup.py index 219feffcd5..38a1cb0833 100644 --- a/portality/tasks/old_data_cleanup.py +++ b/portality/tasks/old_data_cleanup.py @@ -8,7 +8,7 @@ from portality.lib.es_queries import ES_DATETIME_FMT from portality.models import Notification, BackgroundJob from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue class RetentionQuery: @@ -83,7 +83,7 @@ def submit(cls, background_job): ) -huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(long_running) +huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/preservation.py b/portality/tasks/preservation.py index c89f2d1b92..68426a41a4 100644 --- a/portality/tasks/preservation.py +++ b/portality/tasks/preservation.py @@ -17,7 +17,7 @@ from portality.models import Account, Article, BackgroundJob, PreservationState from portality.regex import DOI_COMPILED, HTTP_URL_COMPILED from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue class PreservationException(Exception): @@ -425,7 +425,7 @@ def submit(cls, background_job): preserve.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = PreservationBackgroundTask.create_huey_helper(main_queue) +huey_helper = PreservationBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/prune_es_backups.py b/portality/tasks/prune_es_backups.py index 5c08bdfafc..83e498328d 100644 --- a/portality/tasks/prune_es_backups.py +++ b/portality/tasks/prune_es_backups.py @@ -7,7 +7,7 @@ from portality.core import app, es_connection from portality.lib import dates from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue class PruneESBackupsBackgroundTask(BackgroundTask): @@ -72,7 +72,7 @@ def submit(cls, background_job): prune_es_backups.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = PruneESBackupsBackgroundTask.create_huey_helper(long_running) +huey_helper = PruneESBackupsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/public_data_dump.py b/portality/tasks/public_data_dump.py index 1b6d94fd7d..756847589f 100644 --- a/portality/tasks/public_data_dump.py +++ b/portality/tasks/public_data_dump.py @@ -10,7 +10,7 @@ from portality.models import cache from portality.store import StoreFactory from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class PublicDataDumpBackgroundTask(BackgroundTask): @@ -247,7 +247,7 @@ def submit(cls, background_job): public_data_dump.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = PublicDataDumpBackgroundTask.create_huey_helper(long_running) +huey_helper = PublicDataDumpBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/read_news.py b/portality/tasks/read_news.py index 1aadc594f1..0bf83c8099 100644 --- a/portality/tasks/read_news.py +++ b/portality/tasks/read_news.py @@ -4,7 +4,7 @@ from portality.background import BackgroundTask, BackgroundApi from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class FeedError(Exception): @@ -95,7 +95,7 @@ def save_entry(entry): news.save() -huey_helper = ReadNewsBackgroundTask.create_huey_helper(main_queue) +huey_helper = ReadNewsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/redis_huey.py b/portality/tasks/redis_huey.py index cb369e977c..c0b5d70dce 100644 --- a/portality/tasks/redis_huey.py +++ b/portality/tasks/redis_huey.py @@ -2,15 +2,33 @@ from portality.core import app # every-day background jobs that take a few minutes each (like, bulk deletes and anything else requested by the user) +# DEPRECATED main_queue = RedisHuey('doaj_main_queue', host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], immediate=app.config.get("HUEY_IMMEDIATE", False)) # jobs that might take a long time, like the harvester or the anon export, which can run for several hours +# DEPRECATED long_running = RedisHuey('doaj_long_running', host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], immediate=app.config.get("HUEY_IMMEDIATE", False)) + +# short jobs to be run on demand from within the application +events_queue = RedisHuey('doaj_events_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + +# scheduled jobs that can run for several hours each +scheduled_long_queue = RedisHuey('doaj_scheduled_long_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + +# scheduled jobs that will typically run within a few minutes +scheduled_short_queue = RedisHuey('doaj_scheduled_short_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + """ we put everything we want to be responsive onto the main_queue, and anything that would disrupt the main_queue by taking too diff --git a/portality/tasks/reporting.py b/portality/tasks/reporting.py index bcb8c22f3f..e5149e91b4 100644 --- a/portality/tasks/reporting.py +++ b/portality/tasks/reporting.py @@ -11,7 +11,7 @@ from portality.lib import dates from portality.lib.dates import DEFAULT_TIMESTAMP_VAL, FMT_DATE_STD, FMT_DATE_YM, FMT_YEAR, FMT_DATETIME_STD from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def provenance_reports(fr, to, outdir): @@ -411,7 +411,7 @@ def submit(cls, background_job): run_reports.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = ReportingBackgroundTask.create_huey_helper(main_queue) +huey_helper = ReportingBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/request_es_backup.py b/portality/tasks/request_es_backup.py index 1d80f15878..d26214583a 100644 --- a/portality/tasks/request_es_backup.py +++ b/portality/tasks/request_es_backup.py @@ -4,7 +4,7 @@ from portality.background import BackgroundTask, BackgroundApi from portality.core import app, es_connection from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class RequestESBackupBackgroundTask(BackgroundTask): @@ -69,7 +69,7 @@ def submit(cls, background_job): request_es_backup.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = RequestESBackupBackgroundTask.create_huey_helper(main_queue) +huey_helper = RequestESBackupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/sitemap.py b/portality/tasks/sitemap.py index 204af33e65..fe61577205 100644 --- a/portality/tasks/sitemap.py +++ b/portality/tasks/sitemap.py @@ -3,7 +3,7 @@ from portality.bll.doaj import DOAJ from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_long_queue as queue class SitemapBackgroundTask(BackgroundTask): @@ -64,7 +64,7 @@ def submit(cls, background_job): generate_sitemap.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = SitemapBackgroundTask.create_huey_helper(main_queue) +huey_helper = SitemapBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/suggestion_bulk_edit.py b/portality/tasks/suggestion_bulk_edit.py index 0fc2b583ba..91a8a2f2ec 100644 --- a/portality/tasks/suggestion_bulk_edit.py +++ b/portality/tasks/suggestion_bulk_edit.py @@ -9,7 +9,7 @@ from portality.forms.application_forms import ApplicationFormFactory from portality.lib import dates from portality.lib.formulaic import FormulaicException -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''): @@ -202,7 +202,7 @@ def submit(cls, background_job): suggestion_bulk_edit.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) -huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(main_queue) +huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False)