diff --git a/.gitignore b/.gitignore index 67b386bbf..4a162fa59 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ __pycache__/ /kafka_*/ venv /karapace/version.py +.run diff --git a/karapace/schema_models.py b/karapace/schema_models.py index b31f08e68..327229ec0 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -12,8 +12,9 @@ SchemaParseException as ProtobufSchemaParseException, ) from karapace.protobuf.schema import ProtobufSchema +from karapace.typing import JsonData from karapace.utils import json_encode -from typing import Any, Dict, Union +from typing import Any, Dict, Optional, Union import json @@ -67,6 +68,10 @@ class InvalidSchema(Exception): pass +class InvalidReferences(Exception): + pass + + @unique class SchemaType(str, Enum): AVRO = "AVRO" @@ -75,15 +80,17 @@ class SchemaType(str, Enum): class TypedSchema: - def __init__(self, schema_type: SchemaType, schema_str: str): + def __init__(self, schema_type: SchemaType, schema_str: str, references: Optional["References"] = None): """Schema with type information Args: schema_type (SchemaType): The type of the schema schema_str (str): The original schema string + references(References): The references of schema """ self.schema_type = schema_type self.schema_str = schema_str + self.references = references def to_dict(self) -> Dict[str, Any]: if self.schema_type is SchemaType.PROTOBUF: @@ -100,17 +107,33 @@ def __repr__(self) -> str: return f"TypedSchema(type={self.schema_type}, schema={str(self)})" return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})" + def get_references(self) -> Optional["References"]: + return self.references + def __eq__(self, other: Any) -> bool: - return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type + schema_is_equal = ( + isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__() + ) + if not schema_is_equal: + return False + if self.references is not None: + return self.references == other.references + return other.references is None class ValidatedTypedSchema(TypedSchema): - def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): - super().__init__(schema_type=schema_type, schema_str=schema_str) + def __init__( + self, + schema_type: SchemaType, + schema_str: str, + schema: Union[Draft7Validator, AvroSchema, ProtobufSchema], + references: Optional["References"] = None, + ): + super().__init__(schema_type=schema_type, schema_str=schema_str, references=references) self.schema = schema @staticmethod - def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": + def parse(schema_type: SchemaType, schema_str: str, references: Optional["References"] = None) -> "ValidatedTypedSchema": if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") @@ -146,9 +169,43 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": else: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema) + return ValidatedTypedSchema( + schema_type=schema_type, schema_str=schema_str, schema=parsed_schema, references=references + ) def __str__(self) -> str: if self.schema_type == SchemaType.PROTOBUF: return str(self.schema) return super().__str__() + + +class References: + def __init__(self, schema_type: SchemaType, references: JsonData): + """Schema with type information + + Args: + schema_type (SchemaType): The type of the schema + references (str): The references of schema in Kafka/Json representation + """ + self.schema_type = schema_type + if self.schema_type != "PROTOBUF" or not isinstance(references, list): + raise InvalidReferences + + for ref in references: + if not isinstance(ref, dict): + raise InvalidReferences + if ref.get("name") is None or ref.get("subject") is None or ref.get("version") is None: + raise InvalidReferences + + self.references = references + + def val(self) -> JsonData: + return self.references + + def json(self) -> str: + return str(json_encode(self.references, sort_keys=True)) + + def __eq__(self, other: Any) -> bool: + if other is None or not isinstance(other, References): + return False + return self.json() == other.json() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 6ac96532b..efc81f086 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -13,9 +13,9 @@ from karapace.master_coordinator import MasterCoordinator from karapace.schema_models import SchemaType, TypedSchema from karapace.statsd import StatsClient -from karapace.utils import KarapaceKafkaClient +from karapace.utils import KarapaceKafkaClient, reference_key from threading import Event, Lock, Thread -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import json import logging @@ -26,6 +26,7 @@ Schema = Dict[str, Any] # Container type for a subject, with configuration settings and all the schemas SubjectData = Dict[str, Any] +Referents = List SchemaId = int # The value `0` is a valid offset and it represents the first message produced @@ -140,6 +141,7 @@ def __init__( self.config = config self.subjects: Dict[Subject, SubjectData] = {} self.schemas: Dict[int, TypedSchema] = {} + self.referenced_by: Dict[str, Referents] = {} self.global_schema_id = 0 self.admin_client: Optional[KafkaAdminClient] = None self.topic_replication_factor = self.config["replication_factor"] @@ -295,7 +297,6 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: if subject not in self.subjects: LOG.info("Adding first version of subject: %r with no schemas", subject) self.subjects[subject] = {"schemas": {}} - if not value: LOG.info("Deleting compatibility config completely for subject: %r", subject) self.subjects[subject].pop("compatibility", None) @@ -345,6 +346,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: schema_id = value["id"] schema_version = value["version"] schema_deleted = value.get("deleted", False) + schema_references = value.get("references", None) try: schema_type_parsed = SchemaType(schema_type) @@ -375,21 +377,36 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: # dedup schemas to reduce memory pressure schema_str = self._hash_to_schema.setdefault(hash(schema_str), schema_str) - if schema_version in subjects_schemas: - LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) - else: - LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) - typed_schema = TypedSchema( schema_type=schema_type_parsed, schema_str=schema_str, + references=schema_references, ) - subjects_schemas[schema_version] = { + schema = { "schema": typed_schema, "version": schema_version, "id": schema_id, "deleted": schema_deleted, } + if schema_references: + schema["references"] = schema_references + + if schema_version in subjects_schemas: + LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + else: + LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + + subjects_schemas[schema_version] = schema + if schema_references: + for ref in schema_references: + ref_str = reference_key(ref["subject"], ref["version"]) + referents = self.referenced_by.get(ref_str, None) + if referents: + LOG.info("Adding entry subject referenced_by : %r", ref_str) + referents.append(schema_id) + else: + LOG.info("Adding entry subject referenced_by : %r", ref_str) + self.referenced_by[ref_str] = [schema_id] self.schemas[schema_id] = typed_schema self.global_schema_id = max(self.global_schema_id, schema_id) @@ -428,3 +445,9 @@ def get_schemas( key: val for key, val in self.subjects[subject]["schemas"].items() if val.get("deleted", False) is False } return non_deleted_schemas + + def remove_referenced_by(self, schema_id: SchemaId, references: List): + for ref in references: + key = reference_key(ref["subject"], ref["version"]) + if self.referenced_by.get(key, None) and schema_id in self.referenced_by[key]: + self.referenced_by[key].remove(schema_id) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 8bac67636..b9a1c0744 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -10,10 +10,10 @@ from karapace.karapace import KarapaceBase from karapace.master_coordinator import MasterCoordinator from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema +from karapace.schema_models import InvalidReferences, InvalidSchema, InvalidSchemaType, References, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader, SchemaType, SubjectData, TypedSchema from karapace.typing import JsonData -from karapace.utils import json_encode, KarapaceKafkaClient +from karapace.utils import json_encode, KarapaceKafkaClient, reference_key from typing import Any, Dict, NoReturn, Optional, Tuple, Union import aiohttp @@ -44,6 +44,9 @@ class SchemaErrorCodes(Enum): INVALID_VERSION_ID = 42202 INVALID_COMPATIBILITY_LEVEL = 42203 INVALID_AVRO_SCHEMA = 44201 + INVALID_REFERENCES = 44301 + REFERENCES_SUPPORT_NOT_IMPLEMENTED = 44302 + REFERENCE_EXISTS = 42206 NO_MASTER_ERROR = 50003 @@ -55,6 +58,7 @@ class SchemaErrorMessages(Enum): "forward, full, backward_transitive, forward_transitive, and " "full_transitive" ) + REFERENCES_SUPPORT_NOT_IMPLEMENTED = "Schema references are not supported for '{schema_type}' schema type" class KarapaceSchemaRegistry(KarapaceBase): @@ -217,6 +221,12 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, auth=self._auth, ) + self.route( + "/subjects//versions//referencedby", + callback=self.subject_version_referencedby_get, + method="GET", + schema_request=True, + ) self.route( "/subjects/", callback=self.subject_delete, @@ -343,6 +353,7 @@ def send_schema_message( schema_id: int, version: int, deleted: bool, + references: Optional[References], ): key = json.dumps({"subject": subject, "version": version, "magic": 1, "keytype": "SCHEMA"}, separators=(",", ":")) if schema: @@ -353,6 +364,8 @@ def send_schema_message( "schema": schema.schema_str, "deleted": deleted, } + if references: + valuedict["references"] = references.val() if schema.schema_type is not SchemaType.AVRO: valuedict["schemaType"] = schema.schema_type value = json_encode(valuedict) @@ -623,11 +636,45 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent latest_schema_id = 0 if permanent: + for version, value in list(subject_data["schemas"].items()): + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, + "message": ( + f"Subject '{subject}' Version {version} cannot be not be deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + for version, value in list(subject_data["schemas"].items()): schema_id = value.get("id") + references = value.get("references", None) self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id) - self.send_schema_message(subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True) + self.send_schema_message( + subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None + ) + if references and len(references) > 0: + self.ksr.remove_referenced_by(schema_id, references) + else: + referenced_by = self.ksr.referenced_by.get(reference_key(subject, latest_schema_id), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, + "message": ( + f"Subject '{subject}' Version {latest_schema_id} cannot be not be deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) self.send_delete_subject_message(subject, latest_schema_id) self.r(version_list, content_type, status=HTTPStatus.OK) @@ -736,11 +783,33 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v status=HTTPStatus.NOT_FOUND, ) + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, + "message": ( + f"Subject '{subject}' Version {version} was not deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + schema_id = subject_schema_data["id"] schema = subject_schema_data["schema"] + references = schema.references self.send_schema_message( - subject=subject, schema=None if permanent else schema, schema_id=schema_id, version=version, deleted=True + subject=subject, + schema=None if permanent else schema, + schema_id=schema_id, + version=version, + deleted=True, + references=None, ) + if references and len(references) > 0: + self.ksr.remove_referenced_by(schema_id, references) self.r(str(version), content_type, status=HTTPStatus.OK) async def subject_version_delete( @@ -784,9 +853,52 @@ async def subject_version_schema_get( ) self.r(schema_data["schema"].schema_str, content_type) + async def subject_version_referencedby_get(self, content_type, *, subject, version): + self._validate_version(content_type, version) + subject_data = self._subject_get(subject, content_type) + schema_data = None + max_version = max(subject_data["schemas"]) + if version == "latest": + version = max(subject_data["schemas"]) + schema_data = subject_data["schemas"][version] + elif int(version) <= max_version: + schema_data = subject_data["schemas"].get(int(version)) + else: + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + if not schema_data: + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + + if schema_data["schema"].schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_data["schema"].schema_type + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), []) + self.r(list(referenced_by), content_type, status=HTTPStatus.OK) + async def subject_versions_list(self, content_type: str, *, subject: str, user: Optional[User] = None) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") - subject_data = self._subject_get(subject, content_type) schemas = list(subject_data["schemas"]) self.r(schemas, content_type, status=HTTPStatus.OK) @@ -813,12 +925,12 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) - for attr in body: - if attr not in {"schema", "schemaType"}: + for field in body: + if field not in {"schema", "schemaType", "references"}: self.r( body={ "error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - "message": f"Unrecognized field: {attr}", + "message": f"Unrecognized field: {field}", }, content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, @@ -870,8 +982,39 @@ async def subjects_schema_post( ) schema_str = body["schema"] schema_type = self._validate_schema_type(content_type=content_type, data=body) + + new_schema_references = None + references = body.get("references") + if references: + if schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_type.value + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + try: + new_schema_references = References(schema_type, references) + except InvalidReferences: + human_error = "Provided references is not valid" + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_REFERENCES.value, + "message": f"Invalid {schema_type} references. Error: {human_error}", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + try: - new_schema = ValidatedTypedSchema.parse(schema_type, schema_str) + new_schema = ValidatedTypedSchema.parse( + schema_type=schema_type, schema_str=schema_str, references=new_schema_references + ) except InvalidSchema: self.log.exception("No proper parser found") self.r( @@ -882,11 +1025,13 @@ async def subjects_schema_post( content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) + for schema in subject_data["schemas"].values(): validated_typed_schema = ValidatedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str) if ( validated_typed_schema.schema_type == new_schema.schema_type and validated_typed_schema.schema == new_schema.schema + and schema.get("references", None) == new_schema_references ): ret = { "subject": subject, @@ -918,9 +1063,37 @@ async def subject_post( self._validate_schema_request_body(content_type, body) schema_type = self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) + new_schema_references = None + references = body.get("references") + if references: + if schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_type.value + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + try: + new_schema_references = References(schema_type, references) + except InvalidReferences: + human_error = "Provided references is not valid" + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_REFERENCES.value, + "message": f"Invalid {schema_type} references. Error: {human_error}", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) try: - new_schema = ValidatedTypedSchema.parse(schema_type=schema_type, schema_str=body["schema"]) + new_schema = ValidatedTypedSchema.parse( + schema_type=schema_type, schema_str=body["schema"], references=new_schema_references + ) except (InvalidSchema, InvalidSchemaType) as e: self.log.warning("Invalid schema: %r", body["schema"], exc_info=True) if isinstance(e.__cause__, (SchemaParseException, json.JSONDecodeError)): @@ -929,6 +1102,7 @@ async def subject_post( human_error = "Provided schema is not valid" self.r( body={ + # TODO: I suppose this code is common for AVRO, PROTOBUF, JSON so INVALID_AVRO_SCHEMA must be replaced "error_code": SchemaErrorCodes.INVALID_AVRO_SCHEMA.value, "message": f"Invalid {schema_type} schema. Error: {human_error}", }, @@ -943,7 +1117,7 @@ async def subject_post( are_we_master, master_url = await self.get_master() if are_we_master: async with self.schema_lock: - await self.write_new_schema_local(subject, new_schema, content_type) + await self.write_new_schema_local(subject, new_schema, new_schema_references, content_type) elif not master_url: self.no_master_error(content_type) else: @@ -954,6 +1128,7 @@ def write_new_schema_local( self, subject: str, new_schema: ValidatedTypedSchema, + new_schema_references: Optional[References], content_type: str, ) -> NoReturn: """Since we're the master we get to write the new schema""" @@ -975,14 +1150,14 @@ def write_new_schema_local( schema_id=schema_id, version=version, deleted=False, + references=new_schema_references, ) - # Returns here self.r({"id": schema_id}, content_type) # First check if any of the existing schemas for the subject match schemas = self.ksr.get_schemas(subject) if not schemas: # Previous ones have been deleted by the user. - version = max(subject_data["schemas"]) + 1 + version = max(self.ksr.subjects[subject]["schemas"]) + 1 schema_id = self.ksr.get_schema_id(new_schema) self.log.info( "Registering subject: %r, id: %r new version: %r with schema %r, schema_id: %r", @@ -998,6 +1173,7 @@ def write_new_schema_local( schema_id=schema_id, version=version, deleted=False, + references=new_schema_references, ) # Returns here self.r({"id": schema_id}, content_type) @@ -1067,6 +1243,71 @@ def write_new_schema_local( schema_id=schema_id, version=version, deleted=False, + references=new_schema_references, + ) + self.r({"id": schema_id}, content_type) + compatibility_mode = self._get_compatibility_mode(subject=subject_data, content_type=content_type) + + # Run a compatibility check between on file schema(s) and the one being submitted now + # the check is either towards the latest one or against all previous ones in case of + # transitive mode + schema_versions = sorted(list(schemas)) + if compatibility_mode.is_transitive(): + check_against = schema_versions + else: + check_against = [schema_versions[-1]] + + for old_version in check_against: + old_schema = subject_data["schemas"][old_version]["schema"] + validated_old_schema = ValidatedTypedSchema.parse( + schema_type=old_schema.schema_type, schema_str=old_schema.schema_str + ) + result = check_compatibility( + old_schema=validated_old_schema, + new_schema=new_schema, + compatibility_mode=compatibility_mode, + ) + if is_incompatible(result): + message = set(result.messages).pop() if result.messages else "" + self.log.warning("Incompatible schema: %s", result) + self.r( + body={ + "error_code": SchemaErrorCodes.HTTP_CONFLICT.value, + "message": f"Incompatible schema, compatibility_mode={compatibility_mode.value} {message}", + }, + content_type=content_type, + status=HTTPStatus.CONFLICT, + ) + + # We didn't find an existing schema and the schema is compatible so go and create one + schema_id = self.ksr.get_schema_id(new_schema) + version = max(self.ksr.subjects[subject]["schemas"]) + 1 + if new_schema.schema_type is SchemaType.PROTOBUF: + self.log.info( + "Registering subject: %r, id: %r new version: %r with schema %r, schema_id: %r", + subject, + schema_id, + version, + new_schema.__str__(), + schema_id, + ) + else: + self.log.info( + "Registering subject: %r, id: %r new version: %r with schema %r, schema_id: %r", + subject, + schema_id, + version, + new_schema.to_dict(), + schema_id, + ) + + self.send_schema_message( + subject=subject, + schema=new_schema, + schema_id=schema_id, + version=version, + deleted=False, + references=new_schema_references, ) self.r({"id": schema_id}, content_type) diff --git a/karapace/serialization.py b/karapace/serialization.py index 5d870a6c1..64248b4c2 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -5,7 +5,7 @@ from karapace.client import Client from karapace.protobuf.exception import ProtobufTypeException from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter -from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import InvalidSchema, References, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.utils import json_encode from typing import Any, Dict, Optional, Tuple from urllib.parse import quote @@ -75,9 +75,14 @@ def __init__( self.client = Client(server_uri=schema_registry_url, server_ca=server_ca, session_auth=session_auth) self.base_url = schema_registry_url - async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> int: + async def post_new_schema( + self, subject: str, schema: ValidatedTypedSchema, references: Optional[References] = None + ) -> int: if schema.schema_type is SchemaType.PROTOBUF: - payload = {"schema": str(schema), "schemaType": schema.schema_type.value} + if references: + payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()} + else: + payload = {"schema": str(schema), "schemaType": schema.schema_type.value} else: payload = {"schema": json_encode(schema.to_dict()), "schemaType": schema.schema_type.value} result = await self.client.post(f"subjects/{quote(subject)}/versions", json=payload) @@ -94,11 +99,13 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) + return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e - async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: + async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]]: result = await self.client.get(f"schemas/ids/{schema_id}") if not result.ok: raise SchemaRetrievalError(result.json()["message"]) @@ -107,7 +114,15 @@ async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) + references_str = json_result.get("references") + if references_str: + references = References(schema_type, references_str) + else: + references = None + if references: + return ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references return ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -159,6 +174,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." + schema_id, schema = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = schema.__str__() @@ -176,6 +192,7 @@ async def get_id_for_schema(self, schema: str, subject: str, schema_type: Schema if schema_ser in self.schemas_to_ids: return self.schemas_to_ids[schema_ser] schema_id = await self.registry_client.post_new_schema(subject, schema_typed) + async with self.state_lock: self.schemas_to_ids[schema_ser] = schema_id self.ids_to_schemas[schema_id] = schema_typed diff --git a/karapace/utils.py b/karapace/utils.py index 95302da04..9433d6523 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -55,6 +55,10 @@ def default_json_serialization(obj: MappingProxyType) -> dict: ... +def reference_key(subject: str, version: int) -> str: + return hash((subject, version)) + + def default_json_serialization( # pylint: disable=inconsistent-return-statements obj: Union[datetime, timedelta, Decimal, MappingProxyType], ) -> Union[str, float, dict]: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index 476dbd2de..f924b536f 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -10,7 +10,7 @@ async def test_remote_client_protobuf(registry_async_client): reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_protobuf) + sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" @@ -25,7 +25,7 @@ async def test_remote_client_protobuf2(registry_async_client): reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_protobuf) + sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index a745da6f8..6e3689046 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1246,6 +1246,17 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject_3}' not found." + schema_str = json.dumps({"type": "string"}) + # Create the subject + subject_1 = subject_name_factory() + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": schema_str, "references": [{"name": "Customer.avro", "subject": "customer", "version": 1}]}, + ) + assert res.status_code == 422 + assert res.json()["error_code"] == 44302 + assert res.json()["message"] == "Schema references are not supported for 'AVRO' schema type" + @pytest.mark.parametrize("trail", ["", "/"]) async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> None: diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 09be5f739..139bc889c 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -104,3 +104,61 @@ async def test_protobuf_schema_compatibility(registry_async_client: Client, trai ) assert res.status_code == 200 assert "id" in res.json() + + +async def test_protobuf_schema_references(registry_async_client: Client) -> None: + + customer_schema = """ + |syntax = "proto3"; + |package a1; + |message Customer { + | string name = 1; + | int32 code = 2; + |} + |""" + + customer_schema = trim_margin(customer_schema) + res = await registry_async_client.post( + "subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema} + ) + assert res.status_code == 200 + assert "id" in res.json() + original_schema = """ + |syntax = "proto3"; + |package a1; + |import "Customer.proto"; + |message TestMessage { + | message Value { + | Customer customer = 1; + | int32 x = 2; + | } + | string test = 1; + | .a1.TestMessage.Value val = 2; + |} + |""" + + original_schema = trim_margin(original_schema) + references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] + res = await registry_async_client.post( + "subjects/test_schema/versions", + json={"schemaType": "PROTOBUF", "schema": original_schema, "references": references}, + ) + assert res.status_code == 200 + assert "id" in res.json() + res = await registry_async_client.get("subjects/customer/versions/latest/referencedby", json={}) + assert res.status_code == 200 + myjson = res.json() + referents = [2] + assert not any(x != y for x, y in zip(myjson, referents)) + + res = await registry_async_client.delete("subjects/customer/versions/1") + assert res.status_code == 404 + match_msg = "Subject 'customer' Version 1 was not deleted because it is referenced by schemas with ids:[2]" + myjson = res.json() + assert myjson["error_code"] == 42206 and myjson["message"] == match_msg + + res = await registry_async_client.delete("subjects/test_schema/versions/1") + assert res.status_code == 200 + + res = await registry_async_client.delete("subjects/customer/versions/1") + assert res.status_code == 200