-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic support of references Karapace API calls. #21
base: main
Are you sure you want to change the base?
Changes from 26 commits
efb456d
843a1da
44f853a
3498962
9e9b16d
944cd39
956b68e
d677bd5
764facf
c1c4c50
732a9ed
1f33b7d
ebe889f
93cd241
2e9a8c6
7f2cf01
0f2fab6
bbbb5a3
1d2b7c4
4634373
efe29f4
0854296
8a1b129
46146dd
967c822
da4171a
70e8077
ef05bd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,3 +16,4 @@ __pycache__/ | |
/kafka_*/ | ||
venv | ||
/karapace/version.py | ||
.run |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,9 @@ | |
SchemaParseException as ProtobufSchemaParseException, | ||
) | ||
from karapace.protobuf.schema import ProtobufSchema | ||
from karapace.typing import JsonData | ||
from karapace.utils import json_encode | ||
from typing import Any, Dict, Union | ||
from typing import Any, Dict, Optional, Union | ||
|
||
import json | ||
|
||
|
@@ -67,6 +68,10 @@ class InvalidSchema(Exception): | |
pass | ||
|
||
|
||
class InvalidReferences(Exception): | ||
pass | ||
|
||
|
||
@unique | ||
class SchemaType(str, Enum): | ||
AVRO = "AVRO" | ||
|
@@ -75,15 +80,17 @@ class SchemaType(str, Enum): | |
|
||
|
||
class TypedSchema: | ||
def __init__(self, schema_type: SchemaType, schema_str: str): | ||
def __init__(self, schema_type: SchemaType, schema_str: str, references: Optional["References"] = None): | ||
"""Schema with type information | ||
|
||
Args: | ||
schema_type (SchemaType): The type of the schema | ||
schema_str (str): The original schema string | ||
references(References): The references of schema | ||
""" | ||
self.schema_type = schema_type | ||
self.schema_str = schema_str | ||
self.references = references | ||
libretto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def to_dict(self) -> Dict[str, Any]: | ||
if self.schema_type is SchemaType.PROTOBUF: | ||
|
@@ -100,17 +107,33 @@ def __repr__(self) -> str: | |
return f"TypedSchema(type={self.schema_type}, schema={str(self)})" | ||
return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})" | ||
|
||
def get_references(self) -> Optional["References"]: | ||
return self.references | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type | ||
schema_is_equal = ( | ||
isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__() | ||
) | ||
if not schema_is_equal: | ||
return False | ||
if self.references is not None: | ||
return self.references == other.references | ||
return other.references is None | ||
|
||
|
||
class ValidatedTypedSchema(TypedSchema): | ||
def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): | ||
super().__init__(schema_type=schema_type, schema_str=schema_str) | ||
def __init__( | ||
self, | ||
schema_type: SchemaType, | ||
schema_str: str, | ||
schema: Union[Draft7Validator, AvroSchema, ProtobufSchema], | ||
references: Optional["References"] = None, | ||
): | ||
super().__init__(schema_type=schema_type, schema_str=schema_str, references=references) | ||
self.schema = schema | ||
|
||
@staticmethod | ||
def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": | ||
def parse(schema_type: SchemaType, schema_str: str, references: Optional["References"] = None) -> "ValidatedTypedSchema": | ||
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: | ||
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") | ||
|
||
|
@@ -146,9 +169,34 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": | |
else: | ||
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") | ||
|
||
return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema) | ||
return ValidatedTypedSchema( | ||
schema_type=schema_type, schema_str=schema_str, schema=parsed_schema, references=references | ||
) | ||
|
||
def __str__(self) -> str: | ||
if self.schema_type == SchemaType.PROTOBUF: | ||
return str(self.schema) | ||
return super().__str__() | ||
|
||
|
||
class References: | ||
def __init__(self, schema_type: SchemaType, references: JsonData): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would propose to type the references to specific type instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jjaakola-aiven Which reason to add some more types there when the specific type is "References" and data for this type is stored in Kafka in JSON? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably just an error from me on review, not sure after vacation. Possibly I've been thinking on modeling the data directly in the controller layer from JSON to objects. |
||
"""Schema with type information | ||
|
||
Args: | ||
schema_type (SchemaType): The type of the schema | ||
references (str): The references of schema in Kafka/Json representation | ||
""" | ||
self.schema_type = schema_type | ||
self.references = references | ||
|
||
def val(self) -> JsonData: | ||
return self.references | ||
|
||
def json(self) -> str: | ||
return str(json_encode(self.references, sort_keys=True)) | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
if other is None or not isinstance(other, References): | ||
return False | ||
return self.json() == other.json() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,9 @@ | |
from karapace.master_coordinator import MasterCoordinator | ||
from karapace.schema_models import SchemaType, TypedSchema | ||
from karapace.statsd import StatsClient | ||
from karapace.utils import KarapaceKafkaClient | ||
from karapace.utils import KarapaceKafkaClient, reference_key | ||
from threading import Event, Lock, Thread | ||
from typing import Any, Dict, Optional | ||
from typing import Any, Dict, List, Optional | ||
|
||
import json | ||
import logging | ||
|
@@ -26,6 +26,7 @@ | |
Schema = Dict[str, Any] | ||
# Container type for a subject, with configuration settings and all the schemas | ||
SubjectData = Dict[str, Any] | ||
Referents = List | ||
SchemaId = int | ||
|
||
# The value `0` is a valid offset and it represents the first message produced | ||
|
@@ -140,6 +141,7 @@ def __init__( | |
self.config = config | ||
self.subjects: Dict[Subject, SubjectData] = {} | ||
self.schemas: Dict[int, TypedSchema] = {} | ||
self.referenced_by: Dict[str, Referents] = {} | ||
self.global_schema_id = 0 | ||
self.admin_client: Optional[KafkaAdminClient] = None | ||
self.topic_replication_factor = self.config["replication_factor"] | ||
|
@@ -295,7 +297,6 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: | |
if subject not in self.subjects: | ||
LOG.info("Adding first version of subject: %r with no schemas", subject) | ||
self.subjects[subject] = {"schemas": {}} | ||
|
||
if not value: | ||
LOG.info("Deleting compatibility config completely for subject: %r", subject) | ||
self.subjects[subject].pop("compatibility", None) | ||
|
@@ -345,6 +346,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: | |
schema_id = value["id"] | ||
schema_version = value["version"] | ||
schema_deleted = value.get("deleted", False) | ||
schema_references = value.get("references", None) | ||
|
||
try: | ||
schema_type_parsed = SchemaType(schema_type) | ||
|
@@ -375,21 +377,36 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: | |
# dedup schemas to reduce memory pressure | ||
schema_str = self._hash_to_schema.setdefault(hash(schema_str), schema_str) | ||
|
||
if schema_version in subjects_schemas: | ||
LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) | ||
else: | ||
LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) | ||
|
||
typed_schema = TypedSchema( | ||
schema_type=schema_type_parsed, | ||
schema_str=schema_str, | ||
references=schema_references, | ||
) | ||
subjects_schemas[schema_version] = { | ||
schema = { | ||
"schema": typed_schema, | ||
"version": schema_version, | ||
"id": schema_id, | ||
"deleted": schema_deleted, | ||
} | ||
if schema_references: | ||
schema["references"] = schema_references | ||
|
||
if schema_version in subjects_schemas: | ||
LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) | ||
else: | ||
LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) | ||
|
||
subjects_schemas[schema_version] = schema | ||
if schema_references: | ||
for ref in schema_references: | ||
ref_str = reference_key(ref["subject"], ref["version"]) | ||
referents = self.referenced_by.get(ref_str, None) | ||
if referents: | ||
LOG.info("Adding entry subject referenced_by : %r", ref_str) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Debug log. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Must we remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If deemed valuable for debugging no need to remove, but change it to |
||
referents.append(schema_id) | ||
else: | ||
LOG.info("Adding entry subject referenced_by : %r", ref_str) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Debug log. |
||
self.referenced_by[ref_str] = [schema_id] | ||
|
||
self.schemas[schema_id] = typed_schema | ||
self.global_schema_id = max(self.global_schema_id, schema_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is not raised.