Skip to content

Commit

Permalink
repin master off develop
Browse files Browse the repository at this point in the history
  • Loading branch information
DGaffney committed Jul 11, 2024
2 parents 8e338d7 + 0c1c8a8 commit d471cf0
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 101 deletions.
20 changes: 12 additions & 8 deletions app/main/controller/presto_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ class PrestoResource(Resource):
@api.doc('Receive a presto callback for a given `model_type`')
def post(self, action, model_type):
data = request.json
r = redis_client.get_client()
item_id = data.get("body", {}).get("id")
r.lpush(f"{model_type}_{item_id}", json.dumps(data))
r.expire(f"{model_type}_{item_id}", 60*60*24)
app.logger.info(f"PrestoResource {action}/{model_type}")
return_value = None
if action == "add_item":
app.logger.info(f"Data looks like {data}")
result = similarity.callback_add_item(data.get("body"), model_type)
Expand All @@ -41,8 +39,14 @@ def post(self, action, model_type):
Webhook.return_webhook(callback_url, action, model_type, result)
output = {"action": action, "model_type": model_type, "data": result}
app.logger.info(f"PrestoResource value is {output}")
return {"action": action, "model_type": model_type, "data": result}
abort(
404,
description=f"Action type of {action} was not found. Currently available action types are add_item, search_item."
)
return_value = {"action": action, "model_type": model_type, "data": result}
r = redis_client.get_client()
r.lpush(f"{model_type}_{item_id}", json.dumps(data))
r.expire(f"{model_type}_{item_id}", 60*60*24)
if return_value:
return return_value
else:
abort(
404,
description=f"Action type of {action} was not found. Currently available action types are add_item, search_item."
)
1 change: 1 addition & 0 deletions app/main/controller/similarity_async_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
'text': fields.String(required=False, description='text to be stored or queried for similarity'),
'url': fields.String(required=False, description='url for item to be stored or queried for similarity'),
'callback_url': fields.String(required=False, description='callback_url for final search results'),
'content_hash': fields.String(required=False, description='Content hash for checking for cached Presto Response'),
'doc_id': fields.String(required=False, description='text ID to constrain uniqueness'),
'models': fields.List(required=False, description='similarity models to use: ["elasticsearch"] (pure Elasticsearch, default) or the key name of an active model', cls_or_instance=fields.String),
'language': fields.String(required=False, description='language code for the analyzer to use during the similarity query (defaults to standard analyzer)'),
Expand Down
6 changes: 5 additions & 1 deletion app/main/lib/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ def get_similar_items_callback_url(alegre_host, similarity_type):
def send_request(presto_host, model_key, callback_url, message, requires_callback=True):
data = {
"callback_url": callback_url,
"id": message.get("doc_id", str(uuid.uuid4())),
"content_hash": message.get("content_hash"),
"url": message.get("url"),
"text": message.get("text"),
"raw": message,
"requires_callback": requires_callback
}
if message.get("doc_id"):
data["id"] = message.get("doc_id")
else:
data["id"] = str(uuid.uuid4())
headers = {"Content-Type": "application/json"}
json_data = json.dumps(data, default=safe_serializer)
return requests.post(f"{presto_host}/process_item/{model_key}", data=json_data, headers=headers)
Expand Down
113 changes: 59 additions & 54 deletions app/main/lib/shared_models/video_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,65 +111,70 @@ def search_by_context(self, context):
raise e

def search(self, task):
body, threshold, limit = media_crud.parse_task_search(task)
video, temporary = media_crud.get_object(body, Video)
if video.hash_value is None:
callback_url = Presto.add_item_callback_url(app.config['ALEGRE_HOST'], "video")
if task.get("doc_id") is None:
task["doc_id"] = str(uuid.uuid4())
response = json.loads(Presto.send_request(app.config['PRESTO_HOST'], "video__Model", callback_url, task, False).text)
# Warning: this is a blocking hold to wait until we get a response in
# a redis key that we've received something from presto.
result = Presto.blocked_response(response, "video")
video.hash_value = result.get("body", {}).get("result", {}).get("hash_value")
if video:
if self.tmk_file_exists(video):
matches = self.search_by_context(body["context"])
default_list = list(np.zeros(len(video.hash_value)))
try:
l1_scores = np.ndarray.flatten((1-distance.cdist([r.get("hash_value", default_list) or default_list for r in matches], [video.hash_value], 'cosine'))).tolist()
except:
app.logger.info('L1 scoring failed while running search for video id of '+str(video.id)+' match ids of : '+str([e.get("id") for e in matches]))
l1_scores = [0.0 for e in matches]
qualified_matches = []
for i,match in enumerate(matches):
if l1_scores[i] > app.config['VIDEO_MODEL_L1_SCORE']:
qualified_matches.append(match)
files = self.get_fullpath_files(qualified_matches, False)
try:
temporary = False
try:
body, threshold, limit = media_crud.parse_task_search(task)
video, temporary = media_crud.get_object(body, Video)
if video.hash_value is None:
callback_url = Presto.add_item_callback_url(app.config['ALEGRE_HOST'], "video")
if task.get("doc_id") is None:
task["doc_id"] = str(uuid.uuid4())
response = json.loads(Presto.send_request(app.config['PRESTO_HOST'], "video__Model", callback_url, task, False).text)
# Warning: this is a blocking hold to wait until we get a response in
# a redis key that we've received something from presto.
result = Presto.blocked_response(response, "video")
video.hash_value = result.get("body", {}).get("result", {}).get("hash_value")

matches = self.search_by_context(body["context"])
default_list = list(np.zeros(len(video.hash_value)))
try:
l1_scores = np.ndarray.flatten((1-distance.cdist([r.get("hash_value", default_list) or default_list for r in matches], [video.hash_value], 'cosine'))).tolist()
except:
app.logger.info('L1 scoring failed while running search for video id of '+str(video.id)+' match ids of : '+str([e.get("id") for e in matches]))
l1_scores = [0.0 for e in matches]
qualified_matches = []
for i,match in enumerate(matches):
if l1_scores[i] > app.config['VIDEO_MODEL_L1_SCORE']:
qualified_matches.append(match)
files = self.get_fullpath_files(qualified_matches, False)
try:
if self.tmk_file_exists(video):
scores = tmkpy.query(media_crud.tmk_file_path(video.folder, video.filepath),files,1)
except Exception as err:
ErrorLog.notify(err, {"video_folder": video.folder, "video_filepath": video.filepath, "files": files, "video_id": video.id, "task": task})
raise err
threshold = float(task.get("threshold", 0.0) or 0.0)
results = []
for i,score in enumerate(scores):
if score > threshold:
results.append({
"context": qualified_matches[i].get("context", {}),
"folder": qualified_matches[i].get("folder"),
"filepath": qualified_matches[i].get("filepath"),
"doc_id": qualified_matches[i].get("doc_id"),
"url": qualified_matches[i].get("url"),
"filename": files[i],
"score": score,
"model": "video"
})
if temporary:
self.delete(task)
limit = task.get("limit")
if limit:
return {"result": results[:limit]}
else:
return {"result": results}
ErrorLog.notify(Exception("Failed to locate needle for a video!"), {"video_folder": video.folder, "video_filepath": video.filepath, "video_id": video.id, "task": task})
return {"error": "Video not found for provided task", "task": task}
except Exception as err:
ErrorLog.notify(err, {"video_folder": video.folder, "video_filepath": video.filepath, "files": files, "video_id": video.id, "task": task})
raise err
threshold = float(task.get("threshold", 0.0) or 0.0)
results = []
for i,score in enumerate(scores):
if score > threshold:
results.append({
"context": qualified_matches[i].get("context", {}),
"folder": qualified_matches[i].get("folder"),
"filepath": qualified_matches[i].get("filepath"),
"doc_id": qualified_matches[i].get("doc_id"),
"url": qualified_matches[i].get("url"),
"filename": files[i],
"score": score,
"model": "video"
})
limit = task.get("limit")
if limit:
return {"result": results[:limit]}
else:
ErrorLog.notify(Exception("Failed to locate needle for a video!"), {"video_folder": video.folder, "video_filepath": video.filepath, "video_id": video.id, "task": task})
return {"error": "Video not found for provided task", "task": task}
else:
return {"error": "Video not found for provided task", "task": task}
return {"result": results}
except Exception as err:
ErrorLog.notify(err, {"task": task})
raise err
finally:
if temporary:
self.delete(task)

def tmk_file_exists(self, video):
return os.path.exists(media_crud.tmk_file_path(video.folder, video.filepath))
file_path = media_crud.tmk_file_path(video.folder, video.filepath)
return os.path.exists(file_path) and os.path.getsize(file_path) > 0

def tmk_program_name(self):
return "AlegreVideoEncoder"
Expand Down
3 changes: 2 additions & 1 deletion app/main/lib/similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_body_for_text_document(params, mode):
params['content'] = None

if mode == 'store':
allow_list = set(['language', 'content', 'created_at', 'models', 'context', 'callback_url'])
allow_list = set(['language', 'content', 'created_at', 'models', 'context', 'callback_url', 'content_hash'])
keys_to_remove = params.keys() - allow_list
app.logger.info(
f"[Alegre Similarity] get_body_for_text_document:running in `store' mode. Removing {keys_to_remove}")
Expand All @@ -85,6 +85,7 @@ def model_response_package(item, command):
"limit": item.get("limit", DEFAULT_SEARCH_LIMIT) or DEFAULT_SEARCH_LIMIT,
"url": item.get("url"),
"callback_url": item.get("callback_url"),
"content_hash": item.get("content_hash"),
"doc_id": item.get("doc_id"),
"context": item.get("context", {}),
"created_at": item.get("created_at"),
Expand Down
71 changes: 35 additions & 36 deletions app/main/lib/text_similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,42 +90,41 @@ def get_elasticsearch_base_conditions(search_params, clause_count, threshold):
return conditions

def get_vector_model_base_conditions(search_params, model_key, threshold):
if "vector" in search_params:
vector = search_params["vector"]
elif model_key[:len(PREFIX_OPENAI)] == PREFIX_OPENAI:
vector = retrieve_openai_embeddings(search_params['content'], model_key)
if vector == None:
return None
else:
model = SharedModel.get_client(model_key)
vector = model.get_shared_model_response(search_params['content'])
return {
'query': {
'script_score': {
'min_score': float(threshold)+1,
'query': {
'bool': {
'must': [
{
'match': {
'model_'+str(model_key): {
'query': "1",
}
}
}
]
}
},
'script': {
'source': "cosineSimilarity(params.query_vector, doc[params.field]) + 1.0",
'params': {
'field': "vector_"+str(model_key),
'query_vector': vector
}
}
}
}
}
if "vector" in search_params:
vector = search_params["vector"]
elif model_key[:len(PREFIX_OPENAI)] == PREFIX_OPENAI:
vector = retrieve_openai_embeddings(search_params['content'], model_key)
if vector is None:
return None
else:
model = SharedModel.get_client(model_key)
vector = model.get_shared_model_response(search_params['content'])

return {
'query': {
'script_score': {
'min_score': float(threshold) + 1,
'query': {
'bool': {
'must': [
{
'exists': {
'field': 'vector_'+str(model_key)
}
}
]
}
},
'script': {
'source': "cosineSimilarity(params.query_vector, doc[params.field]) + 1.0",
'params': {
'field': "vector_" + str(model_key),
'query_vector': vector
}
}
}
}
}

def insert_model_into_response(hits, model_key):
for hit in hits:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ oauthlib==3.1.0
opensearch-py==2.2.0
opt-einsum==3.2.0
packaging==21.0
pact-python==1.4.5
# pact-python==1.4.5
# Pillow==8.1.1
plac==0.9.6
preshed==2.0.1
Expand Down

0 comments on commit d471cf0

Please sign in to comment.