diff --git a/go.mod b/go.mod index 04ece05..c99720e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/luno/reflex -go 1.21 +go 1.22 require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0 @@ -8,6 +8,7 @@ require ( github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/client_model v0.3.0 + github.com/sebdah/goldie/v2 v2.5.3 github.com/stretchr/testify v1.8.3 go.opentelemetry.io/otel v1.14.0 go.opentelemetry.io/otel/sdk v1.14.0 @@ -52,6 +53,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/sergi/go-diff v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/net v0.11.0 // indirect diff --git a/go.sum b/go.sum index ee708c4..9ef4f3c 100644 --- a/go.sum +++ b/go.sum @@ -1962,6 +1962,7 @@ github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624 github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= +github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= diff --git a/rsql/db.go b/rsql/db.go index 356c0b2..27f9263 100644 --- a/rsql/db.go +++ b/rsql/db.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "strconv" + "strings" "testing" "time" @@ -75,6 +76,68 @@ func makeDefaultInserter(schema eTableSchema) inserter { } } +// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options. +func makeDefaultManyInserter(schema eTableSchema) manyInserter { + return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error { + if len(events) == 0 { + return nil + } + q, args, err := makeInsertManyQuery(ctx, schema, events) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, q, args...) + return errors.Wrap(err, "insert error") + } +} + +func makeInsertManyQuery( + ctx context.Context, + schema eTableSchema, + events []EventToInsert, +) (query string, args []any, err error) { + spanCtx, hasTrace := tracing.Extract(ctx) + var traceData []byte + if schema.traceField != "" && hasTrace { + d, err := tracing.Marshal(spanCtx) + if err != nil { + return "", nil, err + } + traceData = d + } + + cols := []string{schema.foreignIDField, schema.typeField, schema.timeField} + if schema.metadataField != "" { + cols = append(cols, schema.metadataField) + } + if traceData != nil { + cols = append(cols, schema.traceField) + } + + q := "insert into " + schema.name + " (" + strings.Join(cols, ", ") + ") values" + + for i, e := range events { + vals := []string{"?", "?", "now(6)"} + args = append(args, e.ForeignID, e.Type.ReflexType()) + if schema.metadataField != "" { + vals = append(vals, "?") + args = append(args, e.Metadata) + } else if e.Metadata != nil { + return "", nil, errors.New("metadata not enabled") + } + if traceData != nil { + vals = append(vals, "?") + args = append(args, traceData) + } + if i > 0 { + q += "," + } + q += " (" + strings.Join(vals, ", ") + ")" + } + + return q, args, nil +} + type row interface { Scan(dest ...interface{}) error } @@ -158,7 +221,14 @@ func getNextEvents(ctx context.Context, dbc *sql.DB, schema eTableSchema, } // GetNextEventsForTesting fetches a bunch of events from the event table -func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error) { +func GetNextEventsForTesting( + ctx context.Context, + _ *testing.T, + dbc *sql.DB, + table *EventsTable, + after int64, + lag time.Duration, +) ([]*reflex.Event, error) { return getNextEvents(ctx, dbc, table.schema, after, lag) } @@ -287,7 +357,16 @@ func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter { // NB: See the documentation is the following link on the behaviour of "on last_insert_id()" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id q := fmt.Sprintf( "insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)", - schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorCreatedAtField, schema.errorUpdatedAtField, schema.errorStatusField, schema.idField, schema.idField) + schema.name, + schema.eventConsumerField, + schema.eventIDField, + schema.errorMsgField, + schema.errorCreatedAtField, + schema.errorUpdatedAtField, + schema.errorStatusField, + schema.idField, + schema.idField, + ) return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) { r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus) // If the error has already been written then we can ignore the error diff --git a/rsql/db_internal_test.go b/rsql/db_internal_test.go new file mode 100644 index 0000000..43a3c3b --- /dev/null +++ b/rsql/db_internal_test.go @@ -0,0 +1,106 @@ +package rsql + +import ( + "context" + "testing" + + "github.com/luno/jettison/jtest" + "github.com/sebdah/goldie/v2" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/luno/reflex/internal/tracing" +) + +//go:generate go test . -run Test_makeInsertManyQuery -update -clean + +func Test_makeInsertManyQuery(t *testing.T) { + ctx := context.Background() + + type res struct { + Q string + Args []any + } + + defaultSchema := eTableSchema{ + name: "events", + idField: "id", + timeField: "timestamp", + typeField: "type", + foreignIDField: "foreign_id", + } + + t.Run("empty", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, nil) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) + + t.Run("one", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid1", testEventType(1), nil}, + }) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) + + t.Run("two", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid1", testEventType(1), nil}, + {"fid2", testEventType(2), nil}, + }) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) + + t.Run("more", func(t *testing.T) { + var events []EventToInsert + for i := range 100 { + events = append(events, EventToInsert{"fid", testEventType(i), nil}) + } + q, args, err := makeInsertManyQuery(ctx, defaultSchema, events) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) + + t.Run("metadata_error", func(t *testing.T) { + _, _, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid1", testEventType(1), []byte("metadata")}, + }) + require.ErrorContains(t, err, "metadata not enable") + }) + + t.Run("with_metadata", func(t *testing.T) { + schemaWithMetadata := defaultSchema + schemaWithMetadata.metadataField = "metadata" + q, args, err := makeInsertManyQuery(ctx, schemaWithMetadata, []EventToInsert{ + {"fid1", testEventType(1), []byte("metadata")}, + }) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) + + t.Run("with_trace", func(t *testing.T) { + schemaWithTrace := defaultSchema + schemaWithTrace.traceField = "trace" + traceID, err := trace.TraceIDFromHex("00000000000000000000000000000009") + jtest.RequireNil(t, err) + spanID, err := trace.SpanIDFromHex("0000000000000002") + jtest.RequireNil(t, err) + data, err := tracing.Marshal(trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + })) + jtest.RequireNil(t, err) + ctx := tracing.Inject(ctx, data) + q, args, err := makeInsertManyQuery(ctx, schemaWithTrace, []EventToInsert{ + {"fid1", testEventType(1), nil}, + }) + jtest.RequireNil(t, err) + goldie.New(t).AssertJson(t, t.Name(), res{q, args}) + }) +} + +type testEventType int + +func (t testEventType) ReflexType() int { return int(t) } diff --git a/rsql/eventstable.go b/rsql/eventstable.go index 3cfeeed..8fa8b25 100644 --- a/rsql/eventstable.go +++ b/rsql/eventstable.go @@ -44,10 +44,20 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable { table.inserter = makeDefaultInserter(table.schema) } + if table.manyInserter == nil { + table.manyInserter = makeDefaultManyInserter(table.schema) + } + table.gapCh = make(chan Gap) table.gapListeners = make(chan GapListenFunc) table.gapListenDone = make(chan struct{}) - table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents) + table.currentLoader = buildLoader( + table.baseLoader, + table.gapCh, + table.disableCache, + table.schema, + table.includeNoopEvents, + ) return table } @@ -175,10 +185,28 @@ func WithEventsInserter(inserter inserter) EventsOption { } } +// WithEventsManyInserter provides an option to set the event inserter +// which inserts many events into a sql table. The default inserter is +// configured with the WithEventsXField options. +func WithEventsManyInserter(manyInserter manyInserter) EventsOption { + return func(table *EventsTable) { + table.manyInserter = manyInserter + } +} + // inserter abstracts the insertion of an event into a sql table. type inserter func(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, metadata []byte) error +type EventToInsert struct { + ForeignID string + Type reflex.EventType + Metadata []byte +} + +// manyInserter abstracts the insertion of many events into a sql table. +type manyInserter func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error + // EventsTable provides reflex event insertion and streaming // for a sql db table. type EventsTable struct { @@ -189,6 +217,7 @@ type EventsTable struct { includeNoopEvents bool baseLoader loader inserter inserter + manyInserter manyInserter // Stateful fields not cloned currentLoader filterLoader @@ -230,6 +259,25 @@ func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreig return t.notifier.Notify, nil } +// InsertMany inserts a many events into the EventsTable. +func (t *EventsTable) InsertMany( + ctx context.Context, + tx *sql.Tx, + events []EventToInsert, +) (NotifyFunc, error) { + for _, e := range events { + if isNoop(e.ForeignID, e.Type) { + return nil, errors.New("inserting invalid noop event") + } + } + err := t.manyInserter(ctx, tx, events...) + if err != nil { + return noopFunc, err + } + + return t.notifier.Notify, nil +} + // Clone returns a new events table generated from the config of t with the new options applied. // Note that non-config fields are not copied, so things like the cache and inmemnotifier // are not shared. @@ -312,7 +360,13 @@ func (t *EventsTable) getSchema() eTableSchema { } // buildLoader returns a new layered event loader. -func buildLoader(baseLoader loader, ch chan<- Gap, disableCache bool, schema eTableSchema, withNoopEvents bool) filterLoader { +func buildLoader( + baseLoader loader, + ch chan<- Gap, + disableCache bool, + schema eTableSchema, + withNoopEvents bool, +) filterLoader { if baseLoader == nil { baseLoader = makeBaseLoader(schema) } diff --git a/rsql/eventstable_test.go b/rsql/eventstable_test.go index 19071a6..7fa25ff 100644 --- a/rsql/eventstable_test.go +++ b/rsql/eventstable_test.go @@ -542,3 +542,25 @@ func TestCloneInserter(t *testing.T) { require.Equal(t, 2, i1) require.Equal(t, 1, i2) } + +func TestInsertMany(t *testing.T) { + ctx := context.Background() + dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable()) + + table1 := rsql.NewEventsTable(eventsTable) + + tx, _ := dbc.Begin() + _, err := table1.InsertMany(ctx, tx, []rsql.EventToInsert{ + {"fid1", testEventType(1), nil}, + {"fid2", testEventType(2), nil}, + {"fid3", testEventType(3), nil}, + }) + jtest.RequireNil(t, err) + err = tx.Commit() + jtest.RequireNil(t, err) + el, err := rsql.GetNextEventsForTesting(context.Background(), t, dbc, table1, 0, 0) + jtest.RequireNil(t, err) + assert.Equal(t, "fid1", el[0].ForeignID) + assert.Equal(t, "fid2", el[1].ForeignID) + assert.Equal(t, "fid3", el[2].ForeignID) +} diff --git a/rsql/testdata/Test_makeInsertManyQuery/empty.golden b/rsql/testdata/Test_makeInsertManyQuery/empty.golden new file mode 100644 index 0000000..4733b05 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/empty.golden @@ -0,0 +1,4 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp) values", + "Args": null +} \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/more.golden b/rsql/testdata/Test_makeInsertManyQuery/more.golden new file mode 100644 index 0000000..a008580 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/more.golden @@ -0,0 +1,205 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp) values (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6))", + "Args": [ + "fid", + 0, + "fid", + 1, + "fid", + 2, + "fid", + 3, + "fid", + 4, + "fid", + 5, + "fid", + 6, + "fid", + 7, + "fid", + 8, + "fid", + 9, + "fid", + 10, + "fid", + 11, + "fid", + 12, + "fid", + 13, + "fid", + 14, + "fid", + 15, + "fid", + 16, + "fid", + 17, + "fid", + 18, + "fid", + 19, + "fid", + 20, + "fid", + 21, + "fid", + 22, + "fid", + 23, + "fid", + 24, + "fid", + 25, + "fid", + 26, + "fid", + 27, + "fid", + 28, + "fid", + 29, + "fid", + 30, + "fid", + 31, + "fid", + 32, + "fid", + 33, + "fid", + 34, + "fid", + 35, + "fid", + 36, + "fid", + 37, + "fid", + 38, + "fid", + 39, + "fid", + 40, + "fid", + 41, + "fid", + 42, + "fid", + 43, + "fid", + 44, + "fid", + 45, + "fid", + 46, + "fid", + 47, + "fid", + 48, + "fid", + 49, + "fid", + 50, + "fid", + 51, + "fid", + 52, + "fid", + 53, + "fid", + 54, + "fid", + 55, + "fid", + 56, + "fid", + 57, + "fid", + 58, + "fid", + 59, + "fid", + 60, + "fid", + 61, + "fid", + 62, + "fid", + 63, + "fid", + 64, + "fid", + 65, + "fid", + 66, + "fid", + 67, + "fid", + 68, + "fid", + 69, + "fid", + 70, + "fid", + 71, + "fid", + 72, + "fid", + 73, + "fid", + 74, + "fid", + 75, + "fid", + 76, + "fid", + 77, + "fid", + 78, + "fid", + 79, + "fid", + 80, + "fid", + 81, + "fid", + 82, + "fid", + 83, + "fid", + 84, + "fid", + 85, + "fid", + 86, + "fid", + 87, + "fid", + 88, + "fid", + 89, + "fid", + 90, + "fid", + 91, + "fid", + 92, + "fid", + 93, + "fid", + 94, + "fid", + 95, + "fid", + 96, + "fid", + 97, + "fid", + 98, + "fid", + 99 + ] +} \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/one.golden b/rsql/testdata/Test_makeInsertManyQuery/one.golden new file mode 100644 index 0000000..7dbe01e --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/one.golden @@ -0,0 +1,7 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp) values (?, ?, now(6))", + "Args": [ + "fid1", + 1 + ] +} \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/two.golden b/rsql/testdata/Test_makeInsertManyQuery/two.golden new file mode 100644 index 0000000..835d330 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/two.golden @@ -0,0 +1,9 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp) values (?, ?, now(6)), (?, ?, now(6))", + "Args": [ + "fid1", + 1, + "fid2", + 2 + ] +} \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden b/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden new file mode 100644 index 0000000..807f902 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden @@ -0,0 +1,8 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp, metadata) values (?, ?, now(6), ?)", + "Args": [ + "fid1", + 1, + "bWV0YWRhdGE=" + ] +} \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden b/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden new file mode 100644 index 0000000..2f87460 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden @@ -0,0 +1,8 @@ +{ + "Q": "insert into events (foreign_id, type, timestamp, trace) values (?, ?, now(6), ?)", + "Args": [ + "fid1", + 1, + "CiAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwORIQMDAwMDAwMDAwMDAwMDAwMg==" + ] +} \ No newline at end of file