diff --git a/core/capabilities/targets/write_target.go b/core/capabilities/targets/write_target.go index c6d34271662..531730cc089 100644 --- a/core/capabilities/targets/write_target.go +++ b/core/capabilities/targets/write_target.go @@ -22,14 +22,15 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/forwarder" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) var forwardABI = evmtypes.MustGetABI(forwarder.KeystoneForwarderMetaData.ABI) -func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer) error { +func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, lggr logger.Logger) error { for _, chain := range legacyEVMChains.Slice() { - capability := NewEvmWrite(chain) + capability := NewEvmWrite(chain, lggr) if err := registry.Add(context.TODO(), capability); err != nil { return err } @@ -41,12 +42,15 @@ var ( _ capabilities.ActionCapability = &EvmWrite{} ) +const defaultGasLimit = 200000 + type EvmWrite struct { chain legacyevm.Chain capabilities.CapabilityInfo + lggr logger.Logger } -func NewEvmWrite(chain legacyevm.Chain) *EvmWrite { +func NewEvmWrite(chain legacyevm.Chain, lggr logger.Logger) *EvmWrite { // generate ID based on chain selector name := fmt.Sprintf("write_%v", chain.ID()) chainName, err := chainselectors.NameFromChainId(chain.ID().Uint64()) @@ -64,6 +68,7 @@ func NewEvmWrite(chain legacyevm.Chain) *EvmWrite { return &EvmWrite{ chain, info, + lggr.Named("EvmWrite"), } } @@ -153,6 +158,7 @@ func encodePayload(args []any, rawSelector string) ([]byte, error) { } func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, request capabilities.CapabilityRequest) error { + cap.lggr.Debugw("Execute", "request", request) // TODO: idempotency // TODO: extract into ChainWriter? @@ -184,8 +190,6 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C // TODO: validate encoded report is prefixed with workflowID and executionID that match the request meta - // unlimited gas in the MVP demo - gasLimit := 0 // No signature validation in the MVP demo signatures := [][]byte{} @@ -208,7 +212,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C FromAddress: config.FromAddress().Address(), ToAddress: config.ForwarderAddress().Address(), EncodedPayload: calldata, - FeeLimit: uint32(gasLimit), + FeeLimit: uint32(defaultGasLimit), Meta: txMeta, Strategy: strategy, Checker: checker, diff --git a/core/capabilities/targets/write_target_test.go b/core/capabilities/targets/write_target_test.go index 68ca890cc0c..c99e84beb75 100644 --- a/core/capabilities/targets/write_target_test.go +++ b/core/capabilities/targets/write_target_test.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" @@ -45,7 +46,7 @@ func TestEvmWrite(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, cfg) chain.On("Config").Return(evmcfg) - capability := targets.NewEvmWrite(chain) + capability := targets.NewEvmWrite(chain, logger.TestLogger(t)) ctx := testutils.Context(t) config, err := values.NewMap(map[string]any{ diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 13a8bda4043..6faa0bacdb8 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -44,7 +44,7 @@ func (d *Delegate) ServicesForSpec(spec job.Job) ([]job.ServiceCtx, error) { func NewDelegate(logger logger.Logger, registry types.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer) *Delegate { // NOTE: we temporarily do registration inside NewDelegate, this will be moved out of job specs in the future - _ = targets.InitializeWrite(registry, legacyEVMChains) + _ = targets.InitializeWrite(registry, legacyEVMChains, logger) return &Delegate{logger: logger, registry: registry} } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 3260702e66b..01d1326e072 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -15,9 +15,10 @@ import ( ) const ( - mockedWorkflowID = "aaaaaaaa-f4d1-422f-a4b2-8ce0a1075f0a" - mockedExecutionID = "bbbbbbbb-f4d1-422f-a4b2-8ce0a1075f0a" - mockedTriggerID = "cccccccc-5cac-4071-be62-0152dd9adb0f" + // NOTE: max 32 bytes per ID - consider enforcing exactly 32 bytes? + mockedWorkflowID = "aaaaaaaaaa0000000000000000000000" + mockedExecutionID = "bbbbbbbbbb0000000000000000000000" + mockedTriggerID = "cccccccccc0000000000000000000000" ) type Engine struct { @@ -143,13 +144,20 @@ func (e *Engine) handleExecution(ctx context.Context, event capabilities.Capabil if err != nil { return err } + if len(results.Underlying) == 0 { + return fmt.Errorf("consensus returned no reports") + } + if len(results.Underlying) > 1 { + e.logger.Debugw("consensus returned more than one report") + } - _, err = e.handleTarget(ctx, results) + // we're expecting exactly one report + _, err = e.handleTarget(ctx, results.Underlying[0]) return err } -func (e *Engine) handleTarget(ctx context.Context, resp *values.List) (*values.List, error) { - +func (e *Engine) handleTarget(ctx context.Context, resp values.Value) (*values.List, error) { + e.logger.Debugw("handle target") inputs := map[string]values.Value{ "report": resp, } @@ -158,23 +166,28 @@ func (e *Engine) handleTarget(ctx context.Context, resp *values.List) (*values.L Inputs: &values.Map{Underlying: inputs}, Config: e.targetConfig, Metadata: capabilities.RequestMetadata{ - WorkflowID: mockedWorkflowID, + WorkflowID: mockedWorkflowID, + WorkflowExecutionID: mockedExecutionID, }, } return capabilities.ExecuteSync(ctx, e.target, tr) } -func (e *Engine) handleConsensus(ctx context.Context, resp capabilities.CapabilityResponse) (*values.List, error) { - e.logger.Debugw("running consensus", "resp", resp) - inputs := map[string]values.Value{ - "observations": resp.Value, - } +func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (*values.List, error) { + e.logger.Debugw("running consensus", "event", event) cr := capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: mockedWorkflowID, WorkflowExecutionID: mockedExecutionID, }, - Inputs: &values.Map{Underlying: inputs}, + Inputs: &values.Map{ + Underlying: map[string]values.Value{ + // each node provides a single observation - outputs of mercury trigger + "observations": &values.List{ + Underlying: []values.Value{event.Value}, + }, + }, + }, Config: e.consensusConfig, } return capabilities.ExecuteSync(ctx, e.consensus, cr)