Skip to content

Commit

Permalink
refactor after verifying full functionality for video, audio, and image
Browse files Browse the repository at this point in the history
  • Loading branch information
DGaffney committed Jan 12, 2024
1 parent b00b83a commit 6a41da1
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ services:
context: .
args:
- PRESTO_PORT=${PRESTO_PORT}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
env_file:
- ./.env_file
depends_on:
Expand Down
5 changes: 3 additions & 2 deletions lib/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
"""
self.directory = "./video_files"
self.ffmpeg_dir = "/usr/local/bin/ffmpeg"
self.model_name = os.environ.get("MODEL_NAME")
pathlib.Path(self.directory).mkdir(parents=True, exist_ok=True)

def tmk_file_path(self, filename: str, create_path: bool = True) -> str:
Expand All @@ -39,7 +40,7 @@ def tmk_bucket(self) -> str:
"""
Constant for identifying bucket. Needed for uploading output.
"""
return "presto_tmk_videos"
return "presto-tmk-videos"

def process(self, video: schemas.Message) -> schemas.GenericItem:
"""
Expand All @@ -60,4 +61,4 @@ def process(self, video: schemas.Message) -> schemas.GenericItem:
for file_path in [self.tmk_file_path(video_filename), temp_file_name]:
if os.path.exists(file_path):
os.remove(file_path)
return dict(**video.dict(), **{"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value})
return {"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value}
7 changes: 3 additions & 4 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def create(cls, input_queue_name: str = None, batch_size: int = 10):
input_queue_name = Queue.get_queue_name(input_queue_name)
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1):
"""
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size.
Expand All @@ -28,7 +28,7 @@ def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_s
self.all_queues = self.store_queue_map(self.input_queues)
logger.info(f"Processor listening to queues of {self.all_queues}")
self.batch_size = batch_size

def send_callbacks(self) -> List[schemas.Message]:
"""
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth,
Expand All @@ -42,8 +42,7 @@ def send_callbacks(self) -> List[schemas.Message]:
for body in bodies:
self.send_callback(body)
self.delete_messages(messages_with_queues)



def send_callback(self, message):
"""
Rescue against failures when attempting to respond (i.e. fingerprint) from models.
Expand Down
2 changes: 1 addition & 1 deletion lib/queue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def safely_respond(self, model: Model) -> List[schemas.Message]:
messages_with_queues = self.receive_messages(model.BATCH_SIZE)
responses = []
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
model_type = type(model)
try:
responses = model.respond([schemas.Message(**{**json.loads(message.body), **{"model_name": model.model_name}}) for message, queue in messages_with_queues])
except Exception as e:
Expand Down
27 changes: 26 additions & 1 deletion lib/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
import os
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from lib.logger import logger
def upload_file_to_s3(bucket: str, filename: str):
"""
Generic upload helper for s3. Could be moved over to helpers folder...
"""
s3_url = os.getenv('S3_ENDPOINT')
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region = os.getenv('AWS_DEFAULT_REGION')
secure = s3_url.startswith('https')
# Set up the S3 client
s3_client = boto3.client('s3')
s3_client = boto3.client('s3',
endpoint_url=s3_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(signature_version='s3v4'),
region_name=region,
use_ssl=secure)
# Extract the file name from the local file path
try:
s3_client.head_bucket(Bucket=bucket)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket' or int(e.response['Error']['Code']) == 404:
# Create the bucket since it does not exist
s3_client.create_bucket(Bucket=bucket)
logger.info(f'Created bucket {bucket} in MinIO.')
else:
# Other errors like permissions issues
logger.error(f'Error in accessing bucket {bucket}: {e}, {bucket} {filename}')
return
file_name = filename.split('/')[-1]
# Upload the file to S3
try:
Expand Down

0 comments on commit 6a41da1

Please sign in to comment.