-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DPE-2362][DPE-2369][DPE-2374] HA: full cluster crash, full cluster restart, leader restart, leader freeze. #136
[DPE-2362][DPE-2369][DPE-2374] HA: full cluster crash, full cluster restart, leader restart, leader freeze. #136
Conversation
19a6eda
to
b3b9277
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job! The code looks good! Let's just make sure tests passes.
Some small, minor comments attached
count: int = -1 | ||
last_message: Optional[object] = None | ||
last_expected_message: int = -1 | ||
lost_messages: int = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
niptick, minor I'd rather use Optional instead of -1, or not have any default at all actually
@@ -26,6 +27,14 @@ | |||
logger = logging.getLogger(__name__) | |||
|
|||
|
|||
@dataclass | |||
class ContinuousWritesResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise I see that you have been using data-classes, I really like this and makes function return very meaninfgul object which improve readability
tests/integration/ha/ha_helpers.py
Outdated
stderr=PIPE, | ||
shell=True, | ||
universal_newlines=True, | ||
) | ||
result = TopicDescription() | ||
result.leader = int(re.search(r"Leader: (\d+)", output)[1]) | ||
result.in_sync_replicas = {int(i) for i in re.search(r"Isr: ([\d,]+)", output)[1].split(",")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
niptick, minor I believe it is a tiny bit more pythonic to use
return TopicDescription(leader=..., in_sync_replica=...)
tests/integration/ha/ha_helpers.py
Outdated
|
||
# example of topic offset output: 'test-topic:0:10' | ||
result = check_output( | ||
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'", | ||
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.get-offsets --bootstrap-server {','.join(bootstrap_servers)} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question what if kafka/0
is down? Maybe better to use kafka/leader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same situation. The CLI commands work without the main Kafka process being up. So if the whole unit is down, the test is not even meant to pass.
In short, this command is not dependent on the unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a nudge here to not rely on specific unit numbers, but instead to grab one from ops_test.model.applications[APP_NAME].units[X].name
|
||
logger.info( | ||
f"Killing broker of leader for topic '{ContinuousWrites.TOPIC_NAME}': {initial_leader_num}" | ||
) | ||
await send_control_signal( | ||
ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", kill_code="SIGKILL" | ||
ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", signal="SIGKILL" | ||
) | ||
# Give time for the service to restart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question Should we assert whether the in-sync-replica is not all units anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! I can add that here.
topic_description = await get_topic_description( | ||
ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME | ||
) | ||
initial_offsets = await get_topic_offsets(ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question shouldn't this be at the top, before killing the brokers? Probably it is the same, but I believe it would make the test a bit clearer, in the sense that we
- Set the stage and get pre-information
- Do the action (killing everything)
- do asserts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial offsets being here is intentional, as I want to check that offsets are still increasing after all brokers are back up. If I initialize before killing brokers, the offsets will still report an increase because of the time it takes to actually kill the brokers.
# your suggestion:
# producer already writing
initial_offsets = get_offsets() <---
. . . |--- # Between these two moments, offsets are still increasing,
| # so a check afterwards would always succeed
kill_all_brokers() <---
# Give time for the service to restart | ||
await asyncio.sleep(15) | ||
|
||
initial_offsets = await get_topic_offsets(ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
last_written_value
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.gitignore as in the other PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is included on gitignore. For some reason it got trough at some previous commit.
…estart, leader restart, leader freeze. (#136) * add extra HA tests * change to full acks
…estart, leader restart, leader freeze. (#136) * add extra HA tests * change to full acks
Add HA tests and fix
ContinuousWrites
implementation.Added more checks in between test stages.
Fixes:
ContinuousWrites
now doesn't reinitialize the client if there is an error. Just retries until an Exception raises.last index of messages produced == count of consumed messages + lost messages - 1