diff --git a/app/main/lib/shared_models/video_model.py b/app/main/lib/shared_models/video_model.py index 17683a66..a305139e 100644 --- a/app/main/lib/shared_models/video_model.py +++ b/app/main/lib/shared_models/video_model.py @@ -110,19 +110,35 @@ def search_by_context(self, context): db.session.rollback() raise e + def get_blocked_response(self, task): + callback_url = Presto.add_item_callback_url(app.config['ALEGRE_HOST'], "video") + if task.get("doc_id") is None: + task["doc_id"] = str(uuid.uuid4()) + response = json.loads(Presto.send_request(app.config['PRESTO_HOST'], "video__Model", callback_url, task, False).text) + # Warning: this is a blocking hold to wait until we get a response in + # a redis key that we've received something from presto. + return Presto.blocked_response(response, "video") + + def download_temp_file(self, task): + result = self.get_blocked_response(task).get("body") + s3_folder = (result.get("result", {}) or {}).get("folder") + s3_filepath = (result.get("result", {}) or {}).get("filepath") + tempfile = self.get_tempfile() + folder = str.join("/", tempfile.name.split("/")[0:-1]) + filepath = tempfile.name.split("/")[-1] + with tempfile.NamedTemporaryFile(delete=False) as tmp: + folder = os.path.dirname(tmp.name) + filepath = os.path.basename(tmp.name) + download_file_from_s3(s3_folder, s3_filepath, media_crud.tmk_file_path(folder, filepath)) + return folder, filepath + def search(self, task): temporary = False try: body, threshold, limit = media_crud.parse_task_search(task) video, temporary = media_crud.get_object(body, Video) if video.hash_value is None: - callback_url = Presto.add_item_callback_url(app.config['ALEGRE_HOST'], "video") - if task.get("doc_id") is None: - task["doc_id"] = str(uuid.uuid4()) - response = json.loads(Presto.send_request(app.config['PRESTO_HOST'], "video__Model", callback_url, task, False).text) - # Warning: this is a blocking hold to wait until we get a response in - # a redis key that we've received something from presto. - result = Presto.blocked_response(response, "video") + result = self.get_blocked_response(task) video.hash_value = result.get("body", {}).get("result", {}).get("hash_value") matches = self.search_by_context(body["context"]) @@ -137,14 +153,18 @@ def search(self, task): if l1_scores[i] > app.config['VIDEO_MODEL_L1_SCORE']: qualified_matches.append(match) files = self.get_fullpath_files(qualified_matches, False) + if self.tmk_file_exists(video.folder, video.filepath): + folder, filepath = (video.folder, video.filepath) + else: + folder, filepath = self.download_temp_file(task) try: - if self.tmk_file_exists(video): - scores = tmkpy.query(media_crud.tmk_file_path(video.folder, video.filepath),files,1) + if self.tmk_file_exists(folder, filepath): + scores = tmkpy.query(media_crud.tmk_file_path(folder, filepath),files,1) else: - ErrorLog.notify(Exception("Failed to locate needle for a video!"), {"video_folder": video.folder, "video_filepath": video.filepath, "video_id": video.id, "task": task}) + ErrorLog.notify(Exception("Failed to locate needle for a video!"), {"video_folder": folder, "video_filepath": filepath, "video_id": video.id, "task": task}) return {"error": "Video not found for provided task", "task": task} except Exception as err: - ErrorLog.notify(err, {"video_folder": video.folder, "video_filepath": video.filepath, "files": files, "video_id": video.id, "task": task}) + ErrorLog.notify(err, {"video_folder": folder, "video_filepath": filepath, "files": files, "video_id": video.id, "task": task}) raise err threshold = float(task.get("threshold", 0.0) or 0.0) results = [] @@ -172,8 +192,8 @@ def search(self, task): if temporary: self.delete(task) - def tmk_file_exists(self, video): - file_path = media_crud.tmk_file_path(video.folder, video.filepath) + def tmk_file_exists(self, folder, filepath): + file_path = media_crud.tmk_file_path(folder, filepath) return os.path.exists(file_path) and os.path.getsize(file_path) > 0 def tmk_program_name(self):