diff --git a/.gitignore b/.gitignore index ba746605..59ddf3a0 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ docs/_build/ # PyBuilder target/ + +# Sqlite databases +*.db diff --git a/examples/app.py b/examples/app.py index c8762e40..37303957 100644 --- a/examples/app.py +++ b/examples/app.py @@ -25,19 +25,33 @@ """Minimal Flask application example for development. +For this example the access control is disabled. + Run example development server: .. code-block:: console - $ cd examples - $ flask -a app.py db init - $ flask -a app.py db create - $ flask -a app.py load_fixture - $ flask -a app.py --debug run -""" + $ cd examples + $ flask -a app.py db init + $ flask -a app.py db create + $ flask -a app.py fixture records + $ flask -a app.py --debug run + +Try to get some records + + $ curl -XGET http://localhost:5000/records/1 + $ curl -XGET http://localhost:5000/records/2 + $ curl -XGET http://localhost:5000/records/3 + $ curl -XGET http://localhost:5000/records/4 + $ curl -XGET http://localhost:5000/records/5 + $ curl -XGET http://localhost:5000/records/6 + $ curl -XGET http://localhost:5000/records/7 +""" # noqa from __future__ import absolute_import, print_function +import os + from flask import Flask from flask_celeryext import FlaskCeleryExt from flask_cli import FlaskCLI @@ -48,6 +62,13 @@ from invenio_records_rest import InvenioRecordsREST + +# create application's instance directory. Needed for this example only. +current_dir = os.path.dirname(os.path.realpath(__file__)) +instance_dir = os.path.join(current_dir, 'app') +if not os.path.exists(instance_dir): + os.makedirs(instance_dir) + # Create Flask application app = Flask(__name__) app.config.update( @@ -55,6 +76,11 @@ CELERY_CACHE_BACKEND="memory", CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_RESULT_BACKEND="cache", + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) FlaskCeleryExt(app) @@ -65,8 +91,13 @@ InvenioRecordsREST(app) -@app.cli.command() -def load_fixture(): +@app.cli.group() +def fixtures(): + """Command for working with test data.""" + + +@fixtures.command() +def records(): """Load test data fixture.""" import uuid from invenio_records.api import Record diff --git a/examples/permsapp.py b/examples/permsapp.py new file mode 100644 index 00000000..681d87b1 --- /dev/null +++ b/examples/permsapp.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + + +"""Minimal Flask application example with access control enabled. + +Run example development server: + +.. code-block:: console + $ cd examples + $ flask -a permsapp.py db init + $ flask -a permsapp.py db create + $ flask -a permsapp.py fixtures records + $ flask -a permsapp.py fixtures access + $ flask -a permsapp.py --debug run + + +Try to get record 1: + + $ curl -XGET http://localhost:5000/records/1 + +Login as admin@invenio-software.org (password 123456) and get record 1: + + $ curl -XPOST http://localhost:5000/login -d 'email=admin%40invenio-software.org&password=123456' -c mycookie + $ curl -XGET http://localhost:5000/records/1 -b mycookie + +Login as forbidden@invenio-software.org (password 123456), hwo has not +the permission to read record 1, and try to get record 1: + + $ curl -XPOST http://localhost:5000/login -d 'email=forbidden%40invenio-software.org&password=123456' -c mycookie2 + $ curl -XGET http://localhost:5000/records/1 -b mycookie2 +""" # noqa + +from __future__ import absolute_import, print_function + +import os + +from flask import Flask +from flask_celeryext import FlaskCeleryExt +from flask_cli import FlaskCLI +from flask_menu import Menu +from flask_security.utils import encrypt_password +from invenio_accounts import InvenioAccounts +from invenio_accounts.views import blueprint +from invenio_db import InvenioDB, db +from invenio_pidstore import InvenioPIDStore +from invenio_records import InvenioRecords +from invenio_records.permissions import records_create_all, \ + records_delete_all, records_read_all, records_update_all +from invenio_rest import InvenioREST + +from invenio_access import InvenioAccess +from invenio_access.models import ActionUsers +from invenio_records_rest import InvenioRecordsREST + + +# create application's instance directory. Needed for this example only. +current_dir = os.path.dirname(os.path.realpath(__file__)) +instance_dir = os.path.join(current_dir, 'permsapp') +if not os.path.exists(instance_dir): + os.makedirs(instance_dir) + +# Create Flask application +app = Flask(__name__, instance_path=instance_dir) +app.config.update( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND="memory", + CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + CELERY_RESULT_BACKEND="cache", + # Install Principal and Login extensions + WTF_CSRF_ENABLED=False, + ACCOUNTS_USE_CELERY=False, + SECRET_KEY='CHANGE_ME', + SECURITY_PASSWORD_SALT='CHANGE_ME_ALSO', +) +FlaskCLI(app) +FlaskCeleryExt(app) +InvenioDB(app) +InvenioREST(app) +InvenioPIDStore(app) +InvenioRecords(app) +Menu(app) +accounts = InvenioAccounts(app) +app.register_blueprint(blueprint) +InvenioAccess(app) +InvenioRecordsREST(app) + +rec_uuid = '9107e6ef-fea4-4971-ae2f-934d2fdcaa30' + + +@app.cli.group() +def fixtures(): + """Command for working with test data.""" + + +@fixtures.command() +def records(): + """Load test data fixture.""" + from invenio_records.api import Record + from invenio_pidstore.models import PersistentIdentifier, PIDStatus + + # Record 1 - Live record + with db.session.begin_nested(): + PersistentIdentifier.create( + 'recid', '1', object_type='rec', object_uuid=rec_uuid, + status=PIDStatus.REGISTERED) + Record.create({'title': 'Registered '}, id_=rec_uuid) + + +@fixtures.command() +def access(): + """Load access fixtures.""" + admin = accounts.datastore.create_user( + email='admin@invenio-software.org', + password=encrypt_password('123456'), + active=True, + ) + forbidden = accounts.datastore.create_user( + email='forbidden@invenio-software.org', + password=encrypt_password('123456'), + active=True, + ) + # Give all permissions on the record to admin user + db.session.add(ActionUsers( + action=records_create_all.value, + user=admin)) + db.session.add(ActionUsers( + action=records_read_all.value, argument=rec_uuid, + user=admin)) + db.session.add(ActionUsers( + action=records_update_all.value, argument=rec_uuid, + user=admin)) + db.session.add(ActionUsers( + action=records_delete_all.value, argument=rec_uuid, + user=admin)) + # Exclude all permission on the record to the forbidden user + db.session.add(ActionUsers( + action=records_create_all.value, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_read_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_update_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.add(ActionUsers( + action=records_delete_all.value, argument=rec_uuid, + user=forbidden, exclude=True)) + db.session.commit() diff --git a/invenio_records_rest/ext.py b/invenio_records_rest/ext.py index 2c16a986..7e09fb42 100644 --- a/invenio_records_rest/ext.py +++ b/invenio_records_rest/ext.py @@ -26,9 +26,66 @@ from __future__ import absolute_import, print_function +from werkzeug.utils import import_string + from .views import create_blueprint +class _RecordRESTState(object): + """Record REST state.""" + + def __init__(self, app): + """Initialize state.""" + self.app = app + self._read_permission_factory = None + self._create_permission_factory = None + self._update_permission_factory = None + self._delete_permission_factory = None + + @property + def read_permission_factory(self): + """Load default read permission factory.""" + if self._read_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY" + ] + self._read_permission_factory = import_string(imp) if imp else None + return self._read_permission_factory + + @property + def create_permission_factory(self): + """Load default create permission factory.""" + if self._create_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY" + ] + self._create_permission_factory = import_string(imp) \ + if imp else None + return self._create_permission_factory + + @property + def update_permission_factory(self): + """Load default update permission factory.""" + if self._update_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY" + ] + self._update_permission_factory = import_string(imp) \ + if imp else None + return self._update_permission_factory + + @property + def delete_permission_factory(self): + """Load default delete permission factory.""" + if self._delete_permission_factory is None: + imp = self.app.config[ + "RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY" + ] + self._delete_permission_factory = import_string(imp) \ + if imp else None + return self._delete_permission_factory + + class InvenioRecordsREST(object): """Invenio-Records-REST extension.""" @@ -44,10 +101,25 @@ def init_app(self, app): app.register_blueprint( create_blueprint(app.config["RECORDS_REST_ENDPOINTS"]) ) - app.extensions['invenio-records-rest'] = self + app.extensions['invenio-records-rest'] = _RecordRESTState(app) def init_config(self, app): """Initialize configuration.""" + app.config.setdefault( + "RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY", + "invenio_records.permissions:read_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY", + "invenio_records.permissions:create_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY", + "invenio_records.permissions:update_permission_factory") + app.config.setdefault( + "RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY", + "invenio_records.permissions:delete_permission_factory") + + app.config.setdefault("RECORDS_REST_LOGIN_ENDPOINT", "security.login") + # Set up API endpoints for records. app.config.setdefault( "RECORDS_REST_ENDPOINTS", diff --git a/invenio_records_rest/views.py b/invenio_records_rest/views.py index b24e950f..0a959f60 100644 --- a/invenio_records_rest/views.py +++ b/invenio_records_rest/views.py @@ -42,10 +42,29 @@ from jsonpatch import JsonPatchException, JsonPointerException from sqlalchemy.exc import SQLAlchemyError from werkzeug.routing import BuildError +from werkzeug.local import LocalProxy +from werkzeug.utils import import_string from .serializers import record_to_json_serializer +current_read_permission_factory = LocalProxy( + lambda: + current_app.extensions['invenio-records-rest'].read_permission_factory) + +current_create_permission_factory = LocalProxy( + lambda: + current_app.extensions['invenio-records-rest'].create_permission_factory) + +current_update_permission_factory = LocalProxy( + lambda: + current_app.extensions['invenio-records-rest'].update_permission_factory) + +current_delete_permission_factory = LocalProxy( + lambda: + current_app.extensions['invenio-records-rest'].delete_permission_factory) + + def create_blueprint(endpoints): """Create Invenio-Records-REST blueprint.""" blueprint = Blueprint( @@ -62,12 +81,45 @@ def create_blueprint(endpoints): def create_url_rules(endpoint, list_route=None, item_route=None, - pid_type=None, pid_minter=None): - """Create Werkzeug URL rules.""" + pid_type=None, pid_minter=None, + read_permission_factory_imp=None, + create_permission_factory_imp=None, + update_permission_factory_imp=None, + delete_permission_factory_imp=None): + """Create Werkzeug URL rules. + + :param endpoint: Name of endpoint. + :param list_route: record listing URL route . Required. + :param item_route: record URL route (must include ```` pattern). + Required. + :param pid_type: Persistent identifier type for endpoint. Required. + :param template: Template to render. Defaults to + ``invenio_records_ui/detail.html``. + :param read_permission_factory: Import path to factory that creates a read + permission object for a given record. + :param create_permission_factory: Import path to factory that creates a + create permission object for a given record. + :param update_permission_factory: Import path to factory that creates a + update permission object for a given record. + :param delete_permission_factory: Import path to factory that creates a + delete permission object for a given record. + + :returns: a list of dictionaries with can each be passed as keywords + arguments to ``Blueprint.add_url_rule``. + """ assert list_route assert item_route assert pid_type + read_permission_factory = import_string(read_permission_factory_imp) \ + if read_permission_factory_imp else None + create_permission_factory = import_string(create_permission_factory_imp) \ + if create_permission_factory_imp else None + update_permission_factory = import_string(update_permission_factory_imp) \ + if update_permission_factory_imp else None + delete_permission_factory = import_string(delete_permission_factory_imp) \ + if delete_permission_factory_imp else None + resolver = Resolver(pid_type=pid_type, object_type='rec', getter=Record.get_record) @@ -77,10 +129,15 @@ def create_url_rules(endpoint, list_route=None, item_route=None, RecordsListResource.view_name.format(endpoint), resolver=resolver, minter_name=pid_minter, + read_permission_factory=read_permission_factory, + create_permission_factory=create_permission_factory, serializers=serializers) item_view = RecordResource.as_view( RecordResource.view_name.format(endpoint), resolver=resolver, + read_permission_factory=read_permission_factory, + update_permission_factory=update_permission_factory, + delete_permission_factory=delete_permission_factory, serializers=serializers) return [ @@ -138,11 +195,15 @@ class RecordsListResource(ContentNegotiatedMethodView): view_name = '{0}_list' - def __init__(self, resolver=None, minter_name=None, **kwargs): + def __init__(self, resolver=None, minter_name=None, + read_permission_factory=None, create_permission_factory=None, + **kwargs): """Constructor.""" super(RecordsListResource, self).__init__(**kwargs) self.resolver = resolver self.minter = current_pidstore.minters[minter_name] + self.read_permission_factory = read_permission_factory + self.create_permission_factory = create_permission_factory def post(self, **kwargs): """Create a record. @@ -165,6 +226,18 @@ def post(self, **kwargs): # Create record record = Record.create(data, id_=record_uuid) + # Check permissions + create_permission_factory = self.create_permission_factory or \ + current_create_permission_factory + if create_permission_factory: + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not create_permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated(): + abort(401) + abort(403) + db.session.commit() except SQLAlchemyError: db.session.rollback() @@ -178,13 +251,18 @@ class RecordResource(ContentNegotiatedMethodView): view_name = '{0}_item' - def __init__(self, resolver=None, **kwargs): + def __init__(self, resolver=None, read_permission_factory=None, + update_permission_factory=None, + delete_permission_factory=None, **kwargs): """Constructor. :param resolver: Persistent identifier resolver instance. """ super(RecordResource, self).__init__(**kwargs) self.resolver = resolver + self.read_permission_factory = read_permission_factory + self.update_permission_factory = update_permission_factory + self.delete_permission_factory = delete_permission_factory @pass_record def get(self, pid, record, **kwargs): @@ -194,6 +272,17 @@ def get(self, pid, record, **kwargs): :param record: Record object. :returns: The requested record. """ + # Check permissions + read_permission_factory = self.read_permission_factory or \ + current_read_permission_factory + if read_permission_factory: + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not read_permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated(): + abort(401) + abort(403) self.check_etag(str(record.model.version_id)) return pid, record @@ -208,6 +297,18 @@ def patch(self, pid, record, **kwargs): :param record: Record object. :returns: The modified record. """ + # Check permissions + update_permission_factory = self.update_permission_factory or \ + current_update_permission_factory + if update_permission_factory: + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not update_permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated(): + abort(401) + abort(403) + # TODO: accept 'application/json' mediatype and use the object # to replace the specified attributes data = request.get_json(force=True) @@ -236,6 +337,18 @@ def put(self, pid, record, **kwargs): :param record: Record object. :returns: The modified record. """ + # Check permissions + update_permission_factory = self.update_permission_factory or \ + current_update_permission_factory + if update_permission_factory: + # Note, cannot be done in one line due overloading of boolean + # operations permission object. + if not update_permission_factory(record).can(): + from flask_login import current_user + if not current_user.is_authenticated(): + abort(401) + abort(403) + # TODO: accept non json content (MARC21...) data = request.get_json() if data is None: diff --git a/setup.py b/setup.py index ce2d879c..4f404780 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,8 @@ 'check-manifest>=0.25', 'coverage>=4.0', 'invenio-db[all]>=1.0.0a6', + 'invenio-access>=1.0.0a1', + 'invenio-accounts>=1.0.0a2', 'isort>=4.2.2', 'pep257>=0.7.0', 'pytest-cache>=1.0', diff --git a/tests/conftest.py b/tests/conftest.py index d36fe6d1..cdbc9f21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,9 +30,10 @@ import os import shutil import tempfile +from contextlib import contextmanager import pytest -from flask import Flask +from flask import Flask, url_for from flask_cli import FlaskCLI from invenio_db import InvenioDB, db from invenio_pidstore import InvenioPIDStore @@ -42,8 +43,18 @@ from invenio_rest import InvenioREST from sqlalchemy_utils.functions import create_database, database_exists, \ drop_database +from invenio_records.permissions import records_create_all, \ + records_delete_all, records_read_all, records_update_all -from invenio_records_rest import InvenioRecordsREST +from flask_menu import Menu + +from invenio_access.models import ActionUsers + +from flask_security.utils import encrypt_password + +from invenio_access import InvenioAccess +from invenio_accounts import InvenioAccounts +from invenio_accounts.views import blueprint as accounts_blueprint @pytest.fixture() @@ -56,14 +67,18 @@ def app(request): SERVER_NAME='localhost:5000', SQLALCHEMY_DATABASE_URI=os.environ.get( 'SQLALCHEMY_DATABASE_URI', 'sqlite:///test.db' - ) + ), + # No permission checking + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY=None, + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY=None, ) FlaskCLI(app) InvenioDB(app) InvenioREST(app) InvenioRecords(app) InvenioPIDStore(app) - InvenioRecordsREST(app) with app.app_context(): if not database_exists(str(db.engine.url)) and \ @@ -83,6 +98,83 @@ def finalize(): return app +@pytest.fixture() +def accounts(app): + app.config.update( + WTF_CSRF_ENABLED=False, + SECRET_KEY='CHANGEME', + SECURITY_PASSWORD_SALT='CHANGEME', + # conftest switches off permission checking, so re-enable it for this + # app. + RECORDS_REST_DEFAULT_CREATE_PERMISSION_FACTORY='invenio_records.permissions:create_permission_factory', # noqa + RECORDS_REST_DEFAULT_READ_PERMISSION_FACTORY='invenio_records.permissions:read_permission_factory', # noqa + RECORDS_REST_DEFAULT_UPDATE_PERMISSION_FACTORY='invenio_records.permissions:update_permission_factory', # noqa + RECORDS_REST_DEFAULT_DELETE_PERMISSION_FACTORY='invenio_records.permissions:delete_permission_factory', # noqa + ) + # FIXME: use OAuth authentication instead of UI authentication + Menu(app) + accounts = InvenioAccounts(app) + app.register_blueprint(accounts_blueprint) + InvenioAccess(app) + return accounts + + +@pytest.yield_fixture +def user_factory(app, accounts): + """Create a user which has all permissions on every records.""" + + password = '123456' + + with app.test_request_context(): + login_url = url_for('security.login') + + @contextmanager + def create_user(name): + """Create a user. + + Should be called in application context. + """ + class UserConfig(object): + def __init__(self, name): + self.email = '{}@invenio-software.org'.format(name) + self.user = accounts.datastore.create_user( + email=self.email, + password=encrypt_password(password), + active=True, + ) + + def login_function(self): + def login(client): + res = client.post(login_url, data={ + 'email': self.email, 'password': password}) + assert res.status_code == 302 + return login + + def create_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_create_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def read_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_read_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def update_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_update_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + def delete_access(self, allow, record_id=None): + db.session.add(ActionUsers( + action=records_delete_all.value, argument=record_id, + user=self.user, exclude=not allow)) + + yield UserConfig(name) + + yield create_user + + @pytest.fixture(scope='session') def resolver(): """Create a persistent identifier resolver.""" diff --git a/tests/test_invenio_records_rest.py b/tests/test_invenio_records_rest.py index 1efb1fa1..e6f22470 100644 --- a/tests/test_invenio_records_rest.py +++ b/tests/test_invenio_records_rest.py @@ -90,6 +90,7 @@ def control_num(data, cn=1): def test_valid_create(app, resolver): """Test VALID record creation request (POST .../records/).""" + InvenioRecordsREST(app) with app.app_context(): with app.test_client() as client: headers = [('Content-Type', 'application/json'), @@ -115,6 +116,7 @@ def test_valid_create(app, resolver): def test_invalid_create(app): """Test INVALID record creation request (POST .../records/).""" + InvenioRecordsREST(app) with app.app_context(): with app.test_client() as client: # check that creating with non accepted format will return 406 @@ -162,6 +164,7 @@ def test_invalid_create(app): def test_valid_get(app): """Test VALID record get request (GET .../records/).""" + InvenioRecordsREST(app) with app.app_context(): # create the record using the internal API pid, record = create_record(test_data) @@ -186,6 +189,7 @@ def test_valid_get(app): def test_invalid_get(app): """Test INVALID record get request (GET .../records/).""" + InvenioRecordsREST(app) with app.app_context(): with app.test_client() as client: # check that GET with non existing id will return 404 @@ -208,6 +212,7 @@ def test_invalid_get(app): def test_valid_patch(app, resolver): """Test VALID record patch request (PATCH .../records/).""" + InvenioRecordsREST(app) with app.app_context(): # create the record using the internal API pid, internal_record = create_record(test_data) @@ -237,6 +242,7 @@ def test_valid_patch(app, resolver): def test_invalid_patch(app): """Test INVALID record patch request (PATCH .../records/).""" + InvenioRecordsREST(app) with app.app_context(): with app.test_client() as client: # check that PATCH with non existing id will return 404 @@ -292,6 +298,7 @@ def test_invalid_patch(app): def test_valid_put(app, resolver): """Test VALID record patch request (PATCH .../records/).""" + InvenioRecordsREST(app) with app.app_context(): # create the record using the internal API pid, internal_record = create_record(test_data) @@ -319,6 +326,7 @@ def test_valid_put(app, resolver): def test_invalid_put(app): """Test INVALID record put request (PUT .../records/).""" + InvenioRecordsREST(app) with app.app_context(): with app.test_client() as client: # check that PUT with non existing id will return 404 @@ -361,6 +369,182 @@ def test_invalid_put(app): assert res.status_code == 400 +def test_create_permissions(app, user_factory, resolver): + InvenioRecordsREST(app) + with app.app_context(): + # create users + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to create records + allowed_user.create_access(True) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to create records + forbidden_user.create_access(False) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test create without being authenticated + with app.test_client() as client: + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 401 + # test not allowed create + with app.test_client() as client: + forbidden_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 403 + # test allowed create + with app.test_client() as client: + allowed_login(client) + res = client.post(url_for('invenio_records_rest.recid_list'), + data=json.dumps(test_data), + headers=headers) + assert res.status_code == 201 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_read_one_permissions(app, user_factory, resolver): + InvenioRecordsREST(app) + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to read the record + allowed_user.read_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to read the record + forbidden_user.read_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.get(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + pid, internal_record = resolver.resolve(response_data['id']) + assert internal_record == response_data['metadata'] + + +def test_patch_one_permissions(app, user_factory, resolver): + InvenioRecordsREST(app) + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json-patch+json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.patch(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_patch), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + test = copy.deepcopy(test_data_patched) + test['control_number'] = 1 + assert response_data['metadata'] == test + + +def test_put_one_permissions(app, user_factory, resolver): + InvenioRecordsREST(app) + with app.app_context(): + # create the record using the internal API + pid, internal_record = create_record(test_data) + with user_factory('allowed') as allowed_user, \ + user_factory('forbidden') as forbidden_user: + # create one user allowed to update the record + allowed_user.update_access(True, str(internal_record.id)) + allowed_login = allowed_user.login_function() + # create one user who is not allowed to update the record + forbidden_user.update_access(False, str(internal_record.id)) + forbidden_login = forbidden_user.login_function() + db.session.commit() + + headers = [('Content-Type', 'application/json'), + ('Accept', 'application/json')] + # test get without being authenticated + with app.test_client() as client: + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 401 + # test not allowed get + with app.test_client() as client: + forbidden_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 403 + # test allowed get + with app.test_client() as client: + allowed_login(client) + res = client.put(url_for('invenio_records_rest.recid_item', + pid_value=pid.pid_value), + data=json.dumps(test_data_patched), + headers=headers) + assert res.status_code == 200 + # check that the returned record matches the given data + response_data = json.loads(res.get_data(as_text=True)) + assert response_data['metadata'] == test_data_patched + + def subtest_self_link(response_data, response_headers, client): """Check that the returned self link returns the same data. diff --git a/tests/test_pid_resolver.py b/tests/test_pid_resolver.py index 23cacc34..18e2e069 100644 --- a/tests/test_pid_resolver.py +++ b/tests/test_pid_resolver.py @@ -36,6 +36,8 @@ from invenio_pidstore.models import PersistentIdentifier, PIDStatus from invenio_records import Record +from invenio_records_rest import InvenioRecordsREST + def create_record(data): """Create a test record.""" @@ -56,6 +58,7 @@ def control_num(data, cn=1): def test_tombstone(app): """Test tomstones.""" + InvenioRecordsREST(app) with app.app_context(): # OK PID pid_ok, record = create_record({'title': 'test'})