Skip to content

Commit

Permalink
Introduce logging adapter for jobqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
lzap authored and ondrejbudai committed Sep 9, 2022
1 parent c43b499 commit a8afca4
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 18 deletions.
38 changes: 38 additions & 0 deletions internal/common/slogger/logrus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package slogger

import (
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/sirupsen/logrus"
)

type simpleLogrus struct {
logger *logrus.Logger
}

func NewLogrusLogger(logger *logrus.Logger) jobqueue.SimpleLogger {
return &simpleLogrus{logger: logger}
}

func (s *simpleLogrus) log(level logrus.Level, err error, msg string, args ...string) {
if len(args)%2 != 0 {
panic("log arguments must be even (key value pairs)")
}
var fields = make(logrus.Fields, len(args)/2+1)
for i := 0; i < len(args); i += 2 {
k := args[i]
v := args[i+1]
fields[k] = v
}
if err != nil {
fields["error"] = err.Error()
}
s.logger.WithFields(fields).Log(level, msg)
}

func (s *simpleLogrus) Info(msg string, args ...string) {
s.log(logrus.InfoLevel, nil, msg, args...)
}

func (s *simpleLogrus) Error(err error, msg string, args ...string) {
s.log(logrus.ErrorLevel, err, msg, args...)
}
63 changes: 63 additions & 0 deletions internal/common/slogger/logrus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package slogger

import (
"bytes"
"errors"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

func makeLogrus(buf *bytes.Buffer) *logrus.Logger {
return &logrus.Logger{
Out: buf,
Formatter: &logrus.TextFormatter{
DisableTimestamp: true,
DisableColors: true,
},
Hooks: make(logrus.LevelHooks),
Level: logrus.DebugLevel,
}

}

func TestInfo(t *testing.T) {
buf := &bytes.Buffer{}
l := makeLogrus(buf)
sl := NewLogrusLogger(l)
sl.Info("test")
require.Equal(t, "level=info msg=test\n", buf.String())
}

func TestError(t *testing.T) {
buf := &bytes.Buffer{}
l := makeLogrus(buf)
sl := NewLogrusLogger(l)
sl.Error(errors.New("e"), "test")
require.Equal(t, "level=error msg=test error=e\n", buf.String())
}

func TestErrorIsNil(t *testing.T) {
buf := &bytes.Buffer{}
l := makeLogrus(buf)
sl := NewLogrusLogger(l)
sl.Error(nil, "test")
require.Equal(t, "level=error msg=test\n", buf.String())
}

func TestInfoWithFields(t *testing.T) {
buf := &bytes.Buffer{}
l := makeLogrus(buf)
sl := NewLogrusLogger(l)
sl.Info("test", "key", "value")
require.Equal(t, "level=info msg=test key=value\n", buf.String())
}

func TestErrorWithFields(t *testing.T) {
buf := &bytes.Buffer{}
l := makeLogrus(buf)
sl := NewLogrusLogger(l)
sl.Error(errors.New("e"), "test", "key", "value")
require.Equal(t, "level=error msg=test error=e key=value\n", buf.String())
}
18 changes: 18 additions & 0 deletions internal/common/slogger/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package slogger

import (
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
)

type noopLogger struct {
}

func NewNoopLogger() jobqueue.SimpleLogger {
return &noopLogger{}
}

func (s *noopLogger) Info(_ string, _ ...string) {
}

func (s *noopLogger) Error(_ error, _ string, _ ...string) {
}
53 changes: 35 additions & 18 deletions pkg/jobqueue/dbjobqueue/dbjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/osbuild/osbuild-composer/internal/common/slogger"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"

logrus "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -106,6 +105,7 @@ const (
)

type DBJobQueue struct {
logger jobqueue.SimpleLogger
pool *pgxpool.Pool
dequeuers *dequeuers
stopListener func()
Expand Down Expand Up @@ -153,15 +153,32 @@ func (d *dequeuers) notifyAll() {
}
}

// Create a new DBJobQueue object for `url`.
// Config allows more detailed customization of queue behavior
type Config struct {
// Logger is used for all logging of the queue, when not provided, the stanard
// global logger (logrus) is used.
Logger jobqueue.SimpleLogger
}

// New creates a new DBJobQueue object for `url` with default configuration.
func New(url string) (*DBJobQueue, error) {
stdLogger := slogger.NewLogrusLogger(logrus.StandardLogger())
config := Config{
Logger: stdLogger,
}
return NewWithConfig(url, config)
}

// NewWithLogger creates a new DBJobQueue object for `url` with specific configuration.
func NewWithConfig(url string, config Config) (*DBJobQueue, error) {
pool, err := pgxpool.Connect(context.Background(), url)
if err != nil {
return nil, fmt.Errorf("error establishing connection: %v", err)
}

listenContext, cancel := context.WithCancel(context.Background())
q := &DBJobQueue{
logger: config.Logger,
pool: pool,
dequeuers: newDequeuers(),
stopListener: cancel,
Expand All @@ -185,7 +202,7 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
// use the empty context as the listening context is already cancelled at this point
_, err := conn.Exec(context.Background(), sqlUnlisten)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
logrus.Error("Error unlistening for jobs in dequeue: ", err)
q.logger.Error(err, "Error unlistening for jobs in dequeue")
}
conn.Release()
}()
Expand All @@ -202,13 +219,13 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
if err != nil {
// shutdown the listener if the context is canceled
if errors.Is(err, context.Canceled) {
logrus.Info("Shutting down the listener")
q.logger.Info("Shutting down the listener")
return
}

// otherwise, just log the error and continue, there might just
// be a temporary networking issue
logrus.Debugf("error waiting for notification on jobs channel: %v", err)
q.logger.Error(err, "Error waiting for notification on jobs channel")
continue
}

Expand Down Expand Up @@ -236,7 +253,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
defer func() {
err := tx.Rollback(context.Background())
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
logrus.Error("error rolling back enqueue transaction: ", err)
q.logger.Error(err, "Error rolling back enqueue transaction")
}
}()

Expand All @@ -263,7 +280,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err)
}

logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies)
q.logger.Info("Enqueued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))

return id, nil
}
Expand Down Expand Up @@ -318,7 +335,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}

logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))

return id, token, dependencies, jobType, args, nil
}
Expand Down Expand Up @@ -372,7 +389,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}

logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))

return token, dependencies, jobType, args, nil
}
Expand All @@ -391,7 +408,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
defer func() {
err = tx.Rollback(context.Background())
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
logrus.Errorf("error rolling back finish job transaction for job %s: %v", id, err)
q.logger.Error(err, "Error rolling back finish job transaction", "job_id", id.String())
}

}()
Expand Down Expand Up @@ -440,7 +457,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return fmt.Errorf("unable to commit database transaction: %v", err)
}

logrus.Infof("Finished job with ID %s", id)
q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String())

return nil
}
Expand All @@ -462,7 +479,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
return fmt.Errorf("error canceling job %s: %v", id, err)
}

logrus.Infof("Cancelled job with ID %s", id)
q.logger.Info("Cancelled job", "job_type", jobType, "job_id", id.String())

return nil
}
Expand Down Expand Up @@ -560,13 +577,13 @@ func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
err = rows.Scan(&t)
if err != nil {
// Log the error and try to continue with the next row
logrus.Error("Unable to read token from heartbeats: ", err)
q.logger.Error(err, "Unable to read token from heartbeats")
continue
}
tokens = append(tokens, t)
}
if rows.Err() != nil {
logrus.Error("Error reading tokens from heartbeats: ", rows.Err())
q.logger.Error(rows.Err(), "Error reading tokens from heartbeats")
}

return
Expand All @@ -582,10 +599,10 @@ func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) {

tag, err := conn.Exec(context.Background(), sqlRefreshHeartbeat, token)
if err != nil {
logrus.Error("Error refreshing heartbeat: ", err)
q.logger.Error(err, "Error refreshing heartbeat")
}
if tag.RowsAffected() != 1 {
logrus.Error("No rows affected when refreshing heartbeat for ", token)
q.logger.Error(nil, "No rows affected when refreshing heartbeat", "job_token", token.String())
}
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/jobqueue/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ type JobQueue interface {
RefreshHeartbeat(token uuid.UUID)
}

// SimpleLogger provides a structured logging methods for the jobqueue library.
type SimpleLogger interface {
// Info creates an info-level message and arbitrary amount of key-value string pairs which
// can be optionally mapped to fields by underlying implementations.
Info(msg string, args ...string)

// Error creates an error-level message and arbitrary amount of key-value string pairs which
// can be optionally mapped to fields by underlying implementations. The first error argument
// can be set to nil when no context error is available.
Error(err error, msg string, args ...string)
}

var (
ErrNotExist = errors.New("job does not exist")
ErrNotPending = errors.New("job is not pending")
Expand Down

0 comments on commit a8afca4

Please sign in to comment.