diff --git a/invenio_records_resources/services/records/queryparser/transformer.py b/invenio_records_resources/services/records/queryparser/transformer.py index b4680361..a37c326f 100644 --- a/invenio_records_resources/services/records/queryparser/transformer.py +++ b/invenio_records_resources/services/records/queryparser/transformer.py @@ -14,6 +14,7 @@ """ from invenio_i18n import gettext as _ +from luqum.tree import Phrase, Word from luqum.visitor import TreeTransformer from invenio_records_resources.services.errors import QuerystringValidationError @@ -33,15 +34,68 @@ def term_name(self): """Get the term name.""" return self._term_name - def map_word(self, node): + def map_word(self, node, **kwargs): """Modify a word node.""" return self._word_fun(node) if self._word_fun else node - def map_phrase(self, node): + def map_phrase(self, node, **kwargs): """Modify a phrase node.""" return self._phrase_fun(node) if self._phrase_fun else node +class RestrictedTerm: + """Class used to apply specific permissions to search. + + ex. "internal_notes.note": RestrictedTerm(system_access_permission) + restricts the internal_notes.note field from being searched by anyone else + than system process + """ + + def __init__(self, permission): + """Constructor.""" + self.permission = permission + + def allows(self, identity): + """Check the permission for the whole term.""" + return self.permission.allows(identity) + + +class RestrictedTermValue: + """Class used to apply specific permissions to search specific words. + + ex. "_exists_": RestrictedTermValue( + system_access_permission, word=word_internal_notes + ), + will rewrite the searched field based on given permission + """ + + def __init__(self, permission, word=None, phrase=None): + """Constructor.""" + self.permission = permission + self._word_fun = word + self._phrase_fun = phrase + + def map_word(self, node, context, **kwargs): + """Modify a word node.""" + is_restricted = not self.permission.allows(context["identity"]) + if not is_restricted: + return node + if self._word_fun and is_restricted: + return self._word_fun(node) + else: + return Word("") + + def map_phrase(self, node, context, **kwargs): + """Modify a phrase node.""" + is_restricted = not self.permission.allows(context["identity"]) + if not is_restricted: + return node + if self._phrase_fun and is_restricted: + return self._phrase_fun(node) + else: + return Phrase("") + + class SearchFieldTransformer(TreeTransformer): """Transform from user-friendly field names to internal field names.""" @@ -57,10 +111,24 @@ def visit_search_field(self, node, context): term_name = self._mapping.get(node.name, node.name) field_value_mapper = None + # TODO if you need another if clause here, please refactor if isinstance(term_name, FieldValueMapper): field_value_mapper = term_name term_name = field_value_mapper.term_name - + if isinstance(term_name, RestrictedTermValue): + field_value_mapper = term_name + term_name = node.name + if isinstance(term_name, RestrictedTerm): + allows = term_name.allows(context["identity"]) + term_name = node.name + # field_value_mapper is left as None on purpose - if the permission + # allows, we don't map any query, we allow it "as is" + if not allows: + raise QuerystringValidationError( + _("Invalid search field: {field_name}.").format( + field_name=node.name + ) + ) # If a allow list exists, the term must be allowed to be queried. if self._allow_list and not term_name in self._allow_list: raise QuerystringValidationError( @@ -79,9 +147,9 @@ def visit_search_field(self, node, context): def visit_word(self, node, context): """Visit a word term.""" mapper = context.get("field_value_mapper") - yield node if mapper is None else mapper.map_word(node) + yield node if mapper is None else mapper.map_word(node, context=context) def visit_phrase(self, node, context): """Visit a phrase term.""" mapper = context.get("field_value_mapper") - yield node if mapper is None else mapper.map_phrase(node) + yield node if mapper is None else mapper.map_phrase(node, context=context) diff --git a/tests/services/test_service_queryparser.py b/tests/services/test_service_queryparser.py index 5c58f266..5218a464 100644 --- a/tests/services/test_service_queryparser.py +++ b/tests/services/test_service_queryparser.py @@ -9,14 +9,19 @@ """Query parser tests.""" import pytest -from invenio_access.permissions import system_identity -from luqum.tree import Phrase +from flask_principal import ActionNeed +from invenio_access.permissions import Permission, SystemRoleNeed, system_identity +from luqum.tree import Phrase, Word from invenio_records_resources.services.records.queryparser import ( FieldValueMapper, QueryParser, SearchFieldTransformer, ) +from invenio_records_resources.services.records.queryparser.transformer import ( + RestrictedTerm, + RestrictedTermValue, +) @pytest.fixture() @@ -194,3 +199,85 @@ def lol(node): assert p(system_identity).parse(query).to_dict() == { "query_string": {"query": transformed_query} } + + +@pytest.mark.parametrize( + "query,transformed_query", + [ + # notice the input and the output of the test is the same. + # Search field transformer does not rewrite the search query, but raises exception handled in: + # https://github.com/inveniosoftware/invenio-records-resources/blob/e297181296eef56bdfb0d1486c3e570fa741a0aa/invenio_records_resources/services/records/queryparser/query.py#L136 + # but multimatch will not pick up any results, due to default fields we are allowed to search on + # integration test for this behaviour is in invenio-rdm-records + ("internal_notes.note:abc", "internal_notes.note:abc"), + ], +) +def test_querystring_restricted_term(query, transformed_query, identity_simple, app): + """Invalid syntax falls back to multi match query.""" + + sysadmin_permission = Permission(SystemRoleNeed("system_process")) + + def word_internal_notes(node): + """Rewrite internal notes value.""" + if not node.value.startswith("internal_notes"): + return node + return Word("") + + p = QueryParser.factory( + mapping={ + "internal_notes.note": RestrictedTerm(sysadmin_permission), + "_exists_": RestrictedTermValue( + sysadmin_permission, word=word_internal_notes + ), + }, + tree_transformer_cls=SearchFieldTransformer, + ) + + parser = p(system_identity) + + assert parser.parse(query).to_dict() == {"query_string": {"query": query}} + + parser = p(identity_simple) + + assert parser.parse(query).to_dict() == { + "multi_match": {"query": transformed_query} + } + + +@pytest.mark.parametrize( + "query,transformed_query", + [ + ("_exists_:internal_notes", "_exists_:"), + ("_exists_:metadata", "_exists_:metadata"), + ], +) +def test_querystring_restricted_term(query, transformed_query, identity_simple, app): + """Invalid syntax falls back to multi match query.""" + + sysadmin_permission = Permission(SystemRoleNeed("system_process")) + + def word_internal_notes(node): + """Rewrite internal notes value.""" + if not node.value.startswith("internal_notes"): + return node + return Word("") + + p = QueryParser.factory( + mapping={ + "internal_notes.note": RestrictedTerm(sysadmin_permission), + "_exists_": RestrictedTermValue( + sysadmin_permission, word=word_internal_notes + ), + }, + tree_transformer_cls=SearchFieldTransformer, + ) + + parser = p(system_identity) + + assert parser.parse(query).to_dict() == {"query_string": {"query": query}} + + parser = p(identity_simple) + + assert parser.parse(query).to_dict() == { + "query_string": {"query": transformed_query} + }