Skip to content

Commit

Permalink
treetop: optimise dataframe setup for much faster sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
natoscott committed Oct 3, 2024
1 parent c5b0de2 commit 1df352d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 39 deletions.
82 changes: 45 additions & 37 deletions src/treetop/notebooks/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class TreetopServer():
_sample_interval = 15 # metrics sampling interval (seconds)
_sample_valueset = 128 # count of raw sample values exported
_training_count = 0 # number of times training was performed
_training_interval = 10 # server training interval (seconds)
_training_delay = 2.5 # server inter-training delay (seconds)
_training_interval = 300 # server training interval (seconds)
_importance_type = 'gain' # default feature importance measure
_variance_threshold = 0.125 # minimum level of feature variance
_mutualinfo_threshold = 0.125 # minimum target mutual information
Expand Down Expand Up @@ -183,7 +184,12 @@ def __init__(self):

# dataset
self.df = None
self.columns = []
self.reset()

def reset(self):
self.data = [] # list of dicts
self.index = [] # list of ints
self.dtypes = {} # dict of name:type

def options(self):
""" Setup default command line argument option handling """
Expand Down Expand Up @@ -500,7 +506,7 @@ def settings(self):
# logger.debug('RESET NE:', str(datetime.fromtimestamp(self.sample_time())), 'vs', str(datetime.fromtimestamp(self.timestamp)))
# reset = 'full'
else:
self.timestamp += self._sample_interval
self.timestamp += self._training_interval
logger.debug('STEP FWD: %s', str(datetime.fromtimestamp(self.timestamp)))
self.timestamp_s = str(datetime.fromtimestamp(self.timestamp))

Expand All @@ -522,12 +528,12 @@ def settings(self):
value = values.lookup_mapping("training.window", None)
values.set(value, timewindow)

logger.info("Training interval:", self._training_interval)
logger.info("Sample interval:", self._sample_interval)
logger.info("Sample count:", self._sample_count)
logger.info("Training interval: %d", self._training_interval)
logger.info("Sample interval: %d", self._sample_interval)
logger.info("Sample count: %d", self._sample_count)
logger.info("Timestamp: %s - %s", self.timestamp_s, self.timestamp)
logger.info("Target metric:", self.target())
logger.info("Filter metrics:", self.filter().split())
logger.info("Target metric: %s", self.target())
logger.info("Filter metrics: %s", self.filter().split())
logger.info("Start time: %s - %s", datetime.fromtimestamp(self.start_time), self.start_time)
logger.info("End time: %s - %s", datetime.fromtimestamp(self.end_time), self.end_time)
logger.info("Total time: %.5f seconds" % (self.end_time - self.start_time))
Expand Down Expand Up @@ -604,16 +610,17 @@ def connect(self):
self.client.fetch()
logger.info('Client fetched')
if not self.logfile or self.logfile != self.dataset():
self.logfile = self.dataset()
self.source = None
if not self.source and not self.source_connect():
return False # cannot proceed further, error reported
return True

def sleep(self):
logger.info('Sleep %.1f at timestep %s - %s', self._training_interval, self.timestamp_s, self.timestamp)
logger.info('Sleep %.1f at timestep %s - %s', self._training_delay, self.timestamp_s, self.timestamp)
value = self.values.lookup_mapping("processing.state", None)
self.values.set_string(value, "waiting");
time.sleep(self._training_interval)
time.sleep(self._training_delay)

def elapsed(self, start_time, metric_name):
elapsed_time = time.time() - start_time
Expand Down Expand Up @@ -658,10 +665,8 @@ def refresh(self):

def prepare_dataset(self, reset='full'):
# refresh from the metrics source to form a pandas dataframe

#if reset == 'full' and self.df is not None:
# self.df = None
self.df = None
#if reset != 'full':
# ...append live sample to existing dataset

origin = pmapi.timespec(self.timestamp)
delta = pmapi.timespec(self.sample_interval())
Expand All @@ -680,13 +685,15 @@ def prepare_dataset(self, reset='full'):
result = self.pmconfig.get_ranked_results(valid_only=True)
self.append_sample(count, result)

self.df = pd.DataFrame(data=self.data, index=self.index)
self.df = self.df.replace(b'', None) # from .loc
self.df = self.df.astype(self.dtypes, copy=False)
self.df = self.df.reindex(columns=self.dtypes.keys())
#logger.debug('Reindexed dataframe, shape:', self.df.shape)
#logger.debug('Columns', self.df.columns[:10])
#logger.debug('Dtypes', list(self.df.dtypes)[:10])
#logger.debug('Values', self.df[:10])
logger.debug('Reindexed dataframe, shape: %s', self.df.shape)
#logger.debug('Columns %s', self.df.columns[:10])
#logger.debug('Dtypes %s', list(self.df.dtypes)[:10])
#logger.debug('Values %s', self.df[:10].describe())
self.reset() # drop interim containers and values

def lookup_dtype(self, desc):
""" Map the appropriate pandas type for a metric descriptor """
Expand All @@ -699,17 +706,16 @@ def lookup_dtype(self, desc):
return 'float64'
return None

def update_dataset(self, index, names, dtypes, values):
if self.df is None:
self.df = pd.DataFrame(columns=names)
def update_dataset(self, index, dtypes, values):
if self.dtypes is None:
self.dtypes = dtypes # the 1st dictionary seen
else:
self.dtypes.update(dtypes) # add to dictionary
# insert this sample at the specified offset (index)
self.df.loc[index, names] = values
self.data.append(values)
self.index.append(index)

def append_sample(self, index, result):
names, values, dtypes = ['timestamp'], {}, {}
values, dtypes = {}, {}
values['timestamp'] = self.pmfg_ts()
dtypes['timestamp'] = 'datetime64[ns]'

Expand All @@ -723,7 +729,6 @@ def append_sample(self, index, result):

# Handle the simpler no-instance-domain case first
if desc.indom == PM_INDOM_NULL:
names.append(metric) # column name
dtypes[metric] = dtype
values[metric] = result[metric][0][2]
continue
Expand All @@ -733,11 +738,10 @@ def append_sample(self, index, result):
# ensure we meet xgboost column name rules
instname = re.sub(r'\[|\]|<', '', instname)
metricspec = metric + '(' + instname + ')'
names.append(metricspec) # column name
dtypes[metricspec] = dtype
values[metricspec] = value

self.update_dataset(index, names, dtypes, values)
self.update_dataset(index, dtypes, values)

def export_values(self, target, window):
""" export dataset metrics, including the valueset metric """
Expand Down Expand Up @@ -807,8 +811,8 @@ def anomaly_features(self, df):
anomalies_df = self.top_anomaly_features(iso, y_pred_diffi, df0,
self._max_anomaly_features)
t1 = time.time()
logger.info('Anomaly time:', t1 - t0)
logger.info('Anomaly features:', len(anomalies_df.columns))
logger.info('Anomaly time: %.5s', t1 - t0)
logger.info('Anomaly features: %d', len(anomalies_df.columns))
value = self.values.lookup_mapping("features.anomalies", None)
self.values.set(value, len(anomalies_df))
return anomalies_df
Expand All @@ -821,8 +825,11 @@ def reduce_with_variance(self, train_X):
cull.fit(train_X)
except ValueError:
return train_X # no columns met criteria, training will struggle
except RuntimeError as error:
logger.warning("reduce_with_variance %s, error %s", train_X.shape, error)
return train_X
t1 = time.time()
logger.info('Variance time:', t1 - t0)
logger.info('Variance time: %.5fs', t1 - t0)
keep = cull.get_feature_names_out()
logger.info('Keeping %d of %d columns with variance', len(keep), train_X.shape[1])
value = self.values.lookup_mapping("features.variance", None)
Expand All @@ -840,7 +847,7 @@ def reduce_with_mutual_info(self, train_y, train_X):
mi = mutual_info_regression(clean_X, clean_y, discrete_features=False)
mi /= np.max(mi) # normalise based on largest value observed
t1 = time.time()
logger.info('MutualInformation time:', t1 - t0)
logger.info('MutualInformation time: %.5f', t1 - t0)

results = {}
for i, column in enumerate(clean_X.columns):
Expand Down Expand Up @@ -873,9 +880,9 @@ def prepare_split(self, target, notrain, splits=5):
targets = [target]
window = self.df
logger.debug('Dimensionality reduction for training dataset')
logger.debug('Initial sample @', window.iloc[0]['timestamp'])
logger.debug('Initial shape:', window.shape)
logger.debug('Final sample @', window.iloc[-1]['timestamp'])
logger.debug('Initial sample @ %s', window.iloc[0]['timestamp'])
logger.debug('Initial shape: %s', window.shape)
logger.debug('Final sample @ %s', window.iloc[-1]['timestamp'])

# ensure target metrics (y) values are valid
window = window.dropna(subset=targets, ignore_index=True)
Expand All @@ -899,6 +906,9 @@ def prepare_split(self, target, notrain, splits=5):
# remove the original timestamp feature
clean_X = clean_X.drop(columns=targets)

# automated feature reduction based on variance
clean_X = self.reduce_with_variance(clean_X)

# automated anomaly-based feature engineering
quirk_X = self.anomaly_features(clean_X)
logger.debug('quirk_X shape: %s', quirk_X.shape)
Expand All @@ -907,10 +917,8 @@ def prepare_split(self, target, notrain, splits=5):
clean_X = pd.merge(times_X, clean_X, left_index=True, right_index=True)
clean_X = pd.merge(clean_X, quirk_X, left_index=True, right_index=True)

# automated feature reduction based on variance
clean_X = self.reduce_with_variance(clean_X)

# automated feature reduction based on mutual information
# NB: has side-effect of keeping self.mutualinfo values
clean_X = self.reduce_with_mutual_info(clean_y, clean_X)

# prepare for cross-validation over the training window
Expand Down
4 changes: 2 additions & 2 deletions src/treetop/pcp/TreeTop.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Platform* pcp;
static const char* target = "disk.all.avactive";
static const char* ignore = "disk.all.aveq,disk.all.read,disk.all.blkread,disk.all.read_bytes,disk.all.total,disk.all.blktotal,disk.all.total_bytes,disk.all.write,disk.all.blkwrite,disk.all.write_bytes";
static size_t sample_count = 720;
static double sample_interval = 10;
static double training_interval = 1;
static double sample_interval = 15;
static double training_interval = 300;

const ScreenDefaults Platform_defaultScreens[] = {
{ .name = "Model importance",
Expand Down

0 comments on commit 1df352d

Please sign in to comment.