From 430e5fcedc6be57ffac4649c84ae822c3dba04a8 Mon Sep 17 00:00:00 2001 From: Lars Holm Nielsen Date: Tue, 17 Oct 2023 22:37:55 +0200 Subject: [PATCH] resources: add support for user impersonation --- .../resources/users/config.py | 1 + .../resources/users/resource.py | 21 +++++++++++---- .../services/permissions.py | 1 + .../services/users/service.py | 9 +++++++ tests/resources/test_resources_users.py | 26 +++++++++++++++++++ 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/invenio_users_resources/resources/users/config.py b/invenio_users_resources/resources/users/config.py index d28fbeb..1356f60 100644 --- a/invenio_users_resources/resources/users/config.py +++ b/invenio_users_resources/resources/users/config.py @@ -42,6 +42,7 @@ class UsersResourceConfig(RecordResourceConfig): "block": "//block", "restore": "//restore", "deactivate": "//deactivate", + "impersonate": "//impersonate", } request_view_args = { diff --git a/invenio_users_resources/resources/users/resource.py b/invenio_users_resources/resources/users/resource.py index 2052896..cb491a6 100644 --- a/invenio_users_resources/resources/users/resource.py +++ b/invenio_users_resources/resources/users/resource.py @@ -10,9 +10,9 @@ """Users resource.""" - from flask import g, send_file from flask_resources import resource_requestctx, response_handler, route +from flask_security import impersonate_user from invenio_records_resources.resources import RecordResource from invenio_records_resources.resources.records.resource import ( request_search_args, @@ -42,6 +42,7 @@ def create_url_rules(self): route("POST", routes["block"], self.block), route("POST", routes["restore"], self.restore), route("POST", routes["deactivate"], self.deactivate), + route("POST", routes["impersonate"], self.impersonate), route("GET", routes["moderation_search"], self.search_all), ] @@ -98,7 +99,7 @@ def avatar(self): @request_view_args def approve(self): - """Read a user.""" + """Approve user.""" self.service.approve( id_=resource_requestctx.view_args["id"], identity=g.identity, @@ -107,7 +108,7 @@ def approve(self): @request_view_args def block(self): - """Read a user.""" + """Block user.""" self.service.block( id_=resource_requestctx.view_args["id"], identity=g.identity, @@ -116,7 +117,7 @@ def block(self): @request_view_args def restore(self): - """Read a user.""" + """Restore user.""" self.service.restore( id_=resource_requestctx.view_args["id"], identity=g.identity, @@ -125,9 +126,19 @@ def restore(self): @request_view_args def deactivate(self): - """Read a user.""" + """Deactive user.""" self.service.deactivate( id_=resource_requestctx.view_args["id"], identity=g.identity, ) return "", 200 + + @request_view_args + def impersonate(self): + """Impersonate the user.""" + user = self.service.can_impersonate( + g.identity, resource_requestctx.view_args["id"] + ) + if user: + impersonate_user(user, g.identity) + return "", 200 diff --git a/invenio_users_resources/services/permissions.py b/invenio_users_resources/services/permissions.py index 49abdb9..a1eac30 100644 --- a/invenio_users_resources/services/permissions.py +++ b/invenio_users_resources/services/permissions.py @@ -47,6 +47,7 @@ class UsersPermissionPolicy(BasePermissionPolicy): can_manage = [UserManager, SystemProcess()] can_search_all = [UserManager, SystemProcess()] can_read_system_details = [UserManager, SystemProcess()] + can_impersonate = [UserManager, SystemProcess()] class GroupsPermissionPolicy(BasePermissionPolicy): diff --git a/invenio_users_resources/services/users/service.py b/invenio_users_resources/services/users/service.py index 39fe8ea..3ec22c8 100644 --- a/invenio_users_resources/services/users/service.py +++ b/invenio_users_resources/services/users/service.py @@ -241,3 +241,12 @@ def deactivate(self, identity, id_, uow=None): uow.register(RecordIndexOp(user, indexer=self.indexer, index_refresh=True)) return True + + def can_impersonate(self, identity, id_): + """Check permissions if a user can be impersonated.""" + user = UserAggregate.get_record(id_) + if user is None: + # return 403 even on empty resource due to security implications + raise PermissionDeniedError() + self.require_permission(identity, "impersonate", record=user) + return user.model.model_obj diff --git a/tests/resources/test_resources_users.py b/tests/resources/test_resources_users.py index 259f41b..2d79d37 100644 --- a/tests/resources/test_resources_users.py +++ b/tests/resources/test_resources_users.py @@ -143,6 +143,32 @@ def test_management_permissions(client, headers, user_pub, db): assert res.status_code == 403 +def test_impersonate_user(client, headers, user_pub, user_moderator, db): + """Tests user impersonation endpoint.""" + client = user_moderator.login(client) + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 200 + assert res.json["is_current_user"] == True + res = client.get(f"/users/{user_pub.id}") + assert res.status_code == 200 + assert res.json["is_current_user"] == False + + res = client.post(f"/users/{user_pub.id}/impersonate", headers=headers) + assert res.status_code == 200 + + res = client.get(f"/users/{user_pub.id}") + assert res.status_code == 200 + assert res.json["is_current_user"] == True + + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 403 + + res = client.post(f"/users/{user_moderator.id}/impersonate", headers=headers) + assert res.status_code == 403 + res = client.post(f"/users/{user_pub.id}/impersonate", headers=headers) + assert res.status_code == 403 + + # TODO: test conditional requests # TODO: test caching headers # TODO: test invalid identifiers