Skip to content

Commit

Permalink
wip: ip range, squash
Browse files Browse the repository at this point in the history
  • Loading branch information
utnapischtim committed Jul 18, 2024
1 parent 3c7d483 commit 7a4d875
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 53 deletions.
8 changes: 4 additions & 4 deletions invenio_config_tugraz/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@

from invenio_i18n import gettext as _

INVENIO_CONFIG_TUGRAZ_SHIBBOLETH = False
CONFIG_TUGRAZ_SHIBBOLETH = False
"""Set True if SAML is configured"""

INVENIO_CONFIG_TUGRAZ_SINGLE_IPS = []
CONFIG_TUGRAZ_SINGLE_IPS = []
"""Allows access to users whose IP address is listed.
INVENIO_CONFIG_TUGRAZ_SINGLE_IPS =
["127.0.0.1", "127.0.0.2"]
"""

INVENIO_CONFIG_TUGRAZ_IP_RANGES = []
CONFIG_TUGRAZ_IP_RANGES = []
"""Allows access to users whose range of IP address is listed.
INVENIO_CONFIG_TUGRAZ_IP_RANGES =
[["127.0.0.2", "127.0.0.99"], ["127.0.1.3", "127.0.1.5"]]
"""

INVENIO_CONFIG_TUGRAZ_IP_NETWORK = ""
CONFIG_TUGRAZ_IP_NETWORK = ""
"""Allows access to users who are in the IP network."""


Expand Down
118 changes: 70 additions & 48 deletions invenio_config_tugraz/permissions/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,38 @@
"""

from ipaddress import ip_address, ip_network
from typing import Any

from flask import current_app, request
from invenio_access.permissions import SystemRoleNeed, any_user
from flask_principal import Need
from invenio_access.permissions import any_user
from invenio_records_permissions.generators import Generator
from invenio_search.engine import dsl

from .roles import tugraz_authenticated_user

single_ip_need = SystemRoleNeed("single_ip")
ip_network_need = SystemRoleNeed("ip_network")


class RecordSingleIP(Generator):
"""Allowed any user with accessing with the IP."""

def needs(self, record=None, **kwargs):
"""Enabling Needs, Set of Needs granting permission."""
def needs(self, record: dict | None = None, **__: dict) -> list[Need]:
"""Set of Needs granting permission. Enabling Needs."""
if record is None:
return []

custom_fields = record.get("custom_fields", {})

if not custom_fields.get("single_ip", False):
# if record does not have singleip - return any_user
# if record does not have singleip - return any_user
if not record.get("custom_fields", {}).get("single_ip", False):
return [any_user]
elif self.check_permission():
# if record has singleip, and the ip of the user matches the allowed ip

# if record has singleip, and the ip of the user matches the allowed ip
if self.check_permission():
return [any_user]
else:
# non of the above - return empty
return [single_ip_need]

def excludes(self, **kwargs):
"""Preventing Needs, Set of Needs denying permission.
# non of the above - return empty
return []

def excludes(self, **kwargs: dict) -> list[Need]:
"""Set of Needs denying permission. Preventing Needs.
If ANY of the Needs are matched, permission is revoked.
Expand All @@ -96,53 +94,63 @@ def excludes(self, **kwargs):
If the same Need is returned by `needs` and `excludes`, then that
Need provider is disallowed.
"""
# TODO implement this
return []
try:
if (
kwargs["record"]["custom_fields"]["single_ip"]
and not self.check_permission()
):
return [any_user]

def query_filter(self, *args, **kwargs):
"""Filters for singleip records."""
except KeyError:
return []
else:
return []

def query_filter(self, *_: dict, **__: dict) -> Any: # noqa: ANN401
"""Filter for singleip records."""
if not self.check_permission():
# If user ip is not on the list, and If the record contains 'singleip' will not be seen
return ~dsl.Q("match", **{"custom_fields.single_ip": True})

# Lists all records
return dsl.Q("match_all")

def check_permission(self):
def check_permission(self) -> bool:
"""Check for User IP address in config variable.
If the user ip is in the configured list return True.
"""
user_ip = request.remote_addr
single_ips = current_app.config["INVENIO_CONFIG_TUGRAZ_SINGLE_IPS"]
try:
user_ip = request.remote_addr
except RuntimeError:
return False

single_ips = current_app.config["CONFIG_TUGRAZ_SINGLE_IPS"]

return user_ip in single_ips


class AllowedFromIPNetwork(Generator):
"""Allowed from ip range."""

def needs(self, record=None, **kwargs):
"""Enabling Needs, Set of Needs granting permission."""
def needs(self, record: dict | None = None, **__: dict) -> list[Need]:
"""Set of Needs granting permission. Enabling Needs."""
if record is None:
return []

custom_fields = record.get("custom_fields", {})

if not custom_fields.get("ip_network", False):
# if the record doesn't have set the ip range allowance
# print("AllowedFromIPNetwork if")
# if the record doesn't have set the ip range allowance
if not record.get("custom_fields", {}).get("ip_network", False):
return [any_user]
elif self.check_permission():
# print("AllowedFromIPNetwork check_permission")
# if the record has set the ip_range allowance and is in the range

# if the record has set the ip_range allowance and is in the range
if self.check_permission():
return [any_user]
else:
# non of the above - return empty
return [ip_network_need]

def excludes(self, **kwargs):
"""Preventing Needs, Set of Needs denying permission.
# non of the above - return empty
return []

def excludes(self, **kwargs: dict) -> Need:
"""Set of Needs denying permission. Preventing Needs.
If ANY of the Needs are matched, permission is revoked.
Expand All @@ -159,29 +167,43 @@ def excludes(self, **kwargs):
If the same Need is returned by `needs` and `excludes`, then that
Need provider is disallowed.
"""
# TODO implement this
return []
try:
if (
kwargs["record"]["custom_fields"]["ip_network"]
and not self.check_permission()
):
return [any_user]

def query_filter(self, *args, **kwargs):
"""Filters for ip range records."""
except KeyError:
return []
else:
return []

def query_filter(self, *_: dict, **__: dict) -> Any: # noqa: ANN401
"""Filter for ip range records."""
if not self.check_permission():
return ~dsl.Q("match", **{"custom_fields.ip_network": True})

return dsl.Q("match_all")

def check_permission(self):
def check_permission(self) -> bool:
"""Check for User IP address in the configured network."""
user_ip = request.remote_addr
network = current_app.config["INVENIO_CONFIG_TUGRAZ_IP_NETWORK"]
try:
user_ip = request.remote_addr
except RuntimeError:
return False

network = current_app.config["CONFIG_TUGRAZ_IP_NETWORK"]

try:
return ip_address(user_ip) in ip_network(network)
except ValueError:
return True
return False


class TUGrazAuthenticatedUser(Generator):
"""Generates the `tugraz_authenticated_user` role-need."""

def needs(self, **__):
def needs(self, **__: dict) -> list[Need]:
"""Generate needs to be checked against current user identity."""
return [tugraz_authenticated_user]
7 changes: 6 additions & 1 deletion invenio_config_tugraz/permissions/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ class TUGrazRDMRecordPermissionPolicy(RecordPermissionPolicy):

can_tugraz_authenticated = [TUGrazAuthenticatedUser(), SystemProcess()]
can_authenticated = can_tugraz_authenticated
can_all = [AnyUser(), SystemProcess()]
can_all = [
AnyUser(),
SystemProcess(),
AllowedFromIPNetwork(),
RecordSingleIP(),
]

#
# Miscellaneous
Expand Down

0 comments on commit 7a4d875

Please sign in to comment.