Skip to content

Latest commit

 

History

History
149 lines (109 loc) · 3.09 KB

lession9.md

File metadata and controls

149 lines (109 loc) · 3.09 KB

Lession 9

Message Queue - Receive

Add Worker

Add Dependencies

poetry add celery
poetry add --group dev celery-types
# cleaning/cleaning/worker.py
import logging
import os

from celery import Celery
from celery.signals import worker_process_init
from kombu import Exchange, Queue
from sqlmodel import Session

from cleaning.database import get_engine
from cleaning.models import Booking

logger = logging.getLogger(__name__)


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    logger.info("CeleryInstrumentator initializing")


amqp_url = os.getenv("AMQP_URL")
app = Celery("worker", broker=amqp_url)

app.conf.task_queues = (Queue("cleaning", Exchange("dispo"), routing_key="booking"),)
app.conf.task_routes = {"cleaning.celery_worker.*": "dispo"}


@app.task(name="booking.created")
def booking_created(data):
    logger.info(f"got message {data}")
    with Session(get_engine()) as db:
        booking = Booking(**data)
        db.add(booking)
        db.commit()

Add Resources

Start files

# cleaning/start-worker.sh
#!/bin/sh

set -e

. /venv/bin/activate

exec celery -A cleaning.worker \
  worker -l INFO -P solo

And similar for start-debug-worker.sh with adding python -m debugpy --listen 0.0.0.0:5678 -m between exec and clelery;

Dockerfile

copy api.Dockerfile to worker.Dockerfile and set cmd to start-worker.sh

Deployment Descriptor

# deploy/cleaning_worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: cleaning.worker
  labels:
    app: cleaning
spec:
  selector:
    matchLabels:
      app: cleaning
  template:
    metadata:
      labels:
        app: cleaning
    spec:
      containers:
        - name: cleaning-worker
          image: cleaning-worker
          env:
            - name: DATABASE_URL
              value: postgresql://cleaning:mysecretpassword@postgres/cleaning
            - name: AMQP_URL
              value: amqp://rabbit:mysecretpassword@rabbitmq:5672/
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: http://jaeger:4317
            - name: OTEL_SERVICE_NAME
              value: cleaning.worker
          command: ["./start-worker.sh"]
          resources:
            limits:
              memory: 256Mi
              cpu: 400m

Tiltfile

# tiltfile - add

docker_build(
    'cleaning-worker',
    context='./cleaning',
    dockerfile='./cleaning/api.Dockerfile',
    entrypoint=["./start-worker-debug.sh"],
    ignore=["**/__pycache__", ".pytest_cache", "**/*.pyc*"],
    live_update=[
        sync('./cleaning/cleaning', '/svc/cleaning'),
    ],
)

k8s_yaml('deploy/cleaning_worker.yaml')

k8s_resource(
    'cleaning',
    port_forwards=['5680:5678']
)

Testing

After the worker has started successfully RabbitMQ should have registered a new queue named cleaning with the exchange dispo.

curl http://localhost:8000/api/v1/bookings/  -H "Content-Type: application/json" \
  -d '{"room_id": 1, "start": "2023-02-04", "end": "2023-02-06"}'

curl "http://localhost:8081/api/v1/days/?start=2023-02-01&end=2023-02-20" | jq