Skip to content

Commit

Permalink
Emit when the CDC event was executed (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 26, 2024
1 parent 72a4b3b commit 7589af6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
13 changes: 7 additions & 6 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type SharedDestinationSettings struct {
TruncateExceededValues bool `yaml:"truncateExceededValues"`
}

type Reporting struct {
Sentry *Sentry `yaml:"sentry"`
EmitExecutionTime bool `yaml:"emitExecutionTime"`
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Expand All @@ -63,12 +68,8 @@ type Config struct {
S3 *S3Settings `yaml:"s3,omitempty"`

SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}

Telemetry struct {
Reporting Reporting `yaml:"reporting"`
Telemetry struct {
Metrics struct {
Provider constants.ExporterKind `yaml:"provider"`
Settings map[string]any `yaml:"settings,omitempty"`
Expand Down
27 changes: 21 additions & 6 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/telemetry/metrics/base"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/models"
Expand All @@ -28,10 +29,11 @@ type Event struct {

OptionalSchema map[string]typing.KindDetails
Columns *columns.Columns
ExecutionTime time.Time // When the SQL command was executed
Deleted bool

mode config.Mode
// When the database event was executed
executionTime time.Time
mode config.Mode
}

func hashData(data map[string]any, tc kafkalib.TopicConfig) map[string]any {
Expand Down Expand Up @@ -86,16 +88,29 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
return Event{}, err
}

return Event{
_event := Event{
executionTime: event.GetExecutionTime(),
mode: cfgMode,
Table: tblName,
PrimaryKeyMap: pkMap,
ExecutionTime: event.GetExecutionTime(),
OptionalSchema: optionalSchema,
Columns: cols,
Data: hashData(evtData, tc),
Deleted: event.DeletePayload(),
}, nil
}

return _event, nil
}

// EmitExecutionTimeLag - This will check against the current time and the event execution time and emit the lag.
func (e *Event) EmitExecutionTimeLag(metricsClient base.Client) {
metricsClient.GaugeWithSample(
"row.execution_time_lag",
float64(time.Since(e.executionTime).Milliseconds()),
map[string]string{
"mode": e.mode.String(),
"table": e.Table,
}, 0.5)
}

func (e *Event) Validate() error {
Expand Down Expand Up @@ -247,7 +262,7 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message)
}

td.LatestCDCTs = e.ExecutionTime
td.LatestCDCTs = e.executionTime
flush, flushReason := td.ShouldFlush(cfg)
return flush, flushReason, nil
}
5 changes: 5 additions & 0 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
tags["what"] = "to_mem_event_err"
return "", fmt.Errorf("cannot convert to memory event: %w", err)
}

// Table name is only available after event has been cast
tags["table"] = evt.Table
if topicConfig.tc.ShouldSkip(_event.Operation()) {
Expand All @@ -72,6 +73,10 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
return evt.Table, nil
}

if cfg.Reporting.EmitExecutionTime {
evt.EmitExecutionTimeLag(metricsClient)
}

shouldFlush, flushReason, err := evt.Save(cfg, inMemDB, topicConfig.tc, p.Msg)
if err != nil {
tags["what"] = "save_fail"
Expand Down

0 comments on commit 7589af6

Please sign in to comment.