diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index cba027b66..0abfa1340 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -73,6 +73,8 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B return } + _tableData.Lock() + defer _tableData.Unlock() err = retry.WithRetries(retryCfg, func(_ int, _ error) error { return flush(ctx, dest, metricsClient, args.Reason, _tableName, _tableData) }) @@ -91,9 +93,6 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B } func flush(ctx context.Context, dest destination.Baseline, metricsClient base.Client, reason string, _tableName string, _tableData *models.TableData) error { - // Lock the tables when executing merge / append. - _tableData.Lock() - defer _tableData.Unlock() if _tableData.Empty() { return nil }