Skip to content

Commit

Permalink
indexing: index groups and users in OpenSearch
Browse files Browse the repository at this point in the history
 * fixed tests
 * use bulk indexing instead of iterative single indexing
 * closes #76
  • Loading branch information
TLGINO committed Jun 27, 2023
1 parent 81b226d commit 0a8a9cc
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 32 deletions.
24 changes: 14 additions & 10 deletions invenio_users_resources/records/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from invenio_accounts.models import Role, User
from invenio_accounts.proxies import current_db_change_history

from ..services.groups.tasks import reindex_group, unindex_group
from ..services.users.tasks import reindex_user, unindex_user
from ..proxies import current_groups_service, current_users_service
from ..services.groups.tasks import reindex_groups, unindex_groups
from ..services.users.tasks import reindex_users, unindex_users


def pre_commit(sender, session):
Expand Down Expand Up @@ -50,15 +51,18 @@ def post_commit(sender, session):
# DB operations are allowed here, not even lazy-loading of
# properties!
sid = id(session)

if current_db_change_history.sessions.get(sid):
for user_id in current_db_change_history.sessions[sid].updated_users:
reindex_user.delay(user_id)
# Handle updates
current_session = list(current_db_change_history.sessions[sid].updated_users)
current_users_service.indexer.bulk_index(current_session)

for role_id in current_db_change_history.sessions[sid].updated_roles:
reindex_group.delay(role_id)
role_ids_updated = list(current_db_change_history.sessions[sid].updated_roles)
current_groups_service.indexer.bulk_index(role_ids_updated)

for user_id in current_db_change_history.sessions[sid].deleted_users:
unindex_user.delay(user_id)
# Handle deletes
user_ids_deleted = list(current_db_change_history.sessions[sid].deleted_users)
current_users_service.indexer.bulk_delete(user_ids_deleted)

for role_id in current_db_change_history.sessions[sid].deleted_roles:
unindex_group.delay(role_id)
role_ids_deleted = list(current_db_change_history.sessions[sid].deleted_roles)
current_groups_service.indexer.bulk_delete(role_ids_deleted)
14 changes: 6 additions & 8 deletions invenio_users_resources/services/groups/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@


@shared_task(ignore_result=True)
def reindex_group(role_id):
def reindex_groups(role_ids):
"""Reindex the given user."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.index(group_agg)
current_groups_service.indexer.bulk_index(role_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex group {role_id}: {e}")
current_app.logger.warn(f"Could not bulk-reindex groups: {e}")


@shared_task(ignore_result=True)
def unindex_group(role_id):
def unindex_groups(role_ids):
"""Unindex the given role/group."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.delete(group_agg)
current_groups_service.indexer.bulk_delete(role_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex group {role_id}: {e}")
current_app.logger.warn(f"Could not bulk-unindex groups: {e}")
28 changes: 19 additions & 9 deletions invenio_users_resources/services/users/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,38 @@


@shared_task(ignore_result=True)
def reindex_user(user_id):
def reindex_users(user_ids):
"""Reindex the given user."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.index(user_agg)
user_agg = {
user_id: UserAggregate.get_record(user_id) for user_id in user_ids
}
current_users_service.indexer.bulk_index(user_ids)

# trigger reindexing of related records
send_change_notifications(
"users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)]
"users",
[
(
user_agg[user_id].id,
str(user_agg[user_id].id),
user_agg[user_id].revision_id,
)
for user_id in user_ids
],
)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex user {user_id}: {e}")
current_app.logger.warn(f"Could not bulk-reindex users: {e}")


@shared_task(ignore_result=True)
def unindex_user(user_id):
def unindex_users(user_ids):
"""Delete the given user from the index."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.delete(user_agg)
current_users_service.indexer.bulk_delete(user_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex user {user_id}: {e}")
current_app.logger.warn(f"Could not bulk-unindex users: {e}")
12 changes: 7 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
current_groups_service,
current_users_service,
)
from invenio_users_resources.records import GroupAggregate, UserAggregate
from invenio_users_resources.records import GroupAggregate
from invenio_users_resources.services.schemas import (
NotificationPreferences,
UserPreferencesSchema,
Expand Down Expand Up @@ -209,7 +209,8 @@ def users(UserFixture, app, database, users_data):
)
u.create(app, database)
users[obj["username"]] = u
UserAggregate.index.refresh()
current_users_service.indexer.process_bulk_queue()
current_users_service.record_cls.index.refresh()
return users


Expand All @@ -234,7 +235,7 @@ def group(database):
database=database,
)

GroupAggregate.index.refresh()
# GroupAggregate.index.refresh()
return r


Expand All @@ -249,7 +250,7 @@ def group2(database):
database=database,
)

GroupAggregate.index.refresh()
# GroupAggregate.index.refresh()
return r


Expand All @@ -258,7 +259,8 @@ def groups(database, group, group2):
"""A single group."""
roles = [group, group2]

GroupAggregate.index.refresh()
current_groups_service.indexer.process_bulk_queue()
current_groups_service.record_cls.index.refresh()
return roles


Expand Down

0 comments on commit 0a8a9cc

Please sign in to comment.