Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent 259f7cb commit d5c2c88
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
20 changes: 16 additions & 4 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/telemetry/metrics/base"

"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config"
Expand Down Expand Up @@ -87,7 +89,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
return Event{}, err
}

return Event{
_event := Event{
executionTime: event.GetExecutionTime(),
mode: cfgMode,
Table: tblName,
Expand All @@ -96,11 +98,21 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
Columns: cols,
Data: hashData(evtData, tc),
Deleted: event.DeletePayload(),
}, nil
}

return _event, nil
}

func (e *Event) GetExecutionTime() time.Time {
return e.executionTime
// EmitExecutionTimeLag - This will check against the current time and the event execution time and emit the lag.
func (e *Event) EmitExecutionTimeLag(metricsClient base.Client, mode config.Mode) {
metricsClient.GaugeWithSample(
"row.execution_time_lag",
float64(time.Since(e.executionTime).Milliseconds()),
map[string]string{
"mode": mode.String(),
"table": e.Table,
},
0.5)
}

func (e *Event) Validate() error {
Expand Down
4 changes: 4 additions & 0 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
tags["what"] = "to_mem_event_err"
return "", fmt.Errorf("cannot convert to memory event: %w", err)
}

// Emit event execution time.
evt.EmitExecutionTimeLag(metricsClient, cfg.Mode)

// Table name is only available after event has been cast
tags["table"] = evt.Table
if topicConfig.tc.ShouldSkip(_event.Operation()) {
Expand Down

0 comments on commit d5c2c88

Please sign in to comment.