From f9348ee6ed5251b22ebe49412ceee84b81e70634 Mon Sep 17 00:00:00 2001 From: Cedric Paillet Date: Mon, 13 May 2024 08:14:57 +0000 Subject: [PATCH] Refactor consul.acl Refactor consul.acl as consul.acl.token in preparation for upcoming review and eventual integration of consul.acl.policies. --- consul/api/acl.py | 5 +++ tests/api/test_acl.py | 82 +++++++++++++++++++++---------------------- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/consul/api/acl.py b/consul/api/acl.py index 017b04a..cba328c 100644 --- a/consul/api/acl.py +++ b/consul/api/acl.py @@ -4,6 +4,11 @@ class ACL: + def __init__(self, agent): + self.agent = agent + self.token = Token(self, agent) + +class Token: def __init__(self, agent): self.agent = agent diff --git a/tests/api/test_acl.py b/tests/api/test_acl.py index 35cc0db..f435b51 100644 --- a/tests/api/test_acl.py +++ b/tests/api/test_acl.py @@ -10,37 +10,37 @@ def test_acl_permission_denied(self, acl_consul): c = consul.Consul(port=port) # No token - pytest.raises(consul.ACLPermissionDenied, c.acl.list) - pytest.raises(consul.ACLPermissionDenied, c.acl.create) - pytest.raises(consul.ACLPermissionDenied, c.acl.update, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.clone, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.read, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.delete, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.list) + pytest.raises(consul.ACLPermissionDenied, c.acl.token.create) + pytest.raises(consul.ACLPermissionDenied, c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002") # Token without the right permission (acl:write or acl:read) - pytest.raises(consul.ACLPermissionDenied, c.acl.list, token="anonymous") - pytest.raises(consul.ACLPermissionDenied, c.acl.create, token="anonymous") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.list, token="anonymous") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.create, token="anonymous") pytest.raises( consul.ACLPermissionDenied, - c.acl.update, + c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.clone, + c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.read, + c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.delete, + c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) @@ -50,7 +50,7 @@ def test_acl_list(self, acl_consul): c = consul.Consul(port=port) # Make sure both master and anonymous tokens are created - acls = c.acl.list(token=master_token) + acls = c.acl.token.list(token=master_token) master_token_repr = { "Description": "Initial Management Token", @@ -69,30 +69,30 @@ def test_acl_read(self, acl_consul): c = consul.Consul(port=port) # Unknown token - pytest.raises(consul.ConsulException, c.acl.read, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.read, accessor_id="unknown", token=master_token) anonymous_token_repr = { "AccessorID": "00000000-0000-0000-0000-000000000002", "SecretID": "anonymous", } - acl = c.acl.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token) + acl = c.acl.token.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token) assert find_recursive(acl, anonymous_token_repr) def test_acl_create(self, acl_consul): port, master_token, _consul_version = acl_consul c = consul.Consul(port=port) - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - c.acl.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token) - c.acl.create( + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + c.acl.token.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token) + c.acl.token.create( secret_id="00000000-A5A5-0000-0000-000000000000", accessor_id="00000000-0000-A5A5-0000-000000000000", description="some token!", token=master_token, ) - assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert c.acl.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token) + assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert c.acl.token.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token) expected = [ { @@ -109,21 +109,21 @@ def test_acl_create(self, acl_consul): "Description": "some token!", }, ] - acl = c.acl.list(token=master_token) + acl = c.acl.token.list(token=master_token) assert find_recursive(acl, expected) def test_acl_clone(self, acl_consul): port, master_token, _consul_version = acl_consul c = consul.Consul(port=port) - assert len(c.acl.list(token=master_token)) == 2 + assert len(c.acl.token.list(token=master_token)) == 2 # Unknown token - pytest.raises(consul.ConsulException, c.acl.clone, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.clone, accessor_id="unknown", token=master_token) - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - c.acl.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token) - assert len(c.acl.list(token=master_token)) == 4 + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + c.acl.token.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 4 expected = [ { @@ -133,7 +133,7 @@ def test_acl_clone(self, acl_consul): "Description": "cloned", }, ] - acl = c.acl.list(token=master_token) + acl = c.acl.token.list(token=master_token) assert find_recursive(acl, expected) def test_acl_update(self, acl_consul): @@ -141,35 +141,35 @@ def test_acl_update(self, acl_consul): c = consul.Consul(port=port) # Unknown token - pytest.raises(consul.ConsulException, c.acl.update, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.update, accessor_id="unknown", token=master_token) - assert len(c.acl.list(token=master_token)) == 2 - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 - c.acl.update(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 + assert len(c.acl.token.list(token=master_token)) == 2 + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 3 + c.acl.token.update(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 3 expected = { "AccessorID": "00000000-DEAD-BEEF-0000-000000000000", "Description": "updated", } - acl = c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + acl = c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) assert find_recursive(acl, expected) def test_acl_delete(self, acl_consul): port, master_token, _consul_version = acl_consul c = consul.Consul(port=port) - assert len(c.acl.list(token=master_token)) == 2 - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 - assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 2 + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 3 + assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) # Delete and ensure it doesn't exist anymore - c.acl.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert len(c.acl.list(token=master_token)) == 2 + c.acl.token.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 2 pytest.raises( - consul.ConsulException, c.acl.read, accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token + consul.ConsulException, c.acl.token.read, accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token ) #