diff --git a/internal/common/slogger/logrus.go b/internal/common/slogger/logrus.go new file mode 100644 index 0000000000..dd99d8abbe --- /dev/null +++ b/internal/common/slogger/logrus.go @@ -0,0 +1,38 @@ +package slogger + +import ( + "github.com/osbuild/osbuild-composer/pkg/jobqueue" + "github.com/sirupsen/logrus" +) + +type simpleLogrus struct { + logger *logrus.Logger +} + +func NewLogrusLogger(logger *logrus.Logger) jobqueue.SimpleLogger { + return &simpleLogrus{logger: logger} +} + +func (s *simpleLogrus) log(level logrus.Level, err error, msg string, args ...string) { + if len(args)%2 != 0 { + panic("log arguments must be even (key value pairs)") + } + var fields = make(logrus.Fields, len(args)/2+1) + for i := 0; i < len(args); i += 2 { + k := args[i] + v := args[i+1] + fields[k] = v + } + if err != nil { + fields["error"] = err.Error() + } + s.logger.WithFields(fields).Log(level, msg) +} + +func (s *simpleLogrus) Info(msg string, args ...string) { + s.log(logrus.InfoLevel, nil, msg, args...) +} + +func (s *simpleLogrus) Error(err error, msg string, args ...string) { + s.log(logrus.ErrorLevel, err, msg, args...) +} diff --git a/internal/common/slogger/logrus_test.go b/internal/common/slogger/logrus_test.go new file mode 100644 index 0000000000..2e626eeecb --- /dev/null +++ b/internal/common/slogger/logrus_test.go @@ -0,0 +1,63 @@ +package slogger + +import ( + "bytes" + "errors" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func makeLogrus(buf *bytes.Buffer) *logrus.Logger { + return &logrus.Logger{ + Out: buf, + Formatter: &logrus.TextFormatter{ + DisableTimestamp: true, + DisableColors: true, + }, + Hooks: make(logrus.LevelHooks), + Level: logrus.DebugLevel, + } + +} + +func TestInfo(t *testing.T) { + buf := &bytes.Buffer{} + l := makeLogrus(buf) + sl := NewLogrusLogger(l) + sl.Info("test") + require.Equal(t, "level=info msg=test\n", buf.String()) +} + +func TestError(t *testing.T) { + buf := &bytes.Buffer{} + l := makeLogrus(buf) + sl := NewLogrusLogger(l) + sl.Error(errors.New("e"), "test") + require.Equal(t, "level=error msg=test error=e\n", buf.String()) +} + +func TestErrorIsNil(t *testing.T) { + buf := &bytes.Buffer{} + l := makeLogrus(buf) + sl := NewLogrusLogger(l) + sl.Error(nil, "test") + require.Equal(t, "level=error msg=test\n", buf.String()) +} + +func TestInfoWithFields(t *testing.T) { + buf := &bytes.Buffer{} + l := makeLogrus(buf) + sl := NewLogrusLogger(l) + sl.Info("test", "key", "value") + require.Equal(t, "level=info msg=test key=value\n", buf.String()) +} + +func TestErrorWithFields(t *testing.T) { + buf := &bytes.Buffer{} + l := makeLogrus(buf) + sl := NewLogrusLogger(l) + sl.Error(errors.New("e"), "test", "key", "value") + require.Equal(t, "level=error msg=test error=e key=value\n", buf.String()) +} diff --git a/internal/common/slogger/noop.go b/internal/common/slogger/noop.go new file mode 100644 index 0000000000..8782315b8b --- /dev/null +++ b/internal/common/slogger/noop.go @@ -0,0 +1,18 @@ +package slogger + +import ( + "github.com/osbuild/osbuild-composer/pkg/jobqueue" +) + +type noopLogger struct { +} + +func NewNoopLogger() jobqueue.SimpleLogger { + return &noopLogger{} +} + +func (s *noopLogger) Info(_ string, _ ...string) { +} + +func (s *noopLogger) Error(_ error, _ string, _ ...string) { +} diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 02dfc5c490..970c41a4c8 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -18,10 +18,9 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - + "github.com/osbuild/osbuild-composer/internal/common/slogger" "github.com/osbuild/osbuild-composer/pkg/jobqueue" - - logrus "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) const ( @@ -106,6 +105,7 @@ const ( ) type DBJobQueue struct { + logger jobqueue.SimpleLogger pool *pgxpool.Pool dequeuers *dequeuers stopListener func() @@ -153,8 +153,24 @@ func (d *dequeuers) notifyAll() { } } -// Create a new DBJobQueue object for `url`. +// Config allows more detailed customization of queue behavior +type Config struct { + // Logger is used for all logging of the queue, when not provided, the stanard + // global logger (logrus) is used. + Logger jobqueue.SimpleLogger +} + +// New creates a new DBJobQueue object for `url` with default configuration. func New(url string) (*DBJobQueue, error) { + stdLogger := slogger.NewLogrusLogger(logrus.StandardLogger()) + config := Config{ + Logger: stdLogger, + } + return NewWithConfig(url, config) +} + +// NewWithLogger creates a new DBJobQueue object for `url` with specific configuration. +func NewWithConfig(url string, config Config) (*DBJobQueue, error) { pool, err := pgxpool.Connect(context.Background(), url) if err != nil { return nil, fmt.Errorf("error establishing connection: %v", err) @@ -162,6 +178,7 @@ func New(url string) (*DBJobQueue, error) { listenContext, cancel := context.WithCancel(context.Background()) q := &DBJobQueue{ + logger: config.Logger, pool: pool, dequeuers: newDequeuers(), stopListener: cancel, @@ -185,7 +202,7 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { // use the empty context as the listening context is already cancelled at this point _, err := conn.Exec(context.Background(), sqlUnlisten) if err != nil && !errors.Is(err, context.DeadlineExceeded) { - logrus.Error("Error unlistening for jobs in dequeue: ", err) + q.logger.Error(err, "Error unlistening for jobs in dequeue") } conn.Release() }() @@ -202,13 +219,13 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { if err != nil { // shutdown the listener if the context is canceled if errors.Is(err, context.Canceled) { - logrus.Info("Shutting down the listener") + q.logger.Info("Shutting down the listener") return } // otherwise, just log the error and continue, there might just // be a temporary networking issue - logrus.Debugf("error waiting for notification on jobs channel: %v", err) + q.logger.Error(err, "Error waiting for notification on jobs channel") continue } @@ -236,7 +253,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu defer func() { err := tx.Rollback(context.Background()) if err != nil && !errors.Is(err, pgx.ErrTxClosed) { - logrus.Error("error rolling back enqueue transaction: ", err) + q.logger.Error(err, "Error rolling back enqueue transaction") } }() @@ -263,7 +280,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err) } - logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies) + q.logger.Info("Enqueued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) return id, nil } @@ -318,7 +335,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) } - logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) + q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) return id, token, dependencies, jobType, args, nil } @@ -372,7 +389,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) } - logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) + q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) return token, dependencies, jobType, args, nil } @@ -391,7 +408,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { defer func() { err = tx.Rollback(context.Background()) if err != nil && !errors.Is(err, pgx.ErrTxClosed) { - logrus.Errorf("error rolling back finish job transaction for job %s: %v", id, err) + q.logger.Error(err, "Error rolling back finish job transaction", "job_id", id.String()) } }() @@ -440,7 +457,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return fmt.Errorf("unable to commit database transaction: %v", err) } - logrus.Infof("Finished job with ID %s", id) + q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String()) return nil } @@ -462,7 +479,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return fmt.Errorf("error canceling job %s: %v", id, err) } - logrus.Infof("Cancelled job with ID %s", id) + q.logger.Info("Cancelled job", "job_type", jobType, "job_id", id.String()) return nil } @@ -560,13 +577,13 @@ func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) { err = rows.Scan(&t) if err != nil { // Log the error and try to continue with the next row - logrus.Error("Unable to read token from heartbeats: ", err) + q.logger.Error(err, "Unable to read token from heartbeats") continue } tokens = append(tokens, t) } if rows.Err() != nil { - logrus.Error("Error reading tokens from heartbeats: ", rows.Err()) + q.logger.Error(rows.Err(), "Error reading tokens from heartbeats") } return @@ -582,10 +599,10 @@ func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) { tag, err := conn.Exec(context.Background(), sqlRefreshHeartbeat, token) if err != nil { - logrus.Error("Error refreshing heartbeat: ", err) + q.logger.Error(err, "Error refreshing heartbeat") } if tag.RowsAffected() != 1 { - logrus.Error("No rows affected when refreshing heartbeat for ", token) + q.logger.Error(nil, "No rows affected when refreshing heartbeat", "job_token", token.String()) } } diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index ae8fad8799..3d5bd8dd0f 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -79,6 +79,18 @@ type JobQueue interface { RefreshHeartbeat(token uuid.UUID) } +// SimpleLogger provides a structured logging methods for the jobqueue library. +type SimpleLogger interface { + // Info creates an info-level message and arbitrary amount of key-value string pairs which + // can be optionally mapped to fields by underlying implementations. + Info(msg string, args ...string) + + // Error creates an error-level message and arbitrary amount of key-value string pairs which + // can be optionally mapped to fields by underlying implementations. The first error argument + // can be set to nil when no context error is available. + Error(err error, msg string, args ...string) +} + var ( ErrNotExist = errors.New("job does not exist") ErrNotPending = errors.New("job is not pending")