Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2362][DPE-2369][DPE-2374] HA: full cluster crash, full cluster restart, leader restart, leader freeze. #136

Merged
merged 17 commits into from
Sep 27, 2023
10 changes: 5 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ jobs:
fail-fast: false
matrix:
tox-environments:
- integration-charm
- integration-provider
- integration-scaling
#- integration-charm
#- integration-provider
#- integration-scaling
- integration-password-rotation
- integration-tls
#- integration-tls
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down Expand Up @@ -132,4 +132,4 @@ jobs:
- name: Run integration tests
run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}'
env:
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}
1 change: 0 additions & 1 deletion last_written_value

This file was deleted.

16 changes: 8 additions & 8 deletions lib/charms/kafka/v0/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import KafkaError

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
Expand Down Expand Up @@ -160,6 +159,9 @@ def _producer_client(self) -> KafkaProducer:
ssl_certfile=self.certfile_path if self.ssl else None,
ssl_keyfile=self.keyfile_path if self.mtls else None,
api_version=KafkaClient.API_VERSION if self.mtls else None,
acks="all",
retries=5,
retry_backoff_ms=1000,
)

@cached_property
Expand Down Expand Up @@ -246,16 +248,14 @@ def produce_message(self, topic_name: str, message_content: str) -> None:
Args:
topic_name: the topic to send messages to
message_content: the content of the message to send

Raises:
KafkaTimeoutError, KafkaError (general)
"""
item_content = f"Message #{message_content}"
future = self._producer_client.send(topic_name, str.encode(item_content))
try:
future.get(timeout=60)
logger.info(
f"Message published to topic={topic_name}, message content: {item_content}"
)
except KafkaError as e:
logger.error(f"Error producing message {message_content} to topic {topic_name}: {e}")
future.get(timeout=30)
logger.debug(f"Message published to topic={topic_name}, message content: {item_content}")

def close(self) -> None:
"""Close the connection to the client."""
Expand Down
51 changes: 30 additions & 21 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
import asyncio
import logging
import os
from dataclasses import dataclass
from multiprocessing import Event, Process, Queue
from types import SimpleNamespace
from typing import List, Optional

from charms.kafka.v0.client import KafkaClient
from kafka.admin import NewTopic
from kafka.errors import KafkaTimeoutError
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Retrying,
retry,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_random,
)
Expand All @@ -26,6 +28,14 @@
logger = logging.getLogger(__name__)


@dataclass
class ContinuousWritesResult:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise I see that you have been using data-classes, I really like this and makes function return very meaninfgul object which improve readability

count: int
last_expected_message: int
lost_messages: int
consumed_messages: Optional[List[ConsumerRecord]]


class ContinuousWrites:
"""Utility class for managing continuous writes."""

Expand Down Expand Up @@ -80,7 +90,7 @@ def clear(self) -> None:
finally:
client.close()

def consumed_messages(self) -> list | None:
def consumed_messages(self) -> List[ConsumerRecord] | None:
"""Consume the messages in the topic."""
client = self._client()
try:
Expand All @@ -90,6 +100,7 @@ def consumed_messages(self) -> list | None:
# FIXME: loading whole list of consumed messages into memory might not be the best idea
return list(client.messages())
except RetryError:
logger.error("Could not get consumed messages from Kafka.")
return []
finally:
client.close()
Expand All @@ -108,30 +119,28 @@ def _create_replicated_topic(self):
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def stop(self) -> SimpleNamespace:
def stop(self) -> ContinuousWritesResult:
"""Stop the continuous writes process and return max inserted ID."""
if not self._is_stopped:
self._stop_process()

result = SimpleNamespace()

# messages count
consumed_messages = self.consumed_messages()
result.count = len(consumed_messages)
result.last_message = consumed_messages[-1]
count = len(consumed_messages)
message_list = [record.value.decode() for record in consumed_messages]

# last expected message stored on disk
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)):
with attempt:
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
result.last_expected_message, result.lost_messages = (
f.read().rstrip().split(",", maxsplit=2)
)
except RetryError:
result.last_expected_message = result.lost_messages = -1
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
last_expected_message, lost_messages = map(
int, f.read().rstrip().split(",", maxsplit=2)
)

return result
logger.info(
f"\n\nSTOP RESULTS:\n\t- Count: {count}\n\t- Last expected message: {last_expected_message}\n\t- Lost messages: {lost_messages}\n\t- Consumed messages: {message_list}\n"
)
return ContinuousWritesResult(
count, last_expected_message, lost_messages, consumed_messages
)

def _create_process(self):
self._is_stopped = False
Expand Down Expand Up @@ -196,9 +205,9 @@ def _client():
client.produce_message(
topic_name=ContinuousWrites.TOPIC_NAME, message_content=str(write_value)
)
except KafkaTimeoutError:
client.close()
client = _client()
await asyncio.sleep(0.1)
except KafkaError as e:
logger.error(f"Error on 'Message #{write_value}' Kafka Producer: {e}")
lost_messages += 1
finally:
# process termination requested
Expand Down
72 changes: 54 additions & 18 deletions tests/integration/ha/ha_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@
# See LICENSE file for licensing details.
import logging
import re
from dataclasses import dataclass
from subprocess import PIPE, check_output

from pytest_operator.plugin import OpsTest

from integration.helpers import APP_NAME, get_address
from integration.ha.continuous_writes import ContinuousWritesResult
from integration.helpers import APP_NAME, get_address, get_kafka_zk_relation_data
from literals import SECURITY_PROTOCOL_PORTS
from snap import KafkaSnap
from utils import get_active_brokers

PROCESS = "kafka.Kafka"
SERVICE_DEFAULT_PATH = "/etc/systemd/system/snap.charmed-kafka.daemon.service"


logger = logging.getLogger(__name__)


@dataclass
class TopicDescription:
leader: int
in_sync_replicas: set


class ProcessError(Exception):
"""Raised when a process fails."""

Expand All @@ -25,44 +35,52 @@ class ProcessRunningError(Exception):
"""Raised when a process is running when it is not expected to be."""


async def get_topic_leader(ops_test: OpsTest, topic: str) -> int:
async def get_topic_description(ops_test: OpsTest, topic: str) -> TopicDescription:
"""Get the broker with the topic leader.

Args:
ops_test: OpsTest utility class
topic: the desired topic to check
"""
bootstrap_server = (
await get_address(ops_test=ops_test)
+ f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}"
)
bootstrap_servers = []
for unit in ops_test.model.applications[APP_NAME].units:
bootstrap_servers.append(
await get_address(ops_test=ops_test, unit_num=unit.name.split("/")[-1])
+ f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}"
)
unit_name = ops_test.model.applications[APP_NAME].units[0].name

result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --describe --topic {topic}'",
output = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.topics --bootstrap-server {','.join(bootstrap_servers)} --command-config {KafkaSnap.CONF_PATH}/client.properties --describe --topic {topic}'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return re.search(r"Leader: (\d+)", result)[1]
leader = int(re.search(r"Leader: (\d+)", output)[1])
in_sync_replicas = {int(i) for i in re.search(r"Isr: ([\d,]+)", output)[1].split(",")}

return TopicDescription(leader, in_sync_replicas)


async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> list[str]:
async def get_topic_offsets(ops_test: OpsTest, topic: str) -> list[str]:
"""Get the offsets of a topic on a unit.

Args:
ops_test: OpsTest utility class
topic: the desired topic to check
unit_name: unit to check the offsets on
"""
bootstrap_server = (
await get_address(ops_test=ops_test)
+ f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}"
)
bootstrap_servers = []
for unit in ops_test.model.applications[APP_NAME].units:
bootstrap_servers.append(
await get_address(ops_test=ops_test, unit_num=unit.name.split("/")[-1])
+ f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}"
)
unit_name = ops_test.model.applications[APP_NAME].units[0].name

# example of topic offset output: 'test-topic:0:10'
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'",
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {','.join(bootstrap_servers)} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'",
stderr=PIPE,
shell=True,
universal_newlines=True,
Expand All @@ -72,13 +90,13 @@ async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> li


async def send_control_signal(
ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME
ops_test: OpsTest, unit_name: str, signal: str, app_name: str = APP_NAME
) -> None:
if len(ops_test.model.applications[app_name].units) < 3:
await ops_test.model.applications[app_name].add_unit(count=1)
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)

kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}"
kill_cmd = f"exec --unit {unit_name} -- pkill --signal {signal} -f {PROCESS}"
return_code, stdout, stderr = await ops_test.juju(*kill_cmd.split())

if return_code != 0:
Expand Down Expand Up @@ -114,3 +132,21 @@ async def remove_restart_delay(ops_test: OpsTest, unit_name: str) -> None:
# reload the daemon for systemd to reflect changes
reload_cmd = f"exec --unit {unit_name} -- sudo systemctl daemon-reload"
await ops_test.juju(*reload_cmd.split(), check=True)


def is_up(ops_test: OpsTest, broker_id: int) -> bool:
"""Return if node up."""
unit_name = ops_test.model.applications[APP_NAME].units[0].name
kafka_zk_relation_data = get_kafka_zk_relation_data(
unit_name=unit_name, model_full_name=ops_test.model_full_name
)
active_brokers = get_active_brokers(zookeeper_config=kafka_zk_relation_data)
chroot = kafka_zk_relation_data.get("chroot", "")
return f"{chroot}/brokers/ids/{broker_id}" in active_brokers


def assert_continuous_writes_consistency(result: ContinuousWritesResult):
"""Check results of a stopped ContinuousWrites call against expected results."""
assert (
result.count + result.lost_messages - 1 == result.last_expected_message
), f"Last expected message {result.last_expected_message} doesn't match count {result.count} + lost_messages {result.lost_messages}"
Loading