Skip to content

Commit

Permalink
support uniqueness for specific args via struct tags
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 15, 2024
1 parent 1c3e892 commit 88b4903
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 170 deletions.
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

insertParams := &riverdriver.JobInsertFastParams{
Args: args,
CreatedAt: createdAt,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4035,6 +4035,7 @@ func Test_Client_SubscribeConfig(t *testing.T) {
)
for i := 0; i < numJobsToInsert; i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
Args: &JobArgs{},
EncodedArgs: []byte(`{}`),
Kind: kind,
MaxAttempts: rivercommon.MaxAttemptsDefault,
Expand Down Expand Up @@ -5060,6 +5061,14 @@ func TestClient_JobTimeout(t *testing.T) {
}
}

type JobArgsStaticKind struct {
kind string
}

func (a JobArgsStaticKind) Kind() string {
return a.kind
}

func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -5190,6 +5199,33 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) {
t.Parallel()

uniqueOpts := UniqueOpts{ByArgs: true}

type PartialArgs struct {
JobArgsStaticKind
Included bool `json:"included" unique:"true"`
Excluded bool `json:"excluded"`
}

args := PartialArgs{
JobArgsStaticKind: JobArgsStaticKind{kind: "partialArgs"},
Included: true,
Excluded: true,
}

params, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true}

expectedKey := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params)

require.Equal(t, expectedKey, params.UniqueKey)
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/riverqueue/river/rivertype v0.11.4
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.17.3
github.com/tidwall/sjson v1.2.5
go.uber.org/goleak v1.3.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.17.0
Expand All @@ -26,6 +28,8 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94=
github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
Expand Down
54 changes: 44 additions & 10 deletions internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package dbunique

import (
"crypto/sha256"
"slices"
"strings"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
"github.com/tidwall/sjson"
)

// When a job has specified unique options, but has not set the ByState
Expand Down Expand Up @@ -80,8 +80,50 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
}

if uniqueOpts.ByArgs {
encodedArgsForUnique := params.EncodedArgs

{
// Get unique JSON keys from the JobArgs struct:
uniqueFields, err := getSortedUniqueFieldsCached(params.Args)
if err != nil {
// Instead of returning an error, we can just proceed using the entire
// encoded args.
//
// TODO: log or how will we find out about this?
goto AppendUniqueArgs
}

if len(uniqueFields) == 0 {
// Use all encoded args if no unique fields are specified.
goto AppendUniqueArgs
}

// Extract unique values from the EncodedArgs JSON
uniqueValues := extractUniqueValues(params.EncodedArgs, uniqueFields)

// Assemble the JSON object using bytes.Buffer
// Better to overallocate a bit than to allocate multiple times, so just
// assume we'll cap out at the length of the full encoded args.
sortedJSONWithOnlyUniqueValues := make([]byte, 0, len(params.EncodedArgs))

sjsonOpts := &sjson.Options{ReplaceInPlace: true}
for i, key := range uniqueFields {
if uniqueValues[i] == "undefined" {
continue
}
sortedJSONWithOnlyUniqueValues, err = sjson.SetRawBytesOptions(sortedJSONWithOnlyUniqueValues, key, []byte(uniqueValues[i]), sjsonOpts)
if err != nil {
// Should not happen unless key was invalid
goto AppendUniqueArgs
}
}

encodedArgsForUnique = sortedJSONWithOnlyUniqueValues
}

AppendUniqueArgs:
sb.WriteString("&args=")
sb.Write(params.EncodedArgs)
sb.Write(encodedArgsForUnique)
}

if uniqueOpts.ByPeriod != time.Duration(0) {
Expand All @@ -93,14 +135,6 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
sb.WriteString("&queue=" + params.Queue)
}

stateSet := defaultUniqueStatesStrings
if len(uniqueOpts.ByState) > 0 {
stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) })
slices.Sort(stateSet)
}

sb.WriteString("&state=" + strings.Join(stateSet, ","))

return sb.String()
}

Expand Down
Loading

0 comments on commit 88b4903

Please sign in to comment.