From f4170cceb2883fd3c311092bc67be174ad0974ce Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 5 Nov 2019 18:43:32 -0500 Subject: [PATCH] Logic of supervisor. --- docker-dev-canary/RouterTable.py | 96 +++++++++++++++++++++++++++++ docker-dev-canary/eval_model.py | 35 +++++++++++ docker-dev-canary/main.py | 63 +++++++++++++++---- docker-dev-canary/state_handlers.py | 1 - 4 files changed, 182 insertions(+), 13 deletions(-) create mode 100644 docker-dev-canary/RouterTable.py create mode 100644 docker-dev-canary/eval_model.py diff --git a/docker-dev-canary/RouterTable.py b/docker-dev-canary/RouterTable.py new file mode 100644 index 0000000..e9ec0a6 --- /dev/null +++ b/docker-dev-canary/RouterTable.py @@ -0,0 +1,96 @@ +import random + +""" +The RouterTable class. + +In the jargon, control is our baseline model. It can be the original model in this comparison. +Treatment is the new model that we want to test against the old model. +""" +class RouterTable: + def __init__(self, port): + # The port of our control model + self.control_port = port + # The port of our treat model + self.treat_port = -1 + # The mapping from user_id to self.CONTROL or self.TREAT + self.user_to_group = {} + # How many traffic are we directing into treat model + self.treat_percentage = 0 + + self.CONTROL = "CONTROL" + self.TREAT = "TREAT" + + def flush(self): + self.user_to_group = {} + self.treat_percentage = 0 + + def get_port_by_user_id(self, user_id): + if not self.is_in_test(): + return self.control_port + + # If we have not seen this user before, we first assign it to a group according to self.treat_percentage. + if user_id not in self.user_to_group: + self.assign_new_user(user_id) + + if self.user_to_group[user_id] == self.CONTROL: + return self.control_port + else: + return self.treat_port + + def is_user_in_treatment(self, user_id): + group = self.get_group_by_user_id(user_id) + if group == self.TREAT: + return True + return False + + def get_group_by_user_id(self, user_id): + if user_id not in self.user_to_group: + self.assign_new_user(user_id) + return self.user_to_group[user_id] + + """ + This is used in canary tests to incrementally increase the traffic. + However, do remember to be aware of the logging of data because for a particular user_id, + you may be using a different model than the last time. + """ + def set_treat_percentage(self, percentage): + self.user_to_group = {} + self.treat_percentage = percentage + + """ + For the user we have not seen before, we add him/her into our map. + """ + def assign_new_user(self, user_id): + r = random.random() + if r <= self.treat_percentage: + # It should be in treatment group. + self.user_to_group[user_id] = self.TREAT + else: + self.user_to_group[user_id] = self.CONTROL + + """ + This is used to setup a new treatment. + """ + def set_new_treatment(self, port, percentage): + self.treat_port = port + self.treat_percentage = percentage + self.user_to_group = {} + + """ + This is used when the test is successful. Then the treatment becomes the new control. + """ + def test_success(self): + self.control_port = self.treat_port + self.user_to_group = {} + self.treat_percentage = 0 + + """ + This is used when the test is failed. Then the control keeps being the control. + """ + def test_fail(self): + self.user_to_group = {} + self.treat_percentage = 0 + + def is_in_test(self): + return self.treat_percentage != 0 + diff --git a/docker-dev-canary/eval_model.py b/docker-dev-canary/eval_model.py new file mode 100644 index 0000000..fa6edbe --- /dev/null +++ b/docker-dev-canary/eval_model.py @@ -0,0 +1,35 @@ +import pandas as pd + +""" +This function takes the data that a specific model produces and tell whether it is a good model or not. + +Parameters: + df: The data that a particular model produces. It should contains following columns. + user_id: The id of the user. + response: The recommendation that we give back. + timestamp: The timestamp of the request. (or response) + req_percentage: Over this threshold, the model is successful. + time_window: The time window that a user come back. This is in minutes. +Return: boolean. Determine whether certain percentages of users use the recommendation again in a time window. +""" +def eval_model(df, req_percentage=0.2, time_window=20): + total_requests = df.shape[0] + return (num_following_req(df, time_window) / total_requests) >= req_percentage + +""" +Return the number of requests that are in the time_window of the last request. This can be included in our report. +""" +def num_following_req(df, time_window=20): + time_window *= 60 + count_successful = 0 + for user_id, group in df.groupby('user_id'): + timestamps = group['timestamp'].tolist() + for i in range(1, len(timestamps)): + if (timestamps[i] - timestamps[i-1] <= time_window): + count_successful += 1 + return count_successful + +# if __name__ == "__main__": +# data = {'user_id': [1, 2, 3, 4, 1, 1, 3, 5, 2], 'timestamp': [1, 8, 10, 20, 23, 26, 29, 50, 88]} +# df = pd.DataFrame.from_dict(data) +# print(eval_model(df, 0.2, 20)) \ No newline at end of file diff --git a/docker-dev-canary/main.py b/docker-dev-canary/main.py index b12044e..0be33ee 100644 --- a/docker-dev-canary/main.py +++ b/docker-dev-canary/main.py @@ -1,8 +1,14 @@ from flask import Flask from flask import request +import requests +import time +import pandas as pd + from traffic_handlers import * +from state_handlers import * +from RouterTable import RouterTable +from eval_model import eval_model -import requests app = Flask(__name__) @@ -12,7 +18,10 @@ with an error code if it cannot do so. """ start_system() -print(TEST_REQUEST_COUNT) +# print(TEST_REQUEST_COUNT) +ROUTER = RouterTable(BASE_PORT) +CONFIG_EXPERIMENTS = {} +EXPERIMENT_LOG = {'user_id': [], 'timestamp': []} @app.route('/') @@ -22,29 +31,59 @@ def index(): @app.route('/recommend/') def recommend(user_id): + global CONFIG_EXPERIMENTS, EXPERIMENT_LOG try: user_id = int(user_id) except: return 'Invalid user ID entered' - # Simple production state, simply send all traffic to the production node - if SYSTEM_STATE == 0: - return handle_production_traffic(user_id) - else: - # Keep track of counter and stop here - # Report, email - TEST_REQUEST_COUNT += 1 - return handle_mixed_traffic(user_id) + req_time = time.time() + + # If an experiment is ongoing + if len(CONFIG_EXPERIMENTS) > 0: + done = False + if CONFIG_EXPERIMENTS['DeploymentType'] == 'A/BTest': + duration = CONFIG_EXPERIMENTS['Duration'] + if req_time - CONFIG_EXPERIMENTS['StartTime'] >= duration: + # Test ends. + done = True + elif CONFIG_EXPERIMENTS['DeploymentType'] == 'CanaryTest': + start_time = CONFIG_EXPERIMENTS['StartTime'] + step = CONFIG_EXPERIMENTS['Step'] + interval = CONFIG_EXPERIMENTS['interval'] + percentage = (req_time - start_time) / interval * step + if percentage > 1: + # Test ends + done = True + else: + # Change the percentage accordingly + ROUTER.set_treat_percentage(percentage) + if done: + # Test ends. + isValid = eval_model(pd.DataFrame.from_dict(EXPERIMENT_LOG)) + EXPERIMENT_LOG = {'user_id': [], 'timestamp': []} + CONFIG_EXPERIMENTS = {} + ROUTER.flush() + # TODO: Shut down and open up containers accordingly + # TODO: Calling email API - # TODO: somehow record this response - # TODO: report(experimentType) -> boolean: isValid + if ROUTER.is_user_in_treatment(user_id): + EXPERIMENT_LOG['user_id'].append(user_id) + EXPERIMENT_LOG['timestamp'].append(req_time) + return handle_test_traffic(user_id) + else: + return handle_production_traffic(user_id) @app.route('/test', methods=['GET', 'POST']) def test(): + global CONFIG_EXPERIMENTS test_conf = request.json isSuccessful, config = parse_test_config(test_conf) if isSuccessful: + CONFIG_EXPERIMENTS = config + CONFIG_EXPERIMENTS['StartTime'] = time.time() + ROUTER.set_new_treatment(BASE_PORT + 2, config['ModelInfo']['ExpPercentage']) start_test(config["ModelInfo"]["ModelContainerName"], 8082, BASE_PORT + 2) print(config["ModelInfo"]["ModelContainerName"]) return 'Test started successfully' diff --git a/docker-dev-canary/state_handlers.py b/docker-dev-canary/state_handlers.py index 93af306..09e84b1 100644 --- a/docker-dev-canary/state_handlers.py +++ b/docker-dev-canary/state_handlers.py @@ -46,7 +46,6 @@ def start_system(): def is_test_running(): return number_of_containers_running() == 2 -# TODO: change router table accordingly def start_test(test_image, host_port, container_port): if is_test_running(): return True