Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: index new groups in OpenSearch #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions invenio_users_resources/records/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from invenio_accounts.models import Role, User
from invenio_accounts.proxies import current_db_change_history

from ..services.groups.tasks import reindex_group, unindex_group
from ..services.users.tasks import reindex_user, unindex_user
from ..proxies import current_groups_service, current_users_service
from ..services.groups.tasks import reindex_groups, unindex_groups
from ..services.users.tasks import reindex_users, unindex_users


def pre_commit(sender, session):
Expand Down Expand Up @@ -50,15 +51,18 @@ def post_commit(sender, session):
# DB operations are allowed here, not even lazy-loading of
# properties!
sid = id(session)

if current_db_change_history.sessions.get(sid):
for user_id in current_db_change_history.sessions[sid].updated_users:
reindex_user.delay(user_id)
# Handle updates
current_session = list(current_db_change_history.sessions[sid].updated_users)
current_users_service.indexer.bulk_index(current_session)

for role_id in current_db_change_history.sessions[sid].updated_roles:
reindex_group.delay(role_id)
group_ids_updated = list(current_db_change_history.sessions[sid].updated_roles)
current_groups_service.indexer.bulk_index(group_ids_updated)

for user_id in current_db_change_history.sessions[sid].deleted_users:
unindex_user.delay(user_id)
# Handle deletes
user_ids_deleted = list(current_db_change_history.sessions[sid].deleted_users)
current_users_service.indexer.bulk_delete(user_ids_deleted)

for role_id in current_db_change_history.sessions[sid].deleted_roles:
unindex_group.delay(role_id)
group_ids_deleted = list(current_db_change_history.sessions[sid].deleted_roles)
current_groups_service.indexer.bulk_delete(group_ids_deleted)
18 changes: 8 additions & 10 deletions invenio_users_resources/services/groups/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@


@shared_task(ignore_result=True)
def reindex_group(role_id):
"""Reindex the given user."""
def reindex_groups(group_ids):
"""Reindex the given groups."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.index(group_agg)
current_groups_service.indexer.bulk_index(group_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex group {role_id}: {e}")
current_app.logger.warn(f"Could not bulk-reindex groups: {e}")


@shared_task(ignore_result=True)
def unindex_group(role_id):
"""Unindex the given role/group."""
def unindex_groups(group_ids):
"""Unindex the given groups."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.delete(group_agg)
current_groups_service.indexer.bulk_delete(group_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex group {role_id}: {e}")
current_app.logger.warn(f"Could not bulk-unindex groups: {e}")
28 changes: 19 additions & 9 deletions invenio_users_resources/services/users/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,38 @@


@shared_task(ignore_result=True)
def reindex_user(user_id):
def reindex_users(user_ids):
"""Reindex the given user."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.index(user_agg)
user_agg = {
user_id: UserAggregate.get_record(user_id) for user_id in user_ids
}
current_users_service.indexer.bulk_index(user_ids)

# trigger reindexing of related records
send_change_notifications(
TLGINO marked this conversation as resolved.
Show resolved Hide resolved
"users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)]
"users",
[
(
user_agg[user_id].id,
str(user_agg[user_id].id),
user_agg[user_id].revision_id,
)
for user_id in user_ids
],
Comment on lines +35 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this might end up sending very large lists/messages to a Celery task (which might be problematic depending on some message size limits on RabbitMQ). Do we have an estimate of how many user_ids we might be getting?

In any case, maybe the solution is to loop over user_ids and send things one-by-one.

)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex user {user_id}: {e}")
current_app.logger.warn(f"Could not bulk-reindex users: {e}")


@shared_task(ignore_result=True)
def unindex_user(user_id):
def unindex_users(user_ids):
"""Delete the given user from the index."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.delete(user_agg)
current_users_service.indexer.bulk_delete(user_ids)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex user {user_id}: {e}")
current_app.logger.warn(f"Could not bulk-unindex users: {e}")
12 changes: 5 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
current_groups_service,
current_users_service,
)
from invenio_users_resources.records import GroupAggregate, UserAggregate
from invenio_users_resources.records import GroupAggregate
from invenio_users_resources.services.schemas import (
NotificationPreferences,
UserPreferencesSchema,
Expand Down Expand Up @@ -209,7 +209,8 @@ def users(UserFixture, app, database, users_data):
)
u.create(app, database)
users[obj["username"]] = u
UserAggregate.index.refresh()
current_users_service.indexer.process_bulk_queue()
current_users_service.record_cls.index.refresh()
return users


Expand All @@ -233,8 +234,6 @@ def group(database):
is_managed=True,
database=database,
)

GroupAggregate.index.refresh()
return r


Expand All @@ -248,8 +247,6 @@ def group2(database):
is_managed=True,
database=database,
)

GroupAggregate.index.refresh()
return r


Expand All @@ -258,7 +255,8 @@ def groups(database, group, group2):
"""A single group."""
roles = [group, group2]

GroupAggregate.index.refresh()
current_groups_service.indexer.process_bulk_queue()
current_groups_service.record_cls.index.refresh()
return roles


Expand Down