Skip to content

Commit

Permalink
add acl.policy
Browse files Browse the repository at this point in the history
add acl.policy.list and acl.policy.read
  • Loading branch information
cpaillet committed May 15, 2024
1 parent 883f6b4 commit 3c6a356
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
3 changes: 3 additions & 0 deletions consul/api/acl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from consul.api.acl.policy import Policy
from consul.api.acl.token import Token


class ACL:
def __init__(self, agent):
self.agent = agent

self.token = self.tokens = Token(agent)
self.policy = self.policies = Policy(agent)
32 changes: 32 additions & 0 deletions consul/api/acl/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from consul.callback import CB


class Policy:
def __init__(self, agent):
self.agent = agent

def list(self, token=None):
"""
Lists all the active ACL policies. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params)

def read(self, uuid, token=None):
"""
Returns the policy information for *id*. Requires a token with acl:read capability.
:param accessor_id: Specifies the UUID of the policy you lookup.
:param token: token with acl:read capability
:return: selected Polic information
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", params=params)
16 changes: 16 additions & 0 deletions tests/api/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ def test_acl_token_delete(self, acl_consul):
token=master_token,
)

def test_acl_policy_list(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Make sure both master and anonymous tokens are created
policies = c.acl.policy.list(token=master_token)
assert find_recursive(policies, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

def test_acl_policy_read(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.policy.read, uuid="unknown", token=master_token)

policy = c.acl.policy.read(uuid="00000000-0000-0000-0000-000000000001", token=master_token)
assert find_recursive(policy, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

#
# def test_acl_token_implicit_token_use(self, acl_consul):
# # configure client to use the master token by default
Expand Down

0 comments on commit 3c6a356

Please sign in to comment.