Skip to content

Commit

Permalink
feat(workflows): adds orm methods for managing specs (#15356)
Browse files Browse the repository at this point in the history
* feat(workflows): adds orm methods for managing specs

* refactor(migrations+job): use text instead of int for status in db

* chore(syncer/orm): fail on get with no rows found
  • Loading branch information
MStreet3 authored Nov 25, 2024
1 parent a267825 commit 1168674
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 43 deletions.
27 changes: 18 additions & 9 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,21 +869,30 @@ const (
DefaultSpecType = ""
)

type WorkflowSpecStatus string

const (
WorkflowSpecStatusActive WorkflowSpecStatus = "active"
WorkflowSpecStatusPaused WorkflowSpecStatus = "paused"
WorkflowSpecStatusDefault WorkflowSpecStatus = ""
)

type WorkflowSpec struct {
ID int32 `toml:"-"`
Workflow string `toml:"workflow"` // the raw representation of the workflow
Config string `toml:"config" db:"config"` // the raw representation of the config
// fields derived from the yaml spec, used for indexing the database
// note: i tried to make these private, but translating them to the database seems to require them to be public
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
Status WorkflowSpecStatus `db:"status"`
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
168 changes: 138 additions & 30 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 111 additions & 4 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package syncer

import (
"context"
"errors"
"database/sql"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -33,7 +34,15 @@ type WorkflowSecretsDS interface {
}

type WorkflowSpecsDS interface {
CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)
// UpsertWorkflowSpec inserts or updates a workflow spec. Updates on conflict of workflow name
// and owner
UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error)

// GetWorkflowSpec returns the workflow spec for the given owner and name.
GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error)

// DeleteWorkflowSpec deletes the workflow spec for the given owner and name.
DeleteWorkflowSpec(ctx context.Context, owner, name string) error
}

type ORM interface {
Expand Down Expand Up @@ -149,6 +158,104 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {
return crypto.Keccak256(append(owner, secretsURL...))
}

func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
return 0, errors.New("not implemented")
func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
var id int64

query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return 0, err
}
defer stmt.Close()

spec.UpdatedAt = time.Now()
err = stmt.QueryRowxContext(ctx, spec).Scan(&id)

if err != nil {
return 0, err
}

return id, nil
}

func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) {
query := `
SELECT *
FROM workflow_specs
WHERE workflow_owner = $1 AND workflow_name = $2
`

var spec job.WorkflowSpec
err := orm.ds.GetContext(ctx, &spec, query, owner, name)
if err != nil {
return nil, err
}

return &spec, nil
}

func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error {
query := `
DELETE FROM workflow_specs
WHERE workflow_owner = $1 AND workflow_name = $2
`

result, err := orm.ds.ExecContext(ctx, query, owner, name)
if err != nil {
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}

if rowsAffected == 0 {
return sql.ErrNoRows // No spec deleted
}

return nil
}
Loading

0 comments on commit 1168674

Please sign in to comment.