Skip to content

Commit

Permalink
Logic of supervisor.
Browse files Browse the repository at this point in the history
  • Loading branch information
a-w-1806 committed Nov 5, 2019
1 parent b77be80 commit f4170cc
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 13 deletions.
96 changes: 96 additions & 0 deletions docker-dev-canary/RouterTable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import random

"""
The RouterTable class.
In the jargon, control is our baseline model. It can be the original model in this comparison.
Treatment is the new model that we want to test against the old model.
"""
class RouterTable:
def __init__(self, port):
# The port of our control model
self.control_port = port
# The port of our treat model
self.treat_port = -1
# The mapping from user_id to self.CONTROL or self.TREAT
self.user_to_group = {}
# How many traffic are we directing into treat model
self.treat_percentage = 0

self.CONTROL = "CONTROL"
self.TREAT = "TREAT"

def flush(self):
self.user_to_group = {}
self.treat_percentage = 0

def get_port_by_user_id(self, user_id):
if not self.is_in_test():
return self.control_port

# If we have not seen this user before, we first assign it to a group according to self.treat_percentage.
if user_id not in self.user_to_group:
self.assign_new_user(user_id)

if self.user_to_group[user_id] == self.CONTROL:
return self.control_port
else:
return self.treat_port

def is_user_in_treatment(self, user_id):
group = self.get_group_by_user_id(user_id)
if group == self.TREAT:
return True
return False

def get_group_by_user_id(self, user_id):
if user_id not in self.user_to_group:
self.assign_new_user(user_id)
return self.user_to_group[user_id]

"""
This is used in canary tests to incrementally increase the traffic.
However, do remember to be aware of the logging of data because for a particular user_id,
you may be using a different model than the last time.
"""
def set_treat_percentage(self, percentage):
self.user_to_group = {}
self.treat_percentage = percentage

"""
For the user we have not seen before, we add him/her into our map.
"""
def assign_new_user(self, user_id):
r = random.random()
if r <= self.treat_percentage:
# It should be in treatment group.
self.user_to_group[user_id] = self.TREAT
else:
self.user_to_group[user_id] = self.CONTROL

"""
This is used to setup a new treatment.
"""
def set_new_treatment(self, port, percentage):
self.treat_port = port
self.treat_percentage = percentage
self.user_to_group = {}

"""
This is used when the test is successful. Then the treatment becomes the new control.
"""
def test_success(self):
self.control_port = self.treat_port
self.user_to_group = {}
self.treat_percentage = 0

"""
This is used when the test is failed. Then the control keeps being the control.
"""
def test_fail(self):
self.user_to_group = {}
self.treat_percentage = 0

def is_in_test(self):
return self.treat_percentage != 0

35 changes: 35 additions & 0 deletions docker-dev-canary/eval_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pandas as pd

"""
This function takes the data that a specific model produces and tell whether it is a good model or not.
Parameters:
df: The data that a particular model produces. It should contains following columns.
user_id: The id of the user.
response: The recommendation that we give back.
timestamp: The timestamp of the request. (or response)
req_percentage: Over this threshold, the model is successful.
time_window: The time window that a user come back. This is in minutes.
Return: boolean. Determine whether certain percentages of users use the recommendation again in a time window.
"""
def eval_model(df, req_percentage=0.2, time_window=20):
total_requests = df.shape[0]
return (num_following_req(df, time_window) / total_requests) >= req_percentage

"""
Return the number of requests that are in the time_window of the last request. This can be included in our report.
"""
def num_following_req(df, time_window=20):
time_window *= 60
count_successful = 0
for user_id, group in df.groupby('user_id'):
timestamps = group['timestamp'].tolist()
for i in range(1, len(timestamps)):
if (timestamps[i] - timestamps[i-1] <= time_window):
count_successful += 1
return count_successful

# if __name__ == "__main__":
# data = {'user_id': [1, 2, 3, 4, 1, 1, 3, 5, 2], 'timestamp': [1, 8, 10, 20, 23, 26, 29, 50, 88]}
# df = pd.DataFrame.from_dict(data)
# print(eval_model(df, 0.2, 20))
63 changes: 51 additions & 12 deletions docker-dev-canary/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from flask import Flask
from flask import request
import requests
import time
import pandas as pd

from traffic_handlers import *
from state_handlers import *
from RouterTable import RouterTable
from eval_model import eval_model

import requests

app = Flask(__name__)

Expand All @@ -12,7 +18,10 @@
with an error code if it cannot do so.
"""
start_system()
print(TEST_REQUEST_COUNT)
# print(TEST_REQUEST_COUNT)
ROUTER = RouterTable(BASE_PORT)
CONFIG_EXPERIMENTS = {}
EXPERIMENT_LOG = {'user_id': [], 'timestamp': []}


@app.route('/')
Expand All @@ -22,29 +31,59 @@ def index():

@app.route('/recommend/<user_id>')
def recommend(user_id):
global CONFIG_EXPERIMENTS, EXPERIMENT_LOG
try:
user_id = int(user_id)
except:
return 'Invalid user ID entered'

# Simple production state, simply send all traffic to the production node
if SYSTEM_STATE == 0:
return handle_production_traffic(user_id)
else:
# Keep track of counter and stop here
# Report, email
TEST_REQUEST_COUNT += 1
return handle_mixed_traffic(user_id)
req_time = time.time()

# If an experiment is ongoing
if len(CONFIG_EXPERIMENTS) > 0:
done = False
if CONFIG_EXPERIMENTS['DeploymentType'] == 'A/BTest':
duration = CONFIG_EXPERIMENTS['Duration']
if req_time - CONFIG_EXPERIMENTS['StartTime'] >= duration:
# Test ends.
done = True
elif CONFIG_EXPERIMENTS['DeploymentType'] == 'CanaryTest':
start_time = CONFIG_EXPERIMENTS['StartTime']
step = CONFIG_EXPERIMENTS['Step']
interval = CONFIG_EXPERIMENTS['interval']
percentage = (req_time - start_time) / interval * step
if percentage > 1:
# Test ends
done = True
else:
# Change the percentage accordingly
ROUTER.set_treat_percentage(percentage)
if done:
# Test ends.
isValid = eval_model(pd.DataFrame.from_dict(EXPERIMENT_LOG))
EXPERIMENT_LOG = {'user_id': [], 'timestamp': []}
CONFIG_EXPERIMENTS = {}
ROUTER.flush()
# TODO: Shut down and open up containers accordingly
# TODO: Calling email API

# TODO: somehow record this response
# TODO: report(experimentType) -> boolean: isValid
if ROUTER.is_user_in_treatment(user_id):
EXPERIMENT_LOG['user_id'].append(user_id)
EXPERIMENT_LOG['timestamp'].append(req_time)
return handle_test_traffic(user_id)
else:
return handle_production_traffic(user_id)


@app.route('/test', methods=['GET', 'POST'])
def test():
global CONFIG_EXPERIMENTS
test_conf = request.json
isSuccessful, config = parse_test_config(test_conf)
if isSuccessful:
CONFIG_EXPERIMENTS = config
CONFIG_EXPERIMENTS['StartTime'] = time.time()
ROUTER.set_new_treatment(BASE_PORT + 2, config['ModelInfo']['ExpPercentage'])
start_test(config["ModelInfo"]["ModelContainerName"], 8082, BASE_PORT + 2)
print(config["ModelInfo"]["ModelContainerName"])
return 'Test started successfully'
Expand Down
1 change: 0 additions & 1 deletion docker-dev-canary/state_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def start_system():
def is_test_running():
return number_of_containers_running() == 2

# TODO: change router table accordingly
def start_test(test_image, host_port, container_port):
if is_test_running():
return True
Expand Down

0 comments on commit f4170cc

Please sign in to comment.