From 1defa6faccfeaf21192deda7d32464d58ebd1f9c Mon Sep 17 00:00:00 2001 From: FelixFan1992 Date: Fri, 5 Apr 2024 10:37:52 -0400 Subject: [PATCH] update --- pkg/v3/coordinator/coordinator.go | 2 ++ pkg/v3/flows/conditional.go | 16 ++++++------- pkg/v3/flows/conditional_test.go | 3 +-- pkg/v3/flows/factory.go | 7 +++--- pkg/v3/flows/factory_test.go | 26 ---------------------- pkg/v3/flows/logtrigger.go | 14 +++++------- pkg/v3/flows/recovery.go | 12 +++++----- pkg/v3/flows/retry.go | 9 ++++---- pkg/v3/flows/retry_test.go | 15 +++++++------ pkg/v3/observation.go | 8 ++++--- pkg/v3/observer.go | 12 +++++----- pkg/v3/plugin/hooks/add_log_proposals.go | 1 + pkg/v3/plugin/hooks/remove_from_staging.go | 1 + pkg/v3/plugin/ocr3.go | 10 +++++---- pkg/v3/plugin/plugin.go | 9 ++++---- pkg/v3/postprocessors/eligible.go | 1 + pkg/v3/postprocessors/ineligible.go | 1 + pkg/v3/postprocessors/metadata.go | 4 +++- pkg/v3/postprocessors/retry.go | 4 +++- pkg/v3/preprocessors/proposal_filterer.go | 7 ++++-- pkg/v3/stores/metadata_store.go | 7 +++++- pkg/v3/stores/result_store.go | 2 +- 22 files changed, 82 insertions(+), 89 deletions(-) diff --git a/pkg/v3/coordinator/coordinator.go b/pkg/v3/coordinator/coordinator.go index dfc859f7..eccc1d97 100644 --- a/pkg/v3/coordinator/coordinator.go +++ b/pkg/v3/coordinator/coordinator.go @@ -20,6 +20,8 @@ const ( defaultCacheClean = time.Duration(30) * time.Second ) +// prevent repeating transmission for the same upkeep (check in-flight upkeeps) by checking onchain events from event provider +// transmit again bc check block changes etc. type coordinator struct { closer internalutil.Closer logger *log.Logger diff --git a/pkg/v3/flows/conditional.go b/pkg/v3/flows/conditional.go index db6f9136..8791133f 100644 --- a/pkg/v3/flows/conditional.go +++ b/pkg/v3/flows/conditional.go @@ -19,14 +19,15 @@ import ( ) const ( - // This is the ticker interval for sampling conditional flow + // SamplingConditionInterval is the ticker interval for sampling conditional flow SamplingConditionInterval = 3 * time.Second - // Maximum number of upkeeps to be sampled in every round + // MaxSampledConditionals is the maximum number of upkeeps to be sampled in every round MaxSampledConditionals = 300 - // This is the ticker interval for final conditional flow + // FinalConditionalInterval is the ticker interval for final conditional flow FinalConditionalInterval = 1 * time.Second - // These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow - // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round + // FinalConditionalBatchSize is the maximum number of conditional upkeeps dequeued on every tick from proposal queue + // in FinalConditionalFlow. This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by + // plugin in every round FinalConditionalBatchSize = 50 ) @@ -40,11 +41,11 @@ func newSampleProposalFlow( logger *log.Logger, ) service.Recoverable { pre = append(pre, preprocessors.NewProposalFilterer(ms, types.LogTrigger)) - postprocessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(ms) + post := postprocessors.NewAddProposalToMetadataStorePostprocessor(ms) observer := ocr2keepersv3.NewRunnableObserver( pre, - postprocessors, + post, runner, ObservationProcessLimit, log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), @@ -114,7 +115,6 @@ func newFinalConditionalFlow( proposalQ types.ProposalQueue, builder common.PayloadBuilder, retryQ types.RetryQueue, - stateUpdater common.UpkeepStateUpdater, logger *log.Logger, ) service.Recoverable { post := postprocessors.NewCombinedPostprocessor( diff --git a/pkg/v3/flows/conditional_test.go b/pkg/v3/flows/conditional_test.go index cc9fb8d7..1688fd9d 100644 --- a/pkg/v3/flows/conditional_test.go +++ b/pkg/v3/flows/conditional_test.go @@ -83,7 +83,7 @@ func TestConditionalFinalization(t *testing.T) { // set the ticker time lower to reduce the test time interval := 50 * time.Millisecond pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} - svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger) + svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, logger) var wg sync.WaitGroup wg.Add(1) @@ -206,5 +206,4 @@ func TestSamplingProposal(t *testing.T) { upkeepProvider.AssertExpectations(t) coord.AssertExpectations(t) runner.AssertExpectations(t) - // ratio.AssertExpectations(t) } diff --git a/pkg/v3/flows/factory.go b/pkg/v3/flows/factory.go index d733b3ca..118b7f8f 100644 --- a/pkg/v3/flows/factory.go +++ b/pkg/v3/flows/factory.go @@ -4,30 +4,29 @@ import ( "log" "time" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) func ConditionalTriggerFlows( coord ocr2keepersv3.PreProcessor[common.UpkeepPayload], ratio types.Ratio, getter common.ConditionalUpkeepProvider, - subscriber common.BlockSubscriber, builder common.PayloadBuilder, resultStore types.ResultStore, metadataStore types.MetadataStore, runner ocr2keepersv3.Runner, proposalQ types.ProposalQueue, retryQ types.RetryQueue, - stateUpdater common.UpkeepStateUpdater, logger *log.Logger, ) []service.Recoverable { preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} // runs full check pipeline on a coordinated block with coordinated upkeeps - conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, stateUpdater, logger) + conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, logger) // the sampling proposal flow takes random samples of active upkeeps, checks // them and surfaces the ids if the items are eligible diff --git a/pkg/v3/flows/factory_test.go b/pkg/v3/flows/factory_test.go index d1363bfe..8ce3cb55 100644 --- a/pkg/v3/flows/factory_test.go +++ b/pkg/v3/flows/factory_test.go @@ -17,11 +17,6 @@ func TestConditionalTriggerFlows(t *testing.T) { nil, nil, nil, - &mockSubscriber{ - SubscribeFn: func() (int, chan common.BlockHistory, error) { - return 0, nil, nil - }, - }, nil, nil, nil, @@ -32,7 +27,6 @@ func TestConditionalTriggerFlows(t *testing.T) { }, nil, nil, - nil, log.New(io.Discard, "", 0), ) assert.Equal(t, 2, len(flows)) @@ -69,23 +63,3 @@ type mockRunner struct { func (r *mockRunner) CheckUpkeeps(ctx context.Context, p ...common.UpkeepPayload) ([]common.CheckResult, error) { return r.CheckUpkeepsFn(ctx, p...) } - -type mockSubscriber struct { - SubscribeFn func() (int, chan common.BlockHistory, error) - UnsubscribeFn func(int) error - StartFn func(ctx context.Context) error - CloseFn func() error -} - -func (r *mockSubscriber) Subscribe() (int, chan common.BlockHistory, error) { - return r.SubscribeFn() -} -func (r *mockSubscriber) Unsubscribe(i int) error { - return r.UnsubscribeFn(i) -} -func (r *mockSubscriber) Start(ctx context.Context) error { - return r.StartFn(ctx) -} -func (r *mockSubscriber) Close() error { - return r.CloseFn() -} diff --git a/pkg/v3/flows/logtrigger.go b/pkg/v3/flows/logtrigger.go index bfbe127a..88df65de 100644 --- a/pkg/v3/flows/logtrigger.go +++ b/pkg/v3/flows/logtrigger.go @@ -6,25 +6,21 @@ import ( "log" "time" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" -) - -var ( - ErrNotRetryable = fmt.Errorf("payload is not retryable") ) const ( - // This is the ticker interval for log trigger flow + // LogCheckInterval is the ticker interval for log trigger flow LogCheckInterval = 1 * time.Second - // Limit for processing a whole observer flow given a payload. The main component of this - // is the checkPipeline which involves some RPC, DB and Mercury calls, this is limited - // to 20 seconds for now + // ObservationProcessLimit is the limit for processing a whole observer flow given a payload. The main component of + // this is the checkPipeline which involves some RPC, DB and Mercury calls, this is limited to 20 seconds for now ObservationProcessLimit = 20 * time.Second ) diff --git a/pkg/v3/flows/recovery.go b/pkg/v3/flows/recovery.go index 77e2529e..d844aee3 100644 --- a/pkg/v3/flows/recovery.go +++ b/pkg/v3/flows/recovery.go @@ -8,22 +8,24 @@ import ( "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) const ( - // This is the ticker interval for recovery final flow + // RecoveryFinalInterval is the ticker interval for recovery final flow RecoveryFinalInterval = 1 * time.Second - // These are the maximum number of log upkeeps dequeued on every tick from proposal queue in FinalRecoveryFlow - // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round + // FinalRecoveryBatchSize is the maximum number of log upkeeps dequeued on every tick from proposal queue in + // FinalRecoveryFlow. This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in + // every round FinalRecoveryBatchSize = 50 - // This is the ticker interval for recovery proposal flow + // RecoveryProposalInterval is the ticker interval for recovery proposal flow RecoveryProposalInterval = 1 * time.Second ) diff --git a/pkg/v3/flows/retry.go b/pkg/v3/flows/retry.go index 200b094f..fe442fc2 100644 --- a/pkg/v3/flows/retry.go +++ b/pkg/v3/flows/retry.go @@ -6,19 +6,20 @@ import ( "log" "time" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) const ( - // These are the max number of payloads dequeued on every tick from the retry queue in the retry flow + // RetryBatchSize is the max number of payloads dequeued on every tick from the retry queue in the retry flow RetryBatchSize = 10 - // This is the ticker interval for retry flow + // RetryCheckInterval is the ticker interval for retry flow RetryCheckInterval = 5 * time.Second ) @@ -59,7 +60,7 @@ type retryTick struct { batchSize int } -func (t retryTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { +func (t retryTick) Value(_ context.Context) ([]common.UpkeepPayload, error) { if t.q == nil { return nil, nil } diff --git a/pkg/v3/flows/retry_test.go b/pkg/v3/flows/retry_test.go index fe5ef3eb..44f6abab 100644 --- a/pkg/v3/flows/retry_test.go +++ b/pkg/v3/flows/retry_test.go @@ -11,11 +11,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/stores" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types/mocks" - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) func TestRetryFlow(t *testing.T) { @@ -31,22 +32,22 @@ func TestRetryFlow(t *testing.T) { coord.On("PreProcess", mock.Anything, mock.Anything).Return([]common.UpkeepPayload{ { - UpkeepID: common.UpkeepIdentifier([32]byte{1}), + UpkeepID: [32]byte{1}, WorkID: "0x1", }, { - UpkeepID: common.UpkeepIdentifier([32]byte{2}), + UpkeepID: [32]byte{2}, WorkID: "0x2", }, }, nil).Times(times) runner.On("CheckUpkeeps", mock.Anything, mock.Anything, mock.Anything).Return([]common.CheckResult{ { - UpkeepID: common.UpkeepIdentifier([32]byte{1}), + UpkeepID: [32]byte{1}, WorkID: "0x1", Eligible: true, }, { - UpkeepID: common.UpkeepIdentifier([32]byte{2}), + UpkeepID: [32]byte{2}, WorkID: "0x2", Retryable: true, }, @@ -64,12 +65,12 @@ func TestRetryFlow(t *testing.T) { err := retryQ.Enqueue(types.RetryRecord{ Payload: common.UpkeepPayload{ - UpkeepID: common.UpkeepIdentifier([32]byte{1}), + UpkeepID: [32]byte{1}, WorkID: "0x1", }, }, types.RetryRecord{ Payload: common.UpkeepPayload{ - UpkeepID: common.UpkeepIdentifier([32]byte{2}), + UpkeepID: [32]byte{2}, WorkID: "0x2", }, }) diff --git a/pkg/v3/observation.go b/pkg/v3/observation.go index 8a56697a..0cadf854 100644 --- a/pkg/v3/observation.go +++ b/pkg/v3/observation.go @@ -17,7 +17,8 @@ const ( ObservationPerformablesLimit = 50 ObservationLogRecoveryProposalsLimit = 5 ObservationConditionalsProposalsLimit = 5 - ObservationBlockHistoryLimit = 256 + // ObservationBlockHistoryLimit is the amount of past blocks required from block history source + ObservationBlockHistoryLimit = 256 // MaxObservationLength applies a limit to the total length of bytes in an // observation. NOTE: This is derived from a limit of 10000 on performData @@ -60,6 +61,7 @@ func DecodeAutomationObservation(data []byte, utg types.UpkeepTypeGetter, wg typ return ao, nil } +// validateAutomationObservation validates the automation observation, including block history, performables, proposals, etc func validateAutomationObservation(o AutomationObservation, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) error { // Validate Block History if len(o.BlockHistory) > ObservationBlockHistoryLimit { @@ -121,7 +123,7 @@ func validateAutomationObservation(o AutomationObservation, utg types.UpkeepType return nil } -// Validates the check result fields sent within an observation +// validateCheckResult validates the check result fields sent within an observation func validateCheckResult(r ocr2keepers.CheckResult, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) error { if r.PipelineExecutionState != 0 || r.Retryable { return fmt.Errorf("check result cannot have failed execution state") @@ -169,7 +171,7 @@ func validateUpkeepProposal(p ocr2keepers.CoordinatedBlockProposal, utg types.Up return nil } -// Validate validates the trigger fields, and any extensions if present. +// validateTriggerExtensionType validates the trigger fields, and any extensions if present. func validateTriggerExtensionType(t ocr2keepers.Trigger, ut types.UpkeepType) error { switch ut { case types.ConditionTrigger: diff --git a/pkg/v3/observer.go b/pkg/v3/observer.go index 4fc926a8..5e4d0c61 100644 --- a/pkg/v3/observer.go +++ b/pkg/v3/observer.go @@ -81,25 +81,25 @@ func (o *Observer[T]) Process(ctx context.Context, tick tickers.Tick[[]T]) error defer cancel() // Get upkeeps from tick - value, err := tick.Value(pCtx) + upkeepPayloads, err := tick.Value(pCtx) if err != nil { return err } - o.lggr.Printf("got %d payloads from ticker", len(value)) + o.lggr.Printf("got %d payloads from ticker", len(upkeepPayloads)) // Run pre-processors for _, preprocessor := range o.Preprocessors { - value, err = preprocessor.PreProcess(pCtx, value) + upkeepPayloads, err = preprocessor.PreProcess(pCtx, upkeepPayloads) if err != nil { return err } } - o.lggr.Printf("processing %d payloads", len(value)) + o.lggr.Printf("processing %d payloads", len(upkeepPayloads)) // Run check pipeline - results, err := o.processFunc(pCtx, value...) + results, err := o.processFunc(pCtx, upkeepPayloads...) if err != nil { return err } @@ -107,7 +107,7 @@ func (o *Observer[T]) Process(ctx context.Context, tick tickers.Tick[[]T]) error o.lggr.Printf("post-processing %d results", len(results)) // Run post-processor - if err := o.Postprocessor.PostProcess(pCtx, results, value); err != nil { + if err := o.Postprocessor.PostProcess(pCtx, results, upkeepPayloads); err != nil { return err } diff --git a/pkg/v3/plugin/hooks/add_log_proposals.go b/pkg/v3/plugin/hooks/add_log_proposals.go index 755c9d14..a9189a59 100644 --- a/pkg/v3/plugin/hooks/add_log_proposals.go +++ b/pkg/v3/plugin/hooks/add_log_proposals.go @@ -45,6 +45,7 @@ func (h *AddLogProposalsHook) RunHook(obs *ocr2keepersv3.AutomationObservation, proposals = proposals[:limit] } + // should this be log recovery?? h.logger.Printf("adding %d log recovery proposals to observation", len(proposals)) obs.UpkeepProposals = append(obs.UpkeepProposals, proposals...) return nil diff --git a/pkg/v3/plugin/hooks/remove_from_staging.go b/pkg/v3/plugin/hooks/remove_from_staging.go index 7bfa8959..38a63d76 100644 --- a/pkg/v3/plugin/hooks/remove_from_staging.go +++ b/pkg/v3/plugin/hooks/remove_from_staging.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) +// staging is the result store for certain upkeeps func NewRemoveFromStagingHook(store types.ResultStore, logger *log.Logger) RemoveFromStagingHook { return RemoveFromStagingHook{ store: store, diff --git a/pkg/v3/plugin/ocr3.go b/pkg/v3/plugin/ocr3.go index e577b3c1..823e04c7 100644 --- a/pkg/v3/plugin/ocr3.go +++ b/pkg/v3/plugin/ocr3.go @@ -81,6 +81,7 @@ func (plugin *ocr3Plugin) Observation(ctx context.Context, outctx ocr3types.Outc prommetrics.AutomationPluginPerformables.WithLabelValues(prommetrics.PluginStepObservation).Set(float64(len(observation.Performable))) // Encode the observation to bytes + // libocr receives observation bytes and compare observations to reach quorum return observation.Encode() } @@ -238,6 +239,8 @@ func (plugin *ocr3Plugin) ShouldTransmitAcceptedReport(_ context.Context, seqNr transmit := false // If any upkeep should be transmitted, then transmit + // why? if one upkeep should not be transmitted (already in-flight or some other reasons), will that interfere with + // in-flight tx? for _, upkeep := range upkeeps { shouldTransmit := plugin.Coordinator.ShouldTransmit(upkeep) plugin.Logger.Printf("checking transmit of upkeep '%s', trigger %s in sequence number %d returned %t", upkeep.UpkeepID, upkeep.Trigger, seqNr, shouldTransmit) @@ -274,13 +277,12 @@ func (plugin *ocr3Plugin) startServices() { func (plugin *ocr3Plugin) getReportFromPerformables(toPerform []ocr2keepers.CheckResult) (ocr3types.ReportWithInfo[AutomationReportInfo], error) { encoded, err := plugin.ReportEncoder.Encode(toPerform...) return ocr3types.ReportWithInfo[AutomationReportInfo]{ - Report: ocr2plustypes.Report(encoded), + Report: encoded, }, err } -// Generates a randomness source derived from the config and seq # so -// that it's the same across the network for the same round. -// similar key building as libocr transmit selector. +// getRandomKeySource generates a randomness source derived from the config and seq # so that it's the same across the +// network for the same round. this is similar key building as libocr transmit selector. func getRandomKeySource(cd ocr2plustypes.ConfigDigest, seqNr uint64) [16]byte { return random.GetRandomKeySource(cd[:], seqNr) } diff --git a/pkg/v3/plugin/plugin.go b/pkg/v3/plugin/plugin.go index 6ebb74a1..0e87923f 100644 --- a/pkg/v3/plugin/plugin.go +++ b/pkg/v3/plugin/plugin.go @@ -4,6 +4,10 @@ import ( "fmt" "log" + ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocr2plustypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/config" "github.com/smartcontractkit/chainlink-automation/pkg/v3/coordinator" "github.com/smartcontractkit/chainlink-automation/pkg/v3/flows" @@ -13,9 +17,6 @@ import ( "github.com/smartcontractkit/chainlink-automation/pkg/v3/stores" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - ocr2plustypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) func newPlugin( @@ -88,14 +89,12 @@ func newPlugin( coord, ratio, getter, - blockSource, builder, resultStore, metadataStore, runner, proposalQ, retryQ, - upkeepStateUpdater, logger, ) if err != nil { diff --git a/pkg/v3/postprocessors/eligible.go b/pkg/v3/postprocessors/eligible.go index f5cbc8b7..64f6aaf5 100644 --- a/pkg/v3/postprocessors/eligible.go +++ b/pkg/v3/postprocessors/eligible.go @@ -35,6 +35,7 @@ func NewEligiblePostProcessor(resultsAdder checkResultAdder, logger *log.Logger) } } +// PostProcess adds eligible upkeeps to result store func (p *eligiblePostProcessor) PostProcess(_ context.Context, results []ocr2keepers.CheckResult, _ []ocr2keepers.UpkeepPayload) error { eligible := 0 for _, res := range results { diff --git a/pkg/v3/postprocessors/ineligible.go b/pkg/v3/postprocessors/ineligible.go index f2a7be28..0a9458f8 100644 --- a/pkg/v3/postprocessors/ineligible.go +++ b/pkg/v3/postprocessors/ineligible.go @@ -23,6 +23,7 @@ func NewIneligiblePostProcessor(stateUpdater ocr2keepers.UpkeepStateUpdater, log } } +// PostProcess updates upkeep's states for ineligible upkeeps func (p *ineligiblePostProcessor) PostProcess(ctx context.Context, results []ocr2keepers.CheckResult, _ []ocr2keepers.UpkeepPayload) error { var merr error ineligible := 0 diff --git a/pkg/v3/postprocessors/metadata.go b/pkg/v3/postprocessors/metadata.go index bba4a81f..28952ab8 100644 --- a/pkg/v3/postprocessors/metadata.go +++ b/pkg/v3/postprocessors/metadata.go @@ -3,8 +3,9 @@ package postprocessors import ( "context" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) type addProposalToMetadataStore struct { @@ -15,6 +16,7 @@ func NewAddProposalToMetadataStorePostprocessor(store types.MetadataStore) *addP return &addProposalToMetadataStore{metadataStore: store} } +// PostProcess adds eligible upkeeps to proposals in metadata store func (a *addProposalToMetadataStore) PostProcess(_ context.Context, results []ocr2keepers.CheckResult, _ []ocr2keepers.UpkeepPayload) error { // should only add values and not remove them for _, r := range results { diff --git a/pkg/v3/postprocessors/retry.go b/pkg/v3/postprocessors/retry.go index ed9757f0..6c8b3c3e 100644 --- a/pkg/v3/postprocessors/retry.go +++ b/pkg/v3/postprocessors/retry.go @@ -6,9 +6,10 @@ import ( "fmt" "log" + ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) func NewRetryablePostProcessor(q types.RetryQueue, logger *log.Logger) *retryablePostProcessor { @@ -25,6 +26,7 @@ type retryablePostProcessor struct { var _ PostProcessor = (*retryablePostProcessor)(nil) +// PostProcess enqueues retryable upkeep payloads to retry queue func (p *retryablePostProcessor) PostProcess(_ context.Context, results []ocr2keepers.CheckResult, payloads []ocr2keepers.UpkeepPayload) error { var err error retryable := 0 diff --git a/pkg/v3/preprocessors/proposal_filterer.go b/pkg/v3/preprocessors/proposal_filterer.go index 89472f26..ff60591c 100644 --- a/pkg/v3/preprocessors/proposal_filterer.go +++ b/pkg/v3/preprocessors/proposal_filterer.go @@ -3,9 +3,10 @@ package preprocessors import ( "context" + ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) func NewProposalFilterer(metadata types.MetadataStore, upkeepType types.UpkeepType) ocr2keepersv3.PreProcessor[ocr2keepers.UpkeepPayload] { @@ -22,9 +23,11 @@ type proposalFilterer struct { var _ ocr2keepersv3.PreProcessor[ocr2keepers.UpkeepPayload] = (*proposalFilterer)(nil) -func (p *proposalFilterer) PreProcess(ctx context.Context, payloads []ocr2keepers.UpkeepPayload) ([]ocr2keepers.UpkeepPayload, error) { +// PreProcess returns all the payloads which don't currently exist in matadata store. +func (p *proposalFilterer) PreProcess(_ context.Context, payloads []ocr2keepers.UpkeepPayload) ([]ocr2keepers.UpkeepPayload, error) { all := p.metadata.ViewProposals(p.upkeepType) flatten := map[string]bool{} + // can we make this more efficient? wonder if it's worth the effort for _, proposal := range all { flatten[proposal.WorkID] = true } diff --git a/pkg/v3/stores/metadata_store.go b/pkg/v3/stores/metadata_store.go index 2d0239a3..108203c0 100644 --- a/pkg/v3/stores/metadata_store.go +++ b/pkg/v3/stores/metadata_store.go @@ -8,8 +8,9 @@ import ( "sync/atomic" "time" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) const ( @@ -30,6 +31,7 @@ func (r expiringRecord) expired(expr time.Duration) bool { return time.Since(r.createdAt) > expr } +// metadataStore has a block history source and maintains upkeep proposals type metadataStore struct { chID int ch chan commontypes.BlockHistory @@ -46,6 +48,7 @@ type metadataStore struct { typeGetter types.UpkeepTypeGetter } +// NewMetadataStore creates a new metadata store func NewMetadataStore(subscriber commontypes.BlockSubscriber, typeGetter types.UpkeepTypeGetter) (*metadataStore, error) { chID, ch, err := subscriber.Subscribe() if err != nil { @@ -157,6 +160,7 @@ func (m *metadataStore) addLogRecoveryProposal(proposals ...commontypes.Coordina } } +// viewLogRecoveryProposal iterates all log recovery proposals, deletes expired proposals, and return the rest func (m *metadataStore) viewLogRecoveryProposal() []commontypes.CoordinatedBlockProposal { // We also remove expired items in this function, hence take Lock() instead of RLock() m.logRecoveryMutex.Lock() @@ -197,6 +201,7 @@ func (m *metadataStore) addConditionalProposal(proposals ...commontypes.Coordina } } +// viewConditionalProposal iterates all conditional proposals, deletes expired proposals, and return the rest func (m *metadataStore) viewConditionalProposal() []commontypes.CoordinatedBlockProposal { // We also remove expired items in this function, hence take Lock() instead of RLock() m.conditionalMutex.Lock() diff --git a/pkg/v3/stores/result_store.go b/pkg/v3/stores/result_store.go index 056e6131..0bf54d58 100644 --- a/pkg/v3/stores/result_store.go +++ b/pkg/v3/stores/result_store.go @@ -15,7 +15,7 @@ import ( ) var ( - storeTTL = time.Minute * 5 + storeTTL = 5 * time.Minute gcInterval = 30 * time.Second )