Skip to content

Commit

Permalink
add acl.policies
Browse files Browse the repository at this point in the history
  • Loading branch information
cpaillet committed May 14, 2024
1 parent 2d10c70 commit 9b313ad
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
2 changes: 2 additions & 0 deletions consul/api/acl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from consul.api.acl.policies import Policies
from consul.api.acl.token import Token


class ACL:
def __init__(self, agent):
self.agent = agent
self.token = Token(agent)
self.policies = Policies(agent)
34 changes: 34 additions & 0 deletions consul/api/acl/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json

from consul.callback import CB


class Policies:
def __init__(self, agent):
self.agent = agent

def list(self, token=None):
"""
Lists all the active ACL policies. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params)

def read(self, uuid, token=None):
"""
Returns the policy information for *id*. Requires a token with acl:read capability.
:param accessor_id: Specifies the UUID of the policy you lookup.
:param token: token with acl:read capability
:return: selected Polic information
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), f"/v1/acl/policies/{uuid}", params=params)
33 changes: 33 additions & 0 deletions tests/api/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,39 @@ def test_acl_token_delete(self, acl_consul):
token=master_token,
)

def test_acl_policies_list(self, acl_consul):
port, master_token, _consul_version = acl_consul
c = consul.Consul(port=port)

# Make sure both master and anonymous tokens are created
acls = c.acl.policies.list(token=master_token)

master_token_repr = {
"Description": "Initial Management Token",
"Policies": [{"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"}],
"SecretID": master_token,
}
anonymous_token_repr = {
"AccessorID": "00000000-0000-0000-0000-000000000002",
"SecretID": "anonymous",
}
assert find_recursive(acls, master_token_repr)
assert find_recursive(acls, anonymous_token_repr)

def test_acl_policies_read(self, acl_consul):
port, master_token, _consul_version = acl_consul
c = consul.Consul(port=port)

# Unknown token
pytest.raises(consul.ConsulException, c.acl.policies.read, uuid="unknown", token=master_token)

anonymous_token_repr = {
"AccessorID": "00000000-0000-0000-0000-000000000002",
"SecretID": "anonymous",
}
acl = c.acl.policies.read(uuid="00000000-0000-0000-0000-000000000002", token=master_token)
assert find_recursive(acl, anonymous_token_repr)

#
# def test_acl_token_implicit_token_use(self, acl_consul):
# # configure client to use the master token by default
Expand Down

0 comments on commit 9b313ad

Please sign in to comment.