Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsql: Add manyInserter and InsertMany API #25

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module github.com/luno/reflex

go 1.21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support for i := range N.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we can't go for 1.23? If people are still using 1.22, they'd just have to use the version before this update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, just that I didn't need it.

go 1.22

require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
github.com/go-sql-driver/mysql v1.7.1
github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9
github.com/prometheus/client_golang v1.15.0
github.com/prometheus/client_model v0.3.0
github.com/sebdah/goldie/v2 v2.5.3
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
Expand Down Expand Up @@ -52,6 +53,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.11.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,7 @@ github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624
github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
Expand Down
83 changes: 81 additions & 2 deletions rsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -75,6 +76,68 @@ func makeDefaultInserter(schema eTableSchema) inserter {
}
}

// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options.
func makeDefaultManyInserter(schema eTableSchema) manyInserter {
return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error {
if len(events) == 0 {
return nil
}
q, args, err := makeInsertManyQuery(ctx, schema, events)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, q, args...)
return errors.Wrap(err, "insert error")
}
}

func makeInsertManyQuery(
ctx context.Context,
schema eTableSchema,
events []EventToInsert,
) (query string, args []any, err error) {
spanCtx, hasTrace := tracing.Extract(ctx)
var traceData []byte
if schema.traceField != "" && hasTrace {
d, err := tracing.Marshal(spanCtx)
if err != nil {
return "", nil, err
}
traceData = d
}

cols := []string{schema.foreignIDField, schema.typeField, schema.timeField}
if schema.metadataField != "" {
cols = append(cols, schema.metadataField)
}
if traceData != nil {
cols = append(cols, schema.traceField)
}

q := "insert into " + schema.name + " (" + strings.Join(cols, ", ") + ") values"

for i, e := range events {
vals := []string{"?", "?", "now(6)"}
args = append(args, e.ForeignID, e.Type.ReflexType())
if schema.metadataField != "" {
vals = append(vals, "?")
args = append(args, e.Metadata)
} else if e.Metadata != nil {
return "", nil, errors.New("metadata not enabled")
}
if traceData != nil {
vals = append(vals, "?")
args = append(args, traceData)
}
if i > 0 {
q += ","
}
q += " (" + strings.Join(vals, ", ") + ")"
}

return q, args, nil
}

type row interface {
Scan(dest ...interface{}) error
}
Expand Down Expand Up @@ -158,7 +221,14 @@ func getNextEvents(ctx context.Context, dbc *sql.DB, schema eTableSchema,
}

// GetNextEventsForTesting fetches a bunch of events from the event table
func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error) {
func GetNextEventsForTesting(
ctx context.Context,
_ *testing.T,
dbc *sql.DB,
table *EventsTable,
after int64,
lag time.Duration,
) ([]*reflex.Event, error) {
return getNextEvents(ctx, dbc, table.schema, after, lag)
}

Expand Down Expand Up @@ -287,7 +357,16 @@ func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter {
// NB: See the documentation is the following link on the behaviour of "on last_insert_id(<expr>)" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
q := fmt.Sprintf(
"insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)",
schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorCreatedAtField, schema.errorUpdatedAtField, schema.errorStatusField, schema.idField, schema.idField)
schema.name,
schema.eventConsumerField,
schema.eventIDField,
schema.errorMsgField,
schema.errorCreatedAtField,
schema.errorUpdatedAtField,
schema.errorStatusField,
schema.idField,
schema.idField,
)
return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) {
r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus)
// If the error has already been written then we can ignore the error
Expand Down
106 changes: 106 additions & 0 deletions rsql/db_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package rsql

import (
"context"
"testing"

"github.com/luno/jettison/jtest"
"github.com/sebdah/goldie/v2"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/luno/reflex/internal/tracing"
)

//go:generate go test . -run Test_makeInsertManyQuery -update -clean

func Test_makeInsertManyQuery(t *testing.T) {
ctx := context.Background()

type res struct {
Q string
Args []any
}

defaultSchema := eTableSchema{
name: "events",
idField: "id",
timeField: "timestamp",
typeField: "type",
foreignIDField: "foreign_id",
}

t.Run("empty", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, nil)
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("one", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("two", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), nil},
{"fid2", testEventType(2), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("more", func(t *testing.T) {
var events []EventToInsert
for i := range 100 {
events = append(events, EventToInsert{"fid", testEventType(i), nil})
}
q, args, err := makeInsertManyQuery(ctx, defaultSchema, events)
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("metadata_error", func(t *testing.T) {
_, _, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), []byte("metadata")},
})
require.ErrorContains(t, err, "metadata not enable")
})

t.Run("with_metadata", func(t *testing.T) {
schemaWithMetadata := defaultSchema
schemaWithMetadata.metadataField = "metadata"
q, args, err := makeInsertManyQuery(ctx, schemaWithMetadata, []EventToInsert{
{"fid1", testEventType(1), []byte("metadata")},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("with_trace", func(t *testing.T) {
schemaWithTrace := defaultSchema
schemaWithTrace.traceField = "trace"
traceID, err := trace.TraceIDFromHex("00000000000000000000000000000009")
jtest.RequireNil(t, err)
spanID, err := trace.SpanIDFromHex("0000000000000002")
jtest.RequireNil(t, err)
data, err := tracing.Marshal(trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
}))
jtest.RequireNil(t, err)
ctx := tracing.Inject(ctx, data)
q, args, err := makeInsertManyQuery(ctx, schemaWithTrace, []EventToInsert{
{"fid1", testEventType(1), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})
}

type testEventType int

func (t testEventType) ReflexType() int { return int(t) }
58 changes: 56 additions & 2 deletions rsql/eventstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable {
table.inserter = makeDefaultInserter(table.schema)
}

if table.manyInserter == nil {
table.manyInserter = makeDefaultManyInserter(table.schema)
}

table.gapCh = make(chan Gap)
table.gapListeners = make(chan GapListenFunc)
table.gapListenDone = make(chan struct{})
table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents)
table.currentLoader = buildLoader(
table.baseLoader,
table.gapCh,
table.disableCache,
table.schema,
table.includeNoopEvents,
)

return table
}
Expand Down Expand Up @@ -175,10 +185,28 @@ func WithEventsInserter(inserter inserter) EventsOption {
}
}

// WithEventsManyInserter provides an option to set the event inserter
// which inserts many events into a sql table. The default inserter is
// configured with the WithEventsXField options.
func WithEventsManyInserter(manyInserter manyInserter) EventsOption {
return func(table *EventsTable) {
table.manyInserter = manyInserter
}
}

// inserter abstracts the insertion of an event into a sql table.
type inserter func(ctx context.Context, tx *sql.Tx,
foreignID string, typ reflex.EventType, metadata []byte) error

type EventToInsert struct {
ForeignID string
Type reflex.EventType
Metadata []byte
}

// manyInserter abstracts the insertion of many events into a sql table.
type manyInserter func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error

// EventsTable provides reflex event insertion and streaming
// for a sql db table.
type EventsTable struct {
Expand All @@ -189,6 +217,7 @@ type EventsTable struct {
includeNoopEvents bool
baseLoader loader
inserter inserter
manyInserter manyInserter

// Stateful fields not cloned
currentLoader filterLoader
Expand Down Expand Up @@ -230,6 +259,25 @@ func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreig
return t.notifier.Notify, nil
}

// InsertMany inserts a many events into the EventsTable.
func (t *EventsTable) InsertMany(
ctx context.Context,
tx *sql.Tx,
events []EventToInsert,
) (NotifyFunc, error) {
for _, e := range events {
if isNoop(e.ForeignID, e.Type) {
return nil, errors.New("inserting invalid noop event")
}
}
err := t.manyInserter(ctx, tx, events...)
if err != nil {
return noopFunc, err
}

return t.notifier.Notify, nil
}

// Clone returns a new events table generated from the config of t with the new options applied.
// Note that non-config fields are not copied, so things like the cache and inmemnotifier
// are not shared.
Expand Down Expand Up @@ -312,7 +360,13 @@ func (t *EventsTable) getSchema() eTableSchema {
}

// buildLoader returns a new layered event loader.
func buildLoader(baseLoader loader, ch chan<- Gap, disableCache bool, schema eTableSchema, withNoopEvents bool) filterLoader {
func buildLoader(
baseLoader loader,
ch chan<- Gap,
disableCache bool,
schema eTableSchema,
withNoopEvents bool,
) filterLoader {
if baseLoader == nil {
baseLoader = makeBaseLoader(schema)
}
Expand Down
22 changes: 22 additions & 0 deletions rsql/eventstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,25 @@ func TestCloneInserter(t *testing.T) {
require.Equal(t, 2, i1)
require.Equal(t, 1, i2)
}

func TestInsertMany(t *testing.T) {
ctx := context.Background()
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

table1 := rsql.NewEventsTable(eventsTable)

tx, _ := dbc.Begin()
_, err := table1.InsertMany(ctx, tx, []rsql.EventToInsert{
{"fid1", testEventType(1), nil},
{"fid2", testEventType(2), nil},
{"fid3", testEventType(3), nil},
})
jtest.RequireNil(t, err)
err = tx.Commit()
jtest.RequireNil(t, err)
el, err := rsql.GetNextEventsForTesting(context.Background(), t, dbc, table1, 0, 0)
jtest.RequireNil(t, err)
assert.Equal(t, "fid1", el[0].ForeignID)
assert.Equal(t, "fid2", el[1].ForeignID)
assert.Equal(t, "fid3", el[2].ForeignID)
}
4 changes: 4 additions & 0 deletions rsql/testdata/Test_makeInsertManyQuery/empty.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"Q": "insert into events (foreign_id, type, timestamp) values",
"Args": null
}
Loading