Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support of references Karapace API calls. #21

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
efb456d
add basic support of references to POST/GET subjects
libretto Apr 19, 2022
843a1da
fixups
libretto Apr 19, 2022
44f853a
fixup
libretto Apr 19, 2022
3498962
fixup
libretto Apr 19, 2022
9e9b16d
fixup
libretto Apr 20, 2022
944cd39
fixup
libretto Apr 20, 2022
956b68e
fixup
libretto Apr 26, 2022
d677bd5
merge with master and fixup conflicts
libretto Apr 26, 2022
764facf
debugging
libretto May 1, 2022
c1c4c50
referencedby/delete workarounds
libretto May 11, 2022
732a9ed
referencedby/delete workarounds
libretto May 11, 2022
1f33b7d
Add basic support of references with basic tests
libretto May 16, 2022
ebe889f
Merge branch 'master' into deps
sujayinstaclustr May 23, 2022
93cd241
removed reference for ujson
sujayinstaclustr May 23, 2022
2e9a8c6
added comma at the end
sujayinstaclustr May 23, 2022
7f2cf01
Update schema_models.py
libretto Jun 19, 2022
0f2fab6
Update schema_models.py
libretto Jun 19, 2022
bbbb5a3
Update schema_models.py
libretto Jun 19, 2022
1d2b7c4
Update schema_reader.py
libretto Jun 19, 2022
4634373
Update karapace/schema_registry_apis.py
libretto Jun 19, 2022
efe29f4
update code by PR review
libretto Jun 19, 2022
0854296
add reference_key()
libretto Jun 21, 2022
8a1b129
Merge branch 'master' into deps
sujayinstaclustr Jun 23, 2022
46146dd
fixed undefined variable missing due to merge
sujayinstaclustr Jun 23, 2022
967c822
removed extra line feeds
sujayinstaclustr Jun 29, 2022
da4171a
merge with main branch
libretto Jul 18, 2022
70e8077
Update karapace/schema_registry_apis.py
libretto Jul 26, 2022
ef05bd4
improve PR1 code
libretto Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ __pycache__/
/kafka_*/
venv
/karapace/version.py
.run
68 changes: 61 additions & 7 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
SchemaParseException as ProtobufSchemaParseException,
)
from karapace.protobuf.schema import ProtobufSchema
from karapace.typing import JsonData
from karapace.utils import json_encode
from typing import Any, Dict, Union
from typing import Any, Dict, Optional, Union

import json

Expand Down Expand Up @@ -67,6 +68,10 @@ class InvalidSchema(Exception):
pass


class InvalidReferences(Exception):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this is not raised.

pass


@unique
class SchemaType(str, Enum):
AVRO = "AVRO"
Expand All @@ -75,7 +80,7 @@ class SchemaType(str, Enum):


class TypedSchema:
def __init__(self, schema_type: SchemaType, schema_str: str):
def __init__(self, schema_type: SchemaType, schema_str: str, references: Optional["References"] = None):
"""Schema with type information

Args:
Expand All @@ -84,6 +89,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str):
"""
self.schema_type = schema_type
self.schema_str = schema_str
self.references = references
libretto marked this conversation as resolved.
Show resolved Hide resolved

def to_dict(self) -> Dict[str, Any]:
if self.schema_type is SchemaType.PROTOBUF:
Expand All @@ -100,17 +106,42 @@ def __repr__(self) -> str:
return f"TypedSchema(type={self.schema_type}, schema={str(self)})"
return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})"

def get_references(self) -> Optional["References"]:
return self.references

def __eq__(self, other: Any) -> bool:
return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type
a = isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__()

if not a:
return False
x = self.get_references()
y = other.get_references()
if x:
if y:
if x == y:
return True
else:
return False
else:
if y:
return False

return True
libretto marked this conversation as resolved.
Show resolved Hide resolved


class ValidatedTypedSchema(TypedSchema):
def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]):
super().__init__(schema_type=schema_type, schema_str=schema_str)
def __init__(
self,
schema_type: SchemaType,
schema_str: str,
schema: Union[Draft7Validator, AvroSchema, ProtobufSchema],
references: Optional["References"] = None,
):
super().__init__(schema_type=schema_type, schema_str=schema_str, references=references)
self.schema = schema

@staticmethod
def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema":
def parse(schema_type: SchemaType, schema_str: str, references: Optional["References"] = None) -> "ValidatedTypedSchema":
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

Expand Down Expand Up @@ -146,9 +177,32 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema":
else:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema)
return ValidatedTypedSchema(
schema_type=schema_type, schema_str=schema_str, schema=parsed_schema, references=references
)

def __str__(self) -> str:
if self.schema_type == SchemaType.PROTOBUF:
return str(self.schema)
return super().__str__()


class References:
def __init__(self, schema_type: SchemaType, references: JsonData):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose to type the references to specific type instead of JsonData.

Copy link
Collaborator Author

@libretto libretto Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjaakola-aiven Which reason to add some more types there when the specific type is "References" and data for this type is stored in Kafka in JSON?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just an error from me on review, not sure after vacation. Possibly I've been thinking on modeling the data directly in the controller layer from JSON to objects.

"""Schema with type information

Args:
schema_type (SchemaType): The type of the schema
references (str): The original schema string
libretto marked this conversation as resolved.
Show resolved Hide resolved
"""
self.schema_type = schema_type
self.references = references

def val(self) -> JsonData:
return self.references

def json(self) -> str:
return str(json_encode(self.references, sort_keys=True))

def __eq__(self, other: Any) -> bool:
return self.json() == other.json()
19 changes: 18 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from karapace.statsd import StatsClient
from karapace.utils import KarapaceKafkaClient
from threading import Event, Lock, Thread
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import json
import logging
Expand All @@ -26,6 +26,7 @@
Schema = Dict[str, Any]
# Container type for a subject, with configuration settings and all the schemas
SubjectData = Dict[str, Any]
Referents = List

# The value `0` is a valid offset and it represents the first message produced
# to a topic, therefore it can not be used.
Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(
self.config = config
self.subjects: Dict[Subject, SubjectData] = {}
self.schemas: Dict[int, TypedSchema] = {}
self.referenced_by: Dict[str, Referents] = {}
self.global_schema_id = 0
self.admin_client: Optional[KafkaAdminClient] = None
self.topic_replication_factor = self.config["replication_factor"]
Expand Down Expand Up @@ -291,6 +293,7 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None:
else:
LOG.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value)
self.subjects[subject]["compatibility"] = value["compatibilityLevel"]

libretto marked this conversation as resolved.
Show resolved Hide resolved
elif value is not None:
LOG.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value)
self.config["compatibility"] = value["compatibilityLevel"]
Expand Down Expand Up @@ -334,6 +337,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None:
schema_version = value["version"]
schema_deleted = value.get("deleted", False)

schema_references = value.get("references", None)
try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
Expand Down Expand Up @@ -366,20 +370,33 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None:
typed_schema = TypedSchema(
schema_type=schema_type_parsed,
schema_str=schema_str,
references=schema_references,
)
schema = {
"schema": typed_schema,
"version": schema_version,
"id": schema_id,
"deleted": schema_deleted,
}
if schema_references:
schema["references"] = schema_references

if schema_version in subjects_schemas:
LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id)
else:
LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id)

subjects_schemas[schema_version] = schema
if schema_references:
for ref in schema_references:
ref_str = str(ref["subject"]) + "_" + str(ref["version"])
libretto marked this conversation as resolved.
Show resolved Hide resolved
referents = self.referenced_by.get(ref_str, None)
if referents:
LOG.info("Adding entry subject referenced_by : %r", ref_str)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must we remove it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deemed valuable for debugging no need to remove, but change it to LOG.debug.

referents.append(schema_id)
else:
LOG.info("Adding entry subject referenced_by : %r", ref_str)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log.

self.referenced_by[ref_str] = [schema_id]

subjects_schemas[schema_version] = {
"schema": typed_schema,
Expand Down
Loading