Skip to content

Work a Q

Andrea Falconi edited this page Aug 12, 2021 · 3 revisions

WIP!!

The QL Web app can offload the execution of some tasks to a work queue. In this configuration, a set of QL Web servers add tasks to a queue and a pool of QL queue worker processes fetch tasks from the queue and run them asynchronously, optionally retrying failed tasks. The queue data sits in Redis and QL relies on RQ to manage it, provide worker processes to run tasks and do all the necessary bookkeeping—e.g. purging data past a configured time to live (TTL).

The QL Web app comes with a REST API to manage work queue tasks. Clients can query, retrieve and inspect task inputs as well as task runtime information, count tasks satisfying a given predicate to gauge system load, and delete tasks. The implementation features efficient algorithms to boost performance. In particular the space complexity of queries is constant thanks to the extensive use of stream processing techniques.

The design is modular. Components hide their implementation behind interfaces and use other components only through their provided interfaces. A layered approach makes it possible to write and manage tasks at a high level, independently of the queue backend—RQ, as noted earlier. By the same token, it's possible to implement an alternative queue backend without modifying existing tasks and high-level task management modules.

At the moment, QL only uses the work queue for NGSI notifications, if configured to do so. When an entity payload comes through the notify endpoint, the Web app turns the payload into a task to save it to the DB, adds the task to the queue and returns a 200 to the client immediately. A separate instance of QL, configured as a queue worker, fetches the task from the queue and runs it to actually insert the NGSI entities into the DB, possibly retrying the insert at a later time if it fails. Clients connect to the Web app to manage notify tasks in the queue.

Functional view

TODO:

  • Layering: reporter, wq-ql, cli /—>/ tasks, mgmt /—>/ rts /—>/ RQ, Redis
  • Components, connectors and interfaces
  • Behaviours and data flows—task stuff covered below, mgmt API needs some love

Task life-cycle

The way the QL Web app, the queue worker, RQ and Redis collaborate to process tasks is a key aspect of the QL work queue architecture. In a nutshell, the Web app creates a task and stores it, through RQ, in a Redis hash containing the queue data. Then the worker, through RQ, fetches the task from Redis and runs it. If the task fails and retries are configured, the worker asks RQ to schedule the task to run again later and RQ puts it back into Redis. The worker retries failed tasks for a configured maximum number of times before giving up. Regardless of retries, as soon as a task runs to completion successfully, the worker notifies RQ which, in turn, saves the task back to Redis in the set of successful tasks, taking care of specifying a time to live (TTL) so Redis can automatically remove it from the set and reclaim storage past that TTL. Similarly, in the case a task fails and no retries are configured or there aren't any retries left, the worker notifies RQ which puts it into a Redis set of failed tasks, again setting a TTL so Redis can delete it automatically past that time.

The interactions sketched out so far are based on an abstract specification of the work queue task life-cycle. In fact, the above components carry out the various steps in the task life-cycle according to a finite state machine that models, with a fair degree of accuracy, how computation actually happens in the work queue and explains some important aspects we glossed over earlier, such as the relationships among retries, time and events. The UML state chart below is a visual representation of the task state machine.

A task begins its life when the QL Web app adds it to the work queue—enqueue event in the diagram. At this point the task is in the Queued state and is waiting for a queue worker process to fetch it and run it for the first time.

Workers can retry failed tasks for up to a configured number of times M. (M is an non-negative integer.) RQ keeps track of how many times workers have retried a task. When the task enters the Queued state, the current number of retries r is set to 0. A worker can retry a task only if r < M. So if M is set to 0, workers never retry failed tasks.

A worker initially fetches a task in the Queued state (fetch event) at a certain time t0 and tries to run it once. While a task runs, it's in the Running state. After running a task, the worker checks if the task completed successfully. If so, the task transitions from the Running state to the Succeeded state—succeed event. On the other hand, if the task failed, two transitions out of the Running state are possible, depending on the current number of retries—fail event. If r = M, the task enters the Failed state, whereas if r < M, the worker asks RQ to schedule another execution attempt at a later time and the task enters the Scheduled state.

The worker uses an exponential retry schedule σ. Retries get spaced out by an exponentially growing number of seconds defined by the sequence s = { c⋅2n | k ∈ ℕ } = (c, 2c, 4c, 8c, 16c,...) where c is a constant number of seconds. (In the current implementation c = 20.) The retry schedule σ is the series of seconds defined recursively by

  1. σ0 = t0
  2. σn+1 = σn + sn

So σ = (t0, t0 + c, t0 + c + 2c, …) and the zeroth schedule is the initial task execution at time t0 when the worker fetched the task from the queue for the first time, the first schedule is the time point t0 + c at which the worker retries the task for the first time if the initial run at t0 failed, the second schedule is the time point t0 + c + 2c at which the worker retries the task for the second time if the first retry at t0 + c failed, and so on.

So the task may run at time point σk with 0 ≤ k ≤ M. In particular, if a task sits in the Scheduled state, at time point σr+1 the worker fetches it and tries to run it again—fetch event. Again, while the task runs, it's in the Running state. In transitioning from Scheduled to Running, the current number of retries r is increased by one.

A task in the Queued, Running or Scheduled state is also in the Pending state. This is just a convenience composite state to capture the idea that the system still doesn't know what the task outcome is either because the task is waiting to be run for the first time, or is scheduled for a retry or is actually busy running. The two possible task outcomes in the model are captured by the Succeeded and Failed state, respectively.

The Succeeded and Failed states aren't final states. In fact, as noted earlier, RQ keeps successful and failed tasks in Redis for a configured success and failure TTL, respectively. When that TTL expires, Redis automatically deletes the task at which point the state machine reaches its final state.

Note

To simplify things, the state machine model assumes fetching a task from the queue and running it take an amount of time x less than c. That should be true in most cases for QL, but I haven't tested what RQ would do if x > c—think long-running task that fails.

Process model

this section is still a work in progress!

  • Distributed computation: producers (QL Web app instances) and consumers (QL queue workers) communicate asynchronously through a queue data store (RQ / Redis).
  • Producers don't need to know the outcome of running a task, so there's no need for a coordination algorithm with consumers.
  • Consumers have to coordinate to fetch tasks from the queue; RQ manages the distribution of work to consumers.
  • Each consumer is a sequential (single-threaded) RQ process running a Python interpreter loaded with all the QL packages and dependent libraries. This is because tasks usually aren't self-contained and may reference any other QL package—the notify task is a case in point.
  • The consumer forks a child process (the RQ "work horse") to run each task it fetches from the queue. But it still runs one task at a time, no two work horses ever get forked simultaneously. The point of the work horse is reliability, not performance: runaway tasks can't bring down the consumer—e.g. task memory leaks won't affect the consumer.
  • To increase throughput, start more than one consumer to process tasks in parallel.

Implementation

Code base

TODO:

  • Mapping functional & process view to code
  • Packages, modules and dependencies
  • Stream processing techniques

Testing

TODO:

  • Benchmark stuff
  • Test driver CLI, running w/ or w/o Docker, e.g. python -m wq.tests.benchmark --no-docker
  • Docker stuff

Deployment and operation

this section is still a work in progress!

Running

When using a work queue, there are two sets of QL processes: a set of Web servers that add tasks to a queue and a pool of queue worker processes that fetch tasks from the queue and run them asynchronously. The Web servers are QL Web app instances configured to offload tasks to the work queue through the various WQ_* environment variables available for the Web app. A worker processes is a Python interpreter loaded with the same code as the Web app but started with a different entry point. There are several options to start worker processes:

  • Use built-in CLI. Typically: python wq up, but have a look at wq.core.cli for other options.
  • Supervisor configuration available to manage a pool of worker processes—have a look at the config file in src/wq for the details.
  • Built-in worker process pool also available. Alternative to external tools like Supervisor. Warning: experimental, only use for testing. E.g. python wq up -w 2 runs a pool of two workers to process tasks in parallel.
  • The QL Docker image can start queue workers too, but you'll have to override the Docker entry point—the default command starts the QL Web app.
    • To start a Supervisor-managed pool of workers, set the WQ_WORKERS env var to specify the pool size and override the Docker entry point with: supervisord -n -c ./wq/supervisord.conf.
    • To start a single worker w/o Supervisor, just override the Docker entry point w/: python wq up.

Monitoring

  • Start workers with QL built-in telemetry to gauge performance. It works the same as for QL Web apps. There is a tutorial about collecting and analysing data from a QL Web app. The same principles apply to queue worker telemetry. For actual examples of queue worker telemetry, look at the benchmark in src/wq/tests/benchmark. In particular, there's metrics to gauge overall system throughout (number of DB rows inserted per second) and sample the queue size.
  • Log files when using Supervisor go in /tmp. One log file for Supervisor, plus two log files for each worker to capture stderr and stdout, respectively.

Cloud

TODO

Cookbook

Management API

Here's a few examples to cover the most common scenarios for the task management API—refer to the QL Swagger spec for the details. Before try the examples, load some data, e.g.

curl -v localhost:8668/v2/notify \
     -H 'Content-Type: application/json' \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /' \
     -d @notify-payload.json

and while you do that bring down the DB back-end so some of the tasks will fail.

(1) Get all tasks for a given tenant and service path

curl -v localhost:8668/wq/notifications \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /'

(2) Same as (1) but, but additionally filter by task status—pending, succeeded or failed

curl -v localhost:8668/wq/notifications?taskStatus=succeeded \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /'

(3) Get a summary of all tasks for a given tenant and service path (note: can always use a correlation ID too, if you'd like to narrow the result set further, possibly to just one task)

curl -v localhost:8668/wq/notifications/summary \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /'

Notice this is conceptually the same as (1), except the returned task objects don't have the inputs in it. As in (2), you can additionally filter by status.

(4) Count how many tasks are there in the queue:

curl localhost:8668/wq/notifications/count

Notice this includes everything, even stuff that got processed or failed and won't be touched again.

(5) Count how many tasks have been run successfully, regardless of tenant or service path

curl localhost:8668/wq/notifications/count?taskStatus=succeeded

(6) Optionally you can just retrieve a count for a tenant and/or service path (note: can always use a correlation ID too, if you'd like to narrow the result set further, possibly to just one task)

curl localhost:8668/wq/notifications/count?taskStatus=succeeded \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /'

(7) Delete all tasks for a tenant and service path

curl -X DELETE localhost:8668/wq/notifications \
     -H 'fiware-service: x' \
     -H 'fiware-servicepath: /'

keep in mind that all things being equal, you won't need to use this endpoint since tasks come w/ a configurable time to live, so past the number of seconds specified in the failure or success TTL, the task gets deleted automatically for you.

Interacting with RQ directly

Example interactive session:

>>> from redis import Redis
>>> from rq import Queue

>>> q = Queue(connection=Redis())
>>> ids = q.finished_job_registry.get_job_ids()
>>> js = [q.fetch_job(i) for i in ids]

Looking at what RQ does in Redis

Example interactive session:

>>> from redis import Redis

>>> r = Redis()

>>> ks = r.keys()
>>> ts = [r.type(k) for k in ks]
>>> ks
[b'rq:queue:default', b'rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a', b'rq:job:4d5690dc-fb5a-48f8-9121-1f10cca98ed6', b'rq:queues']
>>> ts
[b'list', b'hash', b'hash', b'set']

>>> j = r.hgetall('rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a')
>>> job_ks = r.keys('rq:job:*')
>>> job_ks
[b'rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a', b'rq:job:4d5690dc-fb5a-48f8-9121-1f10cca98ed6']

>>> r.lrange('rq:queue:default', 0, -1)
[b'4d5690dc-fb5a-48f8-9121-1f10cca98ed6', b'a9a9cef1-cfad-4939-be50-79093ab4f63a']

>>> r.smembers('rq:queues')
{b'rq:queue:default'}

>>> r.zrange('rq:finished:default', 0, -1)
[b'eA==:Lw==:"":NWU0MDhjZDNkZDllNDQ1ZDhhYTZkNTVkODg2MTcyYWM=', b'eA==:Lw==:"":ZDNhODBhOWE5NGM5NDZmZWJhMmE0MmM4YjBkNDFkYjc=', b'eA==:Lw==:"":YzI3MGEzZjVhOGRiNDI0OWI5ZjAwZGUwMTNmYWUwMWQ=', b'eA==:Lw==:"":MWNjMWYwM2I1ZmIwNGVhYzhhOThkYmY0ODdiN2VlMDg=']

>>> r.zscan('rq:finished:default', cursor=0, match='eA==:Lw==*')
(0, [(b'eA==:Lw==:"":NWU0MDhjZDNkZDllNDQ1ZDhhYTZkNTVkODg2MTcyYWM=', 1619795739.0), (b'eA==:Lw==:"":ZDNhODBhOWE5NGM5NDZmZWJhMmE0MmM4YjBkNDFkYjc=', 1619795739.0), (b'eA==:Lw==:"":YzI3MGEzZjVhOGRiNDI0OWI5ZjAwZGUwMTNmYWUwMWQ=', 1619795740.0), (b'eA==:Lw==:"":MWNjMWYwM2I1ZmIwNGVhYzhhOThkYmY0ODdiN2VlMDg=', 1619795741.0)])
>>> r.zscan('rq:finished:default', cursor=0, match='eA==:Lw==1*')
(0, [])

Odds and ends

Connect to Postgres as QL user

psql postgresql://quantumleap:*@localhost

See the worker process tree

pstree -s python

See the computational resources the workers are using

htop -F python -t