diff --git a/python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py b/python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py index 58bff66..c307121 100644 --- a/python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py +++ b/python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py @@ -50,21 +50,18 @@ def parse_args(): def upload_schema(registry_url, subject, schema_registry): """Parse schema registry and upload all schemas. """ - for version in schema_registry.known_versions: - schema = schema_registry.get_by_version(version) - numbers = re.findall(r'\d+', version) - numbers[1] = str(numbers[1]).zfill(2) - version_number = int(''.join(numbers)) + for schema_id in schema_registry.known_ids: + schema = schema_registry.get_by_id(schema_id) normalized_schema = fastavro.schema.to_parsing_canonical_form( schema.definition) - confluent_schema = {"version": version_number, - "id": version_number, "schema": normalized_schema} + confluent_schema = {"version": schema_id, + "id": schema_id, "schema": normalized_schema} payload = json.dumps(confluent_schema) headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"} url = f"{registry_url}/subjects/{subject}/versions" print(f"uploading schema to {url}") response = requests.post(url=url, data=payload, headers=headers) - # response.raise_for_status() + response.raise_for_status() print(f"done, status={response.status_code}") print(f"response text={response.text}") @@ -78,7 +75,7 @@ def delete_schema(registry_url, subject): response = requests.get(url_schema_versions) # Schema registry must be empty to put it in import mode. If it exists, - # remove it and remkae the schema. If not, continue. + # remove it and remake the schema. If not, continue. if response.status_code == 200: print('The schema will be deleted and remade in import mode.') response = requests.delete(url_schemas) diff --git a/python/lsst/alert/packet/schemaRegistry.py b/python/lsst/alert/packet/schemaRegistry.py index 40516ae..672af94 100644 --- a/python/lsst/alert/packet/schemaRegistry.py +++ b/python/lsst/alert/packet/schemaRegistry.py @@ -24,6 +24,7 @@ import json import os +import re import zlib __all__ = ["SchemaRegistry"] @@ -38,6 +39,7 @@ class SchemaRegistry: def __init__(self): self._version_to_id = {} self._id_to_schema = {} + self._ids = {} def register_schema(self, schema, version): """Register a new schema in the registry. @@ -63,6 +65,7 @@ def register_schema(self, schema, version): """ schema_id = self.calculate_id(schema) self._version_to_id[version] = schema_id + self._ids[schema_id] = schema_id self._id_to_schema[schema_id] = schema return schema_id @@ -108,6 +111,17 @@ def known_versions(self): """ return set(self._version_to_id) + @property + def known_ids(self): + """Return all the schema ids tracked by this registry. + + Returns + ------- + schemas : `set` of `int` + Set of schema ids. + """ + return set(self._ids) + @staticmethod def calculate_id(schema): """Calculate an ID for the given schema. @@ -122,13 +136,13 @@ def calculate_id(schema): schema_id : `int` The calculated ID. """ - # Significant risk of collisions with more than a few schemas; - # CRC32 is ok for prototyping but isn't sensible in production. - return zlib.crc32(json.dumps(schema.definition, - sort_keys=True).encode('utf-8')) + numbers = re.findall(r'\d+', schema.definition['name']) + numbers[1] = str(numbers[1]).zfill(2) + schema_id = int(''.join(numbers)) + return schema_id @classmethod - def from_filesystem(cls, root=None, schema_root="lsst.v7_0.alert"): + def from_filesystem(cls, root=None, schema_root="lsst.v7_1.alert"): """Populate a schema registry based on the filesystem. Walk the directory tree from the root provided, locating files named