Skip to content

Commit

Permalink
new unique jobs implementation that works on bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 15, 2024
1 parent e634e3c commit 1c3e892
Show file tree
Hide file tree
Showing 31 changed files with 1,310 additions and 1,831 deletions.
70 changes: 29 additions & 41 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,6 @@ const (
// it so River can check that inserted job kinds have a worker that can run
// them.
type Config struct {
// AdvisoryLockPrefix is a configurable 32-bit prefix that River will use
// when generating any key to acquire a Postgres advisory lock. All advisory
// locks share the same 64-bit number space, so this allows a calling
// application to guarantee that a River advisory lock will never conflict
// with one of its own by cordoning each type to its own prefix.
//
// If this value isn't set, River defaults to generating key hashes across
// the entire 64-bit advisory lock number space, which is large enough that
// conflicts are exceedingly unlikely. If callers don't strictly need this
// option then it's recommended to leave it unset because the prefix leaves
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
Expand Down Expand Up @@ -339,7 +321,6 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -451,7 +432,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// original object, so everything that we care about must be initialized
// here, even if it's only carrying over the original value.
config = &Config{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
Expand Down Expand Up @@ -488,10 +468,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
producersByQueueName: make(map[string]*producer),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}

Expand Down Expand Up @@ -589,8 +566,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

{
periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
NotifyInsert: client.maybeNotifyInsertForQueues,
NotifyInsert: client.maybeNotifyInsertForQueues,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals
Expand Down Expand Up @@ -1150,10 +1126,10 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
return nil, fmt.Errorf("error marshaling args to JSON: %w", err)
}

if insertOpts == nil {
Expand All @@ -1175,7 +1151,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

if err := validateQueueName(queue); err != nil {
return nil, nil, err
return nil, err
}

tags := insertOpts.Tags
Expand All @@ -1187,24 +1163,24 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
return nil, errors.New("tags should be a maximum of 255 characters long")
}
if !tagRE.MatchString(tag) {
return nil, nil, errors.New("tags should match regex " + tagRE.String())
return nil, errors.New("tags should match regex " + tagRE.String())
}
}
}

if priority > 4 {
return nil, nil, errors.New("priority must be between 1 and 4")
return nil, errors.New("priority must be between 1 and 4")
}

uniqueOpts := insertOpts.UniqueOpts
if uniqueOpts.isEmpty() {
uniqueOpts = jobInsertOpts.UniqueOpts
}
if err := uniqueOpts.validate(); err != nil {
return nil, nil, err
return nil, err
}

metadata := insertOpts.Metadata
Expand All @@ -1223,6 +1199,11 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
if !uniqueOpts.isEmpty() {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}

switch {
case !insertOpts.ScheduledAt.IsZero():
Expand All @@ -1241,7 +1222,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, (*dbunique.UniqueOpts)(&uniqueOpts), nil
return insertParams, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand Down Expand Up @@ -1290,7 +1271,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
params, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
if err != nil {
return nil, err
}
Expand All @@ -1301,7 +1282,14 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
}
defer tx.Rollback(ctx)

jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
// TODO: move into insertParamsFromConfigArgsAndOptions
// params.UniqueKey = uniqueOpts.UniqueKey()
// params.UniqueStates = uniqueOpts.UniqueStates()

jobInsertRes, err := tx.JobInsertFast(ctx, params)
// JobInsertFastParams: params,
// UniqueKey: uniqueKeyHash[:],
// })
if err != nil {
return nil, err
}
Expand All @@ -1313,7 +1301,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

return jobInsertRes, nil
return (*rivertype.JobInsertResult)(jobInsertRes), nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1419,8 +1407,8 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
}
Expand Down Expand Up @@ -1448,7 +1436,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParams[i], err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1577,7 +1565,7 @@ func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverd
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParams[i], err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1c3e892

Please sign in to comment.