Skip to content

Commit

Permalink
events: add schema language and code generators
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Dec 19, 2024
1 parent 801cce6 commit cdfb71e
Show file tree
Hide file tree
Showing 22 changed files with 5,702 additions and 7 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/schema_compat.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: On Pull Request Schemas
on:
merge_group:
pull_request:
types: [synchronize, opened, reopened, ready_for_review]
paths:
- 'src/disco/metrics/schema/**'
concurrency:
group: on-pull-request_${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
schema_compat:
timeout-minutes: 1
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: 'Check schemas are compatible'
run: python3 check_schemas.py
working-directory: src/disco/metrics
4 changes: 2 additions & 2 deletions src/disco/metrics/Local.mk
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
$(call add-hdrs,fd_prometheus.h fd_metrics.h)
$(call add-objs,fd_prometheus fd_metrics,fd_disco)
$(call add-hdrs,fd_prometheus.h fd_metrics.h generated/fd_event.h generated/fd_event_metrics.h generated/fd_metric_event_snap.h)
$(call add-objs,fd_prometheus fd_metrics generated/fd_event generated/fd_metric_event_snap,fd_disco)
60 changes: 60 additions & 0 deletions src/disco/metrics/check_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
import os
import subprocess
from pathlib import Path
from typing import Dict
from generate.event_types import Event, check_schema, validate_compatability

def get_files_from_commit(commit: str, directory: Path):
result = subprocess.run(
['git', 'ls-tree', '--name-only', commit, directory],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
return result.stdout.decode('utf-8').splitlines()

def load_file_content(commit: str, file_path: str):
result = subprocess.run(
['git', 'show', f'{commit}:{file_path}'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
).stdout.decode('utf-8')
return json.loads(result)

def load_current_file_content(file_path: str):
with open(file_path, 'r') as file:
return json.load(file)

def check_schema_compatability(ref: str):
directory = Path(__file__).parent / 'schema'

parent_commit = subprocess.run(
['git', 'rev-parse', ref],
stdout=subprocess.PIPE,
check=True
).stdout.strip().decode('utf-8')

parent_schemas: Dict[str, Event] = {}
parent_files = get_files_from_commit(parent_commit, directory)
for file in parent_files:
if file.endswith('.json'):
parent_schemas[file[:-5]] = load_file_content(parent_commit, os.path.join(directory, file))

current_schemas: Dict[str, Event] = {}
working_directory_files = [
os.path.relpath(os.path.join(root, file), start=directory)
for root, _, files in os.walk(directory)
for file in files
]
for file in working_directory_files:
if file.endswith('.json'):
current_schemas[file[:-5]] = load_current_file_content(os.path.join(directory, file))

check_schema(parent_schemas)
check_schema(current_schemas)
validate_compatability(parent_schemas, current_schemas)

if __name__ == "__main__":
check_schema_compatability(os.environ['GITHUB_BASE_REF'])
43 changes: 41 additions & 2 deletions src/disco/metrics/gen_metrics.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,53 @@
from generate.types import *
from generate.write_codegen import write_codegen
import copy
import io
import os

from generate.metric_types import *
from generate.event_types import *
from generate.write_codegen import write_codegen, write_event_snap_codegen
from generate.write_docs import write_docs
from generate.write_metric_event_schema import write_metrics_sample_schema
from generate.write_events_codegen import write_event_formatter
from pathlib import Path

def main():
metrics = parse_metrics(Path('metrics.xml').read_text())
metrics.layout()

schema_before: Dict[str, Any] = {}
for file in os.listdir(Path(__file__).parent / 'schema'):
if file.endswith('.json'):
with open(Path(__file__).parent / 'schema' / file, 'r') as f:
data = json.load(f)

schema_before[file[:-5]] = Event(data)

# Check that metrics event schema which goes up to clickhouse is
# still backwards compatible.
event_new = io.StringIO()
write_metrics_sample_schema(metrics, event_new)
schema_after = copy.deepcopy(schema_before)
schema_after['metrics_sample'] = Event(json.loads(event_new.getvalue()))

check_schema(schema_before)
check_schema(schema_after)
validate_compatability(schema_before, schema_after)

with open(Path(__file__).parent / 'schema/metrics_sample.json', 'w') as f:
f.write(event_new.getvalue())

# Now code generate the metrics structs and accessors.
write_codegen(metrics)

# Now code generate documentation of the metrics.
write_docs(metrics)

# Now code generate the transformer that turns the metrics structs
# into a metrics event for remote reporting.
write_event_snap_codegen(metrics)

# Now code generate a JSON formatter for generic event types.
write_event_formatter(schema_after)

if __name__ == '__main__':
main()
242 changes: 242 additions & 0 deletions src/disco/metrics/generate/event_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import re
import os
import subprocess
import json
from pathlib import Path
from enum import Enum
from typing import Dict, List, Any

class ClickHouseType(Enum):
DATETIME_64_9 = 0
STRING = 1
ENUM_8 = 2
LOW_CARDINALITY_STRING = 3
UINT16 = 4
UINT32 = 5
UINT64 = 6
TUPLE = 7
NESTED = 8
IPV6 = 9

class Field:
def __init__(self, name: str, field: Any):
self.name = name

if not 'description' in field:
raise ValueError(f"Field `{name}` is missing description")

self.description: str = field['description']

if self.name.strip() == '':
raise ValueError("Field name is empty")

if self.description.strip() == '':
raise ValueError(f"Field `{name}` has empty description")

self.deprecated: bool = False
if 'deprecated' in field:
self.deprecated = field['deprecated']

self.server_only = False
if 'server_only' in field:
self.server_only = field['server_only']

if field['type'] == "DateTime64(9)":
self.type = ClickHouseType.DATETIME_64_9
elif field['type'] == "String" or field['type'] == "LowCardinality(String)":
if field['type'] == "String":
self.type = ClickHouseType.STRING
else:
self.type = ClickHouseType.LOW_CARDINALITY_STRING

self.max_length = None
if 'max_length' in field:
self.max_length = int(field['max_length'])
if field['max_length'] < 1:
raise ValueError(f"String field `{name}` has max_length less than 1")
elif field['type'] == "Enum8":
self.type = ClickHouseType.ENUM_8

self.variants: Dict[str, int] = {}
for (variant, value) in field['variants'].items():
if variant in self.variants:
raise ValueError(f"Duplicate variant {variant}")

if not re.match(r'^[a-z][a-z0-9]*(_[a-z0-9]+)*$', variant):
raise ValueError(f"Enum `{name}` variant `{variant}` must contain only lowercase characters and underscores")

if value < -128:
raise ValueError(f"Enum `{name}` variant `{variant}` has value less than -128")
if value > 127:
raise ValueError(f"Enum `{name}` variant `{variant}` has value greater than 127")

self.variants[variant] = value
if len(self.variants) == 0:
raise ValueError(f"Enum `{name}` has no variants")
elif field['type'] == "UInt16":
self.type = ClickHouseType.UINT16
elif field['type'] == "UInt32":
self.type = ClickHouseType.UINT32
elif field['type'] == "UInt64":
self.type = ClickHouseType.UINT64
elif field['type'] == "IPv6":
self.type = ClickHouseType.IPV6
elif field['type'] == 'Tuple':
self.type = ClickHouseType.TUPLE

self.sub_fields: Dict[str, Field] = {}
for sub_field in field['fields']:
if sub_field in self.sub_fields:
raise ValueError(f"Duplicate sub-field {sub_field}")

self.sub_fields[sub_field] = Field(sub_field, field['fields'][sub_field])
elif field['type'] == "Nested":
self.type = ClickHouseType.NESTED

self.sub_fields: Dict[str, Field] = {}
for sub_field in field['fields']:
if sub_field in self.sub_fields:
raise ValueError(f"Duplicate sub-field {sub_field}")

self.sub_fields[sub_field] = Field(sub_field, field['fields'][sub_field])
else:
raise ValueError(f"Unknown field type {field['type']}")

class Event:
def __init__(self, json: Any):
self.name: str = json['name']
self.id: int = json['id']
self.description: str = json['description']
self.deprecated: bool = False
if 'deprecated' in json:
self.deprecated = json['deprecated']

if not re.match(r'^[a-z][a-z0-9]*(_[a-z0-9]+)*$', self.name):
raise ValueError(f"Event `{self.name}` must contain only lowercase characters and underscores")

if self.name.strip() == '':
raise ValueError("Event name is empty")

if self.description.strip() == '':
raise ValueError(f"Event `{self.name}` has empty description")

self.fields: Dict[str, Field] = {}
for (name, field) in json['fields'].items():
if name in self.fields:
raise ValueError(f"Duplicate field {name}")

self.fields[name] = Field(name, field)

def validate_field_compatiblity(before: Field, after: Field):
if before.deprecated and not after.deprecated:
raise ValueError(f"Field `{before.name}` is no longer deprecated")

if before.type != after.type:
raise ValueError(f"Field `{before.name}` has changed type")

if before.type == ClickHouseType.ENUM_8:
for variant in before.variants:
if variant not in after.variants:
raise ValueError(f"Field `{before.name}` has missing variant `{variant}`")

if before.variants[variant] != after.variants[variant]:
raise ValueError(f"Field `{before.name}` has changed value for variant `{variant}`")
elif before.type == ClickHouseType.TUPLE:
for sub_field in before.sub_fields:
if sub_field not in after.sub_fields:
raise ValueError(f"Sub-field `{sub_field}` in field `{before.name}` is missing in new schema")

validate_field_compatiblity(before.sub_fields[sub_field], after.sub_fields[sub_field])
elif before.type == ClickHouseType.NESTED:
for sub_field in before.sub_fields:
if sub_field not in after.sub_fields:
raise ValueError(f"Sub-field `{sub_field}` in field `{before.name}` is missing in new schema")

validate_field_compatiblity(before.sub_fields[sub_field], after.sub_fields[sub_field])

def validate_compatability(before: Dict[str, Event], after: Dict[str, Event]):
for event in before:
if event not in after:
raise ValueError(f"Event `{event}` is missing in new schema")

# id changes don't matter, as they don't make it up to
# clickhouse

if before[event].deprecated and not after[event].deprecated:
raise ValueError(f"Event `{event}` is no longer deprecated")

for field in before[event].fields:
if field not in after[event].fields:
raise ValueError(f"Field `{field}` in event `{event}` is missing in new schema")

validate_field_compatiblity(before[event].fields[field], after[event].fields[field])

def check_field(is_nested: bool, field: Field):
if field.name.strip() == '':
raise ValueError("Field name is empty")

if field.description.strip() == '':
raise ValueError(f"Field `{field.name}` has empty description")

if field.type == ClickHouseType.ENUM_8:
if len(field.variants) == 0:
raise ValueError(f"Enum `{field.name}` has no variants")

for variant in field.variants:
if not re.match(r'^[a-z][a-z0-9]*(_[a-z0-9]+)*$', variant):
raise ValueError(f"Enum `{field.name}` variant `{variant}` must contain only lowercase characters and underscores")

if field.variants[variant] < -128:
raise ValueError(f"Enum `{field.name}` variant `{variant}` has value less than -128")
if field.variants[variant] > 127:
raise ValueError(f"Enum `{field.name}` variant `{variant}` has value greater than 127")
elif field.type == ClickHouseType.TUPLE:
for sub_field in field.sub_fields:
check_field(is_nested, field.sub_fields[sub_field])
elif field.type == ClickHouseType.NESTED:
if is_nested:
raise ValueError(f"Nested fields are not allowed in nested fields")

for sub_field in field.sub_fields:
check_field(True, field.sub_fields[sub_field])

def check_schema(schema: Dict[str, Event]):
for (name, event) in schema.items():
if event.name != name:
raise ValueError(f"Event name `{event.name}` does not match the key `{name}`")

if event.name != 'common' and not re.match(r'^[a-z]+_[a-z]+$', event.name):
raise ValueError(f"Event name `{event.name}` must contain only lowercase characters, and be in the format `{{category}}_{{name}}`")

if not 'common' in schema:
raise ValueError("Missing `common` event")

for event in schema:
for other in schema:
if event == other:
continue

if schema[event].id == schema[other].id:
raise ValueError(f"Event `{event}` and `{other}` have the same id")

ids: List[int] = []
for event in schema:
if event == 'common':
continue

ids.append(schema[event].id)

ids.sort()
for i in range(1, len(ids)):
if ids[i] - ids[i - 1] != 1:
raise ValueError(f"Missing id between {ids[i - 1]} and {ids[i]}")

for event in schema:
if event == 'common':
continue

for field in schema[event].fields.values():
if field.name in schema['common'].fields:
raise ValueError(f"Field `{field.name}` in event `{event}` is also present in `common` event")

check_field(False, field)
Loading

0 comments on commit cdfb71e

Please sign in to comment.