Skip to content

Commit

Permalink
Merge branch 'main' into avro_references2
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Nov 10, 2024
2 parents a1f659f + ba66344 commit ba4d640
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 12 deletions.
12 changes: 10 additions & 2 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,20 @@ def _handle_restore_topic(
instruction: RestoreTopic,
config: Config,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
if skip_topic_creation:
return
repl_factor = instruction.replication_factor
if override_replication_factor is not None:
LOG.info(
"Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor
)
repl_factor = override_replication_factor
if not _maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=instruction.replication_factor,
replication_factor=repl_factor,
topic_configs=instruction.topic_configs,
):
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
Expand Down Expand Up @@ -426,6 +433,7 @@ def restore_backup(
backup_location: ExistingFile,
topic_name: TopicName,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
"""Restores a backup from the specified location into the configured topic.
Expand Down Expand Up @@ -475,7 +483,7 @@ def _check_producer_exception() -> None:
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, RestoreTopic):
_handle_restore_topic(instruction, config, skip_topic_creation)
_handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, ProducerSend):
if producer is None:
Expand Down
10 changes: 10 additions & 0 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace:
),
)

parser_restore.add_argument(
"--override-replication-factor",
help=(
"Override the replication factor that is save in the backup. This is needed when restoring a backup from a"
"downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups."
),
type=int,
)

return parser.parse_args()


Expand Down Expand Up @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None:
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
override_replication_factor=args.override_replication_factor,
)
except BackupDataRestorationError:
traceback.print_exc()
Expand Down
20 changes: 17 additions & 3 deletions src/karapace/protobuf/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,31 @@ def _deserialize_msg(msgtype: Any) -> MessageElement:
for nested_enum in msgtype.enum_type:
nested_types.append(_deserialize_enum(nested_enum))

one_ofs: list[OneOfElement] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl]
one_ofs: list[OneOfElement | None] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl]

for f in msgtype.field:
sf = _deserialize_field(f)
if f.HasField("oneof_index"):
is_oneof = f.HasField("oneof_index")
is_proto3_optional = f.HasField("oneof_index") and f.HasField("proto3_optional") and f.proto3_optional
if is_proto3_optional:
# Every proto3 optional field is placed into a one-field oneof, called a "synthetic" oneof,
# as it was not present in the source .proto file.
# This will make sure that we don't interpret those optionals as oneof.
one_ofs[f.oneof_index] = None
fields.append(sf)
elif is_oneof:
one_ofs[f.oneof_index].fields.append(sf)
else:
fields.append(sf)

one_ofs_filtered: list[OneOfElement] = [oneof for oneof in one_ofs if oneof is not None]
return MessageElement(
DEFAULT_LOCATION, msgtype.name, nested_types=nested_types, reserveds=reserveds, fields=fields, one_ofs=one_ofs
DEFAULT_LOCATION,
msgtype.name,
nested_types=nested_types,
reserveds=reserveds,
fields=fields,
one_ofs=one_ofs_filtered,
)


Expand Down
65 changes: 58 additions & 7 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from aiokafka.errors import UnknownTopicOrPartitionError
from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError
from collections.abc import Iterator
from confluent_kafka import Message, TopicPartition
from confluent_kafka.admin import NewTopic
Expand Down Expand Up @@ -119,14 +119,13 @@ def test_roundtrip_from_kafka_state(
admin_client.update_topic_config(new_topic.topic, {"max.message.bytes": "999"})

# Populate topic.
producer.send(
first_record_fut = producer.send(
new_topic.topic,
key=b"bar",
value=b"foo",
partition=0,
timestamp=1683474641,
)
producer.send(
second_record_fut = producer.send(
new_topic.topic,
key=b"foo",
value=b"bar",
Expand All @@ -135,10 +134,12 @@ def test_roundtrip_from_kafka_state(
("some-header", b"some header value"),
("other-header", b"some other header value"),
],
timestamp=1683474657,
)
producer.flush()

first_message_timestamp = first_record_fut.result(timeout=5).timestamp()[1]
second_message_timestamp = second_record_fut.result(timeout=5).timestamp()[1]

topic_config = get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG})

# Execute backup creation.
Expand Down Expand Up @@ -212,7 +213,7 @@ def test_roundtrip_from_kafka_state(
# Note: This might be unreliable due to not using idempotent producer, i.e. we have
# no guarantee against duplicates currently.
assert first_record.offset() == 0
assert first_record.timestamp()[1] == 1683474641
assert first_record.timestamp()[1] == first_message_timestamp
assert first_record.timestamp()[0] == Timestamp.CREATE_TIME
assert first_record.key() == b"bar"
assert first_record.value() == b"foo"
Expand All @@ -223,7 +224,7 @@ def test_roundtrip_from_kafka_state(
assert second_record.topic() == new_topic.topic
assert second_record.partition() == partition
assert second_record.offset() == 1
assert second_record.timestamp()[1] == 1683474657
assert second_record.timestamp()[1] == second_message_timestamp
assert second_record.timestamp()[0] == Timestamp.CREATE_TIME
assert second_record.key() == b"foo"
assert second_record.value() == b"bar"
Expand Down Expand Up @@ -697,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_override_replication_factor(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
producer: KafkaProducer,
new_topic: NewTopic,
) -> None:
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic
metadata_path = backup_directory / f"{new_topic.topic}.metadata"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

# pupulate the topic and create a backup
for i in range(10):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()
api.create_backup(
config=config,
backup_location=backup_directory,
topic_name=TopicName(new_topic.topic),
version=BackupVersion.V3,
replication_factor=6,
)

# make sure topic doesn't exist beforehand.
_delete_topic(admin_client, new_topic.topic)

# assert that the restore would fail without the replication factor override
with pytest.raises(InvalidReplicationFactorError):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
)

# finally restore the backup with override
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
override_replication_factor=1,
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down
18 changes: 18 additions & 0 deletions tests/schemas/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,21 @@
"lzdGVyLk1ldGFkYXRhEhYKDmNvbXBhbnlfbnVtYmVyGAIgASgJGhYKCE1ldGFkYXRhEgoK"
"AmlkGAEgASgJYgZwcm90bzM="
)

schema_protobuf_optionals_bin = (
"Cgp0ZXN0LnByb3RvIqYBCgpEaW1lbnNpb25zEhEKBHNpemUYASABKAFIAIgBARISCgV3aWR0aBgCIAEoAUgBiAEBEhMKBmhlaWdodBgDIAEo"
+ "AUgCiAEBEhMKBmxlbmd0aBgEIAEoAUgDiAEBEhMKBndlaWdodBgFIAEoAUgEiAEBQgcKBV9zaXplQggKBl93aWR0aEIJCgdfaGVpZ2h0Qg"
+ "kKB19sZW5ndGhCCQoHX3dlaWdodGIGcHJvdG8z"
)

schema_protobuf_optionals = """\
syntax = "proto3";
message Dimensions {
optional double size = 1;
optional double width = 2;
optional double height = 3;
optional double length = 4;
optional double weight = 5;
}
"""
4 changes: 4 additions & 0 deletions tests/unit/test_protobuf_binary_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
schema_protobuf_nested_message4_bin_protoc,
schema_protobuf_oneof,
schema_protobuf_oneof_bin,
schema_protobuf_optionals,
schema_protobuf_optionals_bin,
schema_protobuf_order_after,
schema_protobuf_order_after_bin,
schema_protobuf_plain,
Expand Down Expand Up @@ -89,6 +91,7 @@
(schema_protobuf_references, schema_protobuf_references_bin),
(schema_protobuf_references2, schema_protobuf_references2_bin),
(schema_protobuf_complex, schema_protobuf_complex_bin),
(schema_protobuf_optionals, schema_protobuf_optionals_bin),
],
)
def test_schema_deserialize(schema_plain, schema_serialized):
Expand Down Expand Up @@ -125,6 +128,7 @@ def test_protoc_serialized_schema_deserialize(schema_plain, schema_serialized):
schema_protobuf_references,
schema_protobuf_references2,
schema_protobuf_complex,
schema_protobuf_optionals,
],
)
def test_simple_schema_serialize(schema):
Expand Down

0 comments on commit ba4d640

Please sign in to comment.