diff --git a/client.go b/client.go index a7c49eb8..ad3a74c1 100644 --- a/client.go +++ b/client.go @@ -1189,6 +1189,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf } insertParams := &riverdriver.JobInsertFastParams{ + Args: args, CreatedAt: createdAt, EncodedArgs: json.RawMessage(encodedArgs), Kind: args.Kind(), diff --git a/client_test.go b/client_test.go index 8809e65d..00fb00c8 100644 --- a/client_test.go +++ b/client_test.go @@ -4035,6 +4035,7 @@ func Test_Client_SubscribeConfig(t *testing.T) { ) for i := 0; i < numJobsToInsert; i++ { insertParams[i] = &riverdriver.JobInsertFastParams{ + Args: &JobArgs{}, EncodedArgs: []byte(`{}`), Kind: kind, MaxAttempts: rivercommon.MaxAttemptsDefault, @@ -5060,6 +5061,14 @@ func TestClient_JobTimeout(t *testing.T) { } } +type JobArgsStaticKind struct { + kind string +} + +func (a JobArgsStaticKind) Kind() string { + return a.kind +} + func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() @@ -5190,6 +5199,33 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates) }) + t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) { + t.Parallel() + + uniqueOpts := UniqueOpts{ByArgs: true} + + type PartialArgs struct { + JobArgsStaticKind + Included bool `json:"included" unique:"true"` + Excluded bool `json:"excluded"` + } + + args := PartialArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "partialArgs"}, + Included: true, + Excluded: true, + } + + params, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts}) + require.NoError(t, err) + internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true} + + expectedKey := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params) + + require.Equal(t, expectedKey, params.UniqueKey) + require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates) + }) + t.Run("PriorityIsLimitedTo4", func(t *testing.T) { t.Parallel() diff --git a/go.mod b/go.mod index 8fcce744..2675a819 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( github.com/riverqueue/river/rivertype v0.11.4 github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.9.0 + github.com/tidwall/gjson v1.17.3 + github.com/tidwall/sjson v1.2.5 go.uber.org/goleak v1.3.0 golang.org/x/sync v0.8.0 golang.org/x/text v0.17.0 @@ -26,6 +28,8 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/lib/pq v1.10.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/crypto v0.17.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5b557d6a..40d6626d 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= +github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= diff --git a/go.work.sum b/go.work.sum index dde2f1fd..b9b39ddb 100644 --- a/go.work.sum +++ b/go.work.sum @@ -27,11 +27,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index 2370afb5..2b1ac596 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -2,7 +2,6 @@ package dbunique import ( "crypto/sha256" - "slices" "strings" "time" @@ -10,6 +9,7 @@ import ( "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" + "github.com/tidwall/sjson" ) // When a job has specified unique options, but has not set the ByState @@ -80,8 +80,50 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO } if uniqueOpts.ByArgs { + encodedArgsForUnique := params.EncodedArgs + + { + // Get unique JSON keys from the JobArgs struct: + uniqueFields, err := getSortedUniqueFieldsCached(params.Args) + if err != nil { + // Instead of returning an error, we can just proceed using the entire + // encoded args. + // + // TODO: log or how will we find out about this? + goto AppendUniqueArgs + } + + if len(uniqueFields) == 0 { + // Use all encoded args if no unique fields are specified. + goto AppendUniqueArgs + } + + // Extract unique values from the EncodedArgs JSON + uniqueValues := extractUniqueValues(params.EncodedArgs, uniqueFields) + + // Assemble the JSON object using bytes.Buffer + // Better to overallocate a bit than to allocate multiple times, so just + // assume we'll cap out at the length of the full encoded args. + sortedJSONWithOnlyUniqueValues := make([]byte, 0, len(params.EncodedArgs)) + + sjsonOpts := &sjson.Options{ReplaceInPlace: true} + for i, key := range uniqueFields { + if uniqueValues[i] == "undefined" { + continue + } + sortedJSONWithOnlyUniqueValues, err = sjson.SetRawBytesOptions(sortedJSONWithOnlyUniqueValues, key, []byte(uniqueValues[i]), sjsonOpts) + if err != nil { + // Should not happen unless key was invalid + goto AppendUniqueArgs + } + } + + encodedArgsForUnique = sortedJSONWithOnlyUniqueValues + } + + AppendUniqueArgs: sb.WriteString("&args=") - sb.Write(params.EncodedArgs) + sb.Write(encodedArgsForUnique) } if uniqueOpts.ByPeriod != time.Duration(0) { @@ -93,14 +135,6 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO sb.WriteString("&queue=" + params.Queue) } - stateSet := defaultUniqueStatesStrings - if len(uniqueOpts.ByState) > 0 { - stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) - slices.Sort(stateSet) - } - - sb.WriteString("&state=" + strings.Join(stateSet, ",")) - return sb.String() } diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index bb51de22..7b06f72e 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -2,222 +2,249 @@ package dbunique import ( "crypto/sha256" + "encoding/json" + "fmt" "slices" "testing" "time" "github.com/stretchr/testify/require" - "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivertype" ) -const queueAlternate = "alternate_queue" - -func makeInsertParams(createdAt *time.Time) *riverdriver.JobInsertFastParams { - return &riverdriver.JobInsertFastParams{ - CreatedAt: createdAt, - EncodedArgs: []byte(`{}`), - Kind: "fake_job", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: nil, - State: rivertype.JobStateAvailable, - } +type JobArgsStaticKind struct { + kind string } -func TestDefaultUniqueStatesSorted(t *testing.T) { - t.Parallel() - - states := slices.Clone(defaultUniqueStates) - slices.Sort(states) - require.Equal(t, states, defaultUniqueStates, "Default unique states should be sorted") +func (a JobArgsStaticKind) Kind() string { + return a.kind } func TestUniqueKey(t *testing.T) { - // Fixed time for deterministic tests - fixedTime := time.Date(2024, 9, 14, 12, 0, 0, 0, time.UTC) - // timeGen := &MockTimeGenerator{FixedTime: fixedTime} - stubSvc := &riversharedtest.TimeStub{} - stubSvc.StubNowUTC(fixedTime) - - // Define test parameters - params := &riverdriver.JobInsertFastParams{ - Kind: "email", - EncodedArgs: []byte(`{"to":"user@example.com","subject":"Test Email"}`), - Queue: "default", - } + t.Parallel() - shasum := func(s string) []byte { - value := sha256.Sum256([]byte(s)) - return value[:] - } + // Fixed timestamp for consistency across tests: + now := time.Now().UTC() + stubSvc := &riversharedtest.TimeStub{} + stubSvc.StubNowUTC(now) tests := []struct { - name string - uniqueOpts *UniqueOpts - params *riverdriver.JobInsertFastParams - expectedKey []byte + name string + argsFn func() rivertype.JobArgs + uniqueOpts UniqueOpts + expectedJSON string }{ { - name: "DefaultOptions", - uniqueOpts: &UniqueOpts{}, - params: params, - expectedKey: shasum( - "&kind=email&state=Available,Completed,Pending,Retryable,Running,Scheduled", - ), - }, - { - name: "ExcludeKind", - uniqueOpts: &UniqueOpts{ - ExcludeKind: true, + name: "ByArgsWithMultipleUniqueStructTagsAndDefaultStates", + argsFn: func() rivertype.JobArgs { + type EmailJobArgs struct { + JobArgsStaticKind + Recipient string `json:"recipient" unique:"true"` + Subject string `json:"subject" unique:"true"` + Body string `json:"body"` + TemplateID int `json:"template_id"` + ScheduledAt string `json:"scheduled_at"` + } + return EmailJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_1"}, + Recipient: "user@example.com", + Subject: "Test Email", + Body: "This is a test email.", + TemplateID: 101, + ScheduledAt: "2024-09-15T10:00:00Z", + } }, - params: params, - expectedKey: shasum( - "&state=Available,Completed,Pending,Retryable,Running,Scheduled", - ), + uniqueOpts: UniqueOpts{ByArgs: true}, + expectedJSON: `&kind=worker_1&args={"recipient":"user@example.com","subject":"Test Email"}`, }, { - name: "ByArgs", - uniqueOpts: &UniqueOpts{ - ByArgs: true, + name: "ByArgsWithUniqueFieldsSomeEmpty", + argsFn: func() rivertype.JobArgs { + type SMSJobArgs struct { + JobArgsStaticKind + PhoneNumber string `json:"phone_number" unique:"true"` + Message string `json:"message,omitempty" unique:"true"` + TemplateID int `json:"template_id"` + } + return SMSJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_2"}, + PhoneNumber: "555-5678", + Message: "", // Empty unique field, omitted from key + TemplateID: 202, + } }, - params: params, - expectedKey: shasum( - "&kind=email&args={\"to\":\"user@example.com\",\"subject\":\"Test Email\"}&state=Available,Completed,Pending,Retryable,Running,Scheduled", - ), + uniqueOpts: UniqueOpts{ByArgs: true}, + expectedJSON: `&kind=worker_2&args={"phone_number":"555-5678"}`, }, { - name: "ByPeriod", - uniqueOpts: &UniqueOpts{ - ByPeriod: 2 * time.Hour, - }, - params: params, - expectedKey: shasum( - "&kind=email&period=2024-09-14T12:00:00Z&state=Available,Completed,Pending,Retryable,Running,Scheduled", - ), - }, - { - name: "ByQueue", - uniqueOpts: &UniqueOpts{ - ByQueue: true, + name: "ByArgsUniqueWithNoJSONTagsUsesFieldName", + argsFn: func() rivertype.JobArgs { + type EmailJobArgs struct { + JobArgsStaticKind + Recipient string `unique:"true"` + Subject string `unique:"true"` + TemplateID int + } + return EmailJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_1"}, + Recipient: "john@example.com", + Subject: "Another Test Email", + TemplateID: 102, + } }, - params: params, - expectedKey: shasum("&kind=email&queue=default&state=Available,Completed,Pending,Retryable,Running,Scheduled"), + uniqueOpts: UniqueOpts{ByArgs: true}, + expectedJSON: `&kind=worker_1&args={"Recipient":"john@example.com","Subject":"Another Test Email"}`, }, { - name: "ByState", - uniqueOpts: &UniqueOpts{ - ByState: []rivertype.JobState{ - rivertype.JobStateCancelled, - rivertype.JobStateDiscarded, - }, + name: "ByArgsWithPointerToStruct", + argsFn: func() rivertype.JobArgs { + type EmailJobArgs struct { + JobArgsStaticKind + Recipient string `json:"recipient" unique:"true"` + Subject string `json:"subject" unique:"true"` + Body string `json:"body"` + } + return &EmailJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_1"}, + Recipient: "john@example.com", + Subject: "Another Test Email", + Body: "This is another test email.", + } }, - params: params, - expectedKey: shasum("&kind=email&state=Cancelled,Discarded"), + uniqueOpts: UniqueOpts{ByArgs: true}, + expectedJSON: `&kind=worker_1&args={"recipient":"john@example.com","subject":"Another Test Email"}`, }, { - name: "CombinationOptions", - uniqueOpts: &UniqueOpts{ - ByArgs: true, - ByPeriod: 1 * time.Hour, - ByQueue: true, - ByState: []rivertype.JobState{rivertype.JobStateRunning, rivertype.JobStatePending}, - ExcludeKind: false, + name: "ByArgsWithNoUniqueFields", + argsFn: func() rivertype.JobArgs { + type GenericJobArgs struct { + JobArgsStaticKind + Description string `json:"description"` + Count int `json:"count"` + foo string // won't be marshaled in JSON + } + return GenericJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_3"}, + Description: "A generic job without unique fields.", + Count: 10, + foo: "bar", + } }, - params: params, - expectedKey: shasum( - "&kind=email&args={\"to\":\"user@example.com\",\"subject\":\"Test Email\"}&period=2024-09-14T12:00:00Z&queue=default&state=Pending,Running", - ), + uniqueOpts: UniqueOpts{ByArgs: true}, + expectedJSON: `&kind=worker_3&args={"description":"A generic job without unique fields.","count":10}`, }, { - name: "EmptyUniqueOpts", - uniqueOpts: &UniqueOpts{ - ByArgs: false, - ByPeriod: 0, - ByQueue: false, - ByState: nil, + name: "CustomByStateWithPeriod", + argsFn: func() rivertype.JobArgs { + type TaskJobArgs struct { + JobArgsStaticKind + TaskID string + } + return TaskJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_4"}, + TaskID: "task_123", + } }, - params: params, - expectedKey: shasum("&kind=email&state=Available,Completed,Pending,Retryable,Running,Scheduled"), + uniqueOpts: UniqueOpts{ByPeriod: time.Hour, ByState: []rivertype.JobState{rivertype.JobStateCompleted}}, + expectedJSON: fmt.Sprintf(`&kind=worker_4&period=%s`, now.Truncate(time.Hour).Format(time.RFC3339)), }, { - name: "EmptyEncodedArgs", - uniqueOpts: &UniqueOpts{ - ByArgs: true, - }, - params: &riverdriver.JobInsertFastParams{ - Kind: "email", - EncodedArgs: []byte{}, - Queue: "default", + name: "ExcludeKindByArgs", + argsFn: func() rivertype.JobArgs { + type TaskJobArgs struct { + JobArgsStaticKind + TaskID string `json:"task_id"` + } + return TaskJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_5"}, + TaskID: "task_123", + } }, - expectedKey: shasum("&kind=email&args=&state=Available,Completed,Pending,Retryable,Running,Scheduled"), + uniqueOpts: UniqueOpts{ByArgs: true, ExcludeKind: true}, + expectedJSON: `&args={"task_id":"task_123"}`, }, { - name: "SpecialCharactersInKindAndQueue", - uniqueOpts: &UniqueOpts{ - ByQueue: true, - }, - params: &riverdriver.JobInsertFastParams{ - Kind: "email¬ification", - EncodedArgs: []byte(`{"to":"user@example.com","subject":"Test Email"}`), - Queue: "default/queue", + name: "ByQueue", + argsFn: func() rivertype.JobArgs { + type TaskJobArgs struct { + JobArgsStaticKind + TaskID string `json:"task_id"` + } + return TaskJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_6"}, + TaskID: "task_123", + } }, - expectedKey: shasum( - "&kind=email¬ification&queue=default/queue&state=Available,Completed,Pending,Retryable,Running,Scheduled", - ), + uniqueOpts: UniqueOpts{ByQueue: true}, + expectedJSON: `&kind=worker_6&queue=email_queue`, }, { - name: "UnknownJobState", - uniqueOpts: &UniqueOpts{ - ByState: []rivertype.JobState{ - "UnknownState", - rivertype.JobStateRunning, - }, + name: "EmptyUniqueOpts", + argsFn: func() rivertype.JobArgs { + type TaskJobArgs struct { + JobArgsStaticKind + TaskID string `json:"task_id"` + } + return TaskJobArgs{ + JobArgsStaticKind: JobArgsStaticKind{kind: "worker_7"}, + TaskID: "task_123", + } }, - params: params, - expectedKey: shasum("&kind=email&state=Running"), + uniqueOpts: UniqueOpts{}, + expectedJSON: `&kind=worker_7`, }, } for _, tt := range tests { tt := tt // capture range variable + t.Run(tt.name, func(t *testing.T) { - uniqueKey := UniqueKey(stubSvc, tt.uniqueOpts, tt.params) + t.Parallel() - // Compare the generated unique key with the expected hash - require.Equal(t, tt.expectedKey[:], uniqueKey, "UniqueKey hash does not match expected value") - }) - } + args := tt.argsFn() + + encodedArgs, err := json.Marshal(args) + require.NoError(t, err) + + states := defaultUniqueStates + if len(tt.uniqueOpts.ByState) > 0 { + states = tt.uniqueOpts.ByState + } - // Additional tests to ensure uniqueness - t.Run("DifferentUniqueOptsProduceDifferentKeys", func(t *testing.T) { - opts1 := &UniqueOpts{ByArgs: true} - opts2 := &UniqueOpts{ByQueue: true} + jobParams := &riverdriver.JobInsertFastParams{ + Args: args, + CreatedAt: &now, + EncodedArgs: encodedArgs, + Kind: args.Kind(), + Metadata: []byte(`{"source":"api"}`), + Queue: "email_queue", + ScheduledAt: &now, + State: "Pending", + Tags: []string{"notification", "email"}, + UniqueStates: UniqueStatesToBitmask(states), + } - key1 := UniqueKey(stubSvc, opts1, params) - key2 := UniqueKey(stubSvc, opts2, params) + uniqueKeyPreHash := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, jobParams) + require.Equal(t, tt.expectedJSON, uniqueKeyPreHash) + expectedHash := sha256.Sum256([]byte(tt.expectedJSON)) - require.NotEqual(t, key1, key2, "UniqueKeys should differ for different UniqueOpts") - }) + uniqueKey := UniqueKey(stubSvc, &tt.uniqueOpts, jobParams) + require.NotNil(t, uniqueKey) - t.Run("SameInputsProduceSameKey", func(t *testing.T) { - opts := &UniqueOpts{ - ByArgs: true, - ByPeriod: 30 * time.Minute, - ByQueue: true, - ByState: []rivertype.JobState{rivertype.JobStateRunning}, - ExcludeKind: false, - } - key1 := UniqueKey(stubSvc, opts, params) - key2 := UniqueKey(stubSvc, opts, params) + require.Equal(t, expectedHash[:], uniqueKey, "UniqueKey hash does not match expected value") + }) + } +} + +func TestDefaultUniqueStatesSorted(t *testing.T) { + t.Parallel() - require.Equal(t, key1, key2, "UniqueKeys should be identical for the same inputs") - }) + states := slices.Clone(defaultUniqueStates) + slices.Sort(states) + require.Equal(t, states, defaultUniqueStates, "Default unique states should be sorted") } func TestUniqueOptsIsEmpty(t *testing.T) { @@ -268,5 +295,3 @@ func TestUniqueStatesToBitmask(t *testing.T) { require.Equal(t, byte(1<<(7-position)), bitmask, "Bitmask should be set for single state %s", state) } } - -// TODO(bgentry): tests for new functions/methods in dbunique diff --git a/internal/dbunique/unique_fields.go b/internal/dbunique/unique_fields.go new file mode 100644 index 00000000..cb942d23 --- /dev/null +++ b/internal/dbunique/unique_fields.go @@ -0,0 +1,123 @@ +package dbunique + +import ( + "fmt" + "reflect" + "sort" + "strings" + "sync" + + "github.com/riverqueue/river/rivertype" + "github.com/tidwall/gjson" +) + +var ( + // uniqueFieldsCache caches the unique fields for each JobArgs type. These are + // global to ensure that each struct type's tags are only extracted once. + uniqueFieldsCache = make(map[reflect.Type][]string) //nolint:gochecknoglobals + cacheMutex sync.RWMutex //nolint:gochecknoglobals +) + +type jobArgs interface { + Kind() string +} + +// extractUniqueValues extracts the raw JSON values of the specified keys from the JSON-encoded args. +func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string { + // Use GetManyBytes to retrieve multiple values at once + results := gjson.GetManyBytes(encodedArgs, uniqueKeys...) + + uniqueValues := make([]string, len(results)) + for i, res := range results { + if res.Exists() { + uniqueValues[i] = res.Raw // Use Raw to get the JSON-encoded value + } else { + // Handle missing keys as "undefined" (they'll be skipped when building + // the unique key). We don't want to use "null" here because the JSON may + // actually contain "null" as a value. + uniqueValues[i] = "undefined" + } + } + + return uniqueValues +} + +// getSortedUniqueFields uses reflection to retrieve the JSON keys of fields +// marked with `unique:"true"`. The return values are the JSON keys using the +// same logic as the `json` struct tag. +func getSortedUniqueFields(args jobArgs) ([]string, error) { + typ := reflect.TypeOf(args) + + // Handle pointer to struct + if typ != nil && typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + + // Ensure we're dealing with a struct + if typ == nil || typ.Kind() != reflect.Struct { + return nil, fmt.Errorf("expected struct, got %T", args) + } + + var uniqueFields []string + + // Iterate over all fields + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + + // Check for `unique:"true"` tag + if uniqueTag, ok := field.Tag.Lookup("unique"); ok && uniqueTag == "true" { + // Get the corresponding JSON key + jsonTag := field.Tag.Get("json") + if jsonTag == "" { + // If no JSON tag, use the field name as-is + uniqueFields = append(uniqueFields, field.Name) + } else { + // Handle cases like `json:"recipient,omitempty"` + jsonKey := parseJSONTag(jsonTag) + uniqueFields = append(uniqueFields, jsonKey) + } + } + } + + // Sort the uniqueFields alphabetically for consistent ordering + sort.Strings(uniqueFields) + + return uniqueFields, nil +} + +// getSortedUniqueFieldsCached retrieves unique fields with caching to avoid +// extracting fields from the same struct type repeatedly. +func getSortedUniqueFieldsCached(args rivertype.JobArgs) ([]string, error) { + typ := reflect.TypeOf(args) + + // Check cache first + cacheMutex.RLock() + if fields, ok := uniqueFieldsCache[typ]; ok { + cacheMutex.RUnlock() + return fields, nil + } + cacheMutex.RUnlock() + + // Not in cache; retrieve using reflection + fields, err := getSortedUniqueFields(args) + if err != nil { + return nil, err + } + + // Store in cache + cacheMutex.Lock() + uniqueFieldsCache[typ] = fields + cacheMutex.Unlock() + + return fields, nil +} + +// parseJSONTag extracts the JSON key from the struct tag. +// It handles tags with options, e.g., `json:"recipient,omitempty"`. +func parseJSONTag(tag string) string { + // Tags can be like "recipient,omitempty", so split by comma + if commaIdx := strings.Index(tag, ","); commaIdx != -1 { + return tag[:commaIdx] + } + return tag +} diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 05d29172..712a1180 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -248,6 +248,10 @@ type JobGetStuckParams struct { } type JobInsertFastParams struct { + // Args contains the raw underlying job arguments struct. It has already been + // encoded into EncodedArgs, but the original is kept here for to leverage its + // struct tags and interfaces, such as for use in unique key generation. + Args rivertype.JobArgs CreatedAt *time.Time EncodedArgs []byte Kind string diff --git a/riverdriver/riverdatabasesql/go.mod b/riverdriver/riverdatabasesql/go.mod index 07f1c784..6c7f784a 100644 --- a/riverdriver/riverdatabasesql/go.mod +++ b/riverdriver/riverdatabasesql/go.mod @@ -5,6 +5,7 @@ go 1.21 toolchain go1.23.0 require ( + github.com/jackc/pgx/v5 v5.7.1 github.com/lib/pq v1.10.9 github.com/riverqueue/river/riverdriver v0.11.4 github.com/riverqueue/river/rivershared v0.11.4 diff --git a/riverdriver/riverdatabasesql/go.sum b/riverdriver/riverdatabasesql/go.sum index 49610545..ef0854f9 100644 --- a/riverdriver/riverdatabasesql/go.sum +++ b/riverdriver/riverdatabasesql/go.sum @@ -1,6 +1,14 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -19,6 +27,12 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 0f8f309c..4d3fa410 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -17,6 +17,15 @@ var ErrNotFound = errors.New("not found") // running. var ErrJobRunning = errors.New("running jobs cannot be deleted") +// JobArgs is an interface that should be implemented by the arguments to a job. +// This definition duplicates the JobArgs interface in the river package so that +// it can be used in other packages without creating a circular dependency. +type JobArgs interface { + // Kind returns a unique string that identifies the type of job. It's used to + // determine which worker should work the job. + Kind() string +} + // JobInsertResult is the result of a job insert, containing the inserted job // along with some other useful metadata. type JobInsertResult struct {