From efb456de65e3a6628c422a839bdee58e951b0d95 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 19 Apr 2022 21:04:50 +0300 Subject: [PATCH 01/24] add basic support of references to POST/GET subjects --- karapace/schema_models.py | 45 +++++++++--- karapace/schema_reader.py | 4 + karapace/schema_registry_apis.py | 89 +++++++++++++++++++---- karapace/serialization.py | 41 ++++++++--- tests/integration/test_schema_protobuf.py | 56 ++++++++++++++ 5 files changed, 197 insertions(+), 38 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index e8e111440..6e6a41bae 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -13,7 +13,7 @@ ) from karapace.protobuf.schema import ProtobufSchema from karapace.utils import json_encode -from typing import Any, Dict, Union +from typing import Any, Dict, Union, List import json import logging @@ -71,6 +71,10 @@ class InvalidSchema(Exception): pass +class InvalidReferences(Exception): + pass + + @unique class SchemaType(str, Enum): AVRO = "AVRO" @@ -105,11 +109,13 @@ def __repr__(self) -> str: return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})" def __eq__(self, other: Any) -> bool: - return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type + return isinstance(other, + TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type class ValidatedTypedSchema(TypedSchema): - def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): + def __init__(self, schema_type: SchemaType, schema_str: str, + schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): super().__init__(schema_type=schema_type, schema_str=schema_str) self.schema = schema @@ -136,15 +142,15 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": try: parsed_schema = parse_protobuf_schema_definition(schema_str) except ( - TypeError, - SchemaError, - AssertionError, - ProtobufParserRuntimeException, - IllegalStateException, - IllegalArgumentException, - ProtobufError, - ProtobufException, - ProtobufSchemaParseException, + TypeError, + SchemaError, + AssertionError, + ProtobufParserRuntimeException, + IllegalStateException, + IllegalArgumentException, + ProtobufError, + ProtobufException, + ProtobufSchemaParseException, ) as e: log.exception("Unexpected error: %s \n schema:[%s]", e, schema_str) raise InvalidSchema from e @@ -157,3 +163,18 @@ def __str__(self) -> str: if self.schema_type == SchemaType.PROTOBUF: return str(self.schema) return super().__str__() + + +class References: + def __init__(self, schema_type: SchemaType, references: List): + """Schema with type information + + Args: + schema_type (SchemaType): The type of the schema + references (str): The original schema string + """ + self.schema_type = schema_type + self.references = references + + def json(self) -> str: + return json_encode(self.references) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 335166560..3eaa181b2 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -326,6 +326,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: schema_id = value["id"] schema_version = value["version"] schema_deleted = value.get("deleted", False) + schema_references = value.get("references", None) try: schema_type_parsed = SchemaType(schema_type) @@ -361,6 +362,9 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: "id": schema_id, "deleted": schema_deleted, } + if schema_references: + subjects_schemas[schema_version]["references"] = schema_references + with self.id_lock: self.schemas[schema_id] = typed_schema self.global_schema_id = max(self.global_schema_id, schema_id) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 5079c4976..28bbbadbd 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -8,7 +8,7 @@ from karapace.karapace import KarapaceBase from karapace.master_coordinator import MasterCoordinator from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema +from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema, References, InvalidReferences from karapace.schema_reader import KafkaSchemaReader, SchemaType, TypedSchema from karapace.utils import json_encode, KarapaceKafkaClient from typing import Any, Dict, Optional, Tuple @@ -36,6 +36,8 @@ class SchemaErrorCodes(Enum): INVALID_VERSION_ID = 42202 INVALID_COMPATIBILITY_LEVEL = 42203 INVALID_AVRO_SCHEMA = 44201 + INVALID_PROTOBUF_SCHEMA = 44202 + INVALID_REFERECES = 442203 NO_MASTER_ERROR = 50003 @@ -113,7 +115,8 @@ def _add_schema_registry_routes(self) -> None: self.route("/config", callback=self.config_get, method="GET", schema_request=True) self.route("/config", callback=self.config_set, method="PUT", schema_request=True) self.route( - "/schemas/ids//versions", callback=self.schemas_get_versions, method="GET", schema_request=True + "/schemas/ids//versions", callback=self.schemas_get_versions, method="GET", + schema_request=True ) self.route("/schemas/ids/", callback=self.schemas_get, method="GET", schema_request=True) self.route("/schemas/types", callback=self.schemas_types, method="GET", schema_request=True) @@ -143,6 +146,12 @@ def _add_schema_registry_routes(self) -> None: method="GET", schema_request=True, ) + self.route( + "/subjects//versions//referencedby", + callback=self.subject_version_referencedby_get, + method="GET", + schema_request=True, + ) self.route( "/subjects/", callback=self.subject_delete, @@ -262,13 +271,14 @@ def send_kafka_message(self, key, value): return future def send_schema_message( - self, - *, - subject: str, - schema: Optional[TypedSchema], - schema_id: int, - version: int, - deleted: bool, + self, + *, + subject: str, + schema: Optional[TypedSchema], + schema_id: int, + version: int, + deleted: bool, + references: Optional[References], ): key = '{{"subject":"{}","version":{},"magic":1,"keytype":"SCHEMA"}}'.format(subject, version) if schema: @@ -279,6 +289,8 @@ def send_schema_message( "schema": schema.schema_str, "deleted": deleted, } + if references: + valuedict["references"] = references.json() if schema.schema_type is not SchemaType.AVRO: valuedict["schemaType"] = schema.schema_type value = json_encode(valuedict) @@ -303,7 +315,8 @@ async def compatibility_check(self, content_type, *, subject, version, request): """Check for schema compatibility""" body = request.json self.log.info("Got request to check subject: %r, version_id: %r compatibility", subject, version) - old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, return_dict=True) + old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, + return_dict=True) self.log.info("Existing schema: %r, new_schema: %r", old["schema"], body["schema"]) try: schema_type = SchemaType(body.get("schemaType", "AVRO")) @@ -491,7 +504,8 @@ async def subjects_list(self, content_type): async def _subject_delete_local(self, content_type: str, subject: str, permanent: bool): subject_data = self._subject_get(subject, content_type, include_deleted=permanent) - if permanent and [version for version, value in subject_data["schemas"].items() if not value.get("deleted", False)]: + if permanent and [version for version, value in subject_data["schemas"].items() if + not value.get("deleted", False)]: self.r( body={ "error_code": SchemaErrorCodes.SUBJECT_NOT_SOFT_DELETED.value, @@ -510,8 +524,10 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent if permanent: for version, value in list(subject_data["schemas"].items()): schema_id = value.get("id") - self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id) - self.send_schema_message(subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True) + self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, + schema_id) + self.send_schema_message(subject=subject, schema=None, schema_id=schema_id, version=version, + deleted=True) else: self.send_delete_subject_message(subject, latest_schema_id) self.r(version_list, content_type, status=HTTPStatus.OK) @@ -566,6 +582,9 @@ async def subject_version_get(self, content_type, *, subject, version, return_di "id": schema_id, "schema": schema.schema_str, } + references = schema_data.get("references") + if references : + ret["references"] = references if schema.schema_type is not SchemaType.AVRO: ret["schemaType"] = schema.schema_type if return_dict: @@ -637,6 +656,26 @@ async def subject_version_schema_get(self, content_type, *, subject, version): subject_data = self._subject_get(subject, content_type) max_version = max(subject_data["schemas"]) + if version == "latest": + schema_data = subject_data["schemas"][max_version] + elif int(version) <= max_version: + schema_data = subject_data["schemas"].get(int(version)) + else: + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + self.r(self.get_referencedby(subject, version), content_type) + + async def subject_version_referencedby_get(self, content_type, *, subject, version): + self._validate_version(content_type, version) + subject_data = self._subject_get(subject, content_type) + max_version = max(subject_data["schemas"]) + if version == "latest": schema_data = subject_data["schemas"][max_version] elif int(version) <= max_version: @@ -679,7 +718,7 @@ def _validate_schema_request_body(self, content_type, body) -> None: status=HTTPStatus.INTERNAL_SERVER_ERROR, ) for field in body: - if field not in {"schema", "schemaType"}: + if field not in {"schema", "schemaType", "references"}: self.r( body={ "error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, @@ -799,6 +838,22 @@ def write_new_schema_local(self, subject, body, content_type): content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, ) + new_schema_references = None + if body.get("references"): + b = body.get("references") + try: + new_schema_references = References(schema_type, body["references"]) + except InvalidReferences: + human_error = "Provided references is not valid" + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_REFERECES.value, + "message": f"Invalid {schema_type} references. Error: {human_error}", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + if subject not in self.ksr.subjects or not self.ksr.subjects.get(subject)["schemas"]: schema_id = self.ksr.get_schema_id(new_schema) version = 1 @@ -830,6 +885,7 @@ def write_new_schema_local(self, subject, body, content_type): schema_id=schema_id, version=version, deleted=False, + references=new_schema_references ) self.r({"id": schema_id}, content_type) @@ -902,6 +958,7 @@ def write_new_schema_local(self, subject, body, content_type): schema_id=schema_id, version=version, deleted=False, + references=new_schema_references, ) self.r({"id": schema_id}, content_type) @@ -929,3 +986,7 @@ def no_master_error(self, content_type): content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) + + def get_referencedby(self, subject, version): + + pass diff --git a/karapace/serialization.py b/karapace/serialization.py index 4e95f5b5c..9976defd0 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -4,7 +4,7 @@ from karapace.client import Client from karapace.protobuf.exception import ProtobufTypeException from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter -from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema, References from karapace.utils import json_encode from typing import Any, Dict, Optional, Tuple from urllib.parse import quote @@ -72,9 +72,13 @@ def __init__(self, schema_registry_url: str = "http://localhost:8081", server_ca self.client = Client(server_uri=schema_registry_url, server_ca=server_ca) self.base_url = schema_registry_url - async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> int: + async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, references: Optional[References]) -> int: if schema.schema_type is SchemaType.PROTOBUF: - payload = {"schema": str(schema), "schemaType": schema.schema_type.value} + if references : + payload = {"schema": str(schema), "schemaType": schema.schema_type.value, + "references": references.json()} + else: + payload = {"schema": str(schema), "schemaType": schema.schema_type.value} else: payload = {"schema": json_encode(schema.to_dict()), "schemaType": schema.schema_type.value} result = await self.client.post(f"subjects/{quote(subject)}/versions", json=payload) @@ -82,7 +86,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> i raise SchemaRetrievalError(result.json()) return result.json()["id"] - async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema]: + async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, Optional[References]]: result = await self.client.get(f"subjects/{quote(subject)}/versions/latest") if not result.ok: raise SchemaRetrievalError(result.json()) @@ -91,11 +95,18 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + + if json_result["references"]: + references = References(json_result["references"]) + else: + references = None + + return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references + except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e - async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: + async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]] : result = await self.client.get(f"schemas/ids/{schema_id}") if not result.ok: raise SchemaRetrievalError(result.json()["message"]) @@ -104,7 +115,13 @@ async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - return ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + if json_result["references"]: + references = References(json_result["references"]) + else: + references = None + + return ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references + except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -114,10 +131,10 @@ async def close(self): class SchemaRegistrySerializerDeserializer: def __init__( - self, - config: dict, - name_strategy: str = "topic_name", - **cfg, # pylint: disable=unused-argument + self, + config: dict, + name_strategy: str = "topic_name", + **cfg, # pylint: disable=unused-argument ) -> None: self.config = config self.state_lock = asyncio.Lock() @@ -151,7 +168,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." - schema_id, schema = await self.registry_client.get_latest_schema(subject) + schema_id, schema, references = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = schema.__str__() self.schemas_to_ids[schema_ser] = schema_id diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 09be5f739..a57eb67e1 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -10,6 +10,7 @@ import logging import pytest +import json baseurl = "http://localhost:8081" @@ -104,3 +105,58 @@ async def test_protobuf_schema_compatibility(registry_async_client: Client, trai ) assert res.status_code == 200 assert "id" in res.json() + + +async def test_protobuf_schema_references(registry_async_client: Client) -> None: + + customer_schema = """ + |syntax = "proto3"; + |package a1; + |message Customer { + | string name = 1; + | int32 code = 2; + |} + |""" + customer_schema = trim_margin(customer_schema) + res = await registry_async_client.post( + f"subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema} + ) + assert res.status_code == 200 + assert "id" in res.json() + + original_schema = """ + |syntax = "proto3"; + |package a1; + |import "Customer.proto"; + |message TestMessage { + | message Value { + | Customer customer = 1; + | int32 x = 2; + | } + | string test = 1; + | .a1.TestMessage.Value val = 2; + |} + |""" + + original_schema = trim_margin(original_schema) + references = [{"name": "Customer.proto", + "subject": "customer", + "version": 1}] + res = await registry_async_client.post( + f"subjects/test_schema/versions", + json={"schemaType": "PROTOBUF", + "schema": original_schema, + "references": references} + ) + assert res.status_code == 200 + assert "id" in res.json() + res = await registry_async_client.get( + f"subjects/test_schema/versions/latest", json={} ) + assert res.status_code == 200 + myjson = res.json() + assert "id" in myjson + references = [{"name": "Customer.proto", + "subject": "customer", + "version": 1}] + refs2 = json.loads(myjson["references"]) + assert not any(x != y for x, y in zip(refs2, references)) From 843a1da4a5c9538ad13060d392f85a67ce0053ad Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 19 Apr 2022 22:51:09 +0300 Subject: [PATCH 02/24] fixups --- karapace/schema_registry_apis.py | 68 +++++++++-------------- karapace/serialization.py | 28 +++++----- tests/integration/test_client.py | 4 +- tests/integration/test_schema_protobuf.py | 25 +++------ 4 files changed, 51 insertions(+), 74 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 28bbbadbd..4d8b2c060 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -8,7 +8,7 @@ from karapace.karapace import KarapaceBase from karapace.master_coordinator import MasterCoordinator from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema, References, InvalidReferences +from karapace.schema_models import InvalidReferences, InvalidSchema, InvalidSchemaType, References, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader, SchemaType, TypedSchema from karapace.utils import json_encode, KarapaceKafkaClient from typing import Any, Dict, Optional, Tuple @@ -115,8 +115,7 @@ def _add_schema_registry_routes(self) -> None: self.route("/config", callback=self.config_get, method="GET", schema_request=True) self.route("/config", callback=self.config_set, method="PUT", schema_request=True) self.route( - "/schemas/ids//versions", callback=self.schemas_get_versions, method="GET", - schema_request=True + "/schemas/ids//versions", callback=self.schemas_get_versions, method="GET", schema_request=True ) self.route("/schemas/ids/", callback=self.schemas_get, method="GET", schema_request=True) self.route("/schemas/types", callback=self.schemas_types, method="GET", schema_request=True) @@ -271,14 +270,14 @@ def send_kafka_message(self, key, value): return future def send_schema_message( - self, - *, - subject: str, - schema: Optional[TypedSchema], - schema_id: int, - version: int, - deleted: bool, - references: Optional[References], + self, + *, + subject: str, + schema: Optional[TypedSchema], + schema_id: int, + version: int, + deleted: bool, + references: Optional[References], ): key = '{{"subject":"{}","version":{},"magic":1,"keytype":"SCHEMA"}}'.format(subject, version) if schema: @@ -315,8 +314,7 @@ async def compatibility_check(self, content_type, *, subject, version, request): """Check for schema compatibility""" body = request.json self.log.info("Got request to check subject: %r, version_id: %r compatibility", subject, version) - old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, - return_dict=True) + old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, return_dict=True) self.log.info("Existing schema: %r, new_schema: %r", old["schema"], body["schema"]) try: schema_type = SchemaType(body.get("schemaType", "AVRO")) @@ -504,8 +502,7 @@ async def subjects_list(self, content_type): async def _subject_delete_local(self, content_type: str, subject: str, permanent: bool): subject_data = self._subject_get(subject, content_type, include_deleted=permanent) - if permanent and [version for version, value in subject_data["schemas"].items() if - not value.get("deleted", False)]: + if permanent and [version for version, value in subject_data["schemas"].items() if not value.get("deleted", False)]: self.r( body={ "error_code": SchemaErrorCodes.SUBJECT_NOT_SOFT_DELETED.value, @@ -524,10 +521,10 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent if permanent: for version, value in list(subject_data["schemas"].items()): schema_id = value.get("id") - self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, - schema_id) - self.send_schema_message(subject=subject, schema=None, schema_id=schema_id, version=version, - deleted=True) + self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id) + self.send_schema_message( + subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None + ) else: self.send_delete_subject_message(subject, latest_schema_id) self.r(version_list, content_type, status=HTTPStatus.OK) @@ -583,7 +580,7 @@ async def subject_version_get(self, content_type, *, subject, version, return_di "schema": schema.schema_str, } references = schema_data.get("references") - if references : + if references: ret["references"] = references if schema.schema_type is not SchemaType.AVRO: ret["schemaType"] = schema.schema_type @@ -633,7 +630,12 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v schema_id = subject_schema_data["id"] schema = subject_schema_data["schema"] self.send_schema_message( - subject=subject, schema=None if permanent else schema, schema_id=schema_id, version=version, deleted=True + subject=subject, + schema=None if permanent else schema, + schema_id=schema_id, + version=version, + deleted=True, + references=None, ) self.r(str(version), content_type, status=HTTPStatus.OK) @@ -669,27 +671,10 @@ async def subject_version_schema_get(self, content_type, *, subject, version): content_type=content_type, status=HTTPStatus.NOT_FOUND, ) - self.r(self.get_referencedby(subject, version), content_type) + self.r(schema_data["schema"].schema_str, content_type) async def subject_version_referencedby_get(self, content_type, *, subject, version): - self._validate_version(content_type, version) - subject_data = self._subject_get(subject, content_type) - max_version = max(subject_data["schemas"]) - - if version == "latest": - schema_data = subject_data["schemas"][max_version] - elif int(version) <= max_version: - schema_data = subject_data["schemas"].get(int(version)) - else: - self.r( - body={ - "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", - }, - content_type=content_type, - status=HTTPStatus.NOT_FOUND, - ) - self.r(schema_data["schema"].schema_str, content_type) + pass async def subject_versions_list(self, content_type, *, subject): subject_data = self._subject_get(subject, content_type) @@ -840,7 +825,6 @@ def write_new_schema_local(self, subject, body, content_type): ) new_schema_references = None if body.get("references"): - b = body.get("references") try: new_schema_references = References(schema_type, body["references"]) except InvalidReferences: @@ -885,7 +869,7 @@ def write_new_schema_local(self, subject, body, content_type): schema_id=schema_id, version=version, deleted=False, - references=new_schema_references + references=new_schema_references, ) self.r({"id": schema_id}, content_type) diff --git a/karapace/serialization.py b/karapace/serialization.py index 9976defd0..2fe4359e3 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -4,7 +4,7 @@ from karapace.client import Client from karapace.protobuf.exception import ProtobufTypeException from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter -from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema, References +from karapace.schema_models import InvalidSchema, References, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.utils import json_encode from typing import Any, Dict, Optional, Tuple from urllib.parse import quote @@ -74,9 +74,8 @@ def __init__(self, schema_registry_url: str = "http://localhost:8081", server_ca async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, references: Optional[References]) -> int: if schema.schema_type is SchemaType.PROTOBUF: - if references : - payload = {"schema": str(schema), "schemaType": schema.schema_type.value, - "references": references.json()} + if references: + payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()} else: payload = {"schema": str(schema), "schemaType": schema.schema_type.value} else: @@ -86,7 +85,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, refe raise SchemaRetrievalError(result.json()) return result.json()["id"] - async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, Optional[References]]: + async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, References]: result = await self.client.get(f"subjects/{quote(subject)}/versions/latest") if not result.ok: raise SchemaRetrievalError(result.json()) @@ -95,9 +94,8 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - if json_result["references"]: - references = References(json_result["references"]) + references = References(schema_type, json_result["references"]) else: references = None @@ -106,7 +104,7 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e - async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]] : + async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]]: result = await self.client.get(f"schemas/ids/{schema_id}") if not result.ok: raise SchemaRetrievalError(result.json()["message"]) @@ -116,7 +114,7 @@ async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) if json_result["references"]: - references = References(json_result["references"]) + references = References(schema_type, json_result["references"]) else: references = None @@ -131,10 +129,10 @@ async def close(self): class SchemaRegistrySerializerDeserializer: def __init__( - self, - config: dict, - name_strategy: str = "topic_name", - **cfg, # pylint: disable=unused-argument + self, + config: dict, + name_strategy: str = "topic_name", + **cfg, # pylint: disable=unused-argument ) -> None: self.config = config self.state_lock = asyncio.Lock() @@ -168,6 +166,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." + # pylint: disable=unused-variable schema_id, schema, references = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = schema.__str__() @@ -184,7 +183,8 @@ async def get_id_for_schema(self, schema: str, subject: str, schema_type: Schema schema_ser = schema_typed.__str__() if schema_ser in self.schemas_to_ids: return self.schemas_to_ids[schema_ser] - schema_id = await self.registry_client.post_new_schema(subject, schema_typed) + schema_id = await self.registry_client.post_new_schema(subject, schema_typed) # pylint: disable=E1120 + async with self.state_lock: self.schemas_to_ids[schema_ser] = schema_id self.ids_to_schemas[schema_id] = schema_typed diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index 1467b5ccd..df9b7fb1e 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -9,7 +9,7 @@ async def test_remote_client(registry_async_client: Client) -> None: reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_avro) + sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120 assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" @@ -23,7 +23,7 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None: reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client_tls subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_avro) + sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120 assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index a57eb67e1..c83ca39b9 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -8,9 +8,9 @@ from karapace.protobuf.kotlin_wrapper import trim_margin from tests.utils import create_subject_name_factory +import json import logging import pytest -import json baseurl = "http://localhost:8081" @@ -109,7 +109,7 @@ async def test_protobuf_schema_compatibility(registry_async_client: Client, trai async def test_protobuf_schema_references(registry_async_client: Client) -> None: - customer_schema = """ + customer_schema = """ |syntax = "proto3"; |package a1; |message Customer { @@ -117,13 +117,13 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None | int32 code = 2; |} |""" + customer_schema = trim_margin(customer_schema) res = await registry_async_client.post( - f"subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema} + "subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema} ) assert res.status_code == 200 assert "id" in res.json() - original_schema = """ |syntax = "proto3"; |package a1; @@ -139,24 +139,17 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None |""" original_schema = trim_margin(original_schema) - references = [{"name": "Customer.proto", - "subject": "customer", - "version": 1}] + references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] res = await registry_async_client.post( - f"subjects/test_schema/versions", - json={"schemaType": "PROTOBUF", - "schema": original_schema, - "references": references} + "subjects/test_schema/versions", + json={"schemaType": "PROTOBUF", "schema": original_schema, "references": references}, ) assert res.status_code == 200 assert "id" in res.json() - res = await registry_async_client.get( - f"subjects/test_schema/versions/latest", json={} ) + res = await registry_async_client.get("subjects/test_schema/versions/latest", json={}) assert res.status_code == 200 myjson = res.json() assert "id" in myjson - references = [{"name": "Customer.proto", - "subject": "customer", - "version": 1}] + references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] refs2 = json.loads(myjson["references"]) assert not any(x != y for x, y in zip(refs2, references)) From 44f853a4d04d423c0f91f0cd3acaee747c51982f Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 19 Apr 2022 23:41:07 +0300 Subject: [PATCH 03/24] fixup --- tests/integration/test_client_protobuf.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index 476dbd2de..6a1e186d4 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -10,11 +10,11 @@ async def test_remote_client_protobuf(registry_async_client): reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_protobuf) + sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema = await reg_cli.get_latest_schema(subject) + stored_id, stored_schema, references = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable assert stored_id == sc_id assert stored_schema == schema_protobuf @@ -25,10 +25,10 @@ async def test_remote_client_protobuf2(registry_async_client): reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_protobuf) + sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema = await reg_cli.get_latest_schema(subject) + stored_id, stored_schema, references = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable assert stored_id == sc_id assert stored_schema == schema_protobuf_after From 3498962c7386fd8f072cc0964e4e15b3458a57e9 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 19 Apr 2022 23:47:31 +0300 Subject: [PATCH 04/24] fixup --- karapace/schema_models.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 6e6a41bae..b198bda41 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -13,7 +13,7 @@ ) from karapace.protobuf.schema import ProtobufSchema from karapace.utils import json_encode -from typing import Any, Dict, Union, List +from typing import Any, Dict, List, Union import json import logging @@ -109,13 +109,11 @@ def __repr__(self) -> str: return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})" def __eq__(self, other: Any) -> bool: - return isinstance(other, - TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type + return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type class ValidatedTypedSchema(TypedSchema): - def __init__(self, schema_type: SchemaType, schema_str: str, - schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): + def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): super().__init__(schema_type=schema_type, schema_str=schema_str) self.schema = schema @@ -142,15 +140,15 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": try: parsed_schema = parse_protobuf_schema_definition(schema_str) except ( - TypeError, - SchemaError, - AssertionError, - ProtobufParserRuntimeException, - IllegalStateException, - IllegalArgumentException, - ProtobufError, - ProtobufException, - ProtobufSchemaParseException, + TypeError, + SchemaError, + AssertionError, + ProtobufParserRuntimeException, + IllegalStateException, + IllegalArgumentException, + ProtobufError, + ProtobufException, + ProtobufSchemaParseException, ) as e: log.exception("Unexpected error: %s \n schema:[%s]", e, schema_str) raise InvalidSchema from e From 9e9b16dde4ecf0377bb776e6197c572ecaf897b9 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Wed, 20 Apr 2022 09:43:54 +0300 Subject: [PATCH 05/24] fixup --- .gitignore | 1 + karapace/serialization.py | 8 ++------ tests/integration/test_client_protobuf.py | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 67b386bbf..4a162fa59 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ __pycache__/ /kafka_*/ venv /karapace/version.py +.run diff --git a/karapace/serialization.py b/karapace/serialization.py index 2fe4359e3..1716690d7 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -85,7 +85,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, refe raise SchemaRetrievalError(result.json()) return result.json()["id"] - async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, References]: + async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema]: result = await self.client.get(f"subjects/{quote(subject)}/versions/latest") if not result.ok: raise SchemaRetrievalError(result.json()) @@ -94,12 +94,8 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - if json_result["references"]: - references = References(schema_type, json_result["references"]) - else: - references = None - return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references + return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index 6a1e186d4..d73e7b9c8 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -29,6 +29,6 @@ async def test_remote_client_protobuf2(registry_async_client): assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema, references = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable + stored_id, stored_schema = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable assert stored_id == sc_id assert stored_schema == schema_protobuf_after From 944cd3953fa1733173d759d7bc606f835d71a5fe Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Wed, 20 Apr 2022 09:51:42 +0300 Subject: [PATCH 06/24] fixup --- karapace/serialization.py | 2 +- tests/integration/test_client_protobuf.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/karapace/serialization.py b/karapace/serialization.py index 1716690d7..90f2b8200 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -163,7 +163,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." # pylint: disable=unused-variable - schema_id, schema, references = await self.registry_client.get_latest_schema(subject) + schema_id, schema = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = schema.__str__() self.schemas_to_ids[schema_ser] = schema_id diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index d73e7b9c8..8d5f43b9c 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -14,7 +14,7 @@ async def test_remote_client_protobuf(registry_async_client): assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema, references = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable + stored_id, stored_schema = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable assert stored_id == sc_id assert stored_schema == schema_protobuf From 956b68ef3e2cba8b9287b19722453fbab51fbafe Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 26 Apr 2022 21:35:45 +0300 Subject: [PATCH 07/24] fixup --- karapace/serialization.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/karapace/serialization.py b/karapace/serialization.py index 90f2b8200..df7d3fb53 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -72,7 +72,9 @@ def __init__(self, schema_registry_url: str = "http://localhost:8081", server_ca self.client = Client(server_uri=schema_registry_url, server_ca=server_ca) self.base_url = schema_registry_url - async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, references: Optional[References]) -> int: + async def post_new_schema( + self, subject: str, schema: ValidatedTypedSchema, references: Optional[References] = None + ) -> int: if schema.schema_type is SchemaType.PROTOBUF: if references: payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()} @@ -109,8 +111,9 @@ async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - if json_result["references"]: - references = References(schema_type, json_result["references"]) + references_str = json_result.get("references") + if references_str: + references = References(schema_type, references_str) else: references = None From 764facfb763105eeea261b1ff57efa974bc6ece8 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Mon, 2 May 2022 01:25:09 +0300 Subject: [PATCH 08/24] debugging --- karapace/schema_models.py | 53 +++++++++++++++++++---- karapace/schema_reader.py | 8 ++-- karapace/schema_registry_apis.py | 34 ++++++++------- karapace/serialization.py | 5 ++- tests/integration/test_schema_protobuf.py | 3 +- 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 07d09bae9..24bbb1718 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -12,8 +12,9 @@ SchemaParseException as ProtobufSchemaParseException, ) from karapace.protobuf.schema import ProtobufSchema +from karapace.typing import JsonData from karapace.utils import json_encode -from typing import Any, Dict, List, Union +from typing import Any, Dict, Optional, Union import json import ujson @@ -80,7 +81,7 @@ class SchemaType(str, Enum): class TypedSchema: - def __init__(self, schema_type: SchemaType, schema_str: str): + def __init__(self, schema_type: SchemaType, schema_str: str, references: Optional["References"] = None): """Schema with type information Args: @@ -89,6 +90,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str): """ self.schema_type = schema_type self.schema_str = schema_str + self.references = references def to_dict(self) -> Dict[str, Any]: if self.schema_type is SchemaType.PROTOBUF: @@ -105,17 +107,42 @@ def __repr__(self) -> str: return f"TypedSchema(type={self.schema_type}, schema={str(self)})" return f"TypedSchema(type={self.schema_type}, schema={json_encode(self.to_dict())})" + def get_references(self) -> Optional["References"]: + return self.references + def __eq__(self, other: Any) -> bool: - return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type + a = isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__() + + if not a: + return False + x = self.get_references() + y = other.get_references() + if x: + if y: + if x == y: + return True + else: + return False + else: + if y: + return False + + return True class ValidatedTypedSchema(TypedSchema): - def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): - super().__init__(schema_type=schema_type, schema_str=schema_str) + def __init__( + self, + schema_type: SchemaType, + schema_str: str, + schema: Union[Draft7Validator, AvroSchema, ProtobufSchema], + references: Optional["References"] = None, + ): + super().__init__(schema_type=schema_type, schema_str=schema_str, references=references) self.schema = schema @staticmethod - def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": + def parse(schema_type: SchemaType, schema_str: str, references: Optional["References"] = None) -> "ValidatedTypedSchema": if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") @@ -151,7 +178,9 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": else: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema) + return ValidatedTypedSchema( + schema_type=schema_type, schema_str=schema_str, schema=parsed_schema, references=references + ) def __str__(self) -> str: if self.schema_type == SchemaType.PROTOBUF: @@ -160,7 +189,7 @@ def __str__(self) -> str: class References: - def __init__(self, schema_type: SchemaType, references: List): + def __init__(self, schema_type: SchemaType, references: JsonData): """Schema with type information Args: @@ -170,5 +199,11 @@ def __init__(self, schema_type: SchemaType, references: List): self.schema_type = schema_type self.references = references + def val(self) -> JsonData: + return self.references + def json(self) -> str: - return json_encode(self.references) + return str(json_encode(self.references, sort_keys=True)) + + def __eq__(self, other: Any) -> bool: + return self.json() == other.json() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b76cd172e..11e78778d 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -348,10 +348,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: subjects_schemas = self.subjects[schema_subject]["schemas"] - typed_schema = TypedSchema( - schema_type=schema_type_parsed, - schema_str=schema_str, - ) + typed_schema = TypedSchema(schema_type=schema_type_parsed, schema_str=schema_str, references=schema_references) schema = { "schema": typed_schema, "version": schema_version, @@ -359,7 +356,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: "deleted": schema_deleted, } if schema_references: - subjects_schemas[schema_version]["references"] = schema_references + schema["references"] = schema_references + if schema_version in subjects_schemas: LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) else: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 42864b326..c75672dbd 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -283,7 +283,7 @@ def send_schema_message( "deleted": deleted, } if references: - valuedict["references"] = references.json() + valuedict["references"] = references.val() if schema.schema_type is not SchemaType.AVRO: valuedict["schemaType"] = schema.schema_type value = json_encode(valuedict) @@ -804,8 +804,24 @@ def write_new_schema_local( ) -> NoReturn: """Since we're the master we get to write the new schema""" self.log.info("Writing new schema locally since we're the master") + new_schema_references = None + if body.get("references"): + try: + new_schema_references = References(schema_type, body["references"]) + except InvalidReferences: + human_error = "Provided references is not valid" + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_REFERECES.value, + "message": f"Invalid {schema_type} references. Error: {human_error}", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) try: - new_schema = ValidatedTypedSchema.parse(schema_type=schema_type, schema_str=body["schema"]) + new_schema = ValidatedTypedSchema.parse( + schema_type=schema_type, schema_str=body["schema"], references=new_schema_references + ) except (InvalidSchema, InvalidSchemaType) as e: self.log.warning("Invalid schema: %r", body["schema"], exc_info=True) if isinstance(e.__cause__, (SchemaParseException, ValueError)): @@ -820,20 +836,6 @@ def write_new_schema_local( content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - new_schema_references = None - if body.get("references"): - try: - new_schema_references = References(schema_type, body["references"]) - except InvalidReferences: - human_error = "Provided references is not valid" - self.r( - body={ - "error_code": SchemaErrorCodes.INVALID_REFERECES.value, - "message": f"Invalid {schema_type} references. Error: {human_error}", - }, - content_type=content_type, - status=HTTPStatus.UNPROCESSABLE_ENTITY, - ) if subject not in self.ksr.subjects or not self.ksr.subjects.get(subject)["schemas"]: schema_id = self.ksr.get_schema_id(new_schema) diff --git a/karapace/serialization.py b/karapace/serialization.py index 3fbd839f1..22b5b8b7c 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -113,8 +113,9 @@ async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, references = References(schema_type, references_str) else: references = None - - return ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references + if references: + return ValidatedTypedSchema.parse(schema_type, json_result["schema"]), references + return ValidatedTypedSchema.parse(schema_type, json_result["schema"]) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index c83ca39b9..f5e7fafa1 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -8,7 +8,6 @@ from karapace.protobuf.kotlin_wrapper import trim_margin from tests.utils import create_subject_name_factory -import json import logging import pytest @@ -151,5 +150,5 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None myjson = res.json() assert "id" in myjson references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] - refs2 = json.loads(myjson["references"]) + refs2 = myjson["references"] assert not any(x != y for x, y in zip(refs2, references)) From c1c4c50d43cfaefb80d65bbdef94560b1c9bcb20 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Wed, 11 May 2022 09:56:53 +0300 Subject: [PATCH 09/24] referencedby/delete workarounds --- karapace/schema_reader.py | 17 ++++- karapace/schema_registry_apis.py | 86 ++++++++++++++++++++--- tests/__init__.py | 0 tests/integration/test_schema_protobuf.py | 12 ++++ 4 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 tests/__init__.py diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 11e78778d..ad283ab0b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -15,7 +15,7 @@ from karapace.statsd import StatsClient from karapace.utils import KarapaceKafkaClient from threading import Event, Lock, Thread -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import logging import ujson @@ -26,6 +26,7 @@ Schema = Dict[str, Any] # Container type for a subject, with configuration settings and all the schemas SubjectData = Dict[str, Any] +Referents = List # The value `0` is a valid offset and it represents the first message produced # to a topic, therefore it can not be used. @@ -139,6 +140,7 @@ def __init__( self.config = config self.subjects: Dict[Subject, SubjectData] = {} self.schemas: Dict[int, TypedSchema] = {} + self.referenced_by: Dict[str, Referents] = {} self.global_schema_id = 0 self.admin_client: Optional[KafkaAdminClient] = None self.topic_replication_factor = self.config["replication_factor"] @@ -284,6 +286,7 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: else: LOG.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) self.subjects[subject]["compatibility"] = value["compatibilityLevel"] + elif value is not None: LOG.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) self.config["compatibility"] = value["compatibilityLevel"] @@ -326,8 +329,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: schema_id = value["id"] schema_version = value["version"] schema_deleted = value.get("deleted", False) - schema_references = value.get("references", None) + schema_references = value.get("references", None) try: schema_type_parsed = SchemaType(schema_type) except ValueError: @@ -364,6 +367,16 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) subjects_schemas[schema_version] = schema + if schema_references: + for ref in schema_references: + ref_str = str(ref["subject"]) + "_" + str(ref["version"]) + referents = self.referenced_by.get(ref_str, None) + if referents: + LOG.info("Adding entry subject referenced_by : %r", ref_str) + referents.append(schema_id) + else: + LOG.info("Adding entry subject referenced_by : %r", ref_str) + self.referenced_by[ref_str] = [schema_id] with self.id_lock: self.schemas[schema_id] = typed_schema diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index c75672dbd..7d59cc7ff 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -24,6 +24,7 @@ @unique class SchemaErrorCodes(Enum): EMPTY_SCHEMA = 42201 + REFERENCE_EXISTS_ERROR_CODE = 42206 HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_CONFLICT = HTTPStatus.CONFLICT.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value @@ -39,7 +40,9 @@ class SchemaErrorCodes(Enum): INVALID_COMPATIBILITY_LEVEL = 42203 INVALID_AVRO_SCHEMA = 44201 INVALID_PROTOBUF_SCHEMA = 44202 - INVALID_REFERECES = 442203 + INVALID_REFERECES = 44203 + REFERENCES_SUPPORT_NOT_IMPLEMENTED = 44501 + SCHEMAVERSION_HAS_REFERENCES = 44503 NO_MASTER_ERROR = 50003 @@ -51,6 +54,7 @@ class SchemaErrorMessages(Enum): "forward, full, backward_transitive, forward_transitive, and " "full_transitive" ) + REFERENCES_SUPPORT_NOT_IMPLEMENTED = "Schema references are not supported for '{schema_type}' schema type yet" class KarapaceSchemaRegistry(KarapaceBase): @@ -613,6 +617,20 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v status=HTTPStatus.NOT_FOUND, ) + referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "message": ( + f"Subject '{subject}' Version {version} was not deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + schema_id = subject_schema_data["id"] schema = subject_schema_data["schema"] self.send_schema_message( @@ -660,7 +678,50 @@ async def subject_version_schema_get(self, content_type, *, subject, version): self.r(schema_data["schema"].schema_str, content_type) async def subject_version_referencedby_get(self, content_type, *, subject, version): - pass + self._validate_version(content_type, version) + subject_data = self._subject_get(subject, content_type) + schema_data = None + max_version = max(subject_data["schemas"]) + if version == "latest": + version = max(subject_data["schemas"]) + schema_data = subject_data["schemas"][version] + elif int(version) <= max_version: + schema_data = subject_data["schemas"].get(int(version)) + else: + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + if not schema_data: + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + + if schema_data["schema"].schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_data["schema"].schema_type + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) + if not referenced_by: + referenced_by = list() + self.r(list(referenced_by), content_type, status=HTTPStatus.OK) async def subject_versions_list(self, content_type, *, subject): subject_data = self._subject_get(subject, content_type) @@ -805,9 +866,22 @@ def write_new_schema_local( """Since we're the master we get to write the new schema""" self.log.info("Writing new schema locally since we're the master") new_schema_references = None - if body.get("references"): + references = body.get("references") + if references: + if schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_type.value + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + try: - new_schema_references = References(schema_type, body["references"]) + new_schema_references = References(schema_type, references) except InvalidReferences: human_error = "Provided references is not valid" self.r( @@ -969,7 +1043,3 @@ def no_master_error(self, content_type): content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) - - def get_referencedby(self, subject, version): - - pass diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index f5e7fafa1..1edff88fc 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -152,3 +152,15 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] refs2 = myjson["references"] assert not any(x != y for x, y in zip(refs2, references)) + res = await registry_async_client.get("subjects/customer/versions/latest/referencedby", json={}) + assert res.status_code == 200 + myjson = res.json() + referents = [2] + assert not any(x != y for x, y in zip(myjson, referents)) +# res = await registry_async_client.delete("subjects/customer/versions/latest") +# assert res.status_code == 200 + + +#TODO +#AVRO references error +#JSONSCHEMA references error \ No newline at end of file From 732a9ed4d7b67dd1944ed7c12460e3ba9d8cdb9e Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Wed, 11 May 2022 10:12:54 +0300 Subject: [PATCH 10/24] referencedby/delete workarounds --- tests/integration/test_schema_protobuf.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 1edff88fc..e37eac84a 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -157,10 +157,12 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None myjson = res.json() referents = [2] assert not any(x != y for x, y in zip(myjson, referents)) + + # res = await registry_async_client.delete("subjects/customer/versions/latest") # assert res.status_code == 200 -#TODO -#AVRO references error -#JSONSCHEMA references error \ No newline at end of file +# TODO +# AVRO references error +# JSONSCHEMA references error From 1f33b7dae5c516680554eb18e7cc1028f5851dad Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 17 May 2022 00:38:39 +0300 Subject: [PATCH 11/24] Add basic support of references with basic tests --- karapace/schema_registry_apis.py | 66 +++++++++++++++++++++-- tests/integration/test_schema.py | 11 ++++ tests/integration/test_schema_protobuf.py | 20 ++----- 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 7d59cc7ff..8fe81b106 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -509,6 +509,21 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent latest_schema_id = 0 if permanent: + for version, value in list(subject_data["schemas"].items()): + referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "message": ( + f"Subject '{subject}' Version {version} cannot be not be deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + for version, value in list(subject_data["schemas"].items()): schema_id = value.get("id") self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id) @@ -516,6 +531,19 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None ) else: + referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(latest_schema_id), None) + if referenced_by and len(referenced_by) > 0: + self.r( + body={ + "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "message": ( + f"Subject '{subject}' Version {latest_schema_id} cannot be not be deleted " + "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" + ), + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) self.send_delete_subject_message(subject, latest_schema_id) self.r(version_list, content_type, status=HTTPStatus.OK) @@ -569,9 +597,6 @@ async def subject_version_get(self, content_type, *, subject, version, return_di "id": schema_id, "schema": schema.schema_str, } - references = schema_data.get("references") - if references: - ret["references"] = references if schema.schema_type is not SchemaType.AVRO: ret["schemaType"] = schema.schema_type if return_dict: @@ -802,8 +827,39 @@ async def subjects_schema_post(self, content_type, *, subject, request): ) schema_str = body["schema"] schema_type = self._validate_schema_type(content_type=content_type, data=body) + + new_schema_references = None + references = body.get("references") + if references: + if schema_type != SchemaType.PROTOBUF: + self.r( + body={ + "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, + "message": SchemaErrorMessages.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value.format( + schema_type=schema_type.value + ), + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + try: + new_schema_references = References(schema_type, references) + except InvalidReferences: + human_error = "Provided references is not valid" + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_REFERECES.value, + "message": f"Invalid {schema_type} references. Error: {human_error}", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + try: - new_schema = ValidatedTypedSchema.parse(schema_type, schema_str) + new_schema = ValidatedTypedSchema.parse( + schema_type=schema_type, schema_str=schema_str, references=new_schema_references + ) except InvalidSchema: self.log.exception("No proper parser found") self.r( @@ -814,11 +870,13 @@ async def subjects_schema_post(self, content_type, *, subject, request): content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) + for schema in subject_data["schemas"].values(): validated_typed_schema = ValidatedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str) if ( validated_typed_schema.schema_type == new_schema.schema_type and validated_typed_schema.schema == new_schema.schema + and schema.get("references", None) == new_schema_references ): ret = { "subject": subject, diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 9f816500f..a6c6764e1 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1240,6 +1240,17 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject_3}' not found." + schema_str = ujson.dumps({"type": "string"}) + # Create the subject + subject_1 = subject_name_factory() + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": schema_str, "references": [{"name": "Customer.avro", "subject": "customer", "version": 1}]}, + ) + assert res.status_code == 422 + assert res.json()["error_code"] == 44501 + assert res.json()["message"] == "Schema references are not supported for 'AVRO' schema type yet" + @pytest.mark.parametrize("trail", ["", "/"]) async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> None: diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index e37eac84a..456f8872b 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -145,24 +145,14 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None ) assert res.status_code == 200 assert "id" in res.json() - res = await registry_async_client.get("subjects/test_schema/versions/latest", json={}) - assert res.status_code == 200 - myjson = res.json() - assert "id" in myjson - references = [{"name": "Customer.proto", "subject": "customer", "version": 1}] - refs2 = myjson["references"] - assert not any(x != y for x, y in zip(refs2, references)) res = await registry_async_client.get("subjects/customer/versions/latest/referencedby", json={}) assert res.status_code == 200 myjson = res.json() referents = [2] assert not any(x != y for x, y in zip(myjson, referents)) - -# res = await registry_async_client.delete("subjects/customer/versions/latest") -# assert res.status_code == 200 - - -# TODO -# AVRO references error -# JSONSCHEMA references error + res = await registry_async_client.delete("subjects/customer/versions/1") + assert res.status_code == 404 + match_msg = "Subject 'customer' Version 1 was not deleted because it is referenced by schemas with ids:[2]" + myjson = res.json() + assert myjson["error_code"] == 44503 and myjson["message"] == match_msg From 93cd2419392c5cdae8bfbf1467be8b6b7c1131e4 Mon Sep 17 00:00:00 2001 From: Sujay Bhowmick Date: Mon, 23 May 2022 18:11:42 +1000 Subject: [PATCH 12/24] removed reference for ujson --- tests/integration/test_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 27fed7457..81fc2f6ed 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1235,7 +1235,7 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject_3}' not found." - schema_str = ujson.dumps({"type": "string"}) + schema_str = json.dumps({"type": "string"}) # Create the subject subject_1 = subject_name_factory() res = await registry_async_client.post( From 2e9a8c6cc783344a150b3217da5e7f3048512bf7 Mon Sep 17 00:00:00 2001 From: Sujay Bhowmick Date: Mon, 23 May 2022 18:51:15 +1000 Subject: [PATCH 13/24] added comma at the end --- karapace/schema_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b8d9f16e5..30a30c18f 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -370,7 +370,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: typed_schema = TypedSchema( schema_type=schema_type_parsed, schema_str=schema_str, - references=schema_references + references=schema_references, ) schema = { "schema": typed_schema, From 7f2cf0175887e89317bb975a63f3d814dd6cb1e1 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 21:00:59 +0300 Subject: [PATCH 14/24] Update schema_models.py fixed docstring --- karapace/schema_models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 85c8bc965..acd211e52 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -86,6 +86,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str, references: Optiona Args: schema_type (SchemaType): The type of the schema schema_str (str): The original schema string + references(References): The references which schema reference. """ self.schema_type = schema_type self.schema_str = schema_str From 0f2fab68ddce74ee17dd3b85075e1297c9443412 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 22:12:30 +0300 Subject: [PATCH 15/24] Update schema_models.py fiexed equation functions --- karapace/schema_models.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index acd211e52..f3265f987 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -111,24 +111,14 @@ def get_references(self) -> Optional["References"]: return self.references def __eq__(self, other: Any) -> bool: - a = isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__() - - if not a: + schema_is_equal = isinstance(other, TypedSchema) and \ + self.schema_type is other.schema_type and self.__str__() == other.__str__() + if not schema_is_equal: return False - x = self.get_references() - y = other.get_references() - if x: - if y: - if x == y: - return True - else: - return False + if self.references is not None: + return self.references == other.references else: - if y: - return False - - return True - + return other.references is None class ValidatedTypedSchema(TypedSchema): def __init__( @@ -206,4 +196,6 @@ def json(self) -> str: return str(json_encode(self.references, sort_keys=True)) def __eq__(self, other: Any) -> bool: + if other is None or not isinstance(other, References): + return False return self.json() == other.json() From bbbb5a30ded8af0b3aa9afe823cf95b981e4f0a0 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 22:20:04 +0300 Subject: [PATCH 16/24] Update schema_models.py --- karapace/schema_models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index f3265f987..5567e06ae 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -86,7 +86,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str, references: Optiona Args: schema_type (SchemaType): The type of the schema schema_str (str): The original schema string - references(References): The references which schema reference. + references(References): The references of schema """ self.schema_type = schema_type self.schema_str = schema_str @@ -184,7 +184,7 @@ def __init__(self, schema_type: SchemaType, references: JsonData): Args: schema_type (SchemaType): The type of the schema - references (str): The original schema string + references (str): The references of schema in Kafka/Json representation """ self.schema_type = schema_type self.references = references From 1d2b7c455aee47ed82775d89aa0e4a7f1aa472c8 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 22:22:03 +0300 Subject: [PATCH 17/24] Update schema_reader.py change line feed --- karapace/schema_reader.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 30a30c18f..7034d8a9d 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -4,6 +4,8 @@ Copyright (c) 2019 Aiven Ltd See LICENSE for details """ +import json +import logging from contextlib import closing, ExitStack from kafka import KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic @@ -17,9 +19,6 @@ from threading import Event, Lock, Thread from typing import Any, Dict, List, Optional -import json -import logging - Offset = int Subject = str Version = int From 4634373eb3e9d31b98413c2db501da3dc30ea30f Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 23:00:32 +0300 Subject: [PATCH 18/24] Update karapace/schema_registry_apis.py Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com> --- karapace/schema_registry_apis.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index ff4a75be6..b3b7b0a3a 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -744,9 +744,7 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) - if not referenced_by: - referenced_by = list() + referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), []) self.r(list(referenced_by), content_type, status=HTTPStatus.OK) async def subject_versions_list(self, content_type, *, subject): From efe29f4e3759bb27fc0d0c4b3e70a0bcc1986fcb Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Sun, 19 Jun 2022 23:27:19 +0300 Subject: [PATCH 19/24] update code by PR review --- karapace/schema_models.py | 11 ++++++----- karapace/schema_reader.py | 5 +++-- karapace/serialization.py | 4 ++-- tests/integration/test_client.py | 4 ++-- tests/integration/test_client_protobuf.py | 4 ++-- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 5567e06ae..fe2961592 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -86,7 +86,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str, references: Optiona Args: schema_type (SchemaType): The type of the schema schema_str (str): The original schema string - references(References): The references of schema + references(References): The references of schema """ self.schema_type = schema_type self.schema_str = schema_str @@ -111,14 +111,15 @@ def get_references(self) -> Optional["References"]: return self.references def __eq__(self, other: Any) -> bool: - schema_is_equal = isinstance(other, TypedSchema) and \ - self.schema_type is other.schema_type and self.__str__() == other.__str__() + schema_is_equal = ( + isinstance(other, TypedSchema) and self.schema_type is other.schema_type and self.__str__() == other.__str__() + ) if not schema_is_equal: return False if self.references is not None: return self.references == other.references - else: - return other.references is None + return other.references is None + class ValidatedTypedSchema(TypedSchema): def __init__( diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 7034d8a9d..30a30c18f 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -4,8 +4,6 @@ Copyright (c) 2019 Aiven Ltd See LICENSE for details """ -import json -import logging from contextlib import closing, ExitStack from kafka import KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic @@ -19,6 +17,9 @@ from threading import Event, Lock, Thread from typing import Any, Dict, List, Optional +import json +import logging + Offset = int Subject = str Version = int diff --git a/karapace/serialization.py b/karapace/serialization.py index 1b7536e93..86331b15d 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -163,7 +163,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." - # pylint: disable=unused-variable + schema_id, schema = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = schema.__str__() @@ -180,7 +180,7 @@ async def get_id_for_schema(self, schema: str, subject: str, schema_type: Schema schema_ser = schema_typed.__str__() if schema_ser in self.schemas_to_ids: return self.schemas_to_ids[schema_ser] - schema_id = await self.registry_client.post_new_schema(subject, schema_typed) # pylint: disable=E1120 + schema_id = await self.registry_client.post_new_schema(subject, schema_typed) async with self.state_lock: self.schemas_to_ids[schema_ser] = schema_id diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index df9b7fb1e..1467b5ccd 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -9,7 +9,7 @@ async def test_remote_client(registry_async_client: Client) -> None: reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120 + sc_id = await reg_cli.post_new_schema(subject, schema_avro) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" @@ -23,7 +23,7 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None: reg_cli = SchemaRegistryClient() reg_cli.client = registry_async_client_tls subject = new_random_name("subject") - sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120 + sc_id = await reg_cli.post_new_schema(subject, schema_avro) assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index 8d5f43b9c..f924b536f 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -14,7 +14,7 @@ async def test_remote_client_protobuf(registry_async_client): assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable + stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id assert stored_schema == schema_protobuf @@ -29,6 +29,6 @@ async def test_remote_client_protobuf2(registry_async_client): assert sc_id >= 0 stored_schema = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" - stored_id, stored_schema = await reg_cli.get_latest_schema(subject) # pylint: disable=unused-variable + stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id assert stored_schema == schema_protobuf_after From 0854296ed1a117f53d7299af1f87511e476a330e Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Wed, 22 Jun 2022 00:02:01 +0300 Subject: [PATCH 20/24] add reference_key() --- karapace/schema_reader.py | 5 +++-- karapace/schema_registry_apis.py | 10 +++++----- karapace/utils.py | 4 ++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 30a30c18f..ab942d6c9 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -13,7 +13,7 @@ from karapace.master_coordinator import MasterCoordinator from karapace.schema_models import SchemaType, TypedSchema from karapace.statsd import StatsClient -from karapace.utils import KarapaceKafkaClient +from karapace.utils import KarapaceKafkaClient, reference_key from threading import Event, Lock, Thread from typing import Any, Dict, List, Optional @@ -389,7 +389,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: subjects_schemas[schema_version] = schema if schema_references: for ref in schema_references: - ref_str = str(ref["subject"]) + "_" + str(ref["version"]) + + ref_str = reference_key(ref["subject"], ref["version"]) referents = self.referenced_by.get(ref_str, None) if referents: LOG.info("Adding entry subject referenced_by : %r", ref_str) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index b3b7b0a3a..16c652389 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -12,7 +12,7 @@ from karapace.schema_models import InvalidReferences, InvalidSchema, InvalidSchemaType, References, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader, SchemaType, TypedSchema from karapace.typing import JsonData -from karapace.utils import json_encode, KarapaceKafkaClient +from karapace.utils import json_encode, KarapaceKafkaClient, reference_key from typing import Any, Dict, NoReturn, Optional, Tuple import aiohttp @@ -511,7 +511,7 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent if permanent: for version, value in list(subject_data["schemas"].items()): - referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), None) if referenced_by and len(referenced_by) > 0: self.r( body={ @@ -532,7 +532,7 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None ) else: - referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(latest_schema_id), None) + referenced_by = self.ksr.referenced_by.get(reference_key(subject, latest_schema_id), None) if referenced_by and len(referenced_by) > 0: self.r( body={ @@ -643,7 +643,7 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v status=HTTPStatus.NOT_FOUND, ) - referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), None) + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), None) if referenced_by and len(referenced_by) > 0: self.r( body={ @@ -744,7 +744,7 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - referenced_by = self.ksr.referenced_by.get(str(subject) + "_" + str(version), []) + referenced_by = self.ksr.referenced_by.get(reference_key(subject, version), []) self.r(list(referenced_by), content_type, status=HTTPStatus.OK) async def subject_versions_list(self, content_type, *, subject): diff --git a/karapace/utils.py b/karapace/utils.py index 95302da04..9433d6523 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -55,6 +55,10 @@ def default_json_serialization(obj: MappingProxyType) -> dict: ... +def reference_key(subject: str, version: int) -> str: + return hash((subject, version)) + + def default_json_serialization( # pylint: disable=inconsistent-return-statements obj: Union[datetime, timedelta, Decimal, MappingProxyType], ) -> Union[str, float, dict]: From 46146dd74b2e6558d9ea082903fa4e56de4c2076 Mon Sep 17 00:00:00 2001 From: Sujay Bhowmick Date: Thu, 23 Jun 2022 16:39:47 +1000 Subject: [PATCH 21/24] fixed undefined variable missing due to merge --- karapace/schema_registry_apis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index ff0109ce6..6979217b9 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -925,7 +925,7 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any self.r( body={ "error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - "message": f"Unrecognized field: {attr}", + "message": f"Unrecognized field: {field}", }, content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, From 967c8223676f6b493b7342113ed6b7baca973439 Mon Sep 17 00:00:00 2001 From: Sujay Bhowmick Date: Wed, 29 Jun 2022 16:36:12 +1000 Subject: [PATCH 22/24] removed extra line feeds --- karapace/schema_reader.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index ab942d6c9..0acd38596 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -286,14 +286,12 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: if subject not in self.subjects: LOG.info("Adding first version of subject: %r with no schemas", subject) self.subjects[subject] = {"schemas": {}} - if not value: LOG.info("Deleting compatibility config completely for subject: %r", subject) self.subjects[subject].pop("compatibility", None) else: LOG.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) self.subjects[subject]["compatibility"] = value["compatibilityLevel"] - elif value is not None: LOG.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) self.config["compatibility"] = value["compatibilityLevel"] From 70e8077f5fd6455f87d9032893f6cac4ab9a9977 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Tue, 26 Jul 2022 12:01:21 +0300 Subject: [PATCH 23/24] Update karapace/schema_registry_apis.py Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com> --- karapace/schema_registry_apis.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 2f49d884c..01621b0e8 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -45,10 +45,9 @@ class SchemaErrorCodes(Enum): INVALID_VERSION_ID = 42202 INVALID_COMPATIBILITY_LEVEL = 42203 INVALID_AVRO_SCHEMA = 44201 - INVALID_PROTOBUF_SCHEMA = 44202 - INVALID_REFERECES = 44203 - REFERENCES_SUPPORT_NOT_IMPLEMENTED = 44501 - SCHEMAVERSION_HAS_REFERENCES = 44503 + INVALID_REFERENCES = 44301 + REFERENCES_SUPPORT_NOT_IMPLEMENTED = 44302 + SCHEMAVERSION_HAS_REFERENCES = 44303 NO_MASTER_ERROR = 50003 From ef05bd4da00db2e9a7073c126f982f7e747fcc56 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Fri, 29 Jul 2022 15:23:29 +0300 Subject: [PATCH 24/24] improve PR1 code --- karapace/schema_models.py | 9 +++++++++ karapace/schema_reader.py | 6 ++++++ karapace/schema_registry_apis.py | 22 ++++++++++++++-------- tests/integration/test_schema.py | 4 ++-- tests/integration/test_schema_protobuf.py | 8 +++++++- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index fe2961592..327229ec0 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -188,6 +188,15 @@ def __init__(self, schema_type: SchemaType, references: JsonData): references (str): The references of schema in Kafka/Json representation """ self.schema_type = schema_type + if self.schema_type != "PROTOBUF" or not isinstance(references, list): + raise InvalidReferences + + for ref in references: + if not isinstance(ref, dict): + raise InvalidReferences + if ref.get("name") is None or ref.get("subject") is None or ref.get("version") is None: + raise InvalidReferences + self.references = references def val(self) -> JsonData: diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 4b13a9197..efc81f086 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -445,3 +445,9 @@ def get_schemas( key: val for key, val in self.subjects[subject]["schemas"].items() if val.get("deleted", False) is False } return non_deleted_schemas + + def remove_referenced_by(self, schema_id: SchemaId, references: List): + for ref in references: + key = reference_key(ref["subject"], ref["version"]) + if self.referenced_by.get(key, None) and schema_id in self.referenced_by[key]: + self.referenced_by[key].remove(schema_id) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 01621b0e8..b9a1c0744 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -30,7 +30,6 @@ @unique class SchemaErrorCodes(Enum): EMPTY_SCHEMA = 42201 - REFERENCE_EXISTS_ERROR_CODE = 42206 HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_CONFLICT = HTTPStatus.CONFLICT.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value @@ -47,7 +46,7 @@ class SchemaErrorCodes(Enum): INVALID_AVRO_SCHEMA = 44201 INVALID_REFERENCES = 44301 REFERENCES_SUPPORT_NOT_IMPLEMENTED = 44302 - SCHEMAVERSION_HAS_REFERENCES = 44303 + REFERENCE_EXISTS = 42206 NO_MASTER_ERROR = 50003 @@ -59,7 +58,7 @@ class SchemaErrorMessages(Enum): "forward, full, backward_transitive, forward_transitive, and " "full_transitive" ) - REFERENCES_SUPPORT_NOT_IMPLEMENTED = "Schema references are not supported for '{schema_type}' schema type yet" + REFERENCES_SUPPORT_NOT_IMPLEMENTED = "Schema references are not supported for '{schema_type}' schema type" class KarapaceSchemaRegistry(KarapaceBase): @@ -642,7 +641,7 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent if referenced_by and len(referenced_by) > 0: self.r( body={ - "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, "message": ( f"Subject '{subject}' Version {version} cannot be not be deleted " "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" @@ -654,16 +653,20 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent for version, value in list(subject_data["schemas"].items()): schema_id = value.get("id") + references = value.get("references", None) self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id) self.send_schema_message( subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None ) + if references and len(references) > 0: + self.ksr.remove_referenced_by(schema_id, references) + else: referenced_by = self.ksr.referenced_by.get(reference_key(subject, latest_schema_id), None) if referenced_by and len(referenced_by) > 0: self.r( body={ - "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, "message": ( f"Subject '{subject}' Version {latest_schema_id} cannot be not be deleted " "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" @@ -784,7 +787,7 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v if referenced_by and len(referenced_by) > 0: self.r( body={ - "error_code": SchemaErrorCodes.SCHEMAVERSION_HAS_REFERENCES.value, + "error_code": SchemaErrorCodes.REFERENCE_EXISTS.value, "message": ( f"Subject '{subject}' Version {version} was not deleted " "because it is referenced by schemas with ids:[" + ", ".join(map(str, referenced_by)) + "]" @@ -796,6 +799,7 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v schema_id = subject_schema_data["id"] schema = subject_schema_data["schema"] + references = schema.references self.send_schema_message( subject=subject, schema=None if permanent else schema, @@ -804,6 +808,8 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v deleted=True, references=None, ) + if references and len(references) > 0: + self.ksr.remove_referenced_by(schema_id, references) self.r(str(version), content_type, status=HTTPStatus.OK) async def subject_version_delete( @@ -998,7 +1004,7 @@ async def subjects_schema_post( human_error = "Provided references is not valid" self.r( body={ - "error_code": SchemaErrorCodes.INVALID_REFERECES.value, + "error_code": SchemaErrorCodes.INVALID_REFERENCES.value, "message": f"Invalid {schema_type} references. Error: {human_error}", }, content_type=content_type, @@ -1078,7 +1084,7 @@ async def subject_post( human_error = "Provided references is not valid" self.r( body={ - "error_code": SchemaErrorCodes.INVALID_REFERECES.value, + "error_code": SchemaErrorCodes.INVALID_REFERENCES.value, "message": f"Invalid {schema_type} references. Error: {human_error}", }, content_type=content_type, diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index a1052a372..6e3689046 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1254,8 +1254,8 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non json={"schema": schema_str, "references": [{"name": "Customer.avro", "subject": "customer", "version": 1}]}, ) assert res.status_code == 422 - assert res.json()["error_code"] == 44501 - assert res.json()["message"] == "Schema references are not supported for 'AVRO' schema type yet" + assert res.json()["error_code"] == 44302 + assert res.json()["message"] == "Schema references are not supported for 'AVRO' schema type" @pytest.mark.parametrize("trail", ["", "/"]) diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 456f8872b..139bc889c 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -155,4 +155,10 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None assert res.status_code == 404 match_msg = "Subject 'customer' Version 1 was not deleted because it is referenced by schemas with ids:[2]" myjson = res.json() - assert myjson["error_code"] == 44503 and myjson["message"] == match_msg + assert myjson["error_code"] == 42206 and myjson["message"] == match_msg + + res = await registry_async_client.delete("subjects/test_schema/versions/1") + assert res.status_code == 200 + + res = await registry_async_client.delete("subjects/customer/versions/1") + assert res.status_code == 200