Skip to content

Commit

Permalink
refactor: extract Kafka producer from Registry
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Mar 21, 2023
1 parent d956822 commit 00aaa28
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 162 deletions.
113 changes: 113 additions & 0 deletions karapace/messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
karapace - Karapace producer
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka import KafkaProducer
from kafka.errors import MessageSizeTooLargeError
from karapace.config import Config
from karapace.errors import SchemaTooLargeException
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.utils import json_encode, KarapaceKafkaClient
from karapace.version import __version__
from typing import Any, cast, Dict, Optional, Tuple, Union

import logging
import time

LOG = logging.getLogger(__name__)
X_REGISTRY_VERSION_HEADER = ("X-Registry-Version", f"karapace-{__version__}".encode())


class KarapaceProducer:
def __init__(self, *, config: Config, offset_watcher: OffsetWatcher, key_formatter: KeyFormatter):
self._producer: Optional[KafkaProducer] = None
self._config = config
self._offset_watcher = offset_watcher
self._key_formatter = key_formatter
self._kafka_timeout = 10
self._schemas_topic = self._config["topic_name"]

host: str = cast(str, self._config["host"])
self.x_origin_host_header: Tuple[str, bytes] = ("X-Origin-Host", host.encode("utf8"))

def initialize_karapace_producer(
self,
) -> None:
while True:
try:
self._producer = KafkaProducer(
bootstrap_servers=self._config["bootstrap_uri"],
security_protocol=self._config["security_protocol"],
ssl_cafile=self._config["ssl_cafile"],
ssl_certfile=self._config["ssl_certfile"],
ssl_keyfile=self._config["ssl_keyfile"],
sasl_mechanism=self._config["sasl_mechanism"],
sasl_plain_username=self._config["sasl_plain_username"],
sasl_plain_password=self._config["sasl_plain_password"],
api_version=(1, 0, 0),
metadata_max_age_ms=self._config["metadata_max_age_ms"],
max_block_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check
connections_max_idle_ms=self._config["connections_max_idle_ms"], # helps through cluster upgrades ??
kafka_client=KarapaceKafkaClient,
)
return
except: # pylint: disable=bare-except
LOG.exception("Unable to create producer, retrying")
time.sleep(1)

def close(self) -> None:
if self._producer is not None:
self._producer.close()

def _send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) -> None:
assert self._producer is not None

if isinstance(key, str):
key = key.encode("utf8")
if isinstance(value, str):
value = value.encode("utf8")

future = self._producer.send(
self._schemas_topic,
key=key,
value=value,
headers=[X_REGISTRY_VERSION_HEADER, self.x_origin_host_header],
)
self._producer.flush(timeout=self._kafka_timeout)
try:
msg = future.get(self._kafka_timeout)
except MessageSizeTooLargeError as ex:
raise SchemaTooLargeException from ex

sent_offset = msg.offset

LOG.info(
"Waiting for schema reader to caught up. key: %r, value: %r, offset: %r",
key,
value,
sent_offset,
)

if self._offset_watcher.wait_for_offset(sent_offset, timeout=60) is True:
LOG.info(
"Schema reader has found key. key: %r, value: %r, offset: %r",
key,
value,
sent_offset,
)
else:
raise RuntimeError(
"Schema reader timed out while looking for key. key: {!r}, value: {!r}, offset: {}".format(
key, value, sent_offset
)
)

def send_message(self, *, key: Dict[str, Any], value: Optional[Dict[str, Any]]) -> None:
key_bytes = self._key_formatter.format_key(key)
value_bytes: Union[bytes, str] = b""
if value is not None:
value_bytes = json_encode(value, binary=True, compact=True)
self._send_kafka_message(key=key_bytes, value=value_bytes)
39 changes: 39 additions & 0 deletions karapace/offset_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
karapace - Karapace offset watcher
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from threading import Condition


class OffsetWatcher:
"""Synchronization container for threads to wait until an offset is seen.
This works under the assumption offsets are used only once, which should be
correct as long as no unclean leader election is performed.
"""

def __init__(self) -> None:
# Condition used to protected _greatest_offset, any modifications to that object must
# be performed with this condition acquired
self._condition = Condition()
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.

def greatest_offset(self) -> int:
return self._greatest_offset

def offset_seen(self, new_offset: int) -> None:
with self._condition:
self._greatest_offset = max(self._greatest_offset, new_offset)
self._condition.notify_all()

def wait_for_offset(self, expected_offset: int, timeout: float) -> bool:
"""Block until expected_offset is seen.
Args:
expected_offset: The message offset generated by the producer.
timeout: How long the caller will wait for the offset in seconds.
"""
with self._condition:
return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout)
42 changes: 6 additions & 36 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from karapace.in_memory_database import InMemoryDatabase
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.master_coordinator import MasterCoordinator
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import SchemaType, TypedSchema
from karapace.statsd import StatsClient
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Condition, Event, Thread
from threading import Event, Thread
from typing import Optional

import logging
Expand Down Expand Up @@ -102,42 +103,11 @@ def new_schema_topic_from_config(config: Config) -> NewTopic:
)


class OffsetsWatcher:
"""Synchronization container for threads to wait until an offset is seen.
This works under the assumption offsets are used only once, which should be
correct as long as no unclean leader election is performed.
"""

def __init__(self) -> None:
# Condition used to protected _greatest_offset, any modifications to that object must
# be performed with this condition acquired
self._condition = Condition()
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.

def greatest_offset(self) -> int:
return self._greatest_offset

def offset_seen(self, new_offset: int) -> None:
with self._condition:
self._greatest_offset = max(self._greatest_offset, new_offset)
self._condition.notify_all()

def wait_for_offset(self, expected_offset: int, timeout: float) -> bool:
"""Block until expected_offset is seen.
Args:
expected_offset: The message offset generated by the producer.
timeout: How long the caller will wait for the offset in seconds.
"""
with self._condition:
return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout)


class KafkaSchemaReader(Thread):
def __init__(
self,
config: Config,
offset_watcher: OffsetWatcher,
key_formatter: KeyFormatter,
database: InMemoryDatabase,
master_coordinator: Optional[MasterCoordinator] = None,
Expand All @@ -150,7 +120,7 @@ def __init__(
self.admin_client: Optional[KafkaAdminClient] = None
self.topic_replication_factor = self.config["replication_factor"]
self.consumer: Optional[KafkaConsumer] = None
self.offset_watcher = OffsetsWatcher()
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)

# Thread synchronization objects
Expand Down Expand Up @@ -293,7 +263,7 @@ def _is_ready(self) -> bool:
return self.offset >= self._highest_offset

def highest_offset(self) -> int:
return max(self._highest_offset, self.offset_watcher.greatest_offset())
return max(self._highest_offset, self._offset_watcher.greatest_offset())

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"
Expand Down Expand Up @@ -348,7 +318,7 @@ def handle_messages(self) -> None:
schema_records_processed_keymode_deprecated_karapace += 1

if self.ready and watch_offsets:
self.offset_watcher.offset_seen(self.offset)
self._offset_watcher.offset_seen(self.offset)

self._report_schema_metrics(
schema_records_processed_keymode_canonical,
Expand Down
Loading

0 comments on commit 00aaa28

Please sign in to comment.