Skip to content

Commit

Permalink
Change id calculation to new convention and add known_id property
Browse files Browse the repository at this point in the history
  • Loading branch information
bsmartradio committed Aug 20, 2024
1 parent bcc9fc0 commit fac0a32
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
15 changes: 6 additions & 9 deletions python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,18 @@ def parse_args():
def upload_schema(registry_url, subject, schema_registry):
"""Parse schema registry and upload all schemas.
"""
for version in schema_registry.known_versions:
schema = schema_registry.get_by_version(version)
numbers = re.findall(r'\d+', version)
numbers[1] = str(numbers[1]).zfill(2)
version_number = int(''.join(numbers))
for schema_id in schema_registry.known_ids:
schema = schema_registry.get_by_id(schema_id)
normalized_schema = fastavro.schema.to_parsing_canonical_form(
schema.definition)
confluent_schema = {"version": version_number,
"id": version_number, "schema": normalized_schema}
confluent_schema = {"version": schema_id,
"id": schema_id, "schema": normalized_schema}
payload = json.dumps(confluent_schema)
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
url = f"{registry_url}/subjects/{subject}/versions"
print(f"uploading schema to {url}")
response = requests.post(url=url, data=payload, headers=headers)
# response.raise_for_status()
response.raise_for_status()
print(f"done, status={response.status_code}")
print(f"response text={response.text}")

Expand All @@ -78,7 +75,7 @@ def delete_schema(registry_url, subject):
response = requests.get(url_schema_versions)

# Schema registry must be empty to put it in import mode. If it exists,
# remove it and remkae the schema. If not, continue.
# remove it and remake the schema. If not, continue.
if response.status_code == 200:
print('The schema will be deleted and remade in import mode.')
response = requests.delete(url_schemas)
Expand Down
24 changes: 19 additions & 5 deletions python/lsst/alert/packet/schemaRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import json

Check failure on line 25 in python/lsst/alert/packet/schemaRegistry.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F401

'json' imported but unused
import os
import re
import zlib

Check failure on line 28 in python/lsst/alert/packet/schemaRegistry.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F401

'zlib' imported but unused

__all__ = ["SchemaRegistry"]
Expand All @@ -38,6 +39,7 @@ class SchemaRegistry:
def __init__(self):
self._version_to_id = {}
self._id_to_schema = {}
self._ids = {}

def register_schema(self, schema, version):
"""Register a new schema in the registry.
Expand All @@ -63,6 +65,7 @@ def register_schema(self, schema, version):
"""
schema_id = self.calculate_id(schema)
self._version_to_id[version] = schema_id
self._ids[schema_id] = schema_id
self._id_to_schema[schema_id] = schema
return schema_id

Expand Down Expand Up @@ -108,6 +111,17 @@ def known_versions(self):
"""
return set(self._version_to_id)

@property
def known_ids(self):
"""Return all the schema ids tracked by this registry.
Returns
-------
schemas : `set` of `int`
Set of schema ids.
"""
return set(self._ids)

@staticmethod
def calculate_id(schema):
"""Calculate an ID for the given schema.
Expand All @@ -122,13 +136,13 @@ def calculate_id(schema):
schema_id : `int`
The calculated ID.
"""
# Significant risk of collisions with more than a few schemas;
# CRC32 is ok for prototyping but isn't sensible in production.
return zlib.crc32(json.dumps(schema.definition,
sort_keys=True).encode('utf-8'))
numbers = re.findall(r'\d+', schema.definition['name'])
numbers[1] = str(numbers[1]).zfill(2)
schema_id = int(''.join(numbers))
return schema_id

@classmethod
def from_filesystem(cls, root=None, schema_root="lsst.v7_0.alert"):
def from_filesystem(cls, root=None, schema_root="lsst.v7_1.alert"):
"""Populate a schema registry based on the filesystem.
Walk the directory tree from the root provided, locating files named
Expand Down

0 comments on commit fac0a32

Please sign in to comment.