diff --git a/devops/docker-compose.local.yml b/devops/docker-compose.local.yml index 213cb833..16b3f69c 100644 --- a/devops/docker-compose.local.yml +++ b/devops/docker-compose.local.yml @@ -59,7 +59,12 @@ services: tty: true container_name: geoapiworkers hostname: geoapiworkers - command: "celery -A geoapi.celery_app worker -l info" + command: > + sh -c ' + celery -A geoapi.celery_app worker -l info -Q default -n default_worker@geoapi & + celery -A geoapi.celery_app worker -l info -Q heavy --concurrency=6 -n heavy_worker@geoapi & + wait + ' celerybeat: image: taccaci/geoapi-workers:local diff --git a/devops/geoapi-workers/docker-compose.yml b/devops/geoapi-workers/docker-compose.yml index 778482aa..ce7dfcb3 100644 --- a/devops/geoapi-workers/docker-compose.yml +++ b/devops/geoapi-workers/docker-compose.yml @@ -18,7 +18,12 @@ services: driver: syslog options: tag: geoapi_workers - command: "celery -A geoapi.celery_app worker -l info" + command: > + sh -c ' + celery -A geoapi.celery_app worker -l info -Q default -n default_worker@geoapi & + celery -A geoapi.celery_app worker -l info -Q heavy --concurrency=6 -n heavy_worker@geoapi & + wait + ' watchtower: image: containrrr/watchtower:1.7.1 diff --git a/geoapi/celery_app.py b/geoapi/celery_app.py index 4732f42b..6220cc88 100644 --- a/geoapi/celery_app.py +++ b/geoapi/celery_app.py @@ -15,6 +15,14 @@ broker=CELERY_CONNECTION_STRING, include=['geoapi.tasks']) +# Define the queues +app.conf.task_queues = { + 'default': {'exchange': 'default', 'routing_key': 'default'}, + 'heavy': {'exchange': 'heavy', 'routing_key': 'heavy'} +} + +app.conf.task_default_queue = 'default' + app.conf.beat_schedule = { 'refresh_projects_watch_content': { 'task': 'geoapi.tasks.external_data.refresh_projects_watch_content', diff --git a/geoapi/tasks/external_data.py b/geoapi/tasks/external_data.py index 1a044b52..b72b00e4 100644 --- a/geoapi/tasks/external_data.py +++ b/geoapi/tasks/external_data.py @@ -167,7 +167,7 @@ def _handle_point_cloud_conversion_error(pointCloudId, userId, files, error_desc f"Processing failed for point cloud ({pointCloudId})!") -@app.task(rate_limit="1/s") +@app.task(queue='heavy') def import_point_clouds_from_agave(userId: int, files, pointCloudId: int): with create_task_session() as session: user = session.query(User).get(userId) diff --git a/geoapi/tasks/streetview.py b/geoapi/tasks/streetview.py index 8e04cfa6..990aa825 100644 --- a/geoapi/tasks/streetview.py +++ b/geoapi/tasks/streetview.py @@ -199,7 +199,7 @@ def check_existing_upload(session, user, streetview_service, task_uuid, system_i # TODO: Ensure that just user works and not userid (previously took userid) -@app.task(rate_limit="5/s") +@app.task(queue='heavy') def from_tapis_to_streetview(user_id: int, streetview_service_id: int, system_id: str,