Skip to content

Commit

Permalink
[CAPPL-257] GetUnfinished should filter by workflowID (#15152)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Nov 8, 2024
1 parent 554a346 commit 442840f
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ var (
)

func (e *Engine) resumeInProgressExecutions(ctx context.Context) error {
wipExecutions, err := e.executionStates.GetUnfinished(ctx, defaultOffset, defaultLimit)
wipExecutions, err := e.executionStates.GetUnfinished(ctx, e.workflow.id, defaultOffset, defaultLimit)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
)

const testWorkflowId = "<workflow-id>"
Expand Down Expand Up @@ -149,6 +148,12 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,
return newTestEngine(t, reg, sdkSpec, opts...)
}

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

// newTestEngine creates a new engine with some test defaults.
func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec, opts ...func(c *Config)) (*Engine, *testHooks) {
initFailed := make(chan struct{})
Expand All @@ -174,7 +179,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec
onExecutionFinished: func(weid string) {
executionFinished <- weid
},
SecretsFetcher: syncer.NewWorkflowRegistry(),
SecretsFetcher: mockSecretsFetcher{},
clock: clock,
}
for _, o := range opts {
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Store interface {
UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error)
UpdateStatus(ctx context.Context, executionID string, status string) error
Get(ctx context.Context, executionID string) (WorkflowExecution, error)
GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error)
GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error)
}

var _ Store = (*DBStore)(nil)
21 changes: 11 additions & 10 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ type DBStore struct {
// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
ID string
WorkflowID *string
Status string
CreatedAt *time.Time
UpdatedAt *time.Time
FinishedAt *time.Time
ID string `db:"id"`
WorkflowID *string `db:"workflow_id"`
Status string `db:"status"`
CreatedAt *time.Time `db:"created_at"`
UpdatedAt *time.Time `db:"updated_at"`
FinishedAt *time.Time `db:"finished_at"`
}

// `workflowStepRow` describes a row
Expand Down Expand Up @@ -362,7 +362,7 @@ func (d *DBStore) transact(ctx context.Context, fn func(*DBStore) error) error {
)
}

func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error) {
func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error) {
sql := `
SELECT
workflow_steps.workflow_execution_id AS ws_workflow_execution_id,
Expand All @@ -382,12 +382,13 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf
JOIN workflow_steps
ON workflow_steps.workflow_execution_id = workflow_executions.id
WHERE workflow_executions.status = $1
AND workflow_executions.workflow_id = $2
ORDER BY workflow_executions.created_at DESC
LIMIT $2
OFFSET $3
LIMIT $3
OFFSET $4
`
var joinRecords []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset)
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, workflowID, limit, offset)
if err != nil {
return []WorkflowExecution{}, err
}
Expand Down
40 changes: 38 additions & 2 deletions core/services/workflows/store/store_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/services/job"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func randomID() string {
b := make([]byte, 32)
return random(32)
}

func random(length int) string {
b := make([]byte, length)
_, err := rand.Read(b)
if err != nil {
panic(err)
Expand Down Expand Up @@ -234,10 +239,24 @@ func Test_StoreDB_WorkflowStepStatus(t *testing.T) {
}
}

func createWorkflow(t *testing.T, store *DBStore, id string) {
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW())`
var wfSpec job.WorkflowSpec
wfSpec.Workflow = ""
wfSpec.WorkflowID = id
wfSpec.WorkflowOwner = random(20)
wfSpec.WorkflowName = random(20)
_, err := store.db.NamedExecContext(tests.Context(t), sql, wfSpec)
require.NoError(t, err)
}

func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
store := newTestDBStore(t)

id := randomID()
wid := randomID()
createWorkflow(t, store, wid)
stepOne := &WorkflowExecutionStep{
ExecutionID: id,
Ref: "step1",
Expand All @@ -254,12 +273,29 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
"step2": stepTwo,
},
ExecutionID: id,
WorkflowID: wid,
Status: StatusStarted,
}

_, err := store.Add(tests.Context(t), &es)
require.NoError(t, err)

id2 := randomID()
wid2 := randomID()
createWorkflow(t, store, wid2)
es2 := WorkflowExecution{
Steps: map[string]*WorkflowExecutionStep{
"step1": stepOne,
"step2": stepTwo,
},
ExecutionID: id2,
WorkflowID: wid2,
Status: StatusStarted,
}

_, err = store.Add(tests.Context(t), &es2)
require.NoError(t, err)

id = randomID()
esTwo := WorkflowExecution{
ExecutionID: id,
Expand All @@ -269,7 +305,7 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
_, err = store.Add(tests.Context(t), &esTwo)
require.NoError(t, err)

states, err := store.GetUnfinished(tests.Context(t), 0, 100)
states, err := store.GetUnfinished(tests.Context(t), wid, 0, 100)
require.NoError(t, err)

assert.Len(t, states, 1)
Expand Down

0 comments on commit 442840f

Please sign in to comment.