diff --git a/models/event/event.go b/models/event/event.go index 72e042ba1..461bb106c 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/artie-labs/transfer/lib/telemetry/metrics/base" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/config" @@ -87,7 +89,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi return Event{}, err } - return Event{ + _event := Event{ executionTime: event.GetExecutionTime(), mode: cfgMode, Table: tblName, @@ -96,11 +98,21 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi Columns: cols, Data: hashData(evtData, tc), Deleted: event.DeletePayload(), - }, nil + } + + return _event, nil } -func (e *Event) GetExecutionTime() time.Time { - return e.executionTime +// EmitExecutionTimeLag - This will check against the current time and the event execution time and emit the lag. +func (e *Event) EmitExecutionTimeLag(metricsClient base.Client, mode config.Mode) { + metricsClient.GaugeWithSample( + "row.execution_time_lag", + float64(time.Since(e.executionTime).Milliseconds()), + map[string]string{ + "mode": mode.String(), + "table": e.Table, + }, + 0.5) } func (e *Event) Validate() error { diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2df7cabbf..60636e930 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -63,6 +63,10 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo tags["what"] = "to_mem_event_err" return "", fmt.Errorf("cannot convert to memory event: %w", err) } + + // Emit event execution time. + evt.EmitExecutionTimeLag(metricsClient, cfg.Mode) + // Table name is only available after event has been cast tags["table"] = evt.Table if topicConfig.tc.ShouldSkip(_event.Operation()) {