diff --git a/karapace/messaging.py b/karapace/messaging.py new file mode 100644 index 000000000..40042f734 --- /dev/null +++ b/karapace/messaging.py @@ -0,0 +1,113 @@ +""" +karapace - Karapace producer + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from kafka import KafkaProducer +from kafka.errors import MessageSizeTooLargeError +from karapace.config import Config +from karapace.errors import SchemaTooLargeException +from karapace.key_format import KeyFormatter +from karapace.offset_watcher import OffsetWatcher +from karapace.utils import json_encode, KarapaceKafkaClient +from karapace.version import __version__ +from typing import Any, cast, Dict, Optional, Tuple, Union + +import logging +import time + +LOG = logging.getLogger(__name__) +X_REGISTRY_VERSION_HEADER = ("X-Registry-Version", f"karapace-{__version__}".encode()) + + +class KarapaceProducer: + def __init__(self, *, config: Config, offset_watcher: OffsetWatcher, key_formatter: KeyFormatter): + self._producer: Optional[KafkaProducer] = None + self._config = config + self._offset_watcher = offset_watcher + self._key_formatter = key_formatter + self._kafka_timeout = 10 + self._schemas_topic = self._config["topic_name"] + + host: str = cast(str, self._config["host"]) + self.x_origin_host_header: Tuple[str, bytes] = ("X-Origin-Host", host.encode("utf8")) + + def initialize_karapace_producer( + self, + ) -> None: + while True: + try: + self._producer = KafkaProducer( + bootstrap_servers=self._config["bootstrap_uri"], + security_protocol=self._config["security_protocol"], + ssl_cafile=self._config["ssl_cafile"], + ssl_certfile=self._config["ssl_certfile"], + ssl_keyfile=self._config["ssl_keyfile"], + sasl_mechanism=self._config["sasl_mechanism"], + sasl_plain_username=self._config["sasl_plain_username"], + sasl_plain_password=self._config["sasl_plain_password"], + api_version=(1, 0, 0), + metadata_max_age_ms=self._config["metadata_max_age_ms"], + max_block_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check + connections_max_idle_ms=self._config["connections_max_idle_ms"], # helps through cluster upgrades ?? + kafka_client=KarapaceKafkaClient, + ) + return + except: # pylint: disable=bare-except + LOG.exception("Unable to create producer, retrying") + time.sleep(1) + + def close(self) -> None: + if self._producer is not None: + self._producer.close() + + def _send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) -> None: + assert self._producer is not None + + if isinstance(key, str): + key = key.encode("utf8") + if isinstance(value, str): + value = value.encode("utf8") + + future = self._producer.send( + self._schemas_topic, + key=key, + value=value, + headers=[X_REGISTRY_VERSION_HEADER, self.x_origin_host_header], + ) + self._producer.flush(timeout=self._kafka_timeout) + try: + msg = future.get(self._kafka_timeout) + except MessageSizeTooLargeError as ex: + raise SchemaTooLargeException from ex + + sent_offset = msg.offset + + LOG.info( + "Waiting for schema reader to caught up. key: %r, value: %r, offset: %r", + key, + value, + sent_offset, + ) + + if self._offset_watcher.wait_for_offset(sent_offset, timeout=60) is True: + LOG.info( + "Schema reader has found key. key: %r, value: %r, offset: %r", + key, + value, + sent_offset, + ) + else: + raise RuntimeError( + "Schema reader timed out while looking for key. key: {!r}, value: {!r}, offset: {}".format( + key, value, sent_offset + ) + ) + + def send_message(self, *, key: Dict[str, Any], value: Optional[Dict[str, Any]]) -> None: + key_bytes = self._key_formatter.format_key(key) + value_bytes: Union[bytes, str] = b"" + if value is not None: + value_bytes = json_encode(value, binary=True, compact=True) + self._send_kafka_message(key=key_bytes, value=value_bytes) diff --git a/karapace/offset_watcher.py b/karapace/offset_watcher.py new file mode 100644 index 000000000..6056d5f37 --- /dev/null +++ b/karapace/offset_watcher.py @@ -0,0 +1,39 @@ +""" +karapace - Karapace offset watcher + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from threading import Condition + + +class OffsetWatcher: + """Synchronization container for threads to wait until an offset is seen. + + This works under the assumption offsets are used only once, which should be + correct as long as no unclean leader election is performed. + """ + + def __init__(self) -> None: + # Condition used to protected _greatest_offset, any modifications to that object must + # be performed with this condition acquired + self._condition = Condition() + self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. + + def greatest_offset(self) -> int: + return self._greatest_offset + + def offset_seen(self, new_offset: int) -> None: + with self._condition: + self._greatest_offset = max(self._greatest_offset, new_offset) + self._condition.notify_all() + + def wait_for_offset(self, expected_offset: int, timeout: float) -> bool: + """Block until expected_offset is seen. + + Args: + expected_offset: The message offset generated by the producer. + timeout: How long the caller will wait for the offset in seconds. + """ + with self._condition: + return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b4df36000..dee44a942 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -21,10 +21,11 @@ from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator +from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import SchemaType, TypedSchema from karapace.statsd import StatsClient from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient -from threading import Condition, Event, Thread +from threading import Event, Thread from typing import Optional import logging @@ -102,42 +103,11 @@ def new_schema_topic_from_config(config: Config) -> NewTopic: ) -class OffsetsWatcher: - """Synchronization container for threads to wait until an offset is seen. - - This works under the assumption offsets are used only once, which should be - correct as long as no unclean leader election is performed. - """ - - def __init__(self) -> None: - # Condition used to protected _greatest_offset, any modifications to that object must - # be performed with this condition acquired - self._condition = Condition() - self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. - - def greatest_offset(self) -> int: - return self._greatest_offset - - def offset_seen(self, new_offset: int) -> None: - with self._condition: - self._greatest_offset = max(self._greatest_offset, new_offset) - self._condition.notify_all() - - def wait_for_offset(self, expected_offset: int, timeout: float) -> bool: - """Block until expected_offset is seen. - - Args: - expected_offset: The message offset generated by the producer. - timeout: How long the caller will wait for the offset in seconds. - """ - with self._condition: - return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout) - - class KafkaSchemaReader(Thread): def __init__( self, config: Config, + offset_watcher: OffsetWatcher, key_formatter: KeyFormatter, database: InMemoryDatabase, master_coordinator: Optional[MasterCoordinator] = None, @@ -150,7 +120,7 @@ def __init__( self.admin_client: Optional[KafkaAdminClient] = None self.topic_replication_factor = self.config["replication_factor"] self.consumer: Optional[KafkaConsumer] = None - self.offset_watcher = OffsetsWatcher() + self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) # Thread synchronization objects @@ -293,7 +263,7 @@ def _is_ready(self) -> bool: return self.offset >= self._highest_offset def highest_offset(self) -> int: - return max(self._highest_offset, self.offset_watcher.greatest_offset()) + return max(self._highest_offset, self._offset_watcher.greatest_offset()) def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" @@ -348,7 +318,7 @@ def handle_messages(self) -> None: schema_records_processed_keymode_deprecated_karapace += 1 if self.ready and watch_offsets: - self.offset_watcher.offset_seen(self.offset) + self._offset_watcher.offset_seen(self.offset) self._report_schema_metrics( schema_records_processed_keymode_canonical, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index c4899d438..cd7f01e21 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -3,8 +3,6 @@ See LICENSE for details """ from contextlib import AsyncExitStack, closing -from kafka import errors as kafka_errors, KafkaProducer -from kafka.producer.future import FutureRecordMetadata from karapace.compatibility import check_compatibility, CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible from karapace.config import Config @@ -12,7 +10,6 @@ IncompatibleSchema, InvalidVersion, SchemasNotFoundException, - SchemaTooLargeException, SchemaVersionNotSoftDeletedException, SchemaVersionSoftDeletedException, SubjectNotFoundException, @@ -23,19 +20,17 @@ from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator +from karapace.messaging import KarapaceProducer +from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader from karapace.typing import JsonData, ResolvedVersion, Subject, Version -from karapace.utils import json_encode, KarapaceKafkaClient -from karapace.version import __version__ from typing import cast, Dict, List, Optional, Tuple, Union import asyncio import logging -import time LOG = logging.getLogger(__name__) -X_REGISTRY_VERSION_HEADER = ("X-Registry-Version", f"karapace-{__version__}".encode()) def _resolve_version(schema_versions: Dict[ResolvedVersion, SchemaVersion], version: Version) -> ResolvedVersion: @@ -67,18 +62,19 @@ def validate_version(version: Version) -> Version: class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: self.config = config - host: str = cast(str, self.config["host"]) - self.x_origin_host_header: Tuple[str, bytes] = ("X-Origin-Host", host.encode("utf8")) + self._key_formatter = KeyFormatter() - self.producer = self._create_producer() - self.kafka_timeout = 10 + offset_watcher = OffsetWatcher() + self.producer = KarapaceProducer( + config=self.config, offset_watcher=offset_watcher, key_formatter=self._key_formatter + ) self.mc = MasterCoordinator(config=self.config) - self.key_formatter = KeyFormatter() self.database = InMemoryDatabase() self.schema_reader = KafkaSchemaReader( config=self.config, - key_formatter=self.key_formatter, + offset_watcher=offset_watcher, + key_formatter=self._key_formatter, master_coordinator=self.mc, database=self.database, ) @@ -100,6 +96,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> Lis def start(self) -> None: self.mc.start() self.schema_reader.start() + self.producer.initialize_karapace_producer() async def close(self) -> None: async with AsyncExitStack() as stack: @@ -107,28 +104,6 @@ async def close(self) -> None: stack.enter_context(closing(self.schema_reader)) stack.enter_context(closing(self.producer)) - def _create_producer(self) -> KafkaProducer: - while True: - try: - return KafkaProducer( - bootstrap_servers=self.config["bootstrap_uri"], - security_protocol=self.config["security_protocol"], - ssl_cafile=self.config["ssl_cafile"], - ssl_certfile=self.config["ssl_certfile"], - ssl_keyfile=self.config["ssl_keyfile"], - sasl_mechanism=self.config["sasl_mechanism"], - sasl_plain_username=self.config["sasl_plain_username"], - sasl_plain_password=self.config["sasl_plain_password"], - api_version=(1, 0, 0), - metadata_max_age_ms=self.config["metadata_max_age_ms"], - max_block_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check - connections_max_idle_ms=self.config["connections_max_idle_ms"], # helps through cluster upgrades ?? - kafka_client=KarapaceKafkaClient, - ) - except: # pylint: disable=bare-except - LOG.exception("Unable to create producer, retrying") - time.sleep(1) - async def get_master(self, ignore_readiness: bool = False) -> Tuple[bool, Optional[str]]: """Resolve if current node is the primary and the primary node address. @@ -409,49 +384,6 @@ def get_subject_versions_for_schema( subject_versions = sorted(subject_versions, key=lambda s: (s["subject"], s["version"])) return subject_versions - def send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) -> FutureRecordMetadata: - if isinstance(key, str): - key = key.encode("utf8") - if isinstance(value, str): - value = value.encode("utf8") - - future = self.producer.send( - self.config["topic_name"], - key=key, - value=value, - headers=[X_REGISTRY_VERSION_HEADER, self.x_origin_host_header], - ) - self.producer.flush(timeout=self.kafka_timeout) - try: - msg = future.get(self.kafka_timeout) - except kafka_errors.MessageSizeTooLargeError as ex: - raise SchemaTooLargeException from ex - - sent_offset = msg.offset - - LOG.info( - "Waiting for schema reader to caught up. key: %r, value: %r, offset: %r", - key, - value, - sent_offset, - ) - - if self.schema_reader.offset_watcher.wait_for_offset(sent_offset, timeout=60) is True: - LOG.info( - "Schema reader has found key. key: %r, value: %r, offset: %r", - key, - value, - sent_offset, - ) - else: - raise RuntimeError( - "Schema reader timed out while looking for key. key: {!r}, value: {!r}, offset: {}".format( - key, value, sent_offset - ) - ) - - return future - def send_schema_message( self, *, @@ -460,12 +392,10 @@ def send_schema_message( schema_id: int, version: int, deleted: bool, - ) -> FutureRecordMetadata: - key = self.key_formatter.format_key( - {"subject": subject, "version": version, "magic": 1, "keytype": "SCHEMA"}, - ) + ) -> None: + key = {"subject": subject, "version": version, "magic": 1, "keytype": "SCHEMA"} if schema: - valuedict = { + value = { "subject": subject, "version": version, "id": schema_id, @@ -473,42 +403,21 @@ def send_schema_message( "deleted": deleted, } if schema.schema_type is not SchemaType.AVRO: - valuedict["schemaType"] = schema.schema_type - value = json_encode(valuedict) + value["schemaType"] = schema.schema_type else: - value = "" - return self.send_kafka_message(key, value) - - def send_config_message( - self, compatibility_level: CompatibilityModes, subject: Optional[Subject] = None - ) -> FutureRecordMetadata: - key = self.key_formatter.format_key( - { - "subject": subject, - "magic": 0, - "keytype": "CONFIG", - } - ) - value = f'{{"compatibilityLevel":"{compatibility_level.value}"}}' - return self.send_kafka_message(key, value) - - def send_config_subject_delete_message(self, subject: Subject) -> FutureRecordMetadata: - key = self.key_formatter.format_key( - { - "subject": subject, - "magic": 0, - "keytype": "CONFIG", - } - ) - return self.send_kafka_message(key, b"") - - def send_delete_subject_message(self, subject: Subject, version: Version) -> FutureRecordMetadata: - key = self.key_formatter.format_key( - { - "subject": subject, - "magic": 0, - "keytype": "DELETE_SUBJECT", - } - ) - value = f'{{"subject":"{subject}","version":{version}}}' - return self.send_kafka_message(key, value) + value = None + self.producer.send_message(key=key, value=value) + + def send_config_message(self, compatibility_level: CompatibilityModes, subject: Optional[Subject] = None) -> None: + key = {"subject": subject, "magic": 0, "keytype": "CONFIG"} + value = {"compatibilityLevel": compatibility_level.value} + self.producer.send_message(key=key, value=value) + + def send_config_subject_delete_message(self, subject: Subject) -> None: + key = {"subject": subject, "magic": 0, "keytype": "CONFIG"} + self.producer.send_message(key=key, value=None) + + def send_delete_subject_message(self, subject: Subject, version: Version) -> None: + key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"} + value = {"subject": subject, "version": version} + self.producer.send_message(key=key, value=value) diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index a15acb934..29847eb85 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -10,6 +10,7 @@ from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator +from karapace.offset_watcher import OffsetWatcher from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode from tests.base_testcase import BaseTestCase @@ -71,8 +72,10 @@ def test_regression_soft_delete_schemas_should_be_registered( master_coordinator = MasterCoordinator(config=config) master_coordinator.start() database = InMemoryDatabase() + offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( config=config, + offset_watcher=offset_watcher, key_formatter=KeyFormatter(), master_coordinator=master_coordinator, database=database, @@ -103,7 +106,7 @@ def test_regression_soft_delete_schemas_should_be_registered( ) msg = future.get() - schema_reader.offset_watcher.wait_for_offset(msg.offset, timeout=5) + schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) # pylint: disable=protected-access schemas = database.find_subject_schemas(subject=subject, include_deleted=True) assert len(schemas) == 1, "Deleted schemas must have been registered" @@ -130,7 +133,9 @@ def test_regression_soft_delete_schemas_should_be_registered( ) msg = future.get() - assert schema_reader.offset_watcher.wait_for_offset(msg.offset, timeout=5) is True + assert ( + schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) is True # pylint: disable=protected-access + ) assert database.global_schema_id == test_global_schema_id schemas = database.find_subject_schemas(subject=subject, include_deleted=True) @@ -155,8 +160,10 @@ def test_regression_config_for_inexisting_object_should_not_throw( master_coordinator = MasterCoordinator(config=config) master_coordinator.start() database = InMemoryDatabase() + offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( config=config, + offset_watcher=offset_watcher, key_formatter=KeyFormatter(), master_coordinator=master_coordinator, database=database, @@ -181,7 +188,9 @@ def test_regression_config_for_inexisting_object_should_not_throw( ) msg = future.get() - assert schema_reader.offset_watcher.wait_for_offset(msg.offset, timeout=5) is True + assert ( + schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) is True # pylint: disable=protected-access + ) assert database.find_subject(subject=subject) is not None, "The above message should be handled gracefully" @@ -254,8 +263,10 @@ def test_key_format_detection( master_coordinator.start() key_formatter = KeyFormatter() database = InMemoryDatabase() + offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( config=config, + offset_watcher=offset_watcher, key_formatter=key_formatter, master_coordinator=master_coordinator, database=database, diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index c6b7ac24d..b6566e927 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -9,7 +9,8 @@ from dataclasses import dataclass from karapace.config import DEFAULTS from karapace.in_memory_database import InMemoryDatabase -from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED, OffsetsWatcher +from karapace.offset_watcher import OffsetWatcher +from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED from tests.base_testcase import BaseTestCase from unittest.mock import Mock @@ -19,7 +20,7 @@ def test_offset_watcher() -> None: - watcher = OffsetsWatcher() + watcher = OffsetWatcher() timeout = 0.5 # A largish number of iteration useful to stress the code @@ -140,8 +141,10 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: # Return dict {partition: offsets}, end offset is the next upcoming record offset consumer_mock.end_offsets.return_value = {0: testcase.end_offset} + offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( config=DEFAULTS, + offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(),