Skip to content

Commit

Permalink
Prune workfllw DB entries (#15226)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Nov 14, 2024
1 parent 0240fb8 commit 1a80fec
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)
srvcs = append(srvcs, workflowORM)

promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
if err != nil {
return nil, err
}
d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name)
return []job.ServiceCtx{engine}, nil
}

Expand Down
72 changes: 68 additions & 4 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -15,17 +16,31 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"

commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
)

const (
defaultPruneFrequencySec = 20
defaultPruneTimeoutSec = 60
defaultPruneRecordAgeHours = 3
defaultPruneBatchSize = 3000
)

// `DBStore` is a postgres-backed
// data store that persists workflow progress.
type DBStore struct {
lggr logger.Logger
db sqlutil.DataSource
clock clockwork.Clock
commonservices.StateMachine
lggr logger.Logger
db sqlutil.DataSource
shutdownWaitGroup sync.WaitGroup
chStop commonservices.StopChan
clock clockwork.Clock
}

var _ services.ServiceCtx = (*DBStore)(nil)

// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
Expand Down Expand Up @@ -70,6 +85,47 @@ type workflowExecutionWithStep struct {
WEFinishedAt *time.Time `db:"we_finished_at"`
}

func (d *DBStore) Start(context.Context) error {
return d.StartOnce("DBStore", func() error {
d.shutdownWaitGroup.Add(1)
go d.pruneDBEntries()
return nil
})
}

func (d *DBStore) Close() error {
return d.StopOnce("DBStore", func() error {
close(d.chStop)
d.shutdownWaitGroup.Wait()
return nil
})
}

func (d *DBStore) pruneDBEntries() {
defer d.shutdownWaitGroup.Done()
ticker := time.NewTicker(defaultPruneFrequencySec * time.Second)
defer ticker.Stop()
for {
select {
case <-d.chStop:
return
case <-ticker.C:
ctx, cancel := d.chStop.CtxWithTimeout(defaultPruneTimeoutSec * time.Second)
err := sqlutil.TransactDataSource(ctx, d.db, nil, func(tx sqlutil.DataSource) error {
stmt := fmt.Sprintf("DELETE FROM workflow_executions WHERE (id) IN (SELECT id FROM workflow_executions WHERE (created_at < now() - interval '%d hours') LIMIT %d);", defaultPruneRecordAgeHours, defaultPruneBatchSize)
_, err := tx.ExecContext(ctx, stmt)
return err
})
if err != nil {
d.lggr.Errorw("Failed to prune workflow_executions", "err", err)
} else {
d.lggr.Infow("Pruned oldest workflow_executions", "batchSize", defaultPruneBatchSize, "ageLimitHours", defaultPruneRecordAgeHours)
}
cancel()
}
}
}

// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
Expand Down Expand Up @@ -407,5 +463,13 @@ func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset,
}

func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore {
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock}
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock, chStop: make(chan struct{})}
}

func (d *DBStore) HealthReport() map[string]error {
return map[string]error{d.Name(): d.Healthy()}
}

func (d *DBStore) Name() string {
return d.lggr.Name()
}
4 changes: 4 additions & 0 deletions core/web/testdata/body/health.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@
<details open>
<summary title="TelemetryManager" class="noexpand"><span class="passing">TelemetryManager</span></summary>
</details>
<details open>
<summary title="WorkflowDBStore" class="noexpand"><span class="passing">WorkflowDBStore</span></summary>
</details>

9 changes: 9 additions & 0 deletions core/web/testdata/body/health.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
1 change: 1 addition & 0 deletions core/web/testdata/body/health.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ ok StarkNet.Baz.Chain
ok StarkNet.Baz.Relayer
ok StarkNet.Baz.Txm
ok TelemetryManager
ok WorkflowDBStore
10 changes: 10 additions & 0 deletions testdata/scripts/health/default.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ok PipelineRunner
ok PipelineRunner.BridgeCache
ok RetirementReportCache
ok TelemetryManager
ok WorkflowDBStore

-- out.json --
{
Expand Down Expand Up @@ -134,6 +135,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
10 changes: 10 additions & 0 deletions testdata/scripts/health/multi-chain.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ ok StarkNet.Baz.Chain
ok StarkNet.Baz.Relayer
ok StarkNet.Baz.Txm
ok TelemetryManager
ok WorkflowDBStore

-- out-unhealthy.txt --
! EVM.1.HeadTracker.HeadListener
Expand Down Expand Up @@ -396,6 +397,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
Expand Down

0 comments on commit 1a80fec

Please sign in to comment.