Skip to content

Commit

Permalink
add acl.policies
Browse files Browse the repository at this point in the history
add acl.list and list.read
  • Loading branch information
cpaillet committed May 14, 2024
1 parent 2d10c70 commit 039cd6e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
2 changes: 2 additions & 0 deletions consul/api/acl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from consul.api.acl.policies import Policies
from consul.api.acl.token import Token


class ACL:
def __init__(self, agent):
self.agent = agent
self.token = Token(agent)
self.policies = Policies(agent)
32 changes: 32 additions & 0 deletions consul/api/acl/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from consul.callback import CB


class Policies:
def __init__(self, agent):
self.agent = agent

def list(self, token=None):
"""
Lists all the active ACL policies. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params)

def read(self, uuid, token=None):
"""
Returns the policy information for *id*. Requires a token with acl:read capability.
:param accessor_id: Specifies the UUID of the policy you lookup.
:param token: token with acl:read capability
:return: selected Polic information
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", params=params)
18 changes: 18 additions & 0 deletions tests/api/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,24 @@ def test_acl_token_delete(self, acl_consul):
token=master_token,
)

def test_acl_policies_list(self, acl_consul):
port, master_token, _consul_version = acl_consul
c = consul.Consul(port=port)

# Make sure both master and anonymous tokens are created
policies = c.acl.policies.list(token=master_token)
assert find_recursive(policies, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

def test_acl_policies_read(self, acl_consul):
port, master_token, _consul_version = acl_consul
c = consul.Consul(port=port)

# Unknown token
pytest.raises(consul.ConsulException, c.acl.policies.read, uuid="unknown", token=master_token)

policy = c.acl.policies.read(uuid="00000000-0000-0000-0000-000000000001", token=master_token)
assert find_recursive(policy, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

#
# def test_acl_token_implicit_token_use(self, acl_consul):
# # configure client to use the master token by default
Expand Down

0 comments on commit 039cd6e

Please sign in to comment.