From 0e3a405332a8bcb7984c339a7d74e78312d7c9d5 Mon Sep 17 00:00:00 2001 From: Martin Lettry Date: Sat, 6 May 2023 12:00:47 +0200 Subject: [PATCH] wip: index new groups in OpenSearch * changed number of workers per celery task based on task type * closes https://github.com/inveniosoftware/invenio-app-rdm/issues/2186 --- invenio_users_resources/records/hooks.py | 21 ++++------ .../services/groups/tasks.py | 34 ++++++++------- .../services/users/tasks.py | 42 ++++++++++--------- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/invenio_users_resources/records/hooks.py b/invenio_users_resources/records/hooks.py index c45af91..21aede0 100644 --- a/invenio_users_resources/records/hooks.py +++ b/invenio_users_resources/records/hooks.py @@ -50,18 +50,15 @@ def post_commit(sender, session): # DB operations are allowed here, not even lazy-loading of # properties! sid = id(session) - if current_db_change_history.sessions.get(sid): - for user_id in current_db_change_history.sessions[sid].updated_users: - reindex_user.delay(user_id) - for role_id in current_db_change_history.sessions[sid].updated_roles: - reindex_group.delay(role_id) + reindex_user_roles = current_db_change_history.updated_users[sid] + reindex_group_roles = current_db_change_history.updated_roles[sid] + unindex_user_roles = current_db_change_history.deleted_users[sid] + unindex_group_roles = current_db_change_history.deleted_roles[sid] - for user_id in current_db_change_history.sessions[sid].deleted_users: - unindex_user.delay(user_id) + reindex_user.delay(reindex_user_roles) + reindex_group.delay(reindex_group_roles) + unindex_user.delay(unindex_user_roles) + unindex_group.delay(unindex_group_roles) - for role_id in current_db_change_history.sessions[sid].deleted_roles: - unindex_group.delay(role_id) - - current_db_change_history.sessions[sid].indexed = True - current_db_change_history.clear_session(sid) + current_db_change_history._clear_dirty_sets(session) diff --git a/invenio_users_resources/services/groups/tasks.py b/invenio_users_resources/services/groups/tasks.py index 78b379b..c705551 100644 --- a/invenio_users_resources/services/groups/tasks.py +++ b/invenio_users_resources/services/groups/tasks.py @@ -18,24 +18,26 @@ @shared_task(ignore_result=True) -def reindex_group(role_id): +def reindex_group(role_ids): """Reindex the given user.""" - index = current_groups_service.record_cls.index - if current_groups_service.indexer.exists(index): - try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.index(group_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex group {role_id}: {e}") + for role_id in role_ids: + index = current_groups_service.record_cls.index + if current_groups_service.indexer.exists(index): + try: + group_agg = GroupAggregate.get_record(role_id) + current_groups_service.indexer.index(group_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not reindex group {role_id}: {e}") @shared_task(ignore_result=True) -def unindex_group(role_id): +def unindex_group(role_ids): """Unindex the given role/group.""" - index = current_groups_service.record_cls.index - if current_groups_service.indexer.exists(index): - try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.delete(group_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex group {role_id}: {e}") + for role_id in role_ids: + index = current_groups_service.record_cls.index + if current_groups_service.indexer.exists(index): + try: + group_agg = GroupAggregate.get_record(role_id) + current_groups_service.indexer.delete(group_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not unindex group {role_id}: {e}") diff --git a/invenio_users_resources/services/users/tasks.py b/invenio_users_resources/services/users/tasks.py index a6be922..1e976bc 100644 --- a/invenio_users_resources/services/users/tasks.py +++ b/invenio_users_resources/services/users/tasks.py @@ -19,28 +19,30 @@ @shared_task(ignore_result=True) -def reindex_user(user_id): +def reindex_user(user_ids): """Reindex the given user.""" - index = current_users_service.record_cls.index - if current_users_service.indexer.exists(index): - try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.index(user_agg) - # trigger reindexing of related records - send_change_notifications( - "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] - ) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex user {user_id}: {e}") + for user_id in user_ids: + index = current_users_service.record_cls.index + if current_users_service.indexer.exists(index): + try: + user_agg = UserAggregate.get_record(user_id) + current_users_service.indexer.index(user_agg) + # trigger reindexing of related records + send_change_notifications( + "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] + ) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not reindex user {user_id}: {e}") @shared_task(ignore_result=True) -def unindex_user(user_id): +def unindex_user(user_ids): """Delete the given user from the index.""" - index = current_users_service.record_cls.index - if current_users_service.indexer.exists(index): - try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.delete(user_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex user {user_id}: {e}") + for user_id in user_ids: + index = current_users_service.record_cls.index + if current_users_service.indexer.exists(index): + try: + user_agg = UserAggregate.get_record(user_id) + current_users_service.indexer.delete(user_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not unindex user {user_id}: {e}")