diff --git a/go.mod b/go.mod index 04ece05..c99720e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/luno/reflex -go 1.21 +go 1.22 require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0 @@ -8,6 +8,7 @@ require ( github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/client_model v0.3.0 + github.com/sebdah/goldie/v2 v2.5.3 github.com/stretchr/testify v1.8.3 go.opentelemetry.io/otel v1.14.0 go.opentelemetry.io/otel/sdk v1.14.0 @@ -52,6 +53,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/sergi/go-diff v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/net v0.11.0 // indirect diff --git a/go.sum b/go.sum index ee708c4..9ef4f3c 100644 --- a/go.sum +++ b/go.sum @@ -1962,6 +1962,7 @@ github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624 github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= +github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= diff --git a/rsql/db.go b/rsql/db.go index 356c0b2..dd83071 100644 --- a/rsql/db.go +++ b/rsql/db.go @@ -45,34 +45,82 @@ func (t eventType) ReflexType() int { // makeDefaultInserter returns the default sql inserter configured via WithEventsXField options. func makeDefaultInserter(schema eTableSchema) inserter { - return func(ctx context.Context, tx *sql.Tx, - foreignID string, typ reflex.EventType, metadata []byte, + ins := makeDefaultManyInserter(schema) + return func( + ctx context.Context, + tx *sql.Tx, + foreignID string, + typ reflex.EventType, + metadata []byte, ) error { - q := "insert into " + schema.name + - " set " + schema.foreignIDField + "=?, " + schema.timeField + "=now(6), " + schema.typeField + "=?" - args := []interface{}{foreignID, typ.ReflexType()} + return ins(ctx, tx, EventToInsert{ + ForeignID: foreignID, + Type: typ, + Metadata: metadata, + }) + } +} - if schema.metadataField != "" { - q += ", " + schema.metadataField + "=?" - args = append(args, metadata) - } else if metadata != nil { - return errors.New("metadata not enabled") +// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options. +func makeDefaultManyInserter(schema eTableSchema) manyInserter { + return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error { + if len(events) == 0 { + return nil + } + q, args, err := makeInsertManyQuery(ctx, schema, events) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, q, args...) + return errors.Wrap(err, "insert error") + } +} + +func makeInsertManyQuery( + ctx context.Context, + schema eTableSchema, + events []EventToInsert, +) (query string, args []any, err error) { + spanCtx, hasTrace := tracing.Extract(ctx) + var traceData []byte + if schema.traceField != "" && hasTrace { + d, err := tracing.Marshal(spanCtx) + if err != nil { + return "", nil, err } + traceData = d + } - spanCtx, hasTrace := tracing.Extract(ctx) - if schema.traceField != "" && hasTrace { - traceData, err := tracing.Marshal(spanCtx) - if err != nil { - return err - } + cols := schema.foreignIDField + ", " + schema.typeField + ", " + schema.timeField + if schema.metadataField != "" { + cols += ", " + schema.metadataField + } + if traceData != nil { + cols += ", " + schema.traceField + } - q += ", " + schema.traceField + "=?" + q := "insert into " + schema.name + " (" + cols + ") values" + + for i, e := range events { + vals := "?, ?, now(6)" + args = append(args, e.ForeignID, e.Type.ReflexType()) + if schema.metadataField != "" { + vals += ", ?" + args = append(args, e.Metadata) + } else if e.Metadata != nil { + return "", nil, errors.New("metadata not enabled") + } + if traceData != nil { + vals += ", ?" args = append(args, traceData) } - - _, err := tx.ExecContext(ctx, q, args...) - return errors.Wrap(err, "insert error") + if i > 0 { + q += "," + } + q += " (" + vals + ")" } + + return q, args, nil } type row interface { @@ -158,7 +206,14 @@ func getNextEvents(ctx context.Context, dbc *sql.DB, schema eTableSchema, } // GetNextEventsForTesting fetches a bunch of events from the event table -func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error) { +func GetNextEventsForTesting( + ctx context.Context, + _ *testing.T, + dbc *sql.DB, + table *EventsTable, + after int64, + lag time.Duration, +) ([]*reflex.Event, error) { return getNextEvents(ctx, dbc, table.schema, after, lag) } @@ -287,7 +342,16 @@ func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter { // NB: See the documentation is the following link on the behaviour of "on last_insert_id()" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id q := fmt.Sprintf( "insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)", - schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorCreatedAtField, schema.errorUpdatedAtField, schema.errorStatusField, schema.idField, schema.idField) + schema.name, + schema.eventConsumerField, + schema.eventIDField, + schema.errorMsgField, + schema.errorCreatedAtField, + schema.errorUpdatedAtField, + schema.errorStatusField, + schema.idField, + schema.idField, + ) return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) { r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus) // If the error has already been written then we can ignore the error diff --git a/rsql/db_internal_test.go b/rsql/db_internal_test.go new file mode 100644 index 0000000..0e46067 --- /dev/null +++ b/rsql/db_internal_test.go @@ -0,0 +1,117 @@ +package rsql + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/luno/jettison/jtest" + "github.com/sebdah/goldie/v2" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/luno/reflex/internal/tracing" +) + +//go:generate go test . -run Test_makeInsertManyQuery -update -clean + +func Test_makeInsertManyQuery(t *testing.T) { + ctx := context.Background() + + defaultSchema := eTableSchema{ + name: "events", + idField: "id", + timeField: "timestamp", + typeField: "type", + foreignIDField: "foreign_id", + } + + assert := func(t *testing.T, q string, args []any) { + buf := new(bytes.Buffer) + buf.WriteString(q) + buf.WriteString("\n") + for _, arg := range args { + buf.WriteString("\n") + buf.WriteString(fmt.Sprint(arg)) + } + goldie.New(t).Assert(t, t.Name(), buf.Bytes()) + } + + t.Run("empty", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, nil) + jtest.RequireNil(t, err) + assert(t, q, args) + }) + + t.Run("one", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid", testEventType(1), nil}, + }) + jtest.RequireNil(t, err) + assert(t, q, args) + }) + + t.Run("two", func(t *testing.T) { + q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid1", testEventType(1), nil}, + {"fid2", testEventType(2), nil}, + }) + jtest.RequireNil(t, err) + assert(t, q, args) + }) + + t.Run("more", func(t *testing.T) { + var events []EventToInsert + for i := range 5 { + events = append(events, EventToInsert{ + ForeignID: fmt.Sprintf("fid%d", i+1), + Type: testEventType(i), + }) + } + q, args, err := makeInsertManyQuery(ctx, defaultSchema, events) + jtest.RequireNil(t, err) + assert(t, q, args) + }) + + t.Run("metadata_error", func(t *testing.T) { + _, _, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{ + {"fid", testEventType(1), []byte("metadata")}, + }) + require.ErrorContains(t, err, "metadata not enabled") + }) + + t.Run("with_metadata", func(t *testing.T) { + schemaWithMetadata := defaultSchema + schemaWithMetadata.metadataField = "metadata" + q, args, err := makeInsertManyQuery(ctx, schemaWithMetadata, []EventToInsert{ + {"fid", testEventType(1), []byte("metadata")}, + }) + jtest.RequireNil(t, err) + assert(t, q, args) + }) + + t.Run("with_trace", func(t *testing.T) { + schemaWithTrace := defaultSchema + schemaWithTrace.traceField = "trace" + traceID, err := trace.TraceIDFromHex("00000000000000000000000000000009") + jtest.RequireNil(t, err) + spanID, err := trace.SpanIDFromHex("0000000000000002") + jtest.RequireNil(t, err) + data, err := tracing.Marshal(trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + })) + jtest.RequireNil(t, err) + ctx := tracing.Inject(ctx, data) + q, args, err := makeInsertManyQuery(ctx, schemaWithTrace, []EventToInsert{ + {"fid", testEventType(1), nil}, + }) + jtest.RequireNil(t, err) + assert(t, q, args) + }) +} + +type testEventType int + +func (t testEventType) ReflexType() int { return int(t) } diff --git a/rsql/eventstable.go b/rsql/eventstable.go index 3cfeeed..8fa8b25 100644 --- a/rsql/eventstable.go +++ b/rsql/eventstable.go @@ -44,10 +44,20 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable { table.inserter = makeDefaultInserter(table.schema) } + if table.manyInserter == nil { + table.manyInserter = makeDefaultManyInserter(table.schema) + } + table.gapCh = make(chan Gap) table.gapListeners = make(chan GapListenFunc) table.gapListenDone = make(chan struct{}) - table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents) + table.currentLoader = buildLoader( + table.baseLoader, + table.gapCh, + table.disableCache, + table.schema, + table.includeNoopEvents, + ) return table } @@ -175,10 +185,28 @@ func WithEventsInserter(inserter inserter) EventsOption { } } +// WithEventsManyInserter provides an option to set the event inserter +// which inserts many events into a sql table. The default inserter is +// configured with the WithEventsXField options. +func WithEventsManyInserter(manyInserter manyInserter) EventsOption { + return func(table *EventsTable) { + table.manyInserter = manyInserter + } +} + // inserter abstracts the insertion of an event into a sql table. type inserter func(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, metadata []byte) error +type EventToInsert struct { + ForeignID string + Type reflex.EventType + Metadata []byte +} + +// manyInserter abstracts the insertion of many events into a sql table. +type manyInserter func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error + // EventsTable provides reflex event insertion and streaming // for a sql db table. type EventsTable struct { @@ -189,6 +217,7 @@ type EventsTable struct { includeNoopEvents bool baseLoader loader inserter inserter + manyInserter manyInserter // Stateful fields not cloned currentLoader filterLoader @@ -230,6 +259,25 @@ func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreig return t.notifier.Notify, nil } +// InsertMany inserts a many events into the EventsTable. +func (t *EventsTable) InsertMany( + ctx context.Context, + tx *sql.Tx, + events []EventToInsert, +) (NotifyFunc, error) { + for _, e := range events { + if isNoop(e.ForeignID, e.Type) { + return nil, errors.New("inserting invalid noop event") + } + } + err := t.manyInserter(ctx, tx, events...) + if err != nil { + return noopFunc, err + } + + return t.notifier.Notify, nil +} + // Clone returns a new events table generated from the config of t with the new options applied. // Note that non-config fields are not copied, so things like the cache and inmemnotifier // are not shared. @@ -312,7 +360,13 @@ func (t *EventsTable) getSchema() eTableSchema { } // buildLoader returns a new layered event loader. -func buildLoader(baseLoader loader, ch chan<- Gap, disableCache bool, schema eTableSchema, withNoopEvents bool) filterLoader { +func buildLoader( + baseLoader loader, + ch chan<- Gap, + disableCache bool, + schema eTableSchema, + withNoopEvents bool, +) filterLoader { if baseLoader == nil { baseLoader = makeBaseLoader(schema) } diff --git a/rsql/eventstable_test.go b/rsql/eventstable_test.go index 19071a6..7fa25ff 100644 --- a/rsql/eventstable_test.go +++ b/rsql/eventstable_test.go @@ -542,3 +542,25 @@ func TestCloneInserter(t *testing.T) { require.Equal(t, 2, i1) require.Equal(t, 1, i2) } + +func TestInsertMany(t *testing.T) { + ctx := context.Background() + dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable()) + + table1 := rsql.NewEventsTable(eventsTable) + + tx, _ := dbc.Begin() + _, err := table1.InsertMany(ctx, tx, []rsql.EventToInsert{ + {"fid1", testEventType(1), nil}, + {"fid2", testEventType(2), nil}, + {"fid3", testEventType(3), nil}, + }) + jtest.RequireNil(t, err) + err = tx.Commit() + jtest.RequireNil(t, err) + el, err := rsql.GetNextEventsForTesting(context.Background(), t, dbc, table1, 0, 0) + jtest.RequireNil(t, err) + assert.Equal(t, "fid1", el[0].ForeignID) + assert.Equal(t, "fid2", el[1].ForeignID) + assert.Equal(t, "fid3", el[2].ForeignID) +} diff --git a/rsql/testdata/Test_makeInsertManyQuery/empty.golden b/rsql/testdata/Test_makeInsertManyQuery/empty.golden new file mode 100644 index 0000000..fce1b60 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/empty.golden @@ -0,0 +1 @@ +insert into events (foreign_id, type, timestamp) values diff --git a/rsql/testdata/Test_makeInsertManyQuery/more.golden b/rsql/testdata/Test_makeInsertManyQuery/more.golden new file mode 100644 index 0000000..fef9a7d --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/more.golden @@ -0,0 +1,12 @@ +insert into events (foreign_id, type, timestamp) values (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)), (?, ?, now(6)) + +fid1 +0 +fid2 +1 +fid3 +2 +fid4 +3 +fid5 +4 \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/one.golden b/rsql/testdata/Test_makeInsertManyQuery/one.golden new file mode 100644 index 0000000..73f087c --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/one.golden @@ -0,0 +1,4 @@ +insert into events (foreign_id, type, timestamp) values (?, ?, now(6)) + +fid +1 \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/two.golden b/rsql/testdata/Test_makeInsertManyQuery/two.golden new file mode 100644 index 0000000..0b25670 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/two.golden @@ -0,0 +1,6 @@ +insert into events (foreign_id, type, timestamp) values (?, ?, now(6)), (?, ?, now(6)) + +fid1 +1 +fid2 +2 \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden b/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden new file mode 100644 index 0000000..3efbdf1 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/with_metadata.golden @@ -0,0 +1,5 @@ +insert into events (foreign_id, type, timestamp, metadata) values (?, ?, now(6), ?) + +fid +1 +[109 101 116 97 100 97 116 97] \ No newline at end of file diff --git a/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden b/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden new file mode 100644 index 0000000..54ac453 --- /dev/null +++ b/rsql/testdata/Test_makeInsertManyQuery/with_trace.golden @@ -0,0 +1,5 @@ +insert into events (foreign_id, type, timestamp, trace) values (?, ?, now(6), ?) + +fid +1 +[10 32 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 57 18 16 48 48 48 48 48 48 48 48 48 48 48 48 48 48 48 50] \ No newline at end of file