Skip to content

Commit

Permalink
Self review.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 20, 2024
1 parent 8f141f7 commit a1f1146
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions processes/consumer/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,45 +75,45 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B

_tableData.Lock()
defer _tableData.Unlock()

if _tableData.Empty() {
return
}

start := time.Now()
err = retry.WithRetries(retryCfg, func(_ int, _ error) error {
return flush(ctx, dest, metricsClient, args.Reason, _tableName, _tableData)
return flush(ctx, dest, _tableName, _tableData)
})

tags := map[string]string{
"what": "success",
"mode": _tableData.Mode().String(),
"table": _tableName,
"database": _tableData.TopicConfig().Database,
"schema": _tableData.TopicConfig().Schema,
"reason": args.Reason,
}

if err != nil {
tags["what"] = "merge_fail"
slog.Error(fmt.Sprintf("Failed to %s", action), slog.Any("err", err), slog.String("tableName", _tableName))
} else {
inMemDB.ClearTableConfig(_tableName)
slog.Info(fmt.Sprintf("%s success, clearing memory...", stringutil.CapitalizeFirstLetter(action)), slog.String("tableName", _tableName))
}

metricsClient.Timing("flush", time.Since(start), tags)
}(tableName, tableData)
}
wg.Wait()

return nil
}

func flush(ctx context.Context, dest destination.Baseline, metricsClient base.Client, reason string, _tableName string, _tableData *models.TableData) error {
if _tableData.Empty() {
return nil
}

func flush(ctx context.Context, dest destination.Baseline, _tableName string, _tableData *models.TableData) error {
// This is added so that we have a new temporary table suffix for each merge / append.
_tableData.ResetTempTableSuffix()

start := time.Now()
tags := map[string]string{
"what": "success",
"mode": _tableData.Mode().String(),
"table": _tableName,
"database": _tableData.TopicConfig().Database,
"schema": _tableData.TopicConfig().Schema,
"reason": reason,
}

defer func() {
metricsClient.Timing("flush", time.Since(start), tags)
}()

// Merge or Append depending on the mode.
var err error
if _tableData.Mode() == config.History {
Expand All @@ -123,7 +123,6 @@ func flush(ctx context.Context, dest destination.Baseline, metricsClient base.Cl
}

if err != nil {
tags["what"] = "merge_fail"
return fmt.Errorf("failed to flush: %w", err)
}

Expand Down

0 comments on commit a1f1146

Please sign in to comment.