-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CV2-3551 add local queue consumption and re-work a ton of the startup…
… flow to accommodate
- Loading branch information
Showing
15 changed files
with
225 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,16 @@ | ||
.PHONY: run run_http run_worker run_test | ||
|
||
run: | ||
./start_healthcheck_and_model_engine.sh | ||
./start_all.sh | ||
|
||
run_http: | ||
uvicorn main:app --host 0.0.0.0 --reload | ||
|
||
run_worker: | ||
python run.py | ||
python run_worker.py | ||
|
||
run_processor: | ||
python run_processor.py | ||
|
||
run_test: | ||
python -m unittest discover . | ||
python -m pytest test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from typing import List | ||
import json | ||
|
||
import requests | ||
|
||
from lib import schemas | ||
from lib.logger import logger | ||
from lib.helpers import get_setting | ||
from lib.queue.queue import Queue | ||
class QueueProcessor(Queue): | ||
@classmethod | ||
def create(cls, input_queue_name: str = None, batch_size: int = 10): | ||
""" | ||
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size. | ||
Pulls settings and then inits instance. | ||
""" | ||
input_queue_name = get_setting(input_queue_name, "MODEL_NAME").replace(".", "__") | ||
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})") | ||
return QueueProcessor(input_queue_name, batch_size) | ||
|
||
def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1): | ||
""" | ||
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size. | ||
""" | ||
super().__init__() | ||
self.input_queue_name = input_queue_name | ||
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), "_output") | ||
self.all_queues = self.store_queue_map(self.input_queues) | ||
self.batch_size = batch_size | ||
|
||
def send_callbacks(self) -> List[schemas.Message]: | ||
""" | ||
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth, | ||
pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue. | ||
If failures happen at any point, resend failed messages to input queue. | ||
""" | ||
messages_with_queues = self.receive_messages(self.batch_size) | ||
if messages_with_queues: | ||
logger.debug(f"About to respond to: ({messages_with_queues})") | ||
bodies = [schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues] | ||
for body in bodies: | ||
self.send_callback(body) | ||
self.delete_messages(messages_with_queues) | ||
|
||
|
||
def send_callback(self, body): | ||
""" | ||
Rescue against failures when attempting to respond (i.e. fingerprint) from models. | ||
Return responses if no failure. | ||
""" | ||
try: | ||
callback_url = body.get("callback_url") | ||
requests.post(callback_url, json=body) | ||
except Exception as e: | ||
logger.error(f"Callback fail! Failed with {e} on {callback_url} with body of {body}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import json | ||
from typing import List | ||
from lib import schemas | ||
from lib.logger import logger | ||
from lib.queue.queue import Queue | ||
from lib.model.model import Model | ||
from lib.helpers import get_setting | ||
class QueueWorker(Queue): | ||
@classmethod | ||
def create(cls, input_queue_name: str = None): | ||
""" | ||
Instantiate a queue worker. Must pass input_queue_name. | ||
Pulls settings and then inits instance. | ||
""" | ||
input_queue_name = get_setting(input_queue_name, "MODEL_NAME").replace(".", "__") | ||
output_queue_name = f"{input_queue_name}_output" | ||
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}')") | ||
return QueueWorker(input_queue_name, output_queue_name) | ||
|
||
def __init__(self, input_queue_name: str, output_queue_name: str = None): | ||
""" | ||
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name. | ||
""" | ||
super().__init__() | ||
self.input_queue_name = input_queue_name | ||
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), "_output") | ||
if output_queue_name: | ||
self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name) | ||
self.output_queues = self.get_or_create_queues(output_queue_name) | ||
self.all_queues = self.store_queue_map([item for row in [self.input_queues, self.output_queues] for item in row]) | ||
|
||
def fingerprint(self, model: Model): | ||
""" | ||
Main routine. Given a model, in a loop, read tasks from input_queue_name, | ||
pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue. | ||
If failures happen at any point, resend failed messages to input queue. | ||
""" | ||
responses = self.safely_respond(model) | ||
if responses: | ||
for response in responses: | ||
logger.info(f"Processing message of: ({response})") | ||
self.return_response(response) | ||
|
||
def safely_respond(self, model: Model) -> List[schemas.Message]: | ||
""" | ||
Rescue against failures when attempting to respond (i.e. fingerprint) from models. | ||
Return responses if no failure. | ||
""" | ||
messages_with_queues = self.receive_messages(model.BATCH_SIZE) | ||
responses = [] | ||
if messages_with_queues: | ||
logger.debug(f"About to respond to: ({messages_with_queues})") | ||
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues]) | ||
self.delete_messages(messages_with_queues) | ||
return responses | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,5 @@ uvicorn[standard]==0.19.0 | |
httpx==0.23.1 | ||
huggingface-hub==0.11.0 | ||
fasttext==0.9.2 | ||
requests==2.31.0 | ||
pytest==7.4.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import time | ||
import os | ||
import importlib | ||
from lib.queue.processor import QueueProcessor | ||
from lib.model.model import Model | ||
from lib.logger import logger | ||
queue = QueueProcessor.create() | ||
|
||
logger.info("Beginning callback loop...") | ||
while True: | ||
queue.send_callbacks() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from pydantic import BaseModel | ||
class FakeSQSMessage(BaseModel): | ||
body: str | ||
receipt_handle: str | ||
|
Oops, something went wrong.