diff --git a/src/treetop/notebooks/server.py b/src/treetop/notebooks/server.py index d2cd1e7be5..796fedabe5 100755 --- a/src/treetop/notebooks/server.py +++ b/src/treetop/notebooks/server.py @@ -146,7 +146,8 @@ class TreetopServer(): _sample_interval = 15 # metrics sampling interval (seconds) _sample_valueset = 128 # count of raw sample values exported _training_count = 0 # number of times training was performed - _training_interval = 10 # server training interval (seconds) + _training_delay = 2.5 # server inter-training delay (seconds) + _training_interval = 300 # server training interval (seconds) _importance_type = 'gain' # default feature importance measure _variance_threshold = 0.125 # minimum level of feature variance _mutualinfo_threshold = 0.125 # minimum target mutual information @@ -183,7 +184,12 @@ def __init__(self): # dataset self.df = None - self.columns = [] + self.reset() + + def reset(self): + self.data = [] # list of dicts + self.index = [] # list of ints + self.dtypes = {} # dict of name:type def options(self): """ Setup default command line argument option handling """ @@ -500,7 +506,7 @@ def settings(self): # logger.debug('RESET NE:', str(datetime.fromtimestamp(self.sample_time())), 'vs', str(datetime.fromtimestamp(self.timestamp))) # reset = 'full' else: - self.timestamp += self._sample_interval + self.timestamp += self._training_interval logger.debug('STEP FWD: %s', str(datetime.fromtimestamp(self.timestamp))) self.timestamp_s = str(datetime.fromtimestamp(self.timestamp)) @@ -522,12 +528,12 @@ def settings(self): value = values.lookup_mapping("training.window", None) values.set(value, timewindow) - logger.info("Training interval:", self._training_interval) - logger.info("Sample interval:", self._sample_interval) - logger.info("Sample count:", self._sample_count) + logger.info("Training interval: %d", self._training_interval) + logger.info("Sample interval: %d", self._sample_interval) + logger.info("Sample count: %d", self._sample_count) logger.info("Timestamp: %s - %s", self.timestamp_s, self.timestamp) - logger.info("Target metric:", self.target()) - logger.info("Filter metrics:", self.filter().split()) + logger.info("Target metric: %s", self.target()) + logger.info("Filter metrics: %s", self.filter().split()) logger.info("Start time: %s - %s", datetime.fromtimestamp(self.start_time), self.start_time) logger.info("End time: %s - %s", datetime.fromtimestamp(self.end_time), self.end_time) logger.info("Total time: %.5f seconds" % (self.end_time - self.start_time)) @@ -604,16 +610,17 @@ def connect(self): self.client.fetch() logger.info('Client fetched') if not self.logfile or self.logfile != self.dataset(): + self.logfile = self.dataset() self.source = None if not self.source and not self.source_connect(): return False # cannot proceed further, error reported return True def sleep(self): - logger.info('Sleep %.1f at timestep %s - %s', self._training_interval, self.timestamp_s, self.timestamp) + logger.info('Sleep %.1f at timestep %s - %s', self._training_delay, self.timestamp_s, self.timestamp) value = self.values.lookup_mapping("processing.state", None) self.values.set_string(value, "waiting"); - time.sleep(self._training_interval) + time.sleep(self._training_delay) def elapsed(self, start_time, metric_name): elapsed_time = time.time() - start_time @@ -658,10 +665,8 @@ def refresh(self): def prepare_dataset(self, reset='full'): # refresh from the metrics source to form a pandas dataframe - - #if reset == 'full' and self.df is not None: - # self.df = None - self.df = None + #if reset != 'full': + # ...append live sample to existing dataset origin = pmapi.timespec(self.timestamp) delta = pmapi.timespec(self.sample_interval()) @@ -680,13 +685,15 @@ def prepare_dataset(self, reset='full'): result = self.pmconfig.get_ranked_results(valid_only=True) self.append_sample(count, result) + self.df = pd.DataFrame(data=self.data, index=self.index) self.df = self.df.replace(b'', None) # from .loc self.df = self.df.astype(self.dtypes, copy=False) self.df = self.df.reindex(columns=self.dtypes.keys()) - #logger.debug('Reindexed dataframe, shape:', self.df.shape) - #logger.debug('Columns', self.df.columns[:10]) - #logger.debug('Dtypes', list(self.df.dtypes)[:10]) - #logger.debug('Values', self.df[:10]) + logger.debug('Reindexed dataframe, shape: %s', self.df.shape) + #logger.debug('Columns %s', self.df.columns[:10]) + #logger.debug('Dtypes %s', list(self.df.dtypes)[:10]) + #logger.debug('Values %s', self.df[:10].describe()) + self.reset() # drop interim containers and values def lookup_dtype(self, desc): """ Map the appropriate pandas type for a metric descriptor """ @@ -699,17 +706,16 @@ def lookup_dtype(self, desc): return 'float64' return None - def update_dataset(self, index, names, dtypes, values): - if self.df is None: - self.df = pd.DataFrame(columns=names) + def update_dataset(self, index, dtypes, values): + if self.dtypes is None: self.dtypes = dtypes # the 1st dictionary seen else: self.dtypes.update(dtypes) # add to dictionary - # insert this sample at the specified offset (index) - self.df.loc[index, names] = values + self.data.append(values) + self.index.append(index) def append_sample(self, index, result): - names, values, dtypes = ['timestamp'], {}, {} + values, dtypes = {}, {} values['timestamp'] = self.pmfg_ts() dtypes['timestamp'] = 'datetime64[ns]' @@ -723,7 +729,6 @@ def append_sample(self, index, result): # Handle the simpler no-instance-domain case first if desc.indom == PM_INDOM_NULL: - names.append(metric) # column name dtypes[metric] = dtype values[metric] = result[metric][0][2] continue @@ -733,11 +738,10 @@ def append_sample(self, index, result): # ensure we meet xgboost column name rules instname = re.sub(r'\[|\]|<', '', instname) metricspec = metric + '(' + instname + ')' - names.append(metricspec) # column name dtypes[metricspec] = dtype values[metricspec] = value - self.update_dataset(index, names, dtypes, values) + self.update_dataset(index, dtypes, values) def export_values(self, target, window): """ export dataset metrics, including the valueset metric """ @@ -807,8 +811,8 @@ def anomaly_features(self, df): anomalies_df = self.top_anomaly_features(iso, y_pred_diffi, df0, self._max_anomaly_features) t1 = time.time() - logger.info('Anomaly time:', t1 - t0) - logger.info('Anomaly features:', len(anomalies_df.columns)) + logger.info('Anomaly time: %.5s', t1 - t0) + logger.info('Anomaly features: %d', len(anomalies_df.columns)) value = self.values.lookup_mapping("features.anomalies", None) self.values.set(value, len(anomalies_df)) return anomalies_df @@ -821,8 +825,11 @@ def reduce_with_variance(self, train_X): cull.fit(train_X) except ValueError: return train_X # no columns met criteria, training will struggle + except RuntimeError as error: + logger.warning("reduce_with_variance %s, error %s", train_X.shape, error) + return train_X t1 = time.time() - logger.info('Variance time:', t1 - t0) + logger.info('Variance time: %.5fs', t1 - t0) keep = cull.get_feature_names_out() logger.info('Keeping %d of %d columns with variance', len(keep), train_X.shape[1]) value = self.values.lookup_mapping("features.variance", None) @@ -840,7 +847,7 @@ def reduce_with_mutual_info(self, train_y, train_X): mi = mutual_info_regression(clean_X, clean_y, discrete_features=False) mi /= np.max(mi) # normalise based on largest value observed t1 = time.time() - logger.info('MutualInformation time:', t1 - t0) + logger.info('MutualInformation time: %.5f', t1 - t0) results = {} for i, column in enumerate(clean_X.columns): @@ -873,9 +880,9 @@ def prepare_split(self, target, notrain, splits=5): targets = [target] window = self.df logger.debug('Dimensionality reduction for training dataset') - logger.debug('Initial sample @', window.iloc[0]['timestamp']) - logger.debug('Initial shape:', window.shape) - logger.debug('Final sample @', window.iloc[-1]['timestamp']) + logger.debug('Initial sample @ %s', window.iloc[0]['timestamp']) + logger.debug('Initial shape: %s', window.shape) + logger.debug('Final sample @ %s', window.iloc[-1]['timestamp']) # ensure target metrics (y) values are valid window = window.dropna(subset=targets, ignore_index=True) @@ -899,6 +906,9 @@ def prepare_split(self, target, notrain, splits=5): # remove the original timestamp feature clean_X = clean_X.drop(columns=targets) + # automated feature reduction based on variance + clean_X = self.reduce_with_variance(clean_X) + # automated anomaly-based feature engineering quirk_X = self.anomaly_features(clean_X) logger.debug('quirk_X shape: %s', quirk_X.shape) @@ -907,10 +917,8 @@ def prepare_split(self, target, notrain, splits=5): clean_X = pd.merge(times_X, clean_X, left_index=True, right_index=True) clean_X = pd.merge(clean_X, quirk_X, left_index=True, right_index=True) - # automated feature reduction based on variance - clean_X = self.reduce_with_variance(clean_X) - # automated feature reduction based on mutual information + # NB: has side-effect of keeping self.mutualinfo values clean_X = self.reduce_with_mutual_info(clean_y, clean_X) # prepare for cross-validation over the training window diff --git a/src/treetop/pcp/TreeTop.c b/src/treetop/pcp/TreeTop.c index 34656f568c..5733ac1507 100644 --- a/src/treetop/pcp/TreeTop.c +++ b/src/treetop/pcp/TreeTop.c @@ -51,8 +51,8 @@ Platform* pcp; static const char* target = "disk.all.avactive"; static const char* ignore = "disk.all.aveq,disk.all.read,disk.all.blkread,disk.all.read_bytes,disk.all.total,disk.all.blktotal,disk.all.total_bytes,disk.all.write,disk.all.blkwrite,disk.all.write_bytes"; static size_t sample_count = 720; -static double sample_interval = 10; -static double training_interval = 1; +static double sample_interval = 15; +static double training_interval = 300; const ScreenDefaults Platform_defaultScreens[] = { { .name = "Model importance",