Skip to content

Commit

Permalink
[DPE-1824] General refactoring of HA tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zmraul committed Oct 17, 2023
1 parent abe2135 commit f8767a0
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 158 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ jobs:
fail-fast: false
matrix:
tox-environments:
#- integration-charm
#- integration-provider
#- integration-scaling
- integration-charm
- integration-provider
- integration-scaling
- integration-password-rotation
#- integration-tls
#- integration-upgrade
- integration-tls
- integration-upgrade
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
4 changes: 3 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _on_update_status(self, event: EventBase) -> None:
return

# NOTE for situations like IP change and late integration with rack-awareness charm.
# If properties change, the broker will restart.
# If properties have changed, the broker will restart.
self._on_config_changed(event)

try:
Expand Down Expand Up @@ -263,6 +263,7 @@ def _on_zookeeper_created(self, event: RelationCreatedEvent) -> None:
def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
"""Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created."""
if not self.kafka_config.zookeeper_connected:
logger.debug("No information found from ZooKeeper relation")
self._set_status(Status.ZK_NO_DATA)
return

Expand Down Expand Up @@ -319,6 +320,7 @@ def _on_start(self, event: EventBase) -> None:

# start kafka service
self.snap.start_snap_service()
logger.info("Kafka snap started")

# check for connection
self._on_update_status(event)
Expand Down
52 changes: 27 additions & 25 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import logging
import os
import time
from dataclasses import dataclass
from multiprocessing import Event, Process, Queue
from types import SimpleNamespace
Expand Down Expand Up @@ -32,7 +33,6 @@
class ContinuousWritesResult:
count: int
last_expected_message: int
lost_messages: int
consumed_messages: Optional[List[ConsumerRecord]]


Expand Down Expand Up @@ -126,21 +126,17 @@ def stop(self) -> ContinuousWritesResult:

# messages count
consumed_messages = self.consumed_messages()
count = len(consumed_messages)
message_list = [record.value.decode() for record in consumed_messages]
count = len(set(message_list))

# last expected message stored on disk
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
last_expected_message, lost_messages = map(
int, f.read().rstrip().split(",", maxsplit=2)
)
last_expected_message = int(f.read().strip())

logger.info(
f"\n\nSTOP RESULTS:\n\t- Count: {count}\n\t- Last expected message: {last_expected_message}\n\t- Lost messages: {lost_messages}\n\t- Consumed messages: {message_list}\n"
)
return ContinuousWritesResult(
count, last_expected_message, lost_messages, consumed_messages
f"\n\nSTOP RESULTS:\n\t- Count: {count}\n\t- Last expected message: {last_expected_message}\n\t- Consumed messages: {message_list}\n"
)
return ContinuousWritesResult(count, last_expected_message, consumed_messages)

def _create_process(self):
self._is_stopped = False
Expand Down Expand Up @@ -192,7 +188,6 @@ def _client():
)

write_value = starting_number
lost_messages = 0
client = _client()

while True:
Expand All @@ -201,30 +196,37 @@ def _client():
client.close()
client = _client()

try:
client.produce_message(
topic_name=ContinuousWrites.TOPIC_NAME,
message_content=str(write_value),
timeout=300,
)
await asyncio.sleep(0.1)
except KafkaError as e:
logger.error(f"Error on 'Message #{write_value}' Kafka Producer: {e}")
lost_messages += 1
finally:
# process termination requested
if event.is_set():
break
ContinuousWrites._produce_message(client=client, write_value=write_value)
await asyncio.sleep(0.1)

# process termination requested
if event.is_set():
break

write_value += 1

# write last expected written value on disk
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "w") as f:
f.write(f"{str(write_value)},{str(lost_messages)}")
f.write(f"{str(write_value)}")
os.fsync(f)

client.close()

@staticmethod
def _produce_message(client: KafkaClient, write_value: int) -> None:
ackd = False
while not ackd:
try:
client.produce_message(
topic_name=ContinuousWrites.TOPIC_NAME,
message_content=str(write_value),
timeout=300,
)
ackd = True
except KafkaError as e:
logger.error(f"Error on 'Message #{write_value}' Kafka Producer: {e}")
time.sleep(0.1)

@staticmethod
def _run_async(event: Event, data_queue: Queue, starting_number: int):
"""Run async code."""
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/ha/ha_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,5 @@ def is_up(ops_test: OpsTest, broker_id: int) -> bool:
def assert_continuous_writes_consistency(result: ContinuousWritesResult):
"""Check results of a stopped ContinuousWrites call against expected results."""
assert (
result.count + result.lost_messages - 1 == result.last_expected_message
), f"Last expected message {result.last_expected_message} doesn't match count {result.count} + lost_messages {result.lost_messages}"
result.count - 1 == result.last_expected_message
), f"Last expected message {result.last_expected_message} doesn't match count {result.count}"
Loading

0 comments on commit f8767a0

Please sign in to comment.