-
Notifications
You must be signed in to change notification settings - Fork 1
/
workers.py
130 lines (108 loc) · 5.1 KB
/
workers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from config import Config as config
from job_states import JobStates
from utils import *
import redis
from rq import get_current_job
import time
import json
import sys
import random
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import requests
POOL = redis.ConnectionPool(host=config.redis_host, port=config.redis_port, db=0)
def _evaluate(predicted_heights, true_heights, context):
"""
takes a list of predicted heights and true heights and computes the score
"""
score = 0
secondary_score = 0
"""
for k in range(100):
time.sleep(random.randint(0,100)/1000.0)
percent_complete = k*1.0/100 * 100
update_progress(context, percent_complete, "")
# print "Context Response Channel ::: ", context['response_channel']
# if k%20==0:
# # print "Update : ", percent_complete
score += random.randint(1,100)*1.0/0.7 / 100
secondary_score += random.randint(1,100)*1.0/0.7 / 100
"""
predicted_heights = np.array(predicted_heights)
true_heights = np.array(true_heights)
if predicted_heights.shape != true_heights.shape:
raise Exception("Wrong number of predictions provided. Expected a variable of shape %s, but received a variable of shape %s instead" % ( str(true_heights.shape), str(predicted_heights.shape) ))
score = r2_score(true_heights, predicted_heights)
secondary_score = mean_squared_error(true_heights, predicted_heights)
_result_object = {
"score" : score,
"score_secondary" : secondary_score,
}
return _result_object
def _submit(predicted_heights, true_heights, context):
"""
takes a list of predicted heights and actual heights and computes the score
and prepares the plots for submission to the leaderboard
"""
#_result_object = _evaluate(predicted_heights[:50], true_heights[:50], context)
_result_object = _evaluate(predicted_heights, true_heights, context)
#_result_object["comment"] = ""
#_result_object["media_large"] = "https://upload.wikimedia.org/wikipedia/commons/4/44/Drift_Diffusion_Model_Accumulation_to_Threshold_Example_Graphs.png"
#_result_object["media_thumbnail"] = "https://upload.wikimedia.org/wikipedia/commons/4/44/Drift_Diffusion_Model_Accumulation_to_Threshold_Example_Graphs.png"
#_result_object["media_content_type"] = "image/jpeg"
# Upload result to crowdai
headers = {'Authorization' : 'Token token='+config.CROWDAI_TOKEN, "Content-Type":"application/vnd.api+json"}
_payload = _result_object
_payload['challenge_client_name'] = config.challenge_id
_payload['api_key'] = context['api_key']
_payload['grading_status'] = 'graded'
_payload['comment'] = "" #TODO: Allow participants to add comments to submissions
print "Making POST request...."
r = requests.post(config.CROWDAI_GRADER_URL, params=_payload, headers=headers, verify=False)
print "Status Code : ",r.status_code
if r.status_code == 202:
data = json.loads(r.text)
submission_id = str(data['submission_id'])
_payload['data'] = predicted_heights
context['redis_conn'].set(config.challenge_id+"::submissions::"+submission_id, json.dumps(_payload))
pass #TODO: Add success message
else:
raise Exception(str(r.text))
return _result_object
def _update_job_event(_context, data):
"""
Helper function to serialize JSON
and make sure redis doesnt messup JSON validation
"""
redis_conn = _context['redis_conn']
response_channel = _context['response_channel']
data['data_sequence_no'] = _context['data_sequence_no']
redis_conn.rpush(response_channel, json.dumps(data))
def job_execution_wrapper(data):
redis_conn = redis.Redis(connection_pool=POOL)
job = get_current_job()
_context = {}
_context['redis_conn'] = redis_conn
_context['response_channel'] = data['broker_response_channel']
_context['job_id'] = job.id
_context['data_sequence_no'] = data['data_sequence_no']
_context['api_key'] = data['extra_params']['api_key']
# Register Job Running event
_update_job_event(_context, job_running_template(_context['data_sequence_no'], job.id))
result = {}
try:
if data["function_name"] == "submit":
# Run the job
true_heights = np.load("test_heights.npy")
result = _submit(data["data"], true_heights, _context)
# Register Job Complete event
_update_job_event(_context, job_info_template(_context, "Scores Submitted Successfully ! Coefficient of Determination(R^2) : %s ; MSE : %s" % (str(result['score']), str(result['score_secondary'])) ))
_update_job_event(_context, job_complete_template(_context, result))
else:
_error_object = job_error_template(job.id, "Function not implemented error")
_update_job_event(_context, job_error_template(job.id, result))
result = _error_object
except Exception as e:
_error_object = job_error_template(_context['data_sequence_no'], job.id, str(e))
_update_job_event(_context, _error_object)
return result