Skip to content

Commit

Permalink
Merge pull request Aiven-Open#936 from Aiven-Open/nosahama/abort-serv…
Browse files Browse the repository at this point in the history
…ice-on-corrupt-schema

schema-reader: Shutdown service if corrupt entries in `_schemas` topic
  • Loading branch information
jjaakola-aiven authored Sep 5, 2024
2 parents dff48ce + 2d9f1e8 commit 800f3cb
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 42 deletions.
7 changes: 7 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
* - ``kafka_schema_reader_strict_mode``
- ``false``
- If enabled, causes the Karapace schema-registry service to shutdown when there are invalid schema records in the `_schemas` topic
* - ``kafka_retriable_errors_silenced``
- ``true``
- If enabled, kafka errors which can be retried or custom errors specififed for the service will not be raised,
instead, a warning log is emitted. This will denoise issue tracking systems, i.e. sentry


Authentication and authorization of Karapace Schema Registry REST API
Expand Down
4 changes: 4 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ services:
KARAPACE_COMPATIBILITY: FULL
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

karapace-rest:
image: ghcr.io/aiven-open/karapace:develop
Expand All @@ -106,6 +108,8 @@ services:
KARAPACE_LOG_LEVEL: WARNING
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

prometheus:
image: prom/prometheus
Expand Down
21 changes: 11 additions & 10 deletions container/prometheus/rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ groups:
rules:
- record: karapace_exceptions_sum_by_exception
expr: sum by (exception) (exception)
- alert: HighHTTPRequests
expr: karapace_http_requests_total > 10
for: 5m
- alert: user_alert_karapace_high_non_success_http_requests
expr: sum by (instance) (count_over_time(karapace_http_requests_total{status!~'^2.*'}[1m])) > 5
for: 2m
labels:
severity: warning
annotations:
summary: High HTTP requests for (instance={{ $labels.instance }})
description: "Service received\n HTTP Requests = {{ $value }}\n"
- alert: FireImmidiately
expr: karapace_schema_reader_schemas > 1
summary: High failing HTTP requests for (instance={{ $labels.instance }})
description: "Service returned too many non-success HTTP responses = {{ $value }}\n"
- alert: user_alert_karapace_frequent_restart
expr: sum by (app)(count_over_time(karapace_shutdown_count[30s])) > 1
for: 2m
labels:
severity: page
severity: critical
annotations:
summary: Lots of schems on (instance={{ $labels.instance }})
description: "\n Schema count = {{ $value }}\n"
summary: Karapace service instance={{ $labels.instance }} restarting frequently.
description: "Service is experiencing frequent restarts count={{ $value }}\n"
4 changes: 4 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class Config(TypedDict):
protobuf_runtime_directory: str
statsd_host: str
statsd_port: int
kafka_schema_reader_strict_mode: bool
kafka_retriable_errors_silenced: bool

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False):
"protobuf_runtime_directory": "runtime",
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
2 changes: 1 addition & 1 deletion karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def start(self) -> None:
await self._kafka_client.bootstrap()
break
except KafkaConnectionError:
LOG.exception("Kafka client bootstrap failed.")
LOG.warning("Kafka client bootstrap failed.")
await asyncio.sleep(0.5)

while not self._kafka_client.cluster.brokers():
Expand Down
11 changes: 11 additions & 0 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,14 @@ class SubjectSoftDeletedException(Exception):

class SchemaTooLargeException(Exception):
pass


class ShutdownException(Exception):
"""Raised when the service has encountered an error where it should not continue and shutdown."""


class CorruptKafkaRecordException(ShutdownException):
"""
Raised when a corrupt schema is present in the `_schemas` topic. This should halt the service as
we will end up with a corrupt state and could lead to various runtime issues and data mismatch.
"""
47 changes: 47 additions & 0 deletions karapace/kafka_error_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from karapace.config import Config
from karapace.errors import CorruptKafkaRecordException
from karapace.typing import StrEnum

import aiokafka.errors as Errors
import enum
import logging

LOG = logging.getLogger(__name__)


class KafkaErrorLocation(StrEnum):
SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR"
SCHEMA_READER = "SCHEMA_READER"


class KafkaRetriableErrors(enum.Enum):
SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,)


class KafkaErrorHandler:
def __init__(self, config: Config) -> None:
self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"]
self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"]

def log(self, location: KafkaErrorLocation, error: BaseException) -> None:
LOG.warning("%s encountered error - %s", location, error)

def handle_schema_coordinator_error(self, error: BaseException) -> None:
if getattr(error, "retriable", False) or (
error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value
):
self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error)
if not self.retriable_errors_silenced:
raise error

def handle_schema_reader_error(self, error: BaseException) -> None:
if self.schema_reader_strict_mode:
raise CorruptKafkaRecordException from error

def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None:
return getattr(self, f"handle_{location.lower()}_error")(error=error)
77 changes: 47 additions & 30 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.exception import ProtobufException
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.utils import json_decode, JSONDecodeError
from karapace.utils import json_decode, JSONDecodeError, shutdown
from threading import Event, Thread
from typing import Final, Mapping, Sequence

Expand Down Expand Up @@ -140,6 +142,7 @@ def __init__(
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config)

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down Expand Up @@ -184,12 +187,13 @@ def run(self) -> None:
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.info("[Admin Client] Invalid configuration. Bailing")
LOG.warning("[Admin Client] Invalid configuration. Bailing")
self._stop_schema_reader.set()
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Admin Client] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="admin_client_instantiation")
self._stop_schema_reader.wait(timeout=2.0)
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)

assert self.admin_client is not None

Expand All @@ -199,9 +203,10 @@ def run(self) -> None:
stack.enter_context(closing(self.consumer))
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("[Consumer] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=2.0)
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.info("[Consumer] Invalid configuration. Bailing")
LOG.warning("[Consumer] Invalid configuration. Bailing")
self._stop_schema_reader.set()
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Consumer] Unexpected exception. Retrying")
Expand Down Expand Up @@ -242,9 +247,12 @@ def run(self) -> None:
self.offset = self._get_beginning_offset()
try:
self.handle_messages()
except ShutdownException:
self._stop_schema_reader.set()
shutdown()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")
LOG.warning("Unexpected exception in schema reader loop - %s", e)

def _get_beginning_offset(self) -> int:
assert self.consumer is not None, "Thread must be started"
Expand Down Expand Up @@ -352,18 +360,20 @@ def handle_messages(self) -> None:

assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError:
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
except JSONDecodeError as exc:
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
continue
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
LOG.error(
"Kafka authorization error when consuming from %s: %s %s",
self.config["topic_name"],
exc,
msg.error(),
)
if self.kafka_error_handler.schema_reader_strict_mode:
raise ShutdownException from exc
continue

assert isinstance(key, dict)
Expand All @@ -381,14 +391,19 @@ def handle_messages(self) -> None:
if message_value:
try:
value = self._parse_message_value(message_value)
except JSONDecodeError:
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
except (JSONDecodeError, TypeError) as exc:
LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset())
continue
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]

self.handle_msg(key, value)
self.offset = msg.offset()
try:
self.handle_msg(key, value)
except (InvalidSchema, TypeError) as exc:
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue
finally:
self.offset = msg.offset()

if msg_keymode == KeyMode.CANONICAL:
schema_records_processed_keymode_canonical += 1
Expand Down Expand Up @@ -512,9 +527,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
except ValueError as exc:
LOG.warning("Invalid schema type: %s", schema_type)
return
raise InvalidSchema from exc

# This does two jobs:
# - Validates the schema's JSON
Expand All @@ -528,9 +543,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
except json.JSONDecodeError as exc:
LOG.warning("Schema is not valid JSON")
return
raise InvalidSchema from exc
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
Expand All @@ -544,12 +559,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
normalize=False,
)
schema_str = str(parsed_schema)
except InvalidSchema:
LOG.exception("Schema is not valid ProtoBuf definition")
return
except InvalidReferences:
LOG.exception("Invalid Protobuf references")
return
except (InvalidSchema, ProtobufException) as exc:
LOG.warning("Schema is not valid ProtoBuf definition")
raise InvalidSchema from exc
except InvalidReferences as exc:
LOG.warning("Invalid Protobuf references")
raise InvalidSchema from exc

try:
typed_schema = TypedSchema(
Expand All @@ -559,8 +574,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
dependencies=resolved_dependencies,
schema=parsed_schema,
)
except (InvalidSchema, JSONDecodeError):
return
except (InvalidSchema, JSONDecodeError) as exc:
raise InvalidSchema from exc

self.database.insert_schema_version(
subject=schema_subject,
Expand Down Expand Up @@ -588,13 +603,15 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.no_operation:
pass
except (KeyError, ValueError):
except (KeyError, ValueError) as exc:
LOG.warning("The message %r-%r has been discarded because the %s is not managed", key, value, key["keytype"])
raise InvalidSchema("Unrecognized `keytype` within schema") from exc

else:
LOG.warning(
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
)
raise InvalidSchema("Message key doesn't contain the `keytype` attribute")

def remove_referenced_by(
self,
Expand Down
9 changes: 9 additions & 0 deletions karapace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import importlib
import logging
import signal
import time

if importlib.util.find_spec("ujson"):
Expand Down Expand Up @@ -256,3 +257,11 @@ def remove_prefix(string: str, prefix: str) -> str:
i += 1

return string[i:]


def shutdown():
"""
Send a SIGTERM into the current running application process, which should initiate shutdown logic.
"""
LOG.warning("=======> Sending shutdown signal `SIGTERM` to Application process <=======")
signal.raise_signal(signal.SIGTERM)
Loading

0 comments on commit 800f3cb

Please sign in to comment.