Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce policy list/read capabilities #70

Merged
merged 3 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions consul/api/acl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from consul.api.acl.policy import Policy
from consul.api.acl.token import Token


class ACL:
def __init__(self, agent):
self.agent = agent

self.token = self.tokens = Token(agent)
self.policy = self.policies = Policy(agent)
32 changes: 32 additions & 0 deletions consul/api/acl/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from consul.callback import CB


class Policy:
def __init__(self, agent):
self.agent = agent

def list(self, token=None):
"""
Lists all the active ACL policies. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params)

def read(self, uuid, token=None):
"""
Returns the policy information for *id*. Requires a token with acl:read capability.
:param accessor_id: Specifies the UUID of the policy you lookup.
:param token: token with acl:read capability
:return: selected Polic information
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", params=params)
2 changes: 1 addition & 1 deletion consul/api/acl.py → consul/api/acl/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from consul.callback import CB


class ACL:
class Token:
def __init__(self, agent):
self.agent = agent

Expand Down
125 changes: 76 additions & 49 deletions tests/api/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,54 @@


class TestConsulAcl:
def test_acl_permission_denied(self, acl_consul):
def test_acl_token_permission_denied(self, acl_consul):
c, _master_token, _consul_version = acl_consul

# No token
mbrulatout marked this conversation as resolved.
Show resolved Hide resolved
pytest.raises(consul.ACLPermissionDenied, c.acl.list)
pytest.raises(consul.ACLPermissionDenied, c.acl.create)
pytest.raises(consul.ACLPermissionDenied, c.acl.update, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.clone, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.read, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.delete, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.list)
pytest.raises(consul.ACLPermissionDenied, c.acl.token.create)
pytest.raises(
consul.ACLPermissionDenied, c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002"
)
pytest.raises(consul.ACLPermissionDenied, c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(
consul.ACLPermissionDenied, c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002"
)

# Token without the right permission (acl:write or acl:read)
pytest.raises(consul.ACLPermissionDenied, c.acl.list, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.create, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.list, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.create, token="anonymous")
pytest.raises(
consul.ACLPermissionDenied,
c.acl.update,
c.acl.token.update,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.clone,
c.acl.token.clone,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.read,
c.acl.token.read,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.delete,
c.acl.token.delete,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)

def test_acl_list(self, acl_consul):
def test_acl_token_list(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Make sure both master and anonymous tokens are created
acls = c.acl.list(token=master_token)
acls = c.acl.token.list(token=master_token)

master_token_repr = {
"Description": "Initial Management Token",
Expand All @@ -62,33 +66,33 @@ def test_acl_list(self, acl_consul):
assert find_recursive(acls, master_token_repr)
assert find_recursive(acls, anonymous_token_repr)

def test_acl_read(self, acl_consul):
def test_acl_token_read(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.read, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.read, accessor_id="unknown", token=master_token)

anonymous_token_repr = {
"AccessorID": "00000000-0000-0000-0000-000000000002",
"SecretID": "anonymous",
}
acl = c.acl.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token)
acl = c.acl.token.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token)
assert find_recursive(acl, anonymous_token_repr)

def test_acl_create(self, acl_consul):
def test_acl_token_create(self, acl_consul):
c, master_token, _consul_version = acl_consul

c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token)
c.acl.create(
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.token.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token)
c.acl.token.create(
secret_id="00000000-A5A5-0000-0000-000000000000",
accessor_id="00000000-0000-A5A5-0000-000000000000",
description="some token!",
token=master_token,
)

assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert c.acl.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token)
assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert c.acl.token.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token)

expected = [
{
Expand All @@ -105,20 +109,20 @@ def test_acl_create(self, acl_consul):
"Description": "some token!",
},
]
acl = c.acl.list(token=master_token)
acl = c.acl.token.list(token=master_token)
assert find_recursive(acl, expected)

def test_acl_clone(self, acl_consul):
def test_acl_token_clone(self, acl_consul):
c, master_token, _consul_version = acl_consul

assert len(c.acl.list(token=master_token)) == 2
assert len(c.acl.token.list(token=master_token)) == 2

# Unknown token
pytest.raises(consul.ConsulException, c.acl.clone, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.clone, accessor_id="unknown", token=master_token)

c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token)
assert len(c.acl.list(token=master_token)) == 4
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.token.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 4

expected = [
{
Expand All @@ -128,45 +132,68 @@ def test_acl_clone(self, acl_consul):
"Description": "cloned",
},
]
acl = c.acl.list(token=master_token)
acl = c.acl.token.list(token=master_token)
assert find_recursive(acl, expected)

def test_acl_update(self, acl_consul):
def test_acl_token_update(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.update, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.update, accessor_id="unknown", token=master_token)

assert len(c.acl.list(token=master_token)) == 2
c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
c.acl.update(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
assert len(c.acl.token.list(token=master_token)) == 2
c.acl.token.create(
accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token
)
assert len(c.acl.token.list(token=master_token)) == 3
c.acl.token.update(
accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token
)
assert len(c.acl.token.list(token=master_token)) == 3

expected = {
"AccessorID": "00000000-DEAD-BEEF-0000-000000000000",
"Description": "updated",
}
acl = c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
acl = c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert find_recursive(acl, expected)

def test_acl_delete(self, acl_consul):
def test_acl_token_delete(self, acl_consul):
c, master_token, _consul_version = acl_consul

assert len(c.acl.list(token=master_token)) == 2
c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 2
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 3
assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)

# Delete and ensure it doesn't exist anymore
c.acl.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.list(token=master_token)) == 2
c.acl.token.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 2
pytest.raises(
consul.ConsulException, c.acl.read, accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token
consul.ConsulException,
c.acl.token.read,
accessor_id="00000000-DEAD-BEEF-0000-000000000000",
token=master_token,
)

def test_acl_policy_list(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Make sure both master and anonymous tokens are created
policies = c.acl.policy.list(token=master_token)
assert find_recursive(policies, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

def test_acl_policy_read(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.policy.read, uuid="unknown", token=master_token)

policy = c.acl.policy.read(uuid="00000000-0000-0000-0000-000000000001", token=master_token)
assert find_recursive(policy, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

#
# def test_acl_implicit_token_use(self, acl_consul):
# def test_acl_token_implicit_token_use(self, acl_consul):
# # configure client to use the master token by default
# port, _token, _consul_version = acl_consul
# c = consul.Consul(port=port)
Expand Down