Skip to content

Commit

Permalink
[DPE-4703] - chore: sync vm + k8s w. nodeport feature (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoppenheimer authored Aug 14, 2024
1 parent 3e6d6b3 commit 653c979
Show file tree
Hide file tree
Showing 30 changed files with 1,486 additions and 673 deletions.
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ options:
description: The maximum percentage of the total cpu, disk and network capacity that is allowed to be used on a broker. For example, a value of `0.8` ensures that no broker should have >80% utilization
type: float
default: 0.8
expose-external:
description: "String to determine how to expose the Kafka cluster externally from the Kubernetes cluster. Possible values: 'nodeport', 'none'"
type: string
default: "nodeport"
767 changes: 460 additions & 307 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ cryptography = ">42.0.0"
pydantic ="<2"
pyyaml = "^6.0.1"
requests = "^2.32.3"
lightkube = "0.15.0"
# FIXME: Unpin once rustc 1.76 is available at build time
rpds-py = "<0.19"

[tool.poetry.group.fmt]
optional = true
Expand All @@ -61,6 +64,9 @@ black = "^22.3.0"
ruff = ">=0.1.0"
codespell = ">=2.2.2"
pyright = "^1.1.301"
lightkube = "0.15.0"
# FIXME: Unpin once rustc 1.76 is available at build time
rpds-py = "<0.19"

[tool.poetry.group.unit]
optional = true
Expand All @@ -76,7 +82,7 @@ optional = true

[tool.poetry.group.integration.dependencies]
pytest = ">=7.2"
juju = "^3.4.0"
juju = "3.5.0.0"
coverage = {extras = ["toml"], version = ">7.0"}
pytest-operator = ">0.20"
kazoo = ">=2.8"
Expand Down
26 changes: 17 additions & 9 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
attrs==23.2.0 ; python_version >= "3.8" and python_version < "4.0"
anyio==4.4.0 ; python_version >= "3.8" and python_version < "4.0"
attrs==24.2.0 ; python_version >= "3.8" and python_version < "4.0"
certifi==2024.7.4 ; python_version >= "3.8" and python_version < "4.0"
cffi==1.16.0 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy"
cffi==1.17.0 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy"
charset-normalizer==3.3.2 ; python_version >= "3.8" and python_version < "4.0"
cosl==0.0.12 ; python_version >= "3.8" and python_version < "4.0"
cryptography==42.0.8 ; python_version >= "3.8" and python_version < "4.0"
cosl==0.0.20 ; python_version >= "3.8" and python_version < "4.0"
cryptography==43.0.0 ; python_version >= "3.8" and python_version < "4.0"
exceptiongroup==1.2.2 ; python_version >= "3.8" and python_version < "3.11"
h11==0.14.0 ; python_version >= "3.8" and python_version < "4.0"
httpcore==1.0.5 ; python_version >= "3.8" and python_version < "4.0"
httpx==0.27.0 ; python_version >= "3.8" and python_version < "4.0"
idna==3.7 ; python_version >= "3.8" and python_version < "4.0"
importlib-resources==6.4.0 ; python_version >= "3.8" and python_version < "3.9"
jsonschema-specifications==2023.12.1 ; python_version >= "3.8" and python_version < "4.0"
jsonschema==4.22.0 ; python_version >= "3.8" and python_version < "4.0"
jsonschema==4.23.0 ; python_version >= "3.8" and python_version < "4.0"
kazoo==2.10.0 ; python_version >= "3.8" and python_version < "4.0"
ops==2.14.1 ; python_version >= "3.8" and python_version < "4.0"
lightkube-models==1.30.0.8 ; python_version >= "3.8" and python_version < "4.0"
lightkube==0.15.0 ; python_version >= "3.8" and python_version < "4.0"
ops==2.15.0 ; python_version >= "3.8" and python_version < "4.0"
pkgutil-resolve-name==1.3.10 ; python_version >= "3.8" and python_version < "3.9"
pure-sasl==0.6.2 ; python_version >= "3.8" and python_version < "4.0"
pycparser==2.22 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy"
pydantic==1.10.17 ; python_version >= "3.8" and python_version < "4.0"
pyyaml==6.0.1 ; python_version >= "3.8" and python_version < "4.0"
pyyaml==6.0.2 ; python_version >= "3.8" and python_version < "4.0"
referencing==0.35.1 ; python_version >= "3.8" and python_version < "4.0"
requests==2.32.3 ; python_version >= "3.8" and python_version < "4.0"
rpds-py==0.18.1 ; python_version >= "3.8" and python_version < "4.0"
tenacity==8.5.0 ; python_version >= "3.8" and python_version < "4.0"
sniffio==1.3.1 ; python_version >= "3.8" and python_version < "4.0"
tenacity==9.0.0 ; python_version >= "3.8" and python_version < "4.0"
typing-extensions==4.12.2 ; python_version >= "3.8" and python_version < "4.0"
urllib3==2.2.2 ; python_version >= "3.8" and python_version < "4.0"
websocket-client==1.8.0 ; python_version >= "3.8" and python_version < "4.0"
zipp==3.19.2 ; python_version >= "3.8" and python_version < "3.9"
zipp==3.20.0 ; python_version >= "3.8" and python_version < "3.9"
68 changes: 59 additions & 9 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import os
from functools import cached_property
from ipaddress import IPv4Address, IPv6Address
from typing import TYPE_CHECKING, Any

from charms.data_platform_libs.v0.data_interfaces import (
Expand Down Expand Up @@ -44,7 +45,7 @@
SECRETS_UNIT,
SECURITY_PROTOCOL_PORTS,
ZK,
AuthMechanism,
AuthMap,
Status,
Substrates,
)
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(self, charm: "KafkaCharm", substrate: Substrates):
self.substrate: Substrates = substrate
self.roles = charm.config.roles
self.network_bandwidth = charm.config.network_bandwidth
self.config = charm.config

self.peer_app_interface = DataPeerData(self.model, relation_name=PEER)
self.peer_unit_interface = DataPeerUnitData(
Expand Down Expand Up @@ -288,6 +290,16 @@ def clients(self) -> set[KafkaClient]:

# ---- GENERAL VALUES ----

@property
def bind_address(self) -> IPv4Address | IPv6Address | str:
"""The network binding address from the peer relation."""
bind_address = None
if self.peer_relation:
if binding := self.model.get_binding(self.peer_relation):
bind_address = binding.network.bind_address

return bind_address or ""

@property
def super_users(self) -> str:
"""Generates all users with super/admin permissions for the cluster from relations.
Expand All @@ -313,13 +325,41 @@ def super_users(self) -> str:
return ";".join(super_users_arg)

@property
def port(self) -> int:
"""Return the port to be used internally."""
mechanism: AuthMechanism = "SCRAM-SHA-512"
return (
SECURITY_PROTOCOL_PORTS["SASL_SSL", mechanism].client
if (self.cluster.tls_enabled and self.unit_broker.certificate)
else SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", mechanism].client
def default_auth(self) -> AuthMap:
"""The current enabled auth.protocol for bootstrap."""
auth_protocol = (
"SASL_SSL"
if self.cluster.tls_enabled and self.unit_broker.certificate
else "SASL_PLAINTEXT"
)

# FIXME: will need updating when we support multiple concurrent security.protocols
# as this is what is sent across the relation, currently SASL only
return AuthMap(auth_protocol, "SCRAM-SHA-512")

@property
def enabled_auth(self) -> list[AuthMap]:
"""The currently enabled auth.protocols and their auth.mechanisms, based on related applications."""
enabled_auth = []
if self.client_relations or self.runs_balancer or self.peer_cluster_relation:
enabled_auth.append(self.default_auth)
if self.oauth_relation:
enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER"))
if self.cluster.mtls_enabled:
enabled_auth.append(AuthMap("SSL", "SSL"))

return enabled_auth

@property
def bootstrap_servers_external(self) -> str:
"""Comma-delimited string of `bootstrap-server` for external access."""
return ",".join(
sorted(
{
f"{broker.node_ip}:{self.unit_broker.k8s.get_bootstrap_nodeport(self.default_auth)}"
for broker in self.brokers
}
)
)

@property
Expand All @@ -332,7 +372,17 @@ def bootstrap_server(self) -> str:
if not self.peer_relation:
return ""

return ",".join(sorted([f"{broker.host}:{self.port}" for broker in self.brokers]))
if self.config.expose_external: # implicitly checks for k8s in structured_config
return self.bootstrap_servers_external

return ",".join(
sorted(
[
f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].client}"
for broker in self.brokers
]
)
)

@property
def log_dirs(self) -> str:
Expand Down
68 changes: 61 additions & 7 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import json
import logging
from functools import cached_property
from typing import MutableMapping, TypeAlias

import requests
Expand All @@ -16,11 +17,19 @@
)
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
from kazoo.client import AuthFailedError, NoNodeError
from lightkube.resources.core_v1 import Node, Pod
from ops.model import Application, Relation, Unit
from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed
from typing_extensions import override

from literals import BALANCER, BROKER, INTERNAL_USERS, SECRETS_APP, Substrates
from literals import (
BALANCER,
BROKER,
INTERNAL_USERS,
SECRETS_APP,
Substrates,
)
from managers.k8s import K8sManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -403,6 +412,10 @@ def __init__(
super().__init__(relation, data_interface, component, substrate)
self.data_interface = data_interface
self.unit = component
self.k8s = K8sManager(
pod_name=self.pod_name,
namespace=self.unit._backend.model_name,
)

@property
def unit_id(self) -> int:
Expand All @@ -413,17 +426,26 @@ def unit_id(self) -> int:
return int(self.unit.name.split("/")[1])

@property
def host(self) -> str:
"""Return the hostname of a unit."""
host = ""
def internal_address(self) -> str:
"""The address for internal communication between brokers."""
addr = ""
if self.substrate == "vm":
for key in ["hostname", "ip", "private-address"]:
if host := self.relation_data.get(key, ""):
if addr := self.relation_data.get(key, ""):
break

if self.substrate == "k8s":
host = f"{self.unit.name.split('/')[0]}-{self.unit_id}.{self.unit.name.split('/')[0]}-endpoints"
addr = f"{self.unit.name.split('/')[0]}-{self.unit_id}.{self.unit.name.split('/')[0]}-endpoints"

return addr

return host
@property
def host(self) -> str:
"""Return the hostname of a unit."""
if self.substrate == "vm":
return self.internal_address
else:
return self.node_ip or self.internal_address

# --- TLS ---

Expand Down Expand Up @@ -503,6 +525,38 @@ def rack(self) -> str:
"""The rack for the broker on broker.rack from rack.properties."""
return self.relation_data.get("rack", "")

@property
def pod_name(self) -> str:
"""The name of the K8s Pod for the unit.
K8s-only.
"""
return self.unit.name.replace("/", "-")

@cached_property
def pod(self) -> Pod:
"""The Pod of the unit.
K8s-only.
"""
return self.k8s.get_pod(pod_name=self.pod_name)

@cached_property
def node(self) -> Node:
"""The Node the unit is scheduled on.
K8s-only.
"""
return self.k8s.get_node(pod=self.pod)

@cached_property
def node_ip(self) -> str:
"""The IPV4/IPV6 IP address the Node the unit is on.
K8s-only.
"""
return self.k8s.get_node_ip(node=self.node)


class ZooKeeper(RelationState):
"""State collection metadata for a the Zookeeper relation."""
Expand Down
15 changes: 14 additions & 1 deletion src/core/structured_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from charms.data_platform_libs.v0.data_models import BaseConfigModel
from pydantic import Field, validator

from literals import BALANCER, BROKER
from literals import BALANCER, BROKER, SUBSTRATE

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,6 +78,7 @@ class CharmConfig(BaseConfigModel):
network_bandwidth: int = Field(default=50000, validate_default=False, gt=0)
cruisecontrol_balance_threshold: float = Field(default=1.1, validate_default=False, ge=1)
cruisecontrol_capacity_threshold: float = Field(default=0.8, validate_default=False, le=1)
expose_external: str | None

@validator("*", pre=True)
@classmethod
Expand Down Expand Up @@ -230,6 +231,18 @@ def profile_values(cls, value: str) -> str | None:

return value

@validator("expose_external")
@classmethod
def expose_external_validator(cls, value: str) -> str | None:
"""Check expose-external config option is only used on Kubernetes charm."""
if SUBSTRATE == "vm":
return

if value == "none":
return

return value

@validator("log_level")
@classmethod
def log_level_values(cls, value: str) -> str | None:
Expand Down
23 changes: 22 additions & 1 deletion src/core/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

"""Supporting objects for Kafka charm state."""

import re
import secrets
import string
from abc import ABC, abstractmethod

from ops.pebble import Layer

from literals import BALANCER, BROKER, Role


Expand Down Expand Up @@ -183,13 +186,31 @@ def run_bin_command(self, bin_keyword: str, bin_args: list[str], opts: list[str]
"""
...

@abstractmethod
def get_version(self) -> str:
"""Get the workload version.
Returns:
String of kafka version
"""
if not self.active:
return ""

try:
version = re.split(r"[\s\-]", self.run_bin_command("topics", ["--version"]))[0]
except: # noqa: E722
version = ""
return version

@property
@abstractmethod
def layer(self) -> Layer:
"""Gets the Pebble Layer definition for the current workload."""
...

@property
@abstractmethod
def container_can_connect(self) -> bool:
"""Flag to check if workload container can connect."""
...

@staticmethod
Expand Down
Loading

0 comments on commit 653c979

Please sign in to comment.