Skip to content

Commit

Permalink
[DPE-6138] Update zookeeper client lib (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Dec 11, 2024
1 parent b5d3c3b commit 1c6b435
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 54 deletions.
85 changes: 65 additions & 20 deletions lib/charms/zookeeper/v0/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 6
LIBPATCH = 8


logger = logging.getLogger(__name__)
Expand All @@ -101,6 +101,12 @@ class QuorumLeaderNotFoundError(Exception):
pass


class NoUnitFoundError(Exception):
"""Generic exception for when there are no running zk unit in the app."""

pass


class ZooKeeperManager:
"""Handler for performing ZK commands."""

Expand All @@ -114,6 +120,7 @@ def __init__(
keyfile_path: Optional[str] = "",
keyfile_password: Optional[str] = "",
certfile_path: Optional[str] = "",
read_only: bool = True,
):
self.hosts = hosts
self.username = username
Expand All @@ -123,12 +130,21 @@ def __init__(
self.keyfile_path = keyfile_path
self.keyfile_password = keyfile_password
self.certfile_path = certfile_path
self.leader = ""
self.zk_host = ""
self.read_only = read_only

try:
self.leader = self.get_leader()
except RetryError:
raise QuorumLeaderNotFoundError("quorum leader not found")
if not read_only:
try:
self.zk_host = self.get_leader()
except RetryError:
raise QuorumLeaderNotFoundError("quorum leader not found")

else:
try:
self.zk_host = self.get_any_unit()

except RetryError:
raise NoUnitFoundError

@retry(
wait=wait_fixed(3),
Expand Down Expand Up @@ -170,6 +186,35 @@ def get_leader(self) -> str:

return leader or ""

@retry(
wait=wait_fixed(3),
stop=stop_after_attempt(2),
retry=retry_if_not_result(lambda result: True if result else False),
)
def get_any_unit(self) -> str:
any_host = None
for host in self.hosts:
try:
with ZooKeeperClient(
host=host,
client_port=self.client_port,
username=self.username,
password=self.password,
use_ssl=self.use_ssl,
keyfile_path=self.keyfile_path,
keyfile_password=self.keyfile_password,
certfile_path=self.certfile_path,
) as zk:
response = zk.srvr
if response:
any_host = host
break
except KazooTimeoutError: # in the case of having a dead unit in relation data
logger.debug(f"TIMEOUT - {host}")
continue

return any_host or ""

@property
def server_members(self) -> Set[str]:
"""The current members within the ZooKeeper quorum.
Expand All @@ -179,7 +224,7 @@ def server_members(self) -> Set[str]:
e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"}
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -200,7 +245,7 @@ def config_version(self) -> int:
The zookeeper config version decoded from base16
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -221,7 +266,7 @@ def members_syncing(self) -> bool:
True if any members are syncing. Otherwise False.
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand Down Expand Up @@ -305,7 +350,7 @@ def add_members(self, members: Iterable[str]) -> None:

# specific connection to leader
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -330,7 +375,7 @@ def remove_members(self, members: Iterable[str]) -> None:
for member in members:
member_id = re.findall(r"server.([0-9]+)", member)[0]
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -356,7 +401,7 @@ def leader_znodes(self, path: str) -> Set[str]:
Set of all nested child zNodes
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -369,15 +414,15 @@ def leader_znodes(self, path: str) -> Set[str]:

return all_znode_children

def create_znode_leader(self, path: str, acls: List[ACL]) -> None:
def create_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None:
"""Creates a new zNode on the current quorum leader with given ACLs.
Args:
path: the zNode path to set
acls: the ACLs to be set on that path
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -388,15 +433,15 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None:
) as zk:
zk.create_znode(path=path, acls=acls)

def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None:
def set_acls_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None:
"""Updates ACLs for an existing zNode on the current quorum leader.
Args:
path: the zNode path to update
acls: the new ACLs to be set on that path
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -414,7 +459,7 @@ def delete_znode_leader(self, path: str) -> None:
path: the zNode path to delete
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand All @@ -432,7 +477,7 @@ def get_version(self) -> str:
String of ZooKeeper service version
"""
with ZooKeeperClient(
host=self.leader,
host=self.zk_host,
client_port=self.client_port,
username=self.username,
password=self.password,
Expand Down Expand Up @@ -577,7 +622,7 @@ def delete_znode(self, path: str) -> None:
return
self.client.delete(path, recursive=True)

def create_znode(self, path: str, acls: List[ACL]) -> None:
def create_znode(self, path: str, acls: List[ACL] | None = None) -> None:
"""Create new znode.
Args:
Expand All @@ -599,7 +644,7 @@ def get_acls(self, path: str) -> List[ACL]:

return acl_list if acl_list else []

def set_acls(self, path: str, acls: List[ACL]) -> None:
def set_acls(self, path: str, acls: List[ACL] | None = None) -> None:
"""Sets acls for a desired znode path.
Args:
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cosl==0.0.24 ; python_version >= "3.8" and python_version < "4.0"
cryptography==43.0.3 ; python_version >= "3.8" and python_version < "4.0"
exceptiongroup==1.2.2 ; python_version >= "3.8" and python_version < "3.11"
h11==0.14.0 ; python_version >= "3.8" and python_version < "4.0"
httpcore==1.0.6 ; python_version >= "3.8" and python_version < "4.0"
httpcore==1.0.7 ; python_version >= "3.8" and python_version < "4.0"
httpx==0.27.2 ; python_version >= "3.8" and python_version < "4.0"
idna==3.10 ; python_version >= "3.8" and python_version < "4.0"
importlib-resources==6.4.5 ; python_version >= "3.8" and python_version < "3.9"
Expand All @@ -20,11 +20,11 @@ ops==2.17.0 ; python_version >= "3.8" and python_version < "4.0"
pkgutil-resolve-name==1.3.10 ; python_version >= "3.8" and python_version < "3.9"
pure-sasl==0.6.2 ; python_version >= "3.8" and python_version < "4.0"
pycparser==2.22 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy"
pydantic==1.10.18 ; python_version >= "3.8" and python_version < "4.0"
pydantic==1.10.19 ; python_version >= "3.8" and python_version < "4.0"
pyyaml==6.0.2 ; python_version >= "3.8" and python_version < "4.0"
referencing==0.35.1 ; python_version >= "3.8" and python_version < "4.0"
requests==2.32.3 ; python_version >= "3.8" and python_version < "4.0"
rpds-py==0.18.1 ; python_version >= "3.8" and python_version < "4.0"
rpds-py==0.20.1 ; python_version >= "3.8" and python_version < "4.0"
sniffio==1.3.1 ; python_version >= "3.8" and python_version < "4.0"
tenacity==9.0.0 ; python_version >= "3.8" and python_version < "4.0"
typing-extensions==4.12.2 ; python_version >= "3.8" and python_version < "4.0"
Expand Down
72 changes: 49 additions & 23 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
DataPeerData,
DataPeerUnitData,
)
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
from charms.zookeeper.v0.client import (
NoUnitFoundError,
QuorumLeaderNotFoundError,
ZooKeeperManager,
)
from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError
from kazoo.exceptions import NoAuthError
from lightkube.resources.core_v1 import Node, Pod
Expand Down Expand Up @@ -712,23 +716,6 @@ def chroot(self) -> str:
or ""
)

@property
def uris(self) -> str:
"""Comma separated connection string, containing endpoints + chroot."""
if not self.relation:
return ""

return ",".join(
sorted( # sorting as they may be disordered
(
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="uris"
)
or ""
).split(",")
)
)

@property
def tls(self) -> bool:
"""Check if TLS is enabled on ZooKeeper."""
Expand Down Expand Up @@ -760,11 +747,45 @@ def zookeeper_connected(self) -> bool:

return True

@property
def hosts(self) -> list[str]:
"""Get the hosts from the databag."""
return [host.split(":")[0] for host in self.endpoints.split(",")]

@property
def uris(self):
"""Comma separated connection string, containing endpoints + chroot."""
return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}"

@property
def port(self) -> int:
"""Get the port in use from the databag.
We can extract from:
- host1:port,host2:port
- host1,host2:port
"""
try:
port = next(
iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]),
2181,
)
except IndexError:
# compatibility with older zk versions
port = 2181

return port

@property
def zookeeper_version(self) -> str:
"""Get running zookeeper version."""
hosts = self.endpoints.split(",")
zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
zk = ZooKeeperManager(
hosts=self.hosts,
client_port=self.port,
username=self.username,
password=self.password,
use_ssl=self.tls,
)

return zk.get_version()

Expand All @@ -778,16 +799,21 @@ def zookeeper_version(self) -> str:
def broker_active(self) -> bool:
"""Checks if broker id is recognised as active by ZooKeeper."""
broker_id = self.data_interface.local_unit.name.split("/")[1]
hosts = self.endpoints.split(",")
path = f"{self.database}/brokers/ids/"

zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
try:
zk = ZooKeeperManager(
hosts=self.hosts,
client_port=self.port,
username=self.username,
password=self.password,
use_ssl=self.tls,
)
brokers = zk.leader_znodes(path=path)
except (
NoNodeError,
AuthFailedError,
QuorumLeaderNotFoundError,
NoUnitFoundError,
ConnectionLoss,
NoAuthError,
) as e:
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from json.decoder import JSONDecodeError
from pathlib import Path
from subprocess import PIPE, CalledProcessError, check_output
from typing import Any, Dict, List, Optional, Set
from typing import Any, List, Optional, Set

import yaml
from charms.kafka.v0.client import KafkaClient
Expand Down Expand Up @@ -463,7 +463,7 @@ def get_provider_data(
return provider_relation_data | user_secret | tls_secret


def get_active_brokers(config: Dict) -> Set[str]:
def get_active_brokers(config: dict[str, str]) -> set[str]:
"""Gets all brokers currently connected to ZooKeeper.
Args:
Expand All @@ -473,9 +473,9 @@ def get_active_brokers(config: Dict) -> Set[str]:
Set of active broker ids
"""
chroot = config.get("database", config.get("chroot", ""))
hosts = config.get("endpoints", "").split(",")
username = config.get("username", "")
password = config.get("password", "")
hosts = [host.split(":")[0] for host in config.get("endpoints", "").split(",")]

zk = ZooKeeperManager(hosts=hosts, username=username, password=password)
path = f"{chroot}/brokers/ids/"
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm):
await asyncio.gather(
ops_test.model.deploy(
ZK_NAME,
channel="3/edge",
channel=CHANNEL,
application_name=ZK_NAME,
num_units=1,
),
Expand Down
Loading

0 comments on commit 1c6b435

Please sign in to comment.