diff --git a/.gitignore b/.gitignore index ba746605..59ddf3a0 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ docs/_build/ # PyBuilder target/ + +# Sqlite databases +*.db diff --git a/examples/app.py b/examples/app.py index c8762e40..c84837b9 100644 --- a/examples/app.py +++ b/examples/app.py @@ -25,19 +25,35 @@ """Minimal Flask application example for development. +For this example the access control is disabled. + Run example development server: .. code-block:: console - $ cd examples - $ flask -a app.py db init - $ flask -a app.py db create - $ flask -a app.py load_fixture - $ flask -a app.py --debug run -""" + $ cd examples + $ flask -a app.py db init + $ flask -a app.py db create + $ flask -a app.py fixture records + $ flask -a app.py --debug run + +Try to get some records + +.. code-block:: console + + $ curl -XGET http://localhost:5000/records/1 + $ curl -XGET http://localhost:5000/records/2 + $ curl -XGET http://localhost:5000/records/3 + $ curl -XGET http://localhost:5000/records/4 + $ curl -XGET http://localhost:5000/records/5 + $ curl -XGET http://localhost:5000/records/6 + $ curl -XGET http://localhost:5000/records/7 +""" # noqa from __future__ import absolute_import, print_function +import os + from flask import Flask from flask_celeryext import FlaskCeleryExt from flask_cli import FlaskCLI @@ -48,13 +64,25 @@ from invenio_records_rest import InvenioRecordsREST + +# create application's instance directory. Needed for this example only. +current_dir = os.path.dirname(os.path.realpath(__file__)) +instance_dir = os.path.join(current_dir, 'app') +if not os.path.exists(instance_dir): + os.makedirs(instance_dir) + # Create Flask application -app = Flask(__name__) +app = Flask(__name__, instance_path=instance_dir) app.config.update( CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_RESULT_BACKEND="cache", + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) FlaskCeleryExt(app) @@ -65,8 +93,13 @@ InvenioRecordsREST(app) -@app.cli.command() -def load_fixture(): +@app.cli.group() +def fixtures(): + """Command for working with test data.""" + + +@fixtures.command() +def records(): """Load test data fixture.""" import uuid from invenio_records.api import Record diff --git a/examples/permsapp.py b/examples/permsapp.py new file mode 100644 index 00000000..bb605d60 --- /dev/null +++ b/examples/permsapp.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + + +"""Minimal Flask application example with access control enabled. + +Run example development server: + +.. code-block:: console + $ cd examples + $ flask -a permsapp.py db init + $ flask -a permsapp.py db create + $ flask -a permsapp.py fixtures records + $ flask -a permsapp.py fixtures access + $ flask -a permsapp.py --debug run + + +Try to get record 1: + +.. code-block:: console + $ curl -XGET http://localhost:5000/records/1 + +Login as admin@invenio-software.org (password 123456) and get record 1: + +.. code-block:: console + $ curl -XPOST http://localhost:5000/login/ -d 'email=admin%40invenio-software.org&password=123456' -c mycookie + $ curl -XGET http://localhost:5000/records/1 -b mycookie + +Login as forbidden@invenio-software.org (password 123456), who has not +the permission to read record 1, and try to get record 1: + +.. code-block:: console + $ curl -XPOST http://localhost:5000/login/ -d 'email=forbidden%40invenio-software.org&password=123456' -c mycookie2 + $ curl -XGET http://localhost:5000/records/1 -b mycookie2 +""" # noqa + +from __future__ import absolute_import, print_function + +import os + +from flask import Flask +from flask_celeryext import FlaskCeleryExt +from flask_cli import FlaskCLI +from flask_menu import Menu +from flask_security.utils import encrypt_password +from invenio_accounts import InvenioAccounts +from invenio_accounts.views import blueprint +from invenio_db import InvenioDB, db +from invenio_pidstore import InvenioPIDStore +from invenio_records import InvenioRecords +from invenio_records.permissions import records_create_all, \ + records_delete_all, records_read_all, records_update_all +from invenio_rest import InvenioREST + +from invenio_access import InvenioAccess +from invenio_access.models import ActionUsers +from invenio_records_rest import InvenioRecordsREST + + +# create application's instance directory. Needed for this example only. +current_dir = os.path.dirname(os.path.realpath(__file__)) +instance_dir = os.path.join(current_dir, 'permsapp') +if not os.path.exists(instance_dir): + os.makedirs(instance_dir) + +# Create Flask application +app = Flask(__name__, instance_path=instance_dir) +app.config.update( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND="memory", + CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + CELERY_RESULT_BACKEND="cache", + # Install Principal and Login extensions + WTF_CSRF_ENABLED=False, + ACCOUNTS_USE_CELERY=False, + SECRET_KEY='CHANGE_ME', + SECURITY_PASSWORD_SALT='CHANGE_ME_ALSO', +) +FlaskCLI(app) +FlaskCeleryExt(app) +InvenioDB(app) +InvenioREST(app) +InvenioPIDStore(app) +InvenioRecords(app) +Menu(app) +accounts = InvenioAccounts(app) +app.register_blueprint(blueprint) +InvenioAccess(app) +InvenioRecordsREST(app) + +rec_uuid = 'deadbeef-1234-5678-ba11-b100dc0ffee5' + + +@app.cli.group() +def fixtures(): + """Command for working with test data.""" + + +@fixtures.command() +def records(): + """Load test data fixture.""" + from invenio_records.api import Record + from invenio_pidstore.models import PersistentIdentifier, PIDStatus + + # Record 1 - Live record + with db.session.begin_nested(): + PersistentIdentifier.create( + 'recid', '1', object_type='rec', object_uuid=rec_uuid, + status=PIDStatus.REGISTERED) + Record.create({'title': 'Registered '}, id_=rec_uuid) + + +@fixtures.command() +def access(): + """Load access fixtures.""" + admin = accounts.datastore.create_user( + email='admin@invenio-software.org', + password=encrypt_password('123456'), + active=True, + ) + forbidden = accounts.datastore.create_user( + email='forbidden@invenio-software.org', + password=encrypt_password('123456'), + active=True, + ) + # Give all permissions on the record to admin user + db.session.add(ActionUsers( + action=records_create_all.value, + user=admin)) + db.session.add(ActionUsers( + action=records_read_all.value, argument=rec_uuid, + user=admin)) + db.session.add(ActionUsers( + action=records_update_all.value, argument=rec_uuid, + user=admin)) + db.session.add(ActionUsers( + action=records_delete_all.value, argument=rec_uuid, + user=admin)) + # Exclude all permission on the record to the forbidden user + db.session.add(ActionUsers( + action=records_create_all.value, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_read_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_update_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_delete_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.commit() diff --git a/invenio_records_rest/ext.py b/invenio_records_rest/ext.py index 2c16a986..12ae608f 100644 --- a/invenio_records_rest/ext.py +++ b/invenio_records_rest/ext.py @@ -26,9 +26,66 @@ from __future__ import absolute_import, print_function +from werkzeug.utils import import_string, cached_property + from .views import create_blueprint +class _RecordRESTState(object): + """Record REST state.""" + + def __init__(self, app): + """Initialize state.""" + self.app = app + self._read_permission_factory = None + self._create_permission_factory = None + self._update_permission_factory = None + self._delete_permission_factory = None + + @cached_property + def read_permission_factory(self): + """Load default read permission factory.""" + if self._read_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY" + ] + self._read_permission_factory = import_string(imp) if imp else None + return self._read_permission_factory + + @cached_property + def create_permission_factory(self): + """Load default create permission factory.""" + if self._create_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY" + ] + self._create_permission_factory = import_string(imp) \ + if imp else None + return self._create_permission_factory + + @cached_property + def update_permission_factory(self): + """Load default update permission factory.""" + if self._update_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY" + ] + self._update_permission_factory = import_string(imp) \ + if imp else None + return self._update_permission_factory + + @cached_property + def delete_permission_factory(self): + """Load default delete permission factory.""" + if self._delete_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY" + ] + self._delete_permission_factory = import_string(imp) \ + if imp else None + return self._delete_permission_factory + + class InvenioRecordsREST(object): """Invenio-Records-REST extension.""" @@ -44,10 +101,23 @@ def init_app(self, app): app.register_blueprint( create_blueprint(app.config["RECORDS_REST_ENDPOINTS"]) ) - app.extensions['invenio-records-rest'] = self + app.extensions['invenio-records-rest'] = _RecordRESTState(app) def init_config(self, app): """Initialize configuration.""" + app.config.setdefault( + "RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY", + "invenio_records.permissions:read_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY", + "invenio_records.permissions:create_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY", + "invenio_records.permissions:update_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY", + "invenio_records.permissions:delete_permission_factory") + # Set up API endpoints for records. app.config.setdefault( "RECORDS_REST_ENDPOINTS", diff --git a/invenio_records_rest/views.py b/invenio_records_rest/views.py index b24e950f..6bb4b9a4 100644 --- a/invenio_records_rest/views.py +++ b/invenio_records_rest/views.py @@ -42,10 +42,16 @@ from jsonpatch import JsonPatchException, JsonPointerException from sqlalchemy.exc import SQLAlchemyError from werkzeug.routing import BuildError +from werkzeug.local import LocalProxy +from werkzeug.utils import import_string from .serializers import record_to_json_serializer +current_records_rest = LocalProxy( + lambda: current_app.extensions['invenio-records-rest']) + + def create_blueprint(endpoints): """Create Invenio-Records-REST blueprint.""" blueprint = Blueprint( @@ -62,12 +68,45 @@ def create_blueprint(endpoints): def create_url_rules(endpoint, list_route=None, item_route=None, - pid_type=None, pid_minter=None): - """Create Werkzeug URL rules.""" + pid_type=None, pid_minter=None, + read_permission_factory_imp=None, + create_permission_factory_imp=None, + update_permission_factory_imp=None, + delete_permission_factory_imp=None): + """Create Werkzeug URL rules. + + :param endpoint: Name of endpoint. + :param list_route: record listing URL route . Required. + :param item_route: record URL route (must include ```` pattern). + Required. + :param pid_type: Persistent identifier type for endpoint. Required. + :param template: Template to render. Defaults to + ``invenio_records_ui/detail.html``. + :param read_permission_factory: Import path to factory that creates a read + permission object for a given record. + :param create_permission_factory: Import path to factory that creates a + create permission object for a given record. + :param update_permission_factory: Import path to factory that creates a + update permission object for a given record. + :param delete_permission_factory: Import path to factory that creates a + delete permission object for a given record. + + :returns: a list of dictionaries with can each be passed as keywords + arguments to ``Blueprint.add_url_rule``. + """ assert list_route assert item_route assert pid_type + read_permission_factory = import_string(read_permission_factory_imp) \ + if read_permission_factory_imp else None + create_permission_factory = import_string(create_permission_factory_imp) \ + if create_permission_factory_imp else None + update_permission_factory = import_string(update_permission_factory_imp) \ + if update_permission_factory_imp else None + delete_permission_factory = import_string(delete_permission_factory_imp) \ + if delete_permission_factory_imp else None + resolver = Resolver(pid_type=pid_type, object_type='rec', getter=Record.get_record) @@ -77,10 +116,15 @@ def create_url_rules(endpoint, list_route=None, item_route=None, RecordsListResource.view_name.format(endpoint), resolver=resolver, minter_name=pid_minter, + read_permission_factory=read_permission_factory, + create_permission_factory=create_permission_factory, serializers=serializers) item_view = RecordResource.as_view( RecordResource.view_name.format(endpoint), resolver=resolver, + read_permission_factory=read_permission_factory, + update_permission_factory=update_permission_factory, + delete_permission_factory=delete_permission_factory, serializers=serializers) return [ @@ -129,20 +173,58 @@ def inner(self, pid_value, *args, **kwargs): }) abort(500) - return f(self, pid, record, *args, **kwargs) + return f(self, pid=pid, record=record, *args, **kwargs) return inner +def verify_record_permission(permission_factory, record): + """Check that the current user has the required permissions on record. + + :param permission_factory: permission factory used to check permissions. + :param record: record whose access is limited. + """ + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated: + abort(401) + abort(403) + + +def need_record_permission(factory_name): + """Decorator checking that the user has the required permissions on record. + + :param factory_name: name of the factory to retrieve. + """ + def need_record_permission_builder(f): + @wraps(f) + def need_record_permission_decorator(self, record, *args, **kwargs): + permission_factory = ( + getattr(self, factory_name) or + getattr(current_records_rest, factory_name) + ) + if permission_factory: + verify_record_permission(permission_factory, record) + return f(self, record=record, *args, **kwargs) + return need_record_permission_decorator + return need_record_permission_builder + + class RecordsListResource(ContentNegotiatedMethodView): """Resource for records listing.""" view_name = '{0}_list' - def __init__(self, resolver=None, minter_name=None, **kwargs): + def __init__(self, resolver=None, minter_name=None, + read_permission_factory=None, create_permission_factory=None, + **kwargs): """Constructor.""" super(RecordsListResource, self).__init__(**kwargs) self.resolver = resolver self.minter = current_pidstore.minters[minter_name] + self.read_permission_factory = read_permission_factory + self.create_permission_factory = create_permission_factory def post(self, **kwargs): """Create a record. @@ -165,6 +247,12 @@ def post(self, **kwargs): # Create record record = Record.create(data, id_=record_uuid) + # Check permissions + permission_factory = self.create_permission_factory or \ + current_records_rest.create_permission_factory + if permission_factory: + verify_record_permission(permission_factory, record) + db.session.commit() except SQLAlchemyError: db.session.rollback() @@ -178,15 +266,21 @@ class RecordResource(ContentNegotiatedMethodView): view_name = '{0}_item' - def __init__(self, resolver=None, **kwargs): + def __init__(self, resolver=None, read_permission_factory=None, + update_permission_factory=None, + delete_permission_factory=None, **kwargs): """Constructor. :param resolver: Persistent identifier resolver instance. """ super(RecordResource, self).__init__(**kwargs) self.resolver = resolver + self.read_permission_factory = read_permission_factory + self.update_permission_factory = update_permission_factory + self.delete_permission_factory = delete_permission_factory @pass_record + @need_record_permission('read_permission_factory') def get(self, pid, record, **kwargs): """Get a record. @@ -199,6 +293,7 @@ def get(self, pid, record, **kwargs): @require_content_types('application/json-patch+json') @pass_record + @need_record_permission('update_permission_factory') def patch(self, pid, record, **kwargs): """Modify a record. @@ -226,6 +321,7 @@ def patch(self, pid, record, **kwargs): @require_content_types('application/json') @pass_record + @need_record_permission('update_permission_factory') def put(self, pid, record, **kwargs): """Replace a record. diff --git a/setup.py b/setup.py index ce2d879c..50b7638f 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,8 @@ 'check-manifest>=0.25', 'coverage>=4.0', 'invenio-db[all]>=1.0.0a6', + 'invenio-access>=1.0.0a3', + 'invenio-accounts>=1.0.0a6', 'isort>=4.2.2', 'pep257>=0.7.0', 'pytest-cache>=1.0', @@ -64,7 +66,7 @@ 'Flask-CLI>=0.2.1', 'six>=1.10', 'invenio-rest>=1.0.0a3', - 'invenio-records>=1.0.0a4', + 'invenio-records>=1.0.0a7', 'invenio-pidstore>=1.0.0a2', ] diff --git a/tests/conftest.py b/tests/conftest.py index d36fe6d1..e8ff9bdd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,19 +30,28 @@ import os import shutil import tempfile +from contextlib import contextmanager import pytest -from flask import Flask +from flask import Flask, url_for from flask_cli import FlaskCLI +from flask_menu import Menu +from flask_security.utils import encrypt_password +from invenio_accounts import InvenioAccounts +from invenio_accounts.views import blueprint as accounts_blueprint from invenio_db import InvenioDB, db from invenio_pidstore import InvenioPIDStore from invenio_pidstore.resolver import Resolver from invenio_records import InvenioRecords from invenio_records.api import Record +from invenio_records.permissions import records_create_all, \ + records_delete_all, records_read_all, records_update_all from invenio_rest import InvenioREST from sqlalchemy_utils.functions import create_database, database_exists, \ drop_database +from invenio_access import InvenioAccess +from invenio_access.models import ActionUsers from invenio_records_rest import InvenioRecordsREST @@ -56,7 +65,12 @@ def app(request): SERVER_NAME='localhost:5000', SQLALCHEMY_DATABASE_URI=os.environ.get( 'SQLALCHEMY_DATABASE_URI', 'sqlite:///test.db' - ) + ), + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) InvenioDB(app) @@ -83,6 +97,83 @@ def finalize(): return app +@pytest.fixture() +def accounts(app): + app.config.update( + WTF_CSRF_ENABLED=False, + SECRET_KEY='CHANGEME', + SECURITY_PASSWORD_SALT='CHANGEME', + # conftest switches off permission checking, so re-enable it for this + # app. + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY='invenio_records.permissions:create_permission_factory', # noqa + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY='invenio_records.permissions:read_permission_factory', # noqa + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY='invenio_records.permissions:update_permission_factory', # noqa + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY='invenio_records.permissions:delete_permission_factory', # noqa + ) + # FIXME: use OAuth authentication instead of UI authentication + Menu(app) + accounts = InvenioAccounts(app) + app.register_blueprint(accounts_blueprint) + InvenioAccess(app) + return accounts + + +@pytest.yield_fixture +def user_factory(app, accounts): + """Create a user which has all permissions on every records.""" + + password = '123456' + + with app.test_request_context(): + login_url = url_for('security.login') + + @contextmanager + def create_user(name): + """Create a user. + + Should be called in application context. + """ + class UserConfig(object): + def __init__(self, name): + self.email = '{}@invenio-software.org'.format(name) + self.user = accounts.datastore.create_user( + email=self.email, + password=encrypt_password(password), + active=True, + ) + + def login_function(self): + def login(client): + res = client.post(login_url, data={ + 'email': self.email, 'password': password}) + assert res.status_code == 302 + return login + + def create_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_create_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def read_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_read_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def update_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_update_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def delete_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_delete_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + yield UserConfig(name) + + yield create_user + + @pytest.fixture(scope='session') def resolver(): """Create a persistent identifier resolver.""" diff --git a/tests/test_invenio_records_rest.py b/tests/test_invenio_records_rest.py index 1efb1fa1..46bff930 100644 --- a/tests/test_invenio_records_rest.py +++ b/tests/test_invenio_records_rest.py @@ -361,6 +361,178 @@ def test_invalid_put(app): assert res.status_code == 400 +def test_create_permissions(app, user_factory, resolver): + with app.app_context(): + # create users + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to create records + allowed_user.create_access(True) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to create records + forbidden_user.create_access(False) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test create without being authenticated + with app.test_client() as client: + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 401 + # test not allowed create + with app.test_client() as client: + forbidden_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 403 + # test allowed create + with app.test_client() as client: + allowed_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 201 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_read_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to read the record + allowed_user.read_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to read the record + forbidden_user.read_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_patch_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json-patch+json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + test = copy.deepcopy(test_data_patched) + test['control_number'] = 1 + assert response_data['metadata'] == test + + +def test_put_one_permissions(app, user_factory, resolver): + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + assert response_data['metadata'] == test_data_patched + + def subtest_self_link(response_data, response_headers, client): """Check that the returned self link returns the same data.