diff --git a/invenio_users_resources/records/hooks.py b/invenio_users_resources/records/hooks.py index bcfd20a..f162f6e 100644 --- a/invenio_users_resources/records/hooks.py +++ b/invenio_users_resources/records/hooks.py @@ -12,8 +12,9 @@ from invenio_accounts.models import Role, User from invenio_accounts.proxies import current_db_change_history -from ..services.groups.tasks import reindex_group, unindex_group -from ..services.users.tasks import reindex_user, unindex_user +from ..proxies import current_groups_service, current_users_service +from ..services.groups.tasks import reindex_groups, unindex_groups +from ..services.users.tasks import reindex_users, unindex_users def pre_commit(sender, session): @@ -50,15 +51,18 @@ def post_commit(sender, session): # DB operations are allowed here, not even lazy-loading of # properties! sid = id(session) + if current_db_change_history.sessions.get(sid): - for user_id in current_db_change_history.sessions[sid].updated_users: - reindex_user.delay(user_id) + # Handle updates + current_session = list(current_db_change_history.sessions[sid].updated_users) + current_users_service.indexer.bulk_index(current_session) - for role_id in current_db_change_history.sessions[sid].updated_roles: - reindex_group.delay(role_id) + role_ids_updated = list(current_db_change_history.sessions[sid].updated_roles) + current_groups_service.indexer.bulk_index(role_ids_updated) - for user_id in current_db_change_history.sessions[sid].deleted_users: - unindex_user.delay(user_id) + # Handle deletes + user_ids_deleted = list(current_db_change_history.sessions[sid].deleted_users) + current_users_service.indexer.bulk_delete(user_ids_deleted) - for role_id in current_db_change_history.sessions[sid].deleted_roles: - unindex_group.delay(role_id) + role_ids_deleted = list(current_db_change_history.sessions[sid].deleted_roles) + current_groups_service.indexer.bulk_delete(role_ids_deleted) diff --git a/invenio_users_resources/services/groups/tasks.py b/invenio_users_resources/services/groups/tasks.py index 78b379b..797d797 100644 --- a/invenio_users_resources/services/groups/tasks.py +++ b/invenio_users_resources/services/groups/tasks.py @@ -18,24 +18,22 @@ @shared_task(ignore_result=True) -def reindex_group(role_id): +def reindex_groups(role_ids): """Reindex the given user.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.index(group_agg) + current_groups_service.indexer.bulk_index(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex groups: {e}") @shared_task(ignore_result=True) -def unindex_group(role_id): +def unindex_groups(role_ids): """Unindex the given role/group.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.delete(group_agg) + current_groups_service.indexer.bulk_delete(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex groups: {e}") diff --git a/invenio_users_resources/services/users/tasks.py b/invenio_users_resources/services/users/tasks.py index a6be922..324911f 100644 --- a/invenio_users_resources/services/users/tasks.py +++ b/invenio_users_resources/services/users/tasks.py @@ -19,28 +19,38 @@ @shared_task(ignore_result=True) -def reindex_user(user_id): +def reindex_users(user_ids): """Reindex the given user.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.index(user_agg) + user_agg = { + user_id: UserAggregate.get_record(user_id) for user_id in user_ids + } + current_users_service.indexer.bulk_index(user_ids) + # trigger reindexing of related records send_change_notifications( - "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] + "users", + [ + ( + user_agg[user_id].id, + str(user_agg[user_id].id), + user_agg[user_id].revision_id, + ) + for user_id in user_ids + ], ) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex users: {e}") @shared_task(ignore_result=True) -def unindex_user(user_id): +def unindex_users(user_ids): """Delete the given user from the index.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.delete(user_agg) + current_users_service.indexer.bulk_delete(user_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex users: {e}") diff --git a/tests/conftest.py b/tests/conftest.py index f01eeca..d636efb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,7 +26,7 @@ current_groups_service, current_users_service, ) -from invenio_users_resources.records import GroupAggregate, UserAggregate +from invenio_users_resources.records import GroupAggregate from invenio_users_resources.services.schemas import ( NotificationPreferences, UserPreferencesSchema, @@ -209,7 +209,8 @@ def users(UserFixture, app, database, users_data): ) u.create(app, database) users[obj["username"]] = u - UserAggregate.index.refresh() + current_users_service.indexer.process_bulk_queue() + current_users_service.record_cls.index.refresh() return users @@ -234,7 +235,7 @@ def group(database): database=database, ) - GroupAggregate.index.refresh() + # GroupAggregate.index.refresh() return r @@ -249,7 +250,7 @@ def group2(database): database=database, ) - GroupAggregate.index.refresh() + # GroupAggregate.index.refresh() return r @@ -258,7 +259,8 @@ def groups(database, group, group2): """A single group.""" roles = [group, group2] - GroupAggregate.index.refresh() + current_groups_service.indexer.process_bulk_queue() + current_groups_service.record_cls.index.refresh() return roles