From 15c864e68207477a49ab0c5c74dc66582cf1ba79 Mon Sep 17 00:00:00 2001 From: Martin Lettry Date: Sat, 6 May 2023 12:00:47 +0200 Subject: [PATCH] wip: index new groups in OpenSearch * changed number of workers per celery task based on task type * closes https://github.com/inveniosoftware/invenio-users-resources/issues/76 --- invenio_users_resources/records/hooks.py | 20 ++++++++-------- .../services/groups/tasks.py | 14 +++++------ .../services/users/tasks.py | 24 ++++++++++++------- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/invenio_users_resources/records/hooks.py b/invenio_users_resources/records/hooks.py index c45af91..27731ff 100644 --- a/invenio_users_resources/records/hooks.py +++ b/invenio_users_resources/records/hooks.py @@ -50,18 +50,18 @@ def post_commit(sender, session): # DB operations are allowed here, not even lazy-loading of # properties! sid = id(session) + if current_db_change_history.sessions.get(sid): - for user_id in current_db_change_history.sessions[sid].updated_users: - reindex_user.delay(user_id) + user_ids_updated = current_db_change_history.sessions[sid].updated_users + reindex_user.delay(user_ids_updated) - for role_id in current_db_change_history.sessions[sid].updated_roles: - reindex_group.delay(role_id) + role_ids_updated = current_db_change_history.sessions[sid].updated_roles + reindex_group.delay(role_ids_updated) - for user_id in current_db_change_history.sessions[sid].deleted_users: - unindex_user.delay(user_id) + # ------------------- - for role_id in current_db_change_history.sessions[sid].deleted_roles: - unindex_group.delay(role_id) + user_ids_deleted = current_db_change_history.sessions[sid].deleted_users + unindex_user.delay(user_ids_deleted) - current_db_change_history.sessions[sid].indexed = True - current_db_change_history.clear_session(sid) + role_ids_deleted = current_db_change_history.sessions[sid].deleted_roles + unindex_group.delay(role_ids_deleted) diff --git a/invenio_users_resources/services/groups/tasks.py b/invenio_users_resources/services/groups/tasks.py index 78b379b..f2527cc 100644 --- a/invenio_users_resources/services/groups/tasks.py +++ b/invenio_users_resources/services/groups/tasks.py @@ -18,24 +18,22 @@ @shared_task(ignore_result=True) -def reindex_group(role_id): +def reindex_group(role_ids): """Reindex the given user.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.index(group_agg) + current_groups_service.indexer.bulk_index(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex groups: {e}") @shared_task(ignore_result=True) -def unindex_group(role_id): +def unindex_group(role_ids): """Unindex the given role/group.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.delete(group_agg) + current_groups_service.indexer.bulk_delete(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex groups: {e}") diff --git a/invenio_users_resources/services/users/tasks.py b/invenio_users_resources/services/users/tasks.py index a6be922..4608bf7 100644 --- a/invenio_users_resources/services/users/tasks.py +++ b/invenio_users_resources/services/users/tasks.py @@ -19,28 +19,34 @@ @shared_task(ignore_result=True) -def reindex_user(user_id): +def reindex_user(user_ids): """Reindex the given user.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.index(user_agg) + user_agg = { + user_id: UserAggregate.get_record(user_id) for user_id in user_ids + } + current_users_service.indexer.bulk_index(user_ids) + # trigger reindexing of related records send_change_notifications( - "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] + "users", + [ + (user_id, str(user_id), user_agg[user_id].revision_id) + for user_id in user_ids + ], ) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex users: {e}") @shared_task(ignore_result=True) -def unindex_user(user_id): +def unindex_user(user_ids): """Delete the given user from the index.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.delete(user_agg) + current_users_service.indexer.bulk_delete(user_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex users: {e}")