Skip to content

Commit

Permalink
Merge branch 'main' into eliax1996/add-test-restore-from-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 authored Sep 25, 2024
2 parents a5b4634 + 236d631 commit fd258ba
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
63 changes: 61 additions & 2 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from tests.base_testcase import BaseTestCase
from tests.utils import schema_protobuf_invalid
from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref
from typing import Callable, List, Tuple
from unittest.mock import Mock

Expand Down Expand Up @@ -485,7 +485,7 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":'
+ json.dumps(schema_protobuf_invalid).encode()
+ json.dumps(schema_protobuf_invalid_because_corrupted).encode()
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
Expand Down Expand Up @@ -515,3 +515,62 @@ def test_message_error_handling(
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == test_case.expected_log_message


def test_message_error_handling_with_invalid_reference_schema_protobuf(
caplog: LogCaptureFixture,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
# Given an invalid schema (corrupted)
key_ref = b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}'
value_ref = (
b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_invalid_because_corrupted).encode()
+ b"}"
)
message_ref = message_factory(key=key_ref, value=value_ref)

# And given a schema referencing that corrupted schema (valid otherwise)
key_using_ref = b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}'
value_using_ref = (
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_with_invalid_ref).encode()
+ b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]'
+ b"}"
)
message_using_ref = message_factory(key=key_using_ref, value=value_using_ref)

with caplog.at_level(logging.WARN, logger="karapace.schema_reader"):
# When handling the corrupted schema
schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],))

# Then the schema is recognised as invalid
with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready

# When handling the schema
schema_reader.consumer.consume.side_effect = ([message_using_ref],)

# Then the schema is recognised as invalid because of the corrupted referenced schema
with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready

warn_records = [r for r in caplog.records if r.levelname == "WARNING"]

assert len(warn_records) == 2

# Check that different warnings are logged for each schema
assert warn_records[0].name == "karapace.schema_reader"
assert warn_records[0].message == "Schema is not valid ProtoBuf definition"

assert warn_records[1].name == "karapace.schema_reader"
assert warn_records[1].message == "Invalid Protobuf references"
16 changes: 14 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
{"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}},
]

schema_protobuf_invalid = """
schema_protobuf_invalid_because_corrupted = """
|o3"
|
|opti -- om.codingharbour.protobuf";
Expand All @@ -162,7 +162,19 @@
| HIGH = 0
| MIDDLE = ;
"""
schema_protobuf_invalid = trim_margin(schema_protobuf_invalid)
schema_protobuf_invalid_because_corrupted = trim_margin(schema_protobuf_invalid_because_corrupted)

schema_protobuf_with_invalid_ref = """
|syntax = "proto3";
|
|import "Message.proto";
|
|message MessageWithInvalidRef {
| string name = 1;
| Message ref = 2;
|}
|"""
schema_protobuf_with_invalid_ref = trim_margin(schema_protobuf_with_invalid_ref)

schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)}

Expand Down

0 comments on commit fd258ba

Please sign in to comment.