Skip to content

Commit

Permalink
clean up my nightmare fuel of a last commit lol
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Oct 27, 2023
1 parent 2295dcd commit cc5b638
Showing 1 changed file with 32 additions and 75 deletions.
107 changes: 32 additions & 75 deletions src/StreamingDataFrames/streamingdataframes/models/serializers/quix.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ def _parse_legacy_format(
return headers, message


class QuixLegacySerializer(JSONSerializer):
extra_headers = {}
class QuixSerializer(JSONSerializer):
_legacy = {}
extra_headers = {}

def __init__(
self,
as_legacy: Optional[bool] = None,
as_legacy: bool = True,
dumps: Optional[Callable[[Any, None], Union[str, bytes]]] = None,
dumps_kwargs: Optional[Mapping] = None,
):
Expand All @@ -269,40 +269,29 @@ def __init__(
:param dumps: a function to serialize objects to json. Default - `json.dumps`
:param dumps_kwargs: a dict with keyword arguments for `dumps()` function.
"""
if as_legacy is None:
as_legacy = True
self.as_legacy = as_legacy
super().__init__(dumps=dumps, dumps_kwargs=dumps_kwargs)
if self.as_legacy:
self._legacy = {
QModelKey.HEADER_NAME: QModelKey.PARAMETERDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}
if as_legacy:
self.extra_headers = {}
self._serialize = self._as_legacy_format
else:
self.extra_headers = {
QModelKey.HEADER_NAME: QModelKey.TIMESERIESDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}
self._serialize = self._to_json

def _to_json(self, value: Any):
if self.as_legacy:
start = (
f'{{"{LegacyHeaders.Q_CODEC_ID}":"{self._legacy[QCodecId.HEADER_NAME]}",'
f'"{LegacyHeaders.Q_MODEL_KEY}":"{self._legacy[QModelKey.HEADER_NAME]}",'
f'"{LegacyHeaders.VALUE}":'
)
start_idx = len(start)
value = super()._to_json(value)
end = (
f',"{LegacyHeaders.VALUE_START_IDX}":{start_idx},'
f'"{LegacyHeaders.VALUE_END_IDX}":{start_idx + len(value)}}}'
)
return f"{start}{value}{end}"
else:
return super()._to_json(value)
def _as_legacy_format(self, value: Any):
start = (
f'{{"{LegacyHeaders.Q_CODEC_ID}":"{self._legacy[QCodecId.HEADER_NAME]}",'
f'"{LegacyHeaders.Q_MODEL_KEY}":"{self._legacy[QModelKey.HEADER_NAME]}",'
f'"{LegacyHeaders.VALUE}":'
)
start_idx = len(start)
value = self._to_json(value)
end = (
f',"{LegacyHeaders.VALUE_START_IDX}":{start_idx},'
f'"{LegacyHeaders.VALUE_END_IDX}":{start_idx + len(value)}}}'
)
return f"{start}{value}{end}"


class QuixTimeseriesSerializer(QuixLegacySerializer):
class QuixTimeseriesSerializer(QuixSerializer):
"""
Serialize data to JSON formatted according to Quix Timeseries format.
Expand All @@ -325,31 +314,15 @@ class QuixTimeseriesSerializer(QuixLegacySerializer):
"""

_legacy = {
QModelKey.HEADER_NAME: QModelKey.PARAMETERDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}
extra_headers = {
QModelKey.HEADER_NAME: QModelKey.TIMESERIESDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}

def __init__(
self,
as_legacy: Optional[bool] = None,
dumps: Optional[Callable[[Any, None], Union[str, bytes]]] = None,
dumps_kwargs: Optional[Mapping] = None,
):
"""
Serializer that returns data in json format.
:param as_legacy: parse as the legacy format; Default = True
:param dumps: a function to serialize objects to json. Default - `json.dumps`
:param dumps_kwargs: a dict with keyword arguments for `dumps()` function.
"""
super().__init__(as_legacy=as_legacy, dumps=dumps, dumps_kwargs=dumps_kwargs)
if self.as_legacy:
self._legacy = {
QModelKey.HEADER_NAME: QModelKey.PARAMETERDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}
self.extra_headers = {}

def __call__(
self, value: Mapping, ctx: SerializationContext, timestamp_ns: int = None
) -> Union[str, bytes]:
Expand Down Expand Up @@ -397,10 +370,10 @@ def __call__(
if not is_empty:
result["Timestamps"].append(timestamp_ns or time.time_ns())

return self._to_json(result)
return self._serialize(result)


class QuixEventsSerializer(QuixLegacySerializer):
class QuixEventsSerializer(QuixSerializer):
"""
Serialize data to JSON formatted according to Quix EventData format.
The input value is expected to be a dictionary with the following keys:
Expand Down Expand Up @@ -431,26 +404,10 @@ class QuixEventsSerializer(QuixLegacySerializer):
QModelKey.HEADER_NAME: QModelKey.EVENTDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}

def __init__(
self,
as_legacy: Optional[bool] = None,
dumps: Optional[Callable[[Any, None], Union[str, bytes]]] = None,
dumps_kwargs: Optional[Mapping] = None,
):
"""
Serializer that returns data in json format.
:param as_legacy: parse as the legacy format; Default = True
:param dumps: a function to serialize objects to json. Default - `json.dumps`
:param dumps_kwargs: a dict with keyword arguments for `dumps()` function.
"""
super().__init__(as_legacy=as_legacy, dumps=dumps, dumps_kwargs=dumps_kwargs)
if self.as_legacy:
self._legacy = {
QModelKey.HEADER_NAME: QModelKey.EVENTDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}
self.extra_headers = {}
_legacy = {
QModelKey.HEADER_NAME: QModelKey.EVENTDATA,
QCodecId.HEADER_NAME: QCodecId.JSON_TYPED,
}

def __call__(
self, value: Mapping, ctx: SerializationContext, timestamp_ns: int = None
Expand Down Expand Up @@ -482,4 +439,4 @@ def __call__(
"Timestamp": timestamp_ns or time.time_ns(),
}

return self._to_json(result)
return self._serialize(result)

0 comments on commit cc5b638

Please sign in to comment.