-
Notifications
You must be signed in to change notification settings - Fork 8
/
codec.py
236 lines (189 loc) · 7.98 KB
/
codec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""codec implements the column format described in this project's README.md"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
import datetime
import numpy as np
import pandas as pd
class VersionError(Exception):
def __init__(self, version):
super().__init__(f"unsupported version {version}")
class MissingFieldError(Exception):
def __init__(self, field):
super().__init__(f'missing field: "{field}"')
@dataclass
class Timeseries:
"""Timeseries represents a single timeseries. A timeseries is a
time-indexed Pandas series with a type ("basal", "insulin", "carb")
and metadata (e.g., parameters for carb or insulin curves.)"""
ctype: str
meta: dict
series: pd.Series
@dataclass
class Schedule:
"""Schedule represents a time-indexed parameter schedule."""
index: List[int]
values: List[float]
def reindexed(self, minutes):
return np.array(self.index) // minutes
def fromtuple(tup):
index, values = tup
return Schedule(index, values)
def fromdict(d):
return Schedule(d["index"], d["values"])
def todict(self):
return {
"index": self.index,
"values": self.values,
}
@dataclass
class Request:
"""Request represents a tuning request. It includes the
relevant timeseries as well as other data received from the tuning
request."""
# Timezone contains the user's timezone for the data in the
# frame. This scheme does not yet support timezone mobility;
# as such, timezones should be contained in its own timeline.
timezone: str
timeseries: List[Timeseries]
minimum_time_interval: Optional[int] = None
maximum_schedule_item_count: Optional[int] = None
allowed_basal_rates: Optional[List[float]] = None
basal_insulin_parameters: Dict[str, float] = field(
default_factory=lambda: {"delay": 5, "peak": 65, "duration": 205, }
)
insulin_sensitivity_schedule: Optional[Schedule] = None
carb_ratio_schedule: Optional[Schedule] = None
basal_rate_schedule: Optional[Schedule] = None
# If specified, limits the amount of deviation allowed from
# the above parameters.
tuning_limit: Optional[float] = None
hyper_params: Dict[str, Any] = field(default_factory=lambda: {})
# The set of parameters to tune. If it is not specified,
# all parameters are tuned.
tune_parameters: Optional[Set[str]] = None
def fromdict(payload) -> "Request":
"""Decode a JSON payload into a Request."""
if payload.get("version") is None:
raise MissingFieldError("version")
if payload["version"] != 1:
raise VersionError(payload["version"])
timezone = payload.get("timezone")
if timezone is None:
raise MissingFieldError("timezone")
# iOS has the habit of sending timezone offsets.
if timezone.startswith("GMT-") or timezone.startswith("GMT+"):
delta = datetime.timedelta(
hours=int(timezone[4:6]), minutes=int(timezone[6:8]))
if timezone[3] == "-":
delta = -delta
timezone = datetime.timezone(delta)
raw_timelines = payload.get("timelines")
if raw_timelines is None:
raise MissingFieldError("timelines")
minimum_time_interval = payload.get("minimum_time_interval")
maximum_schedule_item_count = payload.get(
"maximum_schedule_item_count")
allowed_basal_rates = payload.get("allowed_basal_rates")
insulin_sensitivity_schedule = None
if "insulin_sensitivity_schedule" in payload:
insulin_sensitivity_schedule = Schedule.fromdict(
payload["insulin_sensitivity_schedule"]
)
carb_ratio_schedule = None
if "carb_ratio_schedule" in payload:
carb_ratio_schedule = Schedule.fromdict(
payload["carb_ratio_schedule"])
basal_rate_schedule = None
if "basal_rate_schedule" in payload:
basal_rate_schedule = Schedule.fromdict(
payload["basal_rate_schedule"])
basal_insulin_parameters = payload.get("basal_insulin_parameters", {})
timeseries = []
for index, timeline in enumerate(raw_timelines):
series_type = timeline["type"]
if not series_type in ["bolus", "basal", "insulin", "carb", "glucose"]:
raise Exception(
f"series {index}: invalid series type {series_type}")
params = timeline.get("parameters", {})
index = undelta(timeline["index"])
values = undelta(timeline["values"])
if len(index) == 0:
continue
if "durations" in timeline:
durations = undelta(timeline["durations"])
index, values = resample(index, values, durations)
index = pd.to_datetime(index, unit="s", utc=True)
index = index.tz_convert(timezone)
series = pd.Series(values, index)
timeseries.append(Timeseries(series_type, params, series))
tune_parameters = payload.get("tune_parameters")
if tune_parameters is not None:
tune_parameters = set(tune_parameters)
return Request(
timezone=timezone,
timeseries=timeseries,
minimum_time_interval=minimum_time_interval,
maximum_schedule_item_count=maximum_schedule_item_count,
allowed_basal_rates=allowed_basal_rates,
basal_insulin_parameters=basal_insulin_parameters,
insulin_sensitivity_schedule=insulin_sensitivity_schedule,
carb_ratio_schedule=carb_ratio_schedule,
basal_rate_schedule=basal_rate_schedule,
tuning_limit=payload.get("tuning_limit"),
hyper_params=payload.get("hyper_params", {}),
tune_parameters=tune_parameters,
)
def undelta(list):
"""Decode a delta-encoded series."""
array = np.array(list)
array = np.cumsum(array)
return array
def resample(index, values, durations):
"""Resample the series provided the given durations (in seconds).
The data are always resampled to 5 minute increments. Note that
the returned index may have duplicate entries and also be out of
order. However, this should present no concern as these series are
immediately resampled."""
index_out = []
values_out = []
for timestamp, value, duration in zip(index, values, durations):
# Special case for instanteneous events: we spread it across
# the full period. (That's the limit of the model resolution
# anyway.)
if duration == 0:
duration = 300
nperiod = (duration + 300 - 1) // 300
for period in range(nperiod):
index_out.append(timestamp + period * 300)
values_out.append(value / nperiod)
return index_out, values_out
@dataclass
class Response:
version: int
timezone: str
insulin_sensitivity_schedule: Schedule
carb_ratio_schedule: Schedule
basal_rate_schedule: Schedule
training_loss: Optional[float]
def todict(self):
d = {
"version": self.version,
"timezone": self.timezone,
"insulin_sensitivity_schedule": self.insulin_sensitivity_schedule.todict(),
"carb_ratio_schedule": self.carb_ratio_schedule.todict(),
"basal_rate_schedule": self.basal_rate_schedule.todict(),
}
if self.training_loss is not None:
d["training_loss"] = self.training_loss
return d
def fromdict(d):
return Response(
version=d["version"],
timezone=d["timezone"],
insulin_sensitivity_schedule=Schedule.fromdict(
d["insulin_sensitivity_schedule"]
),
carb_ratio_schedule=Schedule.fromdict(d["carb_ratio_schedule"]),
basal_rate_schedule=Schedule.fromdict(d["basal_rate_schedule"]),
training_loss=d.get("training_loss"),
)