Skip to content

Commit

Permalink
[DPE-4951] - fix: re-enable prefixed topic names during relations (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoppenheimer authored Aug 16, 2024
1 parent 653c979 commit fc6a6a5
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 7 deletions.
19 changes: 17 additions & 2 deletions src/managers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,20 @@ def add_acl(
]

if resource_type == "TOPIC":
command += [f"--topic={resource_name}"]
if len(resource_name) > 3 and resource_name.endswith("*"):
pattern = "PREFIXED"
resource_name = resource_name[:-1]
else:
pattern = "LITERAL"

command += [f"--topic={resource_name}", f"--resource-pattern-type={pattern}"]

if resource_type == "GROUP":
command += [
f"--group={resource_name}",
"--resource-pattern-type=PREFIXED",
]
logger.info(f"CREATE ACL - {command}")
self.workload.run_bin_command(bin_keyword="acls", bin_args=command, opts=[self.log4j_opts])

def remove_acl(
Expand Down Expand Up @@ -263,7 +271,14 @@ def remove_acl(
]

if resource_type == "TOPIC":
command += [f"--topic={resource_name}"]
if len(resource_name) > 3 and resource_name.endswith("*"):
pattern = "PREFIXED"
resource_name = resource_name[:-1]
else:
pattern = "LITERAL"

command += [f"--topic={resource_name}", f"--resource-pattern-type={pattern}"]

if resource_type == "GROUP":
command += [
f"--group={resource_name}",
Expand Down
7 changes: 7 additions & 0 deletions tests/integration/app-charm/actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ get-offsets:
bootstrap-server:
type: string
description: The address for mtls Kafka

create-topic:
description: Attempts the configured topic
params:
bootstrap-server:
type: string
description: The address for SASL_PLAINTEXT Kafka
6 changes: 6 additions & 0 deletions tests/integration/app-charm/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
options:
topic-name:
description: |
The topic-name to request when relating to the Kafka application
type: string
default: test-topic
87 changes: 83 additions & 4 deletions tests/integration/app-charm/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ZK = "zookeeper"
CONSUMER_GROUP_PREFIX = "test-prefix"
SNAP_PATH = "/var/snap/charmed-kafka/current/etc/kafka"
CHARMED_KAFKA_SNAP_REVISION = 19
DEFAULT_TOPIC_NAME = "test-topic"


class ApplicationCharm(CharmBase):
Expand All @@ -42,19 +42,27 @@ def __init__(self, *args):
super().__init__(*args)
self.name = CHARM_KEY

if self.config["topic-name"] != DEFAULT_TOPIC_NAME:
self.topic_name = self.config["topic-name"]
else:
self.topic_name = DEFAULT_TOPIC_NAME

self.framework.observe(getattr(self.on, "start"), self._on_start)
self.kafka_requirer_consumer = KafkaRequires(
self,
relation_name=REL_NAME_CONSUMER,
topic="test-topic",
topic=self.topic_name,
extra_user_roles="consumer",
consumer_group_prefix=CONSUMER_GROUP_PREFIX,
)
self.kafka_requirer_producer = KafkaRequires(
self, relation_name=REL_NAME_PRODUCER, topic="test-topic", extra_user_roles="producer"
self,
relation_name=REL_NAME_PRODUCER,
topic=self.topic_name,
extra_user_roles="producer",
)
self.kafka_requirer_admin = KafkaRequires(
self, relation_name=REL_NAME_ADMIN, topic="test-topic", extra_user_roles="admin"
self, relation_name=REL_NAME_ADMIN, topic=self.topic_name, extra_user_roles="admin"
)
self.framework.observe(
self.kafka_requirer_consumer.on.topic_created, self.on_topic_created_consumer
Expand All @@ -73,6 +81,7 @@ def __init__(self, *args):
getattr(self.on, "run_mtls_producer_action"), self.run_mtls_producer
)
self.framework.observe(getattr(self.on, "get_offsets_action"), self.get_offsets)
self.framework.observe(getattr(self.on, "create_topic_action"), self.create_topic)

def _on_start(self, _) -> None:
self.unit.status = ActiveStatus()
Expand All @@ -82,14 +91,41 @@ def _log(self, _: RelationEvent):

def on_topic_created_consumer(self, event: TopicCreatedEvent):
logger.info(f"{event.username} {event.password} {event.bootstrap_server} {event.tls}")
if not (peer_relation := self.model.get_relation("cluster")):
event.defer()
return

peer_relation.data[self.app]["username"] = event.username or ""
peer_relation.data[self.app]["password"] = event.password or ""
peer_relation.data[self.app]["bootstrap-server"] = event.bootstrap_server or ""
peer_relation.data[self.app]["tls"] = event.tls or ""

return

def on_topic_created_producer(self, event: TopicCreatedEvent):
logger.info(f"{event.username} {event.password} {event.bootstrap_server} {event.tls}")
if not (peer_relation := self.model.get_relation("cluster")):
event.defer()
return

peer_relation.data[self.app]["username"] = event.username or ""
peer_relation.data[self.app]["password"] = event.password or ""
peer_relation.data[self.app]["bootstrap-server"] = event.bootstrap_server or ""
peer_relation.data[self.app]["tls"] = event.tls or ""

return

def on_topic_created_admin(self, event: TopicCreatedEvent):
logger.info(f"{event.username} {event.password} {event.bootstrap_server} {event.tls}")
if not (peer_relation := self.model.get_relation("cluster")):
event.defer()
return

peer_relation.data[self.app]["username"] = event.username or ""
peer_relation.data[self.app]["password"] = event.password or ""
peer_relation.data[self.app]["bootstrap-server"] = event.bootstrap_server or ""
peer_relation.data[self.app]["tls"] = event.tls or ""

return

def _install_packages(self):
Expand Down Expand Up @@ -318,6 +354,49 @@ def get_offsets(self, event: ActionEvent):

return

def create_topic(self, event: ActionEvent) -> None:
logger.info("creating client.properties file")
if not (peer_relation := self.model.get_relation("cluster")):
event.fail("No peer relation")
return

self._install_packages()

username = peer_relation.data[self.app]["username"]
password = peer_relation.data[self.app]["password"]
bootstrap_server = peer_relation.data[self.app]["bootstrap-server"]

properties = [
f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";',
"sasl.mechanism=SCRAM-SHA-512",
"security.protocol=SASL_PLAINTEXT",
f"bootstrap.servers={bootstrap_server}",
]

safe_write_to_file(
content="\n".join(properties),
path=f"{SNAP_PATH}/client.properties",
)

logger.info("creating topic")
try:
subprocess.check_output(
# if requested acls for prefixed `test-*`, should be able to create `test-topic`
f"charmed-kafka.topics --bootstrap-server {bootstrap_server} --topic={DEFAULT_TOPIC_NAME} --create --command-config client.properties",
stderr=subprocess.STDOUT,
shell=True,
universal_newlines=True,
cwd=SNAP_PATH,
)
event.set_results({"success": "TRUE"})

except subprocess.CalledProcessError as e:
logger.exception(vars(e))
event.set_results({"success": "FALSE"})
event.fail()

return


if __name__ == "__main__":
main(ApplicationCharm)
27 changes: 26 additions & 1 deletion tests/integration/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ZK = "zookeeper"
DUMMY_NAME_1 = "app"
DUMMY_NAME_2 = "appii"
DUMMY_NAME_3 = "prefix-app"
TLS_NAME = "self-signed-certificates"

REL_NAME_CONSUMER = "kafka-client-consumer"
Expand Down Expand Up @@ -53,7 +54,7 @@ async def test_deploy_charms_relate_active(

# async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, DUMMY_NAME_1, ZK], idle_period=30, status="active"
apps=[APP_NAME, DUMMY_NAME_1, ZK], idle_period=30, status="active", timeout=1000
)

usernames.update(get_client_usernames(ops_test))
Expand Down Expand Up @@ -215,6 +216,30 @@ async def test_admin_removed_from_super_users(ops_test: OpsTest):
await ops_test.model.remove_application(DUMMY_NAME_2, block_until_done=True)


@pytest.mark.abort_on_fail
async def test_prefixed_topic_creation(ops_test: OpsTest, app_charm):
await asyncio.gather(
ops_test.model.deploy(
app_charm,
application_name=DUMMY_NAME_3,
num_units=1,
series="jammy",
config={"topic-name": "test-*"},
)
)
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME_3}:{REL_NAME_PRODUCER}")

async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, DUMMY_NAME_3, ZK], idle_period=30, status="active"
)

action = await ops_test.model.units.get(f"{DUMMY_NAME_3}/0").run_action("create-topic")
response = await action.wait()

assert response.results.get("success", None) == "TRUE"


@pytest.mark.abort_on_fail
async def test_connection_updated_on_tls_enabled(ops_test: OpsTest, app_charm):
"""Test relation when TLS is enabled."""
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,45 @@ def test_add_user_adds_zk_tls_flag(harness: Harness[KafkaCharm]):
in args["bin_args"]
), "--zk-tls-config-file flag not found"
assert "--zookeeper=" in args["bin_args"], "--zookeeper flag not found"


def test_prefixed_acls(harness: Harness[KafkaCharm]):
"""Checks the requirements for adding and removing PREFIXED ACLs."""
with patch("workload.KafkaWorkload.run_bin_command") as patched_run_bin:
for func in [
harness.charm.broker.auth_manager.add_acl,
harness.charm.broker.auth_manager.remove_acl,
]:
func(
username="bilbo",
operation="WRITE",
resource_type="TOPIC",
resource_name="there-and-back-again",
)
func(
username="bilbo",
operation="WRITE",
resource_type="TOPIC",
resource_name="there-and-back-*",
)
func(username="bilbo", operation="WRITE", resource_type="TOPIC", resource_name="??*")

assert (
"--resource-pattern-type=LITERAL"
in patched_run_bin.call_args_list[0].kwargs["bin_args"]
)

assert (
"--resource-pattern-type=PREFIXED"
in patched_run_bin.call_args_list[1].kwargs["bin_args"]
)

# checks that the prefixed topic removes the '*' char from the end
assert (
"--topic=there-and-back-" in patched_run_bin.call_args_list[1].kwargs["bin_args"]
)

assert (
"--resource-pattern-type=LITERAL"
in patched_run_bin.call_args_list[2].kwargs["bin_args"]
)

0 comments on commit fc6a6a5

Please sign in to comment.