This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
permission_query.py
164 lines (127 loc) · 5.71 KB
/
permission_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from sqlalchemy import distinct
from sqlalchemy.sql import text as sql_text
class PermissionQuery:
"""PermissionQuery base class
Query permissions for a QWC resource.
"""
# name of public iam.role
PUBLIC_ROLE_NAME = 'public'
def __init__(self, config_models, logger):
"""Constructor
:param ConfigModels config_models: Helper for ORM models
:param Logger logger: Application logger
"""
self.config_models = config_models
self.logger = logger
def permissions(self, params, username, group, session):
"""Query permissions for a QWC resource and dataset.
Return resource specific permissions for a dataset.
:param obj params: Service specific request parameters
:param str username: User name
:param str group: Group name
:param Session session: DB session
"""
raise NotImplementedError
def resource_permissions(self, resource_type, resource_name, username,
group, session):
"""Query permissions for a QWC resource type and name.
Return resource permissions sorted by priority.
:param str resource_type: QWC resource type
:param str resource_name: QWC resource name
:param str username: User name
:param str group: Group name
:param Session session: DB session
"""
Permission = self.config_models.model('permissions')
Resource = self.config_models.model('resources')
# base query for all permissions of user
query = self.user_permissions_query(username, group, session)
# filter permissions by QWC resource type and name
query = query.join(Permission.resource) \
.filter(Resource.type == resource_type) \
.filter(Resource.name == resource_name)
# order by priority
query = query.order_by(Permission.priority.desc()) \
.distinct(Permission.priority)
# execute query and return results
return query.all()
def resource_restrictions_query(self, resource_type, username, group,
session):
"""Create query for restrictions for a QWC resource type, user and
group.
:param str resource_type: QWC resource type
:param str username: User name
:param str group: Group name
:param Session session: DB session
"""
Permission = self.config_models.model('permissions')
Resource = self.config_models.model('resources')
# all resource restrictions
all_restrictions = session.query(Permission). \
join(Permission.resource). \
with_entities(Resource.id, Resource.name, Resource.parent_id). \
filter(Resource.type == resource_type)
# resource permissions for user
user_permissions = self.resource_permission_query(
resource_type, username, group, session
)
# restrictions without user permissions
restrictions_query = all_restrictions.except_(user_permissions)
return restrictions_query
def resource_permission_query(self, resource_type, username, group,
session):
"""Create query for permissions for a QWC resource type, user and
group.
:param str resource_type: QWC resource type
:param str username: User name
:param str group: Group name
:param Session session: DB session
"""
Permission = self.config_models.model('permissions')
Resource = self.config_models.model('resources')
# resource permissions for user
user_permissions = \
self.user_permissions_query(username, group, session). \
join(Permission.resource). \
with_entities(Resource.id, Resource.name, Resource.parent_id). \
filter(Resource.type == resource_type)
return user_permissions
def user_permissions_query(self, username, group, session):
"""Create base query for all permissions of a user and group.
Combine permissions from roles of user and user groups, group roles and
public role.
:param str username: User name
:param str group: Group name
:param Session session: DB session
"""
Permission = self.config_models.model('permissions')
Role = self.config_models.model('roles')
Group = self.config_models.model('groups')
User = self.config_models.model('users')
# create query
query = session.query(Permission)
# filter by username and group
# NOTE: use nested JOINs to filter early and avoid too many rows
# from cartesian product
# query permissions from roles in user groups
groups_roles_query = query.join(Permission.role) \
.join(Role.groups_collection) \
.join(Group.users_collection) \
.filter(User.name == username)
# query permissions from direct user roles
user_roles_query = query.join(Permission.role) \
.join(Role.users_collection) \
.filter(User.name == username)
# query permissions from group roles
group_roles_query = query.join(Permission.role) \
.join(Role.groups_collection) \
.filter(Group.name == group)
# query permissions from public role
public_roles_query = query.join(Permission.role) \
.filter(Role.name == self.PUBLIC_ROLE_NAME)
# combine queries
query = groups_roles_query.union(user_roles_query) \
.union(group_roles_query).union(public_roles_query)
# unique permissions
query = query.distinct(Permission.id)
return query