Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsql: Add manyInserter and InsertMany API #25

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

NeilLuno
Copy link
Contributor

This is an optimisation to support inserting multiple events in the same transaction.

Note that because each tuple of values has its own now(6), the timestamp of the events may be different.

Fixes #24.

@@ -1,13 +1,14 @@
module github.com/luno/reflex

go 1.21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support for i := range N.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we can't go for 1.23? If people are still using 1.22, they'd just have to use the version before this update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, just that I didn't need it.

Copy link
Contributor

@jrkilloran jrkilloran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making the builder logic explicit upfront:

// makeDefaultInserter returns the default sql inserter configured via WithEventsXField options.
func makeDefaultInserter(schema eTableSchema) inserter {
	dmi := makeDefaultManyInserter(schema)
	return func(ctx context.Context, tx *sql.Tx,
		foreignID string, typ reflex.EventType, metadata []byte,
	) error {
		return dmi(ctx, tx, EventToInsert{ForeignID: foreignID, Type: typ, Metadata:  metadata})
	}
}

// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options.
func makeDefaultManyInserter(schema eTableSchema) manyInserter {
	return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error {
		if len(events) == 0 {
			return nil
		}
		q, args, err := makeInsertManyQuery(ctx, schema, events)
		if err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, q, args...)
		return errors.Wrap(err, "insert error")
	}
}

func makeInsertManyQuery(
	ctx context.Context,
	schema eTableSchema,
	events []EventToInsert,
) (query string, args []any, err error) {
	spanCtx, hasTrace := tracing.Extract(ctx)
	var traceData []byte
	if schema.traceField != "" && hasTrace {
		d, err := tracing.Marshal(spanCtx)
		if err != nil {
			return "", nil, err
		}
		traceData = d
	}

	var cols []string
	var values string
	if schema.metadataField != "" {
		if traceData != nil {
			cols, values, args = makeInsertManyFull(schema, events, traceData)
		} else {
			cols, values, args = makeInsertManyBase(schema, events)
		}
	} else if traceData != nil {
		cols, values, args = makeInsertManyTrace(schema, events, traceData)
	} else {
		cols, values, args = makeInsertManyMetadata(schema, events)
	}

	q := "insert into " + schema.name + " (" + strings.Join(cols, ", ") + ") values" + values
	return q, args, nil
}

func baseSchemaCols(schema eTableSchema) []string {
	return []string{schema.foreignIDField, schema.typeField, schema.timeField}
}

func metadataSchemaCols(schema eTableSchema) []string {
	return append(baseSchemaCols(schema), schema.metadataField)
}

func traceSchemaCols(schema eTableSchema) []string {
	return append(baseSchemaCols(schema), schema.traceField)
}

func fullSchemaCols(schema eTableSchema) []string {
	return append(metadataSchemaCols(schema), schema.traceField)
}

func makeInsertManyBase(
	schema eTableSchema,
	events []EventToInsert,
) (cols []string, values string, args []any) {
	cols = baseSchemaCols(schema)
	values = " (?, ?, now(6)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType())
	}
	return cols, values, args
}

func makeInsertManyMetadata(
	schema eTableSchema,
	events []EventToInsert,
) (cols []string, values string, args []any) {
	cols = metadataSchemaCols(schema)
	values = " (?, ?, now(6), ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), e.Metadata)
	}
	return cols, values, args
}

func makeInsertManyTrace(
	schema eTableSchema,
	events []EventToInsert,
	tracedata []byte,
) (cols []string, values string, args []any) {
	cols = traceSchemaCols(schema)
	values = " (?, ?, now(6), ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), tracedata)
	}
	return cols, values, args
}

func makeInsertManyFull(
	schema eTableSchema,
	events []EventToInsert,
	tracedata []byte,
) (cols []string, values string, args []any) {
	cols = fullSchemaCols(schema)
	values = " (?, ?, now(6), ?, ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), e.Metadata, tracedata)
	}
	return cols, values, args
}

@NeilLuno
Copy link
Contributor Author

NeilLuno commented Nov 13, 2024

What about making the builder logic explicit upfront:

// makeDefaultInserter returns the default sql inserter configured via WithEventsXField options.
func makeDefaultInserter(schema eTableSchema) inserter {
	dmi := makeDefaultManyInserter(schema)
	return func(ctx context.Context, tx *sql.Tx,
		foreignID string, typ reflex.EventType, metadata []byte,
	) error {
		return dmi(ctx, tx, EventToInsert{ForeignID: foreignID, Type: typ, Metadata:  metadata})
	}
}

// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options.
func makeDefaultManyInserter(schema eTableSchema) manyInserter {
	return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error {
		if len(events) == 0 {
			return nil
		}
		q, args, err := makeInsertManyQuery(ctx, schema, events)
		if err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, q, args...)
		return errors.Wrap(err, "insert error")
	}
}

func makeInsertManyQuery(
	ctx context.Context,
	schema eTableSchema,
	events []EventToInsert,
) (query string, args []any, err error) {
	spanCtx, hasTrace := tracing.Extract(ctx)
	var traceData []byte
	if schema.traceField != "" && hasTrace {
		d, err := tracing.Marshal(spanCtx)
		if err != nil {
			return "", nil, err
		}
		traceData = d
	}

	var cols []string
	var values string
	if schema.metadataField != "" {
		if traceData != nil {
			cols, values, args = makeInsertManyFull(schema, events, traceData)
		} else {
			cols, values, args = makeInsertManyBase(schema, events)
		}
	} else if traceData != nil {
		cols, values, args = makeInsertManyTrace(schema, events, traceData)
	} else {
		cols, values, args = makeInsertManyMetadata(schema, events)
	}

	q := "insert into " + schema.name + " (" + strings.Join(cols, ", ") + ") values" + values
	return q, args, nil
}

func baseSchemaCols(schema eTableSchema) []string {
	return []string{schema.foreignIDField, schema.typeField, schema.timeField}
}

func metadataSchemaCols(schema eTableSchema) []string {
	return append(baseSchemaCols(schema), schema.metadataField)
}

func traceSchemaCols(schema eTableSchema) []string {
	return append(baseSchemaCols(schema), schema.traceField)
}

func fullSchemaCols(schema eTableSchema) []string {
	return append(metadataSchemaCols(schema), schema.traceField)
}

func makeInsertManyBase(
	schema eTableSchema,
	events []EventToInsert,
) (cols []string, values string, args []any) {
	cols = baseSchemaCols(schema)
	values = " (?, ?, now(6)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType())
	}
	return cols, values, args
}

func makeInsertManyMetadata(
	schema eTableSchema,
	events []EventToInsert,
) (cols []string, values string, args []any) {
	cols = metadataSchemaCols(schema)
	values = " (?, ?, now(6), ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), e.Metadata)
	}
	return cols, values, args
}

func makeInsertManyTrace(
	schema eTableSchema,
	events []EventToInsert,
	tracedata []byte,
) (cols []string, values string, args []any) {
	cols = traceSchemaCols(schema)
	values = " (?, ?, now(6), ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), tracedata)
	}
	return cols, values, args
}

func makeInsertManyFull(
	schema eTableSchema,
	events []EventToInsert,
	tracedata []byte,
) (cols []string, values string, args []any) {
	cols = fullSchemaCols(schema)
	values = " (?, ?, now(6), ?, ?)"
	for _, e := range events {
		args = append(args, e.ForeignID, e.Type.ReflexType(), e.Metadata, tracedata)
	}
	return cols, values, args
}

Thanks for the code suggestion. I could go either way:

Pros:

  • The branches are in one place outside the loop.
  • Less string wrangling.

Cons:

  • More code.
  • What if we add a third optional column?

(BTW values would also have to be repeated)

[edit: Looking at the code again, I wonder if we should resolve the schema even earlier - like in makeDefaultManyInserter Scratch that - the schema depends on the context]

q := "insert into " + schema.name +
" set " + schema.foreignIDField + "=?, " + schema.timeField + "=now(6), " + schema.typeField + "=?"
args := []interface{}{foreignID, typ.ReflexType()}
return ins(ctx, tx, EventToInsert{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice I like that the singular is using the multiple inserter with just 1 event.

@andrewwormald
Copy link
Contributor

Nice, I really like your solution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add batch insert API + types
4 participants