diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 01e0153c1..1192260ba 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -6,6 +6,7 @@ """ from __future__ import annotations +from abc import ABC, abstractmethod from dataclasses import dataclass, field from karapace.schema_models import SchemaVersion, TypedSchema, Versioner from karapace.schema_references import Reference, Referents @@ -24,7 +25,120 @@ class SubjectData: compatibility: str | None = None -class InMemoryDatabase: +class KarapaceDatabase(ABC): + @abstractmethod + def get_schema_id(self, new_schema: TypedSchema) -> SchemaId: + pass + + @abstractmethod + def get_schema_id_if_exists( + self, + *, + subject: Subject, + schema: TypedSchema, + include_deleted: bool, + ) -> SchemaId | None: + pass + + @abstractmethod + def get_next_version(self, *, subject: Subject) -> Version: + pass + + @abstractmethod + def insert_schema_version( + self, + *, + subject: Subject, + schema_id: SchemaId, + version: Version, + deleted: bool, + schema: TypedSchema, + references: Sequence[Reference] | None, + ) -> None: + pass + + @abstractmethod + def insert_subject(self, *, subject: Subject) -> None: + pass + + @abstractmethod + def get_subject_compatibility(self, *, subject: Subject) -> str | None: + pass + + @abstractmethod + def delete_subject_compatibility(self, *, subject: Subject) -> None: + pass + + @abstractmethod + def set_subject_compatibility(self, *, subject: Subject, compatibility: str) -> None: + pass + + @abstractmethod + def find_schema(self, *, schema_id: SchemaId) -> TypedSchema | None: + pass + + @abstractmethod + def find_schemas(self, *, include_deleted: bool, latest_only: bool) -> dict[Subject, list[SchemaVersion]]: + pass + + @abstractmethod + def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]: + pass + + @abstractmethod + def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]: + pass + + @abstractmethod + def find_subject(self, *, subject: Subject) -> Subject | None: + pass + + @abstractmethod + def find_subjects(self, *, include_deleted: bool) -> list[Subject]: + pass + + @abstractmethod + def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]: + pass + + @abstractmethod + def delete_subject(self, *, subject: Subject, version: Version) -> None: + pass + + @abstractmethod + def delete_subject_hard(self, *, subject: Subject) -> None: + pass + + @abstractmethod + def delete_subject_schema(self, *, subject: Subject, version: Version) -> None: + pass + + @abstractmethod + def num_schemas(self) -> int: + pass + + @abstractmethod + def num_subjects(self) -> int: + pass + + @abstractmethod + def num_schema_versions(self) -> tuple[int, int]: + pass + + @abstractmethod + def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: + pass + + @abstractmethod + def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: + pass + + @abstractmethod + def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: + pass + + +class InMemoryDatabase(KarapaceDatabase): def __init__(self) -> None: self.global_schema_id = SchemaId(0) self.id_lock_thread = Lock() @@ -76,7 +190,7 @@ def get_schema_id_if_exists( *, subject: Subject, schema: TypedSchema, - include_deleted: bool, # pylint: disable=unused-argument + include_deleted: bool, ) -> SchemaId | None: subject_fingerprints = self._hash_to_schema_id_on_subject.get(subject) if subject_fingerprints: diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 85e822ed1..dfefe29e6 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -29,7 +29,7 @@ from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException -from karapace.in_memory_database import InMemoryDatabase +from karapace.in_memory_database import KarapaceDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer @@ -127,7 +127,7 @@ def __init__( config: Config, offset_watcher: OffsetWatcher, key_formatter: KeyFormatter, - database: InMemoryDatabase, + database: KarapaceDatabase, master_coordinator: MasterCoordinator | None = None, ) -> None: Thread.__init__(self, name="schema-reader") @@ -349,6 +349,9 @@ def handle_messages(self) -> None: if are_we_master is True: watch_offsets = True + self.consume_messages(msgs, watch_offsets) + + def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: schema_records_processed_keymode_canonical = 0 schema_records_processed_keymode_deprecated_karapace = 0 for msg in msgs: diff --git a/tests/unit/test_data/schemas.log b/tests/unit/test_data/schemas.log new file mode 100755 index 000000000..bc634acfd --- /dev/null +++ b/tests/unit/test_data/schemas.log @@ -0,0 +1,3 @@ +{"keytype":"CONFIG","subject":null,"magic":0} {"compatibilityLevel":"NONE"} 1671533077776 +{"keytype":"SCHEMA","subject":"wow.first.subject","version":1,"magic":1} {"deleted": false, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"foo\",\"namespace\":\"awesomity\",\"fields\":[{\"name\":\"coffee\",\"type\":\"string\"}], "subject": "wow.first.subject", "version": 1} 1671534130996 +{"keytype":"SCHEMA","subject":"omg.even.better.a.second.subject","version":1,"magic":1} {"deleted": false, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"foo\",\"namespace\":\"awesomity\",\"fields\":[{\"name\":\"pasta\",\"type\":\"string\"}], "subject": "omg.even.better.a.second.subject", "version": 1} 1671534131365 diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index 04ac6102d..aa25adf56 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -2,7 +2,24 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from karapace.in_memory_database import InMemoryDatabase, Subject +from __future__ import annotations + +from collections import defaultdict +from confluent_kafka.cimpl import KafkaError +from karapace.config import DEFAULTS +from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.in_memory_database import InMemoryDatabase, KarapaceDatabase, Subject, SubjectData +from karapace.kafka.types import Timestamp +from karapace.key_format import KeyFormatter +from karapace.offset_watcher import OffsetWatcher +from karapace.schema_models import SchemaVersion, TypedSchema +from karapace.schema_reader import KafkaSchemaReader +from karapace.schema_references import Reference, Referents +from karapace.typing import SchemaId, Version +from pathlib import Path +from typing import Final, Iterable, Sequence + +TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/") class TestFindSchemas: @@ -12,3 +29,232 @@ def test_returns_empty_list_when_no_schemas(self) -> None: database.insert_subject(subject=subject) expected = {subject: []} assert database.find_schemas(include_deleted=True, latest_only=True) == expected + + +class AlwaysFineKafkaMessage: + def __init__( + self, + offset: int, + timestamp: tuple[int, int], + topic: str, + key: str | bytes | None = None, + value: str | bytes | None = None, + partition: int = 0, + headers: list[tuple[str, bytes]] | None = None, + error: KafkaError | None = None, + ) -> None: + self._offset = offset + self._timestamp = timestamp + self._key = key + self._value = value + self._topic = topic + self._partition = partition + self._headers = headers + self._error = error + + def offset(self) -> int: + return self._offset + + def timestamp(self) -> tuple[int, int]: + return self._timestamp + + def key(self) -> str | bytes | None: + return self._key + + def value(self) -> str | bytes | None: + return self._value + + def topic(self) -> str: + return self._topic + + def partition(self) -> int: + return self._partition + + def headers(self) -> list[tuple[str, bytes]] | None: + return self._headers + + def error(self) -> KafkaError | None: + return self._error + + +class WrappedInMemoryDatabase(KarapaceDatabase): + def __init__(self) -> None: + self._duplicates: dict[SchemaId, list[TypedSchema]] = {} + self._schema_id_to_subject: dict[SchemaId, list[Subject]] = defaultdict(list) + self._duplicates_timestamp: dict[SchemaId, list[int]] = {} + self.db = InMemoryDatabase() + self.timestamp = -1 + + def get_schema_id(self, new_schema: TypedSchema) -> SchemaId: + return self.db.get_schema_id(new_schema) + + def get_schema_id_if_exists( + self, + *, + subject: Subject, + schema: TypedSchema, + include_deleted: bool, + ) -> SchemaId | None: + return self.db.get_schema_id_if_exists(subject=subject, schema=schema, include_deleted=include_deleted) + + def get_next_version(self, *, subject: Subject) -> Version: + return self.db.get_next_version(subject=subject) + + def insert_schema_version( + self, + *, + subject: Subject, + schema_id: SchemaId, + version: Version, + deleted: bool, + schema: TypedSchema, + references: Sequence[Reference] | None, + ) -> None: + self._schema_id_to_subject[schema_id].append(subject) + if schema_id in self.db.schemas: + if schema_id not in self._duplicates: + self._duplicates[schema_id] = [self.db.schemas[schema_id]] + self._duplicates[schema_id].append(schema) + + if schema_id not in self._duplicates_timestamp: + self._duplicates_timestamp[schema_id] = [self.timestamp] + self._duplicates_timestamp[schema_id].append(self.timestamp) + + return self.db.insert_schema_version( + subject=subject, schema_id=schema_id, version=version, deleted=deleted, schema=schema, references=references + ) + + def insert_subject(self, *, subject: Subject) -> None: + return self.db.insert_subject(subject=subject) + + def get_subject_compatibility(self, *, subject: Subject) -> str | None: + return self.db.get_subject_compatibility(subject=subject) + + def delete_subject_compatibility(self, *, subject: Subject) -> None: + return self.db.delete_subject_compatibility(subject=subject) + + def set_subject_compatibility(self, *, subject: Subject, compatibility: str) -> None: + return self.db.set_subject_compatibility(subject=subject, compatibility=compatibility) + + def find_schema(self, *, schema_id: SchemaId) -> TypedSchema | None: + return self.db.find_schema(schema_id=schema_id) + + def find_schemas(self, *, include_deleted: bool, latest_only: bool) -> dict[Subject, list[SchemaVersion]]: + return self.db.find_schemas(include_deleted=include_deleted, latest_only=latest_only) + + def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]: + return self.db.subjects_for_schema(schema_id=schema_id) + + def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]: + return self.db.find_schema_versions_by_schema_id(schema_id=schema_id, include_deleted=include_deleted) + + def find_subject(self, *, subject: Subject) -> Subject | None: + return self.db.find_subject(subject=subject) + + def find_subjects(self, *, include_deleted: bool) -> list[Subject]: + return self.db.find_subjects(include_deleted=include_deleted) + + def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]: + return self.db.find_subject_schemas(subject=subject, include_deleted=include_deleted) + + def delete_subject(self, *, subject: Subject, version: Version) -> None: + return self.db.delete_subject(subject=subject, version=version) + + def delete_subject_hard(self, *, subject: Subject) -> None: + return self.db.delete_subject_hard(subject=subject) + + def delete_subject_schema(self, *, subject: Subject, version: Version) -> None: + return self.db.delete_subject_schema(subject=subject, version=version) + + def num_schemas(self) -> int: + return self.db.num_schemas() + + def num_subjects(self) -> int: + return self.db.num_subjects() + + def num_schema_versions(self) -> tuple[int, int]: + return self.db.num_schema_versions() + + def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: + return self.db.insert_referenced_by(subject=subject, version=version, schema_id=schema_id) + + def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: + return self.db.get_referenced_by(subject=subject, version=version) + + def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: + return self.db.remove_referenced_by(schema_id=schema_id, references=references) + + def duplicates(self) -> dict[SchemaId, list[tuple[Subject, TypedSchema]]]: + duplicate_data = defaultdict(list) + for schema_id, schemas in self._duplicates.items(): + for subject, schema in zip(self._schema_id_to_subject[schema_id], schemas): + duplicate_data[schema_id].append((subject, schema)) + return duplicate_data + + def subject_to_subject_data(self) -> dict[Subject, SubjectData]: + return self.db.subjects + + +def compute_schema_id_to_subjects( + duplicates: dict[SchemaId, list[tuple[Subject, TypedSchema]]], subject_to_subject_data: dict[Subject, SubjectData] +) -> dict[SchemaId, list[tuple[Subject, Version]]]: + tuples = [(schema_id, subject) for schema_id, dup in duplicates.items() for subject, _ in dup] + schema_id_to_duplicated_subjects = defaultdict(list) + for schema_id, subject_referring_to_duplicate_schema in tuples: + corrupted_data = subject_to_subject_data[subject_referring_to_duplicate_schema] + corrupted_version = -1 + for schema_version, schema_data in corrupted_data.schemas.items(): + assert schema_version == schema_data.version + + if schema_data.schema_id == schema_id: + corrupted_version = schema_version + + schema_id_to_duplicated_subjects[schema_id].append((subject_referring_to_duplicate_schema, corrupted_version)) + return schema_id_to_duplicated_subjects + + +def test_can_ingest_schemas_from_log() -> None: + """ + Test for the consistency of a backup, this checks that each SchemaID its unique in the backup. + The format of the log its the one obtained by running: + + `kafkacat -C -t _schemas -o beginning -e -f "%k\t%s\t%T\n"` + + on a node running kafka that hosts the `_schemas` topic. + """ + restore_location = TEST_DATA_FOLDER / "schemas.log" + schema_log = restore_location.read_text(encoding="utf-8").strip() + + database = WrappedInMemoryDatabase() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=OffsetWatcher(), + key_formatter=KeyFormatter(), + master_coordinator=None, + database=database, + ) + + kafka_messages: list[AlwaysFineKafkaMessage] = [] + for i, message in enumerate(schema_log.split("\n")[:-1]): + res = message.split("\t") + timestamp = res[-1] + maybe_key_val = res[:-1] + # the tuple follows the kafka message specific + # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp + timestamp_tuple = (Timestamp.CREATE_TIME, int(timestamp)) + database.timestamp = timestamp + if len(maybe_key_val) > 1: + key, value = maybe_key_val + kafka_message = AlwaysFineKafkaMessage(i, timestamp_tuple, DEFAULT_SCHEMA_TOPIC, key=key, value=value) + else: + key = maybe_key_val[0] + kafka_message = AlwaysFineKafkaMessage(i, timestamp_tuple, DEFAULT_SCHEMA_TOPIC, key=key) + + kafka_messages.append(kafka_message) + + schema_reader.consume_messages(kafka_messages, False) + duplicates = database.duplicates() + + schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data()) + assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas" + assert duplicates == {}, "the schema database is broken. The id should be unique"