Skip to content

Commit

Permalink
Merge pull request #31 from knadh/rebrand
Browse files Browse the repository at this point in the history
This merge commit brings a series of major changes to sql-jobber.

- The project is rebranded to DungBeetle.
- The internals are refactored and some, fully rewritten.
- The underlying distributed tasks library is switched from `machinery`
  to `tasqueue`.

Co-authored-by: Kailash Nadh <[email protected]>
Co-authored-by: Lakshay Kalbhor <[email protected]>
  • Loading branch information
knadh and Lakshay Kalbhor authored Mar 14, 2024
2 parents 8b98bd8 + 64c1b1c commit e8576b2
Show file tree
Hide file tree
Showing 29 changed files with 1,773 additions and 1,892 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21"

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
Expand Down
17 changes: 15 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.20.x]
go-version: [1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}

Expand Down Expand Up @@ -46,7 +46,20 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Install Dependenies
run: sudo apt-get update && sudo apt-get install -y --no-install-recommends ca-certificates postgresql-client

- name: Checkout code
uses: actions/checkout@v3

- name: Add tables
run: PGPASSWORD=testPass psql -U testUser -h localhost -p 5432 -d testDB -c 'CREATE TABLE entries (id BIGSERIAL PRIMARY KEY, amount REAL, user_id VARCHAR(6), entry_date DATE, timestamp TIMESTAMP)';

- name: Build binary
run: CGO_ENABLED=0 go build -o server.bin -ldflags="-s -w -X 'main.buildString=${BUILDSTR}'" ./cmd/*.go

- name: Run binary server
run: ./server.bin --config config.test.toml &

- name: Run tests
run: go test ./cmd -v -covermode=count
run: sleep 5 && go test ./client -v -covermode=count
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
config_test.toml
# binary file produced by `make build`.
sql-jobber
dungbeetle
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ before:
builds:
- env:
- CGO_ENABLED=0
binary: sql-jobber
binary: dungbeetle
goos:
- windows
- darwin
Expand Down
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
FROM golang:1.12-alpine AS builder
FROM golang:1.21-alpine AS builder
RUN apk update && apk add make git
WORKDIR /sql-jobber/
WORKDIR /dungbeetle/
COPY ./ ./
ENV CGO_ENABLED=0 GOOS=linux
RUN make build

FROM alpine:latest AS deploy
RUN apk --no-cache add ca-certificates
COPY --from=builder /sql-jobber/sql-jobber ./
COPY --from=builder /sql-jobber/sql ./
COPY --from=builder /dungbeetle/dungbeetle ./
COPY --from=builder /dungbeetle/sql ./
RUN mkdir -p /opt/config
COPY --from=builder /sql-jobber/config.toml.sample /opt/config/sql-jobber.toml
COPY --from=builder /dungbeetle/config.toml.sample /opt/config/dungbeetle.toml

VOLUME ["/opt/config/"]

CMD ["./sql-jobber", "--config", "/opt/config/sql-jobber.toml"]
CMD ["./dungbeetle", "--config", "/opt/config/dungbeetle.toml"]
35 changes: 25 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_COMMIT_DATE := $(shell git show -s --format=%ci ${LAST_COMMIT})
VERSION := $(shell git describe --abbrev=1)

BUILDSTR := ${VERSION} (build "\\\#"${LAST_COMMIT} $(shell date '+%Y-%m-%d %H:%M:%S'))

BIN := sql-jobber
GOPATH ?= $(HOME)/go

BIN := dungbeetle

.PHONY: build
build:
# Compile the main application.
go build -o ${BIN} -ldflags="-s -w -X 'main.buildString=${BUILDSTR}'" ./cmd/
build: $(BIN)

$(BIN): $(shell find . -type f -name "*.go")
CGO_ENABLED=0 go build -o ${BIN} -ldflags="-s -w -X 'main.buildString=${BUILDSTR}'" ./cmd/*.go

.PHONY: run
run:
CGO_ENABLED=0 go run -ldflags="-s -w -X 'main.buildString=${BUILDSTR}'" ./cmd

.PHONY: dist
dist: build

# Run tests in sequence
.PHONY: test
test:
go test
go test ./... -v -p 1

# Use goreleaser to do a dry run producing local builds.
.PHONY: release-dry
release-dry:
goreleaser --parallelism 1 --rm-dist --snapshot --skip-validate --skip-publish

.PHONY: clean
clean:
go clean
- rm -f ${BIN}
# Use goreleaser to build production releases and publish them.
.PHONY: release
release:
goreleaser --parallelism 1 --rm-dist --skip-validate
45 changes: 21 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
<a href="https://zerodha.tech"><img src="https://zerodha.tech/static/images/github-badge.svg" align="right" /></a>

# SQL Jobber
![logo](https://github.com/knadh/sql-jobber/assets/547147/be2ad6f3-45fe-444c-9663-70d98d7ae037)

![test workflow](https://github.com/knadh/sql-jobber/actions/workflows/test.yml/badge.svg)
![release workflow](https://github.com/knadh/sql-jobber/actions/workflows/release.yml/badge.svg)
DungBeetle is a lightweight, single binary distributed job server designed for queuing and asynchronously executing large numbers of SQL read jobs (eg: reports) against SQL databases. When the read jobs are executed, the results are written to separate ephemeral results databases (where the results of every job is its own dedicated table), enabling faster retrieval.

sql-jobber is a light weight SQL "job server" that maintains a distributed, asynchronous job queue of SQL read jobs against one or more large source databases. The results are written to one or more separate "cache" databases, where each result set is a newly created table, from where the results can be fetched as many times much faster than querying the source databases.
A prominent usecase is user facing report generation in applications where requests can be queued and reports returned asynchronously without overloading large source databases.

This is useful for queuing and offloading report generation on applications without clogging source databases, especially in the case of end user applications.

## Features

## Parts

- Supports MySQL and PostgreSQL as source databases.
- Supports MySQL and PostgreSQL as result / cache databases for job responses.
- Supports MySQL, PostgreSQL, ClickHouse as source databases.
- Supports MySQL and PostgreSQL as result / cache databases for job results.
- Standalone server that exposes HTTP APIs for managing jobs and groups of jobs (list, post, status check, cancel jobs).
- Reads SQL queries from .sql files and registers them as jobs ready to be queued.
- Written in Go and built on top of [Machinery](https://github.com/RichardKnop/machinery). Supports multi-process, multi-threaded, asynchronous distributed job queueing via a common broker backend (Redis, AMQP etc.)
- Supports multi-process, multi-threaded, asynchronous distributed job queueing via a common broker backend (Redis, AMQP etc.)

## Usecase

An application that has a very large SQL database, when there are several thousand concurrent users requesting reports from the database simultaneously, every second of IO delay in query execution locks up the application's threads, snowballing and overloading the application.
Consider an application with a very large SQL database. When there are several thousand concurrent users requesting reports from an application connected to it, every second of I/O delay during query execution can bottleneck the application and the database, causing a snowball effect.

Instead, defer every single report request into a job queue, there by immediately freeing up the front end application. The reports are presented to users as they're executed (frontend polls the job's status and prevents the user from sending any more queries). Fixed SQL Jobber servers and worker threads also act as traffic control and prevent the primary database from being indundated with requests.
Instead, user requests for report generations can be deferred to a job queue in the backend, there by immediately freeing up the frontend application. The reports are presented to users as they're executed (frontend polls the job's status and prevents the user from sending any more queries). DungBeetle server and worker instances also act as traffic control and prevent the primary database from being indundated with requests.

Once the reports are generated, it's natural for users to further transform the results by slicing, sorting, and filtering, generating additional queries to the primary database. To offset this load, these subsequent queries can be sent to the smaller, much faster results cache database. These results are of course ephemeral and can be thrown away or expired.
Once the reports are generated (SQL queries finish executing), it's natural for users to further transform the results by slicing, sorting, and filtering, generating additional queries to the primary database. To offset this load, these subsequent queries can be sent to the smaller, much faster results cache database. These results are of course ephemeral and can be thrown away or expired.

![sql-job-server png](https://user-images.githubusercontent.com/547147/44912100-d3f27b80-ad46-11e8-9938-2b6c0f974488.png)
![image](https://github.com/knadh/sql-jobber/assets/547147/48adab5b-120e-4e30-9326-28c60c7f0758)

## Concepts
#### Task
A task is a named SQL job is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overridden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.
A task is a named SQL query that is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overriden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.

Example:
```sql
Expand All @@ -57,7 +54,7 @@ SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?;
Here, when the server starts, the queries `get_profit_summary` and `get_profit_entries` are registered automatically as tasks. Internally, the server validates and prepares these SQL statements (unless `raw: 1`). `?` are MySQL value placholders. For Postgres, the placeholders are `$1, $2 ...`

#### Job
A job is an instance of a named task that has been queued to run. Each job has an ID that can be used to track its status. If an ID is not passed explicitly, it is generated internally and returned. These IDs need not be unique, but only a single job with a certain ID can run at any given point. For the next job with the same ID to be scheduled, the previous job has to finish execution. Using non-unique IDs like this is useful in cases where users can be prevented from sending multiple requests for the same reports, like in our usecases.
A job is an instance of a task that has been queued to run. Each job has an ID that can be used to track its status. If an ID is not passed explicitly, it is generated internally and returned. These IDs need not be unique, but only a single job with a certain ID can run at any given point. For the next job with the same ID to be scheduled, the previous job has to finish execution. Using non-unique IDs like this is useful in cases where users can be prevented from sending multiple requests for the same reports, like in our usecases.

An application polls with the job ID to check if results are ready for consumption.

Expand All @@ -66,7 +63,7 @@ The results from an SQL query job are written to a result backend (MySQL or Post


## Installation
A pre-compiled binary can be downloaded from the [releases](https://github.com/knadh/sql-jobber/releases) page.
A pre-compiled binary can be downloaded from the [releases](https://github.com/zerodha/dungbeetle/releases) page.

### 2) Configure
Copy the `config.toml.sample` file as `config.toml` somewhere and edit the configuration values.
Expand All @@ -76,9 +73,9 @@ Write your SQL query tasks in `.sql` files in the `goyesql` format (as shown in

### 4) Start the server
```shell
sql-jobber --config /path/to/config.toml --sql-directory /path/to/your/sql/queries
dungbeetle --config /path/to/config.toml --sql-directory /path/to/your/sql/queries

# Run 'sql-jobber --help' to see all supported arguments
# Run 'dungbeetle --help' to see all supported arguments
```

Starting the server runs a set of workers listening on a default job queue. It also starts an HTTP service on `http://127.0.0.1:6060` which is the control interface. It's possible to run the server without the HTTP interface by passing the `--worker-only` flag.
Expand Down Expand Up @@ -142,18 +139,18 @@ $ curl localhost:6060/jobs/myjob
### Multiple queues, workers, and job distribution
It's possible to run multiple workers on one or more machines that run different jobs with different concurrency levels independently of each other using different queues. Not all of these instances need to expose the HTTP service and can run as `--worker-only`. This doesn't really make a difference as long as all instances connect to the same broker backend. A job posted to any instance will be routed correctly to the right instances based on the `queue` parameter.

Often times, different queries have different priorities of execution. Some may need to return results faster than others. The below example shows two SQL Jobber servers being run, one with 30 workers and one with just 5 to process jobs of different priorities.
Often times, different queries have different priorities of execution. Some may need to return results faster than others. The below example shows two DungBeetle servers being run, one with 30 workers and one with just 5 to process jobs of different priorities.


```shell
# Run the primary worker + HTTP control interface
sql-jobber --config /path/to/config.toml --sql-directory /path/to/sql/dir \
dungbeetle --config /path/to/config.toml --sql-directory /path/to/sql/dir \
--queue "high_priority" \
--worker-name "high_priority_worker" \
--worker-concurrency 30

# Run another worker on a different queue to handle low priority jobs
sql-jobber --config /path/to/config.toml --sql-directory /path/to/sql/dir \
dungbeetle --config /path/to/config.toml --sql-directory /path/to/sql/dir \
--queue "low_priority" \
--worker-name "low_priority_worker" \
--worker-concurrency 5 \
Expand All @@ -166,8 +163,8 @@ $ curl localhost:6060/tasks/get_profit_entries_by_date/jobs -H "Content-Type: ap
$ curl localhost:6060/tasks/get_profit_entries_by_date/jobs -H "Content-Type: application/json" --data '{"job_id": "myjob", "queue": "low_priority"}'
```

## API client
`github.com/knadh/sql-jobber/client` package can be used as a Go HTTP API client for sql-jobber.
## Go API client
`github.com/zerodha/dungbeetle/client` package can be used as a Go HTTP API client for DungBeetle.


## License
Expand Down
19 changes: 9 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"log/slog"
"net/http"
"net/url"
"strconv"

"github.com/RichardKnop/machinery/v1/tasks"
"github.com/knadh/sql-jobber/models"
"github.com/kalbhor/tasqueue/v2"
"github.com/zerodha/dungbeetle/models"
)

const (
Expand Down Expand Up @@ -49,7 +48,7 @@ type httpResp struct {
type Opt struct {
RootURL string
HTTPClient *http.Client
Logger *log.Logger
Logger *slog.Logger
}

// Client represents the SQL Jobber API client.
Expand Down Expand Up @@ -98,8 +97,8 @@ func (c *Client) DeleteGroupJob(jobID string, purge bool) error {
}

// GetPendingJobs fetches the list of pending jobs.
func (c *Client) GetPendingJobs(queue string) ([]tasks.Signature, error) {
var out []tasks.Signature
func (c *Client) GetPendingJobs(queue string) ([]tasqueue.JobMessage, error) {
var out []tasqueue.JobMessage
err := c.doHTTPReq(http.MethodGet,
fmt.Sprintf(uriGetPendingJobs, queue), nil, nil, &out)
return out, err
Expand All @@ -121,7 +120,7 @@ func (c *Client) GetGroupStatus(groupID string) (models.GroupStatusResp, error)
}

// doHTTPReq makes an HTTP request with the given params and on success, unmarshals
// the JSON response from the sql-jobber upstream (the .data field in the response JSON)
// the JSON response from the DungBeetle upstream (the .data field in the response JSON)
// into the container obj.
// reqBody can be an arbitrary struct or map for POST requests that'll be
// marshalled as JSON and posted. For GET queries, it should be query params
Expand Down Expand Up @@ -172,11 +171,11 @@ func (c *Client) doHTTPReq(method, rURI string, reqBody interface{}, headers htt
}
defer func() {
// Drain and close the body to let the Transport reuse the connection
io.Copy(ioutil.Discard, r.Body)
io.Copy(io.Discard, r.Body)
r.Body.Close()
}()

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if err != nil {
return fmt.Errorf("error reading response: %v", err)
}
Expand Down
Loading

0 comments on commit e8576b2

Please sign in to comment.