From 49babf68d8f9d6384171f97676df58b653b01613 Mon Sep 17 00:00:00 2001 From: Nicolas Harraudeau Date: Thu, 3 Dec 2015 16:14:33 +0100 Subject: [PATCH] views: access control * NEW Adds customizable access control to record views. Allow configuring different permissions per endpoint. (reference #14) Signed-off-by: Nicolas Harraudeau --- .gitignore | 3 + examples/app.py | 53 +++++++-- invenio_records_rest/ext.py | 61 +++++++++- invenio_records_rest/views.py | 106 +++++++++++++++++- setup.py | 5 +- tests/__init__.py | 25 +++++ tests/conftest.py | 98 +++++++++++++++- tests/permissions.py | 75 +++++++++++++ tests/test_invenio_records_rest.py | 172 +++++++++++++++++++++++++++++ 9 files changed, 576 insertions(+), 22 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/permissions.py diff --git a/.gitignore b/.gitignore index ba746605..59bb2282 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ docs/_build/ # PyBuilder target/ + +# Examples instance directories +examples/*_instance diff --git a/examples/app.py b/examples/app.py index c8762e40..9ccd8ed6 100644 --- a/examples/app.py +++ b/examples/app.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of Invenio. -# Copyright (C) 2015 CERN. +# Copyright (C) 2015, 2016 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -25,19 +25,35 @@ """Minimal Flask application example for development. +For this example the access control is disabled. + Run example development server: .. code-block:: console - $ cd examples - $ flask -a app.py db init - $ flask -a app.py db create - $ flask -a app.py load_fixture - $ flask -a app.py --debug run -""" + $ cd examples + $ flask -a app.py db init + $ flask -a app.py db create + $ flask -a app.py fixture records + $ flask -a app.py --debug run + +Try to get some records + +.. code-block:: console + + $ curl -XGET http://localhost:5000/records/1 + $ curl -XGET http://localhost:5000/records/2 + $ curl -XGET http://localhost:5000/records/3 + $ curl -XGET http://localhost:5000/records/4 + $ curl -XGET http://localhost:5000/records/5 + $ curl -XGET http://localhost:5000/records/6 + $ curl -XGET http://localhost:5000/records/7 +""" # noqa from __future__ import absolute_import, print_function +import os + from flask import Flask from flask_celeryext import FlaskCeleryExt from flask_cli import FlaskCLI @@ -48,13 +64,25 @@ from invenio_records_rest import InvenioRecordsREST + +# create application's instance directory. Needed for this example only. +current_dir = os.path.dirname(os.path.realpath(__file__)) +instance_dir = os.path.join(current_dir, 'app_instance') +if not os.path.exists(instance_dir): + os.makedirs(instance_dir) + # Create Flask application -app = Flask(__name__) +app = Flask(__name__, instance_path=instance_dir) app.config.update( CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_RESULT_BACKEND="cache", + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) FlaskCeleryExt(app) @@ -65,8 +93,13 @@ InvenioRecordsREST(app) -@app.cli.command() -def load_fixture(): +@app.cli.group() +def fixtures(): + """Command for working with test data.""" + + +@fixtures.command() +def records(): """Load test data fixture.""" import uuid from invenio_records.api import Record diff --git a/invenio_records_rest/ext.py b/invenio_records_rest/ext.py index 2c16a986..cf091a1d 100644 --- a/invenio_records_rest/ext.py +++ b/invenio_records_rest/ext.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of Invenio. -# Copyright (C) 2015 CERN. +# Copyright (C) 2015, 2016 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -26,9 +26,66 @@ from __future__ import absolute_import, print_function +from werkzeug.utils import import_string, cached_property + from .views import create_blueprint +class _RecordRESTState(object): + """Record REST state.""" + + def __init__(self, app): + """Initialize state.""" + self.app = app + self._read_permission_factory = None + self._create_permission_factory = None + self._update_permission_factory = None + self._delete_permission_factory = None + + @cached_property + def read_permission_factory(self): + """Load default read permission factory.""" + if self._read_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY" + ] + self._read_permission_factory = import_string(imp) if imp else None + return self._read_permission_factory + + @cached_property + def create_permission_factory(self): + """Load default create permission factory.""" + if self._create_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY" + ] + self._create_permission_factory = import_string(imp) \ + if imp else None + return self._create_permission_factory + + @cached_property + def update_permission_factory(self): + """Load default update permission factory.""" + if self._update_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY" + ] + self._update_permission_factory = import_string(imp) \ + if imp else None + return self._update_permission_factory + + @cached_property + def delete_permission_factory(self): + """Load default delete permission factory.""" + if self._delete_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY" + ] + self._delete_permission_factory = import_string(imp) \ + if imp else None + return self._delete_permission_factory + + class InvenioRecordsREST(object): """Invenio-Records-REST extension.""" @@ -44,7 +101,7 @@ def init_app(self, app): app.register_blueprint( create_blueprint(app.config["RECORDS_REST_ENDPOINTS"]) ) - app.extensions['invenio-records-rest'] = self + app.extensions['invenio-records-rest'] = _RecordRESTState(app) def init_config(self, app): """Initialize configuration.""" diff --git a/invenio_records_rest/views.py b/invenio_records_rest/views.py index b24e950f..6bb4b9a4 100644 --- a/invenio_records_rest/views.py +++ b/invenio_records_rest/views.py @@ -42,10 +42,16 @@ from jsonpatch import JsonPatchException, JsonPointerException from sqlalchemy.exc import SQLAlchemyError from werkzeug.routing import BuildError +from werkzeug.local import LocalProxy +from werkzeug.utils import import_string from .serializers import record_to_json_serializer +current_records_rest = LocalProxy( + lambda: current_app.extensions['invenio-records-rest']) + + def create_blueprint(endpoints): """Create Invenio-Records-REST blueprint.""" blueprint = Blueprint( @@ -62,12 +68,45 @@ def create_blueprint(endpoints): def create_url_rules(endpoint, list_route=None, item_route=None, - pid_type=None, pid_minter=None): - """Create Werkzeug URL rules.""" + pid_type=None, pid_minter=None, + read_permission_factory_imp=None, + create_permission_factory_imp=None, + update_permission_factory_imp=None, + delete_permission_factory_imp=None): + """Create Werkzeug URL rules. + + :param endpoint: Name of endpoint. + :param list_route: record listing URL route . Required. + :param item_route: record URL route (must include ```` pattern). + Required. + :param pid_type: Persistent identifier type for endpoint. Required. + :param template: Template to render. Defaults to + ``invenio_records_ui/detail.html``. + :param read_permission_factory: Import path to factory that creates a read + permission object for a given record. + :param create_permission_factory: Import path to factory that creates a + create permission object for a given record. + :param update_permission_factory: Import path to factory that creates a + update permission object for a given record. + :param delete_permission_factory: Import path to factory that creates a + delete permission object for a given record. + + :returns: a list of dictionaries with can each be passed as keywords + arguments to ``Blueprint.add_url_rule``. + """ assert list_route assert item_route assert pid_type + read_permission_factory = import_string(read_permission_factory_imp) \ + if read_permission_factory_imp else None + create_permission_factory = import_string(create_permission_factory_imp) \ + if create_permission_factory_imp else None + update_permission_factory = import_string(update_permission_factory_imp) \ + if update_permission_factory_imp else None + delete_permission_factory = import_string(delete_permission_factory_imp) \ + if delete_permission_factory_imp else None + resolver = Resolver(pid_type=pid_type, object_type='rec', getter=Record.get_record) @@ -77,10 +116,15 @@ def create_url_rules(endpoint, list_route=None, item_route=None, RecordsListResource.view_name.format(endpoint), resolver=resolver, minter_name=pid_minter, + read_permission_factory=read_permission_factory, + create_permission_factory=create_permission_factory, serializers=serializers) item_view = RecordResource.as_view( RecordResource.view_name.format(endpoint), resolver=resolver, + read_permission_factory=read_permission_factory, + update_permission_factory=update_permission_factory, + delete_permission_factory=delete_permission_factory, serializers=serializers) return [ @@ -129,20 +173,58 @@ def inner(self, pid_value, *args, **kwargs): }) abort(500) - return f(self, pid, record, *args, **kwargs) + return f(self, pid=pid, record=record, *args, **kwargs) return inner +def verify_record_permission(permission_factory, record): + """Check that the current user has the required permissions on record. + + :param permission_factory: permission factory used to check permissions. + :param record: record whose access is limited. + """ + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated: + abort(401) + abort(403) + + +def need_record_permission(factory_name): + """Decorator checking that the user has the required permissions on record. + + :param factory_name: name of the factory to retrieve. + """ + def need_record_permission_builder(f): + @wraps(f) + def need_record_permission_decorator(self, record, *args, **kwargs): + permission_factory = ( + getattr(self, factory_name) or + getattr(current_records_rest, factory_name) + ) + if permission_factory: + verify_record_permission(permission_factory, record) + return f(self, record=record, *args, **kwargs) + return need_record_permission_decorator + return need_record_permission_builder + + class RecordsListResource(ContentNegotiatedMethodView): """Resource for records listing.""" view_name = '{0}_list' - def __init__(self, resolver=None, minter_name=None, **kwargs): + def __init__(self, resolver=None, minter_name=None, + read_permission_factory=None, create_permission_factory=None, + **kwargs): """Constructor.""" super(RecordsListResource, self).__init__(**kwargs) self.resolver = resolver self.minter = current_pidstore.minters[minter_name] + self.read_permission_factory = read_permission_factory + self.create_permission_factory = create_permission_factory def post(self, **kwargs): """Create a record. @@ -165,6 +247,12 @@ def post(self, **kwargs): # Create record record = Record.create(data, id_=record_uuid) + # Check permissions + permission_factory = self.create_permission_factory or \ + current_records_rest.create_permission_factory + if permission_factory: + verify_record_permission(permission_factory, record) + db.session.commit() except SQLAlchemyError: db.session.rollback() @@ -178,15 +266,21 @@ class RecordResource(ContentNegotiatedMethodView): view_name = '{0}_item' - def __init__(self, resolver=None, **kwargs): + def __init__(self, resolver=None, read_permission_factory=None, + update_permission_factory=None, + delete_permission_factory=None, **kwargs): """Constructor. :param resolver: Persistent identifier resolver instance. """ super(RecordResource, self).__init__(**kwargs) self.resolver = resolver + self.read_permission_factory = read_permission_factory + self.update_permission_factory = update_permission_factory + self.delete_permission_factory = delete_permission_factory @pass_record + @need_record_permission('read_permission_factory') def get(self, pid, record, **kwargs): """Get a record. @@ -199,6 +293,7 @@ def get(self, pid, record, **kwargs): @require_content_types('application/json-patch+json') @pass_record + @need_record_permission('update_permission_factory') def patch(self, pid, record, **kwargs): """Modify a record. @@ -226,6 +321,7 @@ def patch(self, pid, record, **kwargs): @require_content_types('application/json') @pass_record + @need_record_permission('update_permission_factory') def put(self, pid, record, **kwargs): """Replace a record. diff --git a/setup.py b/setup.py index ce2d879c..3459e119 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,8 @@ 'check-manifest>=0.25', 'coverage>=4.0', 'invenio-db[all]>=1.0.0a6', + 'invenio-access>=1.0.0a3', + 'invenio-accounts>=1.0.0a6', 'isort>=4.2.2', 'pep257>=0.7.0', 'pytest-cache>=1.0', @@ -60,11 +62,10 @@ ] install_requires = [ - 'Flask>=0.10', 'Flask-CLI>=0.2.1', 'six>=1.10', 'invenio-rest>=1.0.0a3', - 'invenio-records>=1.0.0a4', + 'invenio-records>=1.0.0a7', 'invenio-pidstore>=1.0.0a2', ] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..2ff51c92 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2016 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + +"""Invenio-records-rest tests.""" diff --git a/tests/conftest.py b/tests/conftest.py index d36fe6d1..829e48d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of Invenio. -# Copyright (C) 2015 CERN. +# Copyright (C) 2015, 2016 CERN. # # Invenio is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public License as @@ -30,10 +30,15 @@ import os import shutil import tempfile +from contextlib import contextmanager import pytest -from flask import Flask +from flask import Flask, url_for from flask_cli import FlaskCLI +from flask_menu import Menu +from flask_security.utils import encrypt_password +from invenio_accounts import InvenioAccounts +from invenio_accounts.views import blueprint as accounts_blueprint from invenio_db import InvenioDB, db from invenio_pidstore import InvenioPIDStore from invenio_pidstore.resolver import Resolver @@ -43,8 +48,13 @@ from sqlalchemy_utils.functions import create_database, database_exists, \ drop_database +from invenio_access import InvenioAccess +from invenio_access.models import ActionUsers from invenio_records_rest import InvenioRecordsREST +from .permissions import records_create_all, \ + records_delete_all, records_read_all, records_update_all + @pytest.fixture() def app(request): @@ -56,7 +66,12 @@ def app(request): SERVER_NAME='localhost:5000', SQLALCHEMY_DATABASE_URI=os.environ.get( 'SQLALCHEMY_DATABASE_URI', 'sqlite:///test.db' - ) + ), + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) InvenioDB(app) @@ -83,6 +98,83 @@ def finalize(): return app +@pytest.fixture() +def accounts(app): + app.config.update( + WTF_CSRF_ENABLED=False, + SECRET_KEY='CHANGEME', + SECURITY_PASSWORD_SALT='CHANGEME', + # conftest switches off permission checking, so re-enable it for this + # app. + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY='tests.permissions:create_permission_factory', # noqa + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY='tests.permissions:read_permission_factory', # noqa + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY='tests.permissions:update_permission_factory', # noqa + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY='tests.permissions:delete_permission_factory', # noqa + ) + # FIXME: use OAuth authentication instead of UI authentication + Menu(app) + accounts = InvenioAccounts(app) + app.register_blueprint(accounts_blueprint) + InvenioAccess(app) + return accounts + + +@pytest.yield_fixture +def user_factory(app, accounts): + """Create a user which has all permissions on every records.""" + + password = '123456' + + with app.test_request_context(): + login_url = url_for('security.login') + + @contextmanager + def create_user(name): + """Create a user. + + Should be called in application context. + """ + class UserConfig(object): + def __init__(self, name): + self.email = '{}@invenio-software.org'.format(name) + self.user = accounts.datastore.create_user( + email=self.email, + password=encrypt_password(password), + active=True, + ) + + def login_function(self): + def login(client): + res = client.post(login_url, data={ + 'email': self.email, 'password': password}) + assert res.status_code == 302 + return login + + def create_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_create_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def read_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_read_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def update_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_update_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def delete_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_delete_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + yield UserConfig(name) + + yield create_user + + @pytest.fixture(scope='session') def resolver(): """Create a persistent identifier resolver.""" diff --git a/tests/permissions.py b/tests/permissions.py new file mode 100644 index 00000000..05a6cff3 --- /dev/null +++ b/tests/permissions.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2016 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + +"""Example Permissions for records.""" + +from functools import partial + +from invenio_access.permissions import DynamicPermission, \ + ParameterizedActionNeed + + +RecordReadActionNeed = partial(ParameterizedActionNeed, 'records-read') +"""Action need for reading a record.""" + +records_read_all = RecordReadActionNeed(None) +"""Read all records action need.""" + +RecordCreateActionNeed = partial(ParameterizedActionNeed, 'records-create') +"""Action need for creating a record.""" + +records_create_all = RecordCreateActionNeed(None) +"""Create all records action need.""" + +RecordUpdateActionNeed = partial(ParameterizedActionNeed, 'records-update') +"""Action need for updating a record.""" + +records_update_all = RecordUpdateActionNeed(None) +"""Update all records action need.""" + +RecordDeleteActionNeed = partial(ParameterizedActionNeed, 'records-delete') +"""Action need for deleting a record.""" + +records_delete_all = RecordDeleteActionNeed(None) +"""Delete all records action need.""" + + +def read_permission_factory(record): + """Factory for creating read permissions for records.""" + return DynamicPermission(RecordReadActionNeed(str(record.id))) + + +def create_permission_factory(record): + """Factory for creating create permissions for records.""" + return DynamicPermission(RecordCreateActionNeed(str(record.id))) + + +def update_permission_factory(record): + """Factory for creating update permissions for records.""" + return DynamicPermission(RecordUpdateActionNeed(str(record.id))) + + +def delete_permission_factory(record): + """Factory for creating delete permissions for records.""" + return DynamicPermission(RecordDeleteActionNeed(str(record.id))) diff --git a/tests/test_invenio_records_rest.py b/tests/test_invenio_records_rest.py index 1efb1fa1..46bff930 100644 --- a/tests/test_invenio_records_rest.py +++ b/tests/test_invenio_records_rest.py @@ -361,6 +361,178 @@ def test_invalid_put(app): assert res.status_code == 400 +def test_create_permissions(app, user_factory, resolver): + with app.app_context(): + # create users + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to create records + allowed_user.create_access(True) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to create records + forbidden_user.create_access(False) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test create without being authenticated + with app.test_client() as client: + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 401 + # test not allowed create + with app.test_client() as client: + forbidden_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 403 + # test allowed create + with app.test_client() as client: + allowed_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 201 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_read_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to read the record + allowed_user.read_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to read the record + forbidden_user.read_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_patch_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json-patch+json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + test = copy.deepcopy(test_data_patched) + test['control_number'] = 1 + assert response_data['metadata'] == test + + +def test_put_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + assert response_data['metadata'] == test_data_patched + + def subtest_self_link(response_data, response_headers, client): """Check that the returned self link returns the same data.