-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PluginProcessors [CCIP-3147] #98
Conversation
query "github.com/smartcontractkit/chainlink-common/pkg/types/query" | ||
primitives "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" | ||
mock "github.com/stretchr/testify/mock" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep goimports
in sync, so we don't have random changes in PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the command to fix imports and it did this :/
commit/types.go
Outdated
} | ||
|
||
// Encode encodes an Outcome deterministically | ||
func (o Outcome) Encode() ([]byte, error) { | ||
|
||
// Sort all lists to ensure deterministic serialization | ||
o.sort() | ||
o.MerkleOutcome.Sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe both of this are not required anymore, we have deterministic outcomes prior to encoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't currently sort the outcomes before returning it. We can do so but leaving this here is a good safe guard. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Determinism is guaranteed. Encoding should not be mixed with determinism.
Imagine Json Encoding
re-ordering your arrays, not nice :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start but I think we need to improve unit-testability. One idea is to have a generic interface that is implemented by all sub-plugins:
type SubPlugin[QueryType any, ObservationType any, OutcomeType any] interface {
// Will be needed by merkleroots subplugin
Query(ctx context.Context, outctx OutcomeContext) (QueryType, error)
Observation(ctx context.Context, outctx OutcomeContext, query QueryType) (ObservationType, error)
ValidateObservation(outctx OutcomeContext, query QueryType, ao types.AttributedObservation) error
Outcome(outctx OutcomeContext, query QueryType, aos []types.AttributedObservation) (OutcomeType, error)
}
Then calls to this can easily be mocked in unit tests, and would help us unit-test the "combiner" functions used in all the OCR phases.
commit/merkleworker/types.go
Outdated
FChain map[cciptypes.ChainSelector]int `json:"fChain"` | ||
} | ||
|
||
func (obs MerkleObservation) Encode() ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm removing the Encode from all subplugins, this was copied while initially moving the existing plugin into merkleroot sub plugin. One Encode at the top level Plugin should be okay IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, we also want to replace this with proto soon
I was thinking about commenting something similar. The problem is that the different observe helpers have different arguments as well, which sort of defeats the purpose of adding this layer of abstraction. IMO you can get the same testability with a simple type alias, i.e.:
This also makes it so each observer act as a standalone piece of code rather than forcing it to be part of the larger interface. edit: I may be mistaking the |
3d37fea
to
bde95cb
Compare
TokenPrices: p.observer.ObserveTokenPrices(ctx), | ||
FChain: p.observer.ObserveFChain(), | ||
MerkleRoots: w.observer.ObserveMerkleRoots(ctx, previousOutcome.RangesSelectedForReport), | ||
FChain: w.observer.ObserveFChain(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving FChain observed here for now. Will iterate on having Fchain sent to the workers later. to unblock working on tokens and gas in commit plugin.
29f7101
to
0445c0b
Compare
@@ -9,15 +9,18 @@ import ( | |||
"time" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed current tests to use multiple outcomes (specifically merkleroot.Outcome
). No new tests added. Maybe later we'll refactor this to have some mocks and make sure it's aggregating correctly without testing the underlying processors as already each processor will have their own unit tests.
@@ -1,4 +1,4 @@ | |||
package commit | |||
package shared |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving it to shared so that we can use without cyclic dependencies. Just changing fields to make them publicly available. No added or updated functionality.
0445c0b
to
d8e4c12
Compare
commit/fees/types.go
Outdated
func (o Outcome) Sort() { | ||
// TODO: implement | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we should just enforce that these outcomes that are produced are always correctly sorted in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having outcome implement it makes it clear that we need to sort them. I agree we need to test it as well. I'm going to use the sort inside the Outcome function in each of the sub plugins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think I agree with that, test enforcement is always better than this kind of implicit contract ("plz implement")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the conflict. What's the problem with having both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is only used in the commit plugin, wouldn't it be more apt to place it in commit/shared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be used later with exec plugin as well. It has nothing to do with commit only logic. Leaving it in shared makes it easier to discover and use if needed in other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to share this across both plugins 👍
36f6646
to
0f4b0dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i suggest the following renames
fees -> chainfee
token -> tokenprice
then we get
chainfee.Processor
tokenprice.Processor
lggr logger.Logger | ||
observer Observer | ||
readerSyncer *plugincommon.BackgroundReaderSyncer | ||
ccipReader reader.CCIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it's better if we move the plugin methods in this file.
Then we have:
merkleroot/
processor.go
processor_test.go
types.go
types_test.go
Right now I find it hard to navigate through the different files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more readable having different files for observation, outcome, report,..etc. I can change the other processors to follow the same scheme so that navigating files is easier. @makramkd @winder WDYT? In all cases I don't think this needs to be done in this PR. We want to unblock the work on tokens and fees for the time being.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different strokes for different folks, I like the locality as long as the file is < 1,000 lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like locality if it's < 1000 lines, this one will be above the 1K lines and the tests specifically will be hard to navigate as it will be much more than 1K lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference, but usually lean towards keeping an interface implementation all in one file. That said execute is huge and I was planning to split it up like this at some point.
Either way I agree that it should be done in a separate PR.
return nil | ||
} | ||
|
||
seenChains := mapset.NewSet[cciptypes.ChainSelector]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spent a little time reading about mapset
, while I'm still not used to seeing it instead of the stdlib version which would use map[cciptypes.Chainsselector]struct{}
, at the very least we can get more performance by using the thread unsafe version.
seenChains := mapset.NewSet[cciptypes.ChainSelector]() | |
seenChains := mapset.NewThreadUnsafeSet[cciptypes.ChainSelector]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay using the NewThreadUnsafeSet
. It's just usually very alarming for people when they read it and the performance gain won't be much considering the number of selectors we have. Also won't need to worry if we have any concurrency about it.
if !observerSupportedChains.Contains(root.ChainSel) { | ||
return fmt.Errorf("found merkle root for chain %d, but this chain is not supported by Observer %d", | ||
root.ChainSel, observer) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a complete error message can be created by removing this block and adding a new one outside of the loop:
unsupportedChains := observerSupportedChains.Difference(seenChains)
if len(unsupportedChains) > 0 {
return fmt.Errorf("found merkle root for chains unsupported by oracleID %d: %s", observer, unsupportedChains)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to ticket CCIP-3166 to fix later.
return nil | ||
} | ||
|
||
func validateObservedOnRampMaxSeqNums( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestions as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to ticket CCIP-3166 to fix later.
return nil | ||
} | ||
|
||
func validateObservedOffRampMaxSeqNums( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestions as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to ticket CCIP-3166 to fix later.
commit/plugin.go
Outdated
merkleProcessor shared.PluginProcessor[merkleroot.Query, merkleroot.Observation, merkleroot.Outcome] | ||
tokenProcessor shared.PluginProcessor[tokenprice.Query, tokenprice.Observation, tokenprice.Outcome] | ||
feeProcessor shared.PluginProcessor[chainfee.Query, chainfee.Observation, chainfee.Outcome] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not using these generic types so shared.PluginProcessor
should define them as any
for a simpler interface.
If you don't need the types, you can now be more concise:
merkleProcessor shared.PluginProcessor[merkleroot.Query, merkleroot.Observation, merkleroot.Outcome] | |
tokenProcessor shared.PluginProcessor[tokenprice.Query, tokenprice.Observation, tokenprice.Outcome] | |
feeProcessor shared.PluginProcessor[chainfee.Query, chainfee.Observation, chainfee.Outcome] | |
merkleProcessor shared.PluginProcessor | |
tokenProcessor shared.PluginProcessor | |
feeProcessor shared.PluginProcessor |
If you do need the types in some cases, specify the concrete type:
merkleProcessor shared.PluginProcessor[merkleroot.Query, merkleroot.Observation, merkleroot.Outcome] | |
tokenProcessor shared.PluginProcessor[tokenprice.Query, tokenprice.Observation, tokenprice.Outcome] | |
feeProcessor shared.PluginProcessor[chainfee.Query, chainfee.Observation, chainfee.Outcome] | |
merkleProcessor merkleroot.Processor // because we need to call ObserveFChain()' | |
tokenProcessor shared.PluginProcessor | |
feeProcessor shared.PluginProcessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need the types when we start testing. and using the plugin makes the one mock enough to use. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not difficult to cast any
to the concrete type if needed in tests. You're not actually using the interface with the current code, so you may as well remove it rather than deal with all the generics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've discussed on a call and decided to keep as is for testability and we'll revisit this if we can make it more generic later.
commit/plugin.go
Outdated
merkleObs, err := p.merkleProcessor.Observation(ctx, prevOutcome.MerkleRootOutcome, merkleroot.Query{}) | ||
if err != nil { | ||
p.lggr.Errorw("failed to get merkle observation", "err", err) | ||
} | ||
tokenPricesObs, err := p.tokenProcessor.Observation(ctx, prevOutcome.TokenPriceOutcome, tokenprice.Query{}) | ||
if err != nil { | ||
//log error | ||
p.lggr.Errorw("failed to get token prices", "err", err) | ||
} | ||
gasObs, err := p.feeProcessor.Observation(ctx, prevOutcome.ChainFeeOutcome, chainfee.Query{}) | ||
if err != nil { | ||
p.lggr.Errorw("failed to get gas prices", "err", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for the deterministic ordering, this could be a fully decoupled generic SubPlugin aggregator.
Replace:
merkleProcessor shared.PluginProcessor
tokenProcessor shared.PluginProcessor
feeProcessor shared.PluginProcessor
With:
subPlugins map[string]shared.SubPlugin
This loop becomes:
var observations map[string]any
for k, processor := range subPlugins {
// I suppose args need a little more thought. You could have shared k/v's
// Not sure about the query.
observations[k] = processor.Observation(ctx, prevOutcome, query)
}
return Encode(observations)
The neat part here is since you're using polymorphism, the Observation function doesn't change when you add another SubPlugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to do this earlier and didn't go this way to make it simpler, different types will make it more complex once you start having conditions or other mapping for different arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to do it in this PR, but I think its a natural progression of this work.
@@ -105,18 +243,18 @@ func (p *Plugin) Close() error { | |||
return nil | |||
} | |||
|
|||
func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) (Outcome, State) { | |||
func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well merge this function with DecodeOutcome
now that the special part has been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's called in multiple places so far. Nicer having it this way instead of copying the logic of empty outcome and the error checking 3 times IMO.
commit/types.go
Outdated
MerkleObs merkleroot.Observation `json:"merkleObs"` | ||
TokenObs tokenprice.Observation `json:"tokenObs"` | ||
GasObs chainfee.Observation `json:"gasObs"` | ||
FChain map[cciptypes.ChainSelector]int `json:"fChain"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next logical step is to have this top level plugin fully generic, I suppose this would end up as map[string]any
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea
func NewCCIPChainSupport( | ||
lggr logger.Logger, | ||
homeChain reader.HomeChain, | ||
oracleIDToP2PID map[commontypes.OracleID]libocrtypes.PeerID, | ||
nodeID commontypes.OracleID, | ||
destChain cciptypes.ChainSelector, | ||
) CCIPChainSupport { | ||
return CCIPChainSupport{ | ||
lggr: lggr, | ||
homeChain: homeChain, | ||
oracleIDToP2PID: oracleIDToP2PID, | ||
nodeID: nodeID, | ||
destChain: destChain, | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this thing could be passed into the plugin instead of the home chain? And/or expanded to support all the subconsensus stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might worth checking the feasability of this 👍
93c57e5
to
6d730dd
Compare
… plugin. The idea is to separate the logic of different types of observations and outcomes into separate processors. This makes it easier to manage and test the logic of each type of observation/outcome without affecting each others. Some of them will implement state machines (e.g. merkleroot), others might implement simpler logic. (e.g. token) Also makes running them in parallel more streamlined. The OCR plugin becomes a coordinator/collector of these SubPlugins. Example Pseudo code: ``` OCRPlugin { nodeID merkleProcessor tokenProcessor feeProcessor ... } OCRPlugin.Observer { mObs := merkleProcessor.Observer tObs := tokenProcessor.Observer fObs := feeProcessor.Observer return Observation{mObs, tObs, fObs} } OCRPlugin.Validate { mObs := merkleProcessor.Validate tObs := tokenProcessor.Validate fObs := feeProcessor.Validate check errors for each return nil } OCRPlugin.Outcome { mOut := merkleProcessor.Outcome tOut := tokenProcessor.Outcome fOut := feeProcessor.Outcome return Outcome{mOut, tOut, fOut} } OCRPlugin.Report { return Report{mOut.X, tOut.Y, fOut.Z} } ``` Notice all PluginProcessor interface functions are using `prevOutcome` instead of `outCtx`. We're interested in the prevOutcome, and it makes it easier to have all decoding on the top level (OCR plugin), otherwise there might be cyclic dependencies or just complicating the code more. Signed-off-by: asoliman <[email protected]> Plugin observations Still outcome is not working Signed-off-by: asoliman <[email protected]> Plugin Outcome and Report Signed-off-by: asoliman <[email protected]> Add tokenworker and some cleanups Signed-off-by: asoliman <[email protected]> Add gasworker Signed-off-by: asoliman <[email protected]> linting Signed-off-by: asoliman <[email protected]> Add subplugin interface Signed-off-by: asoliman <[email protected]> Cleaning up Signed-off-by: asoliman <[email protected]> linting Signed-off-by: asoliman <[email protected]> Add SubPlugin to Mockery Signed-off-by: asoliman <[email protected]> Rename Worker to Processor Signed-off-by: asoliman <[email protected]> Cleaning round Signed-off-by: asoliman <[email protected]> Prallelize sub plugins Signed-off-by: asoliman <[email protected]> Fix mockery Signed-off-by: asoliman <[email protected]> Use SubPlugin interface Signed-off-by: asoliman <[email protected]> linting Signed-off-by: asoliman <[email protected]> run mockery Signed-off-by: asoliman <[email protected]> Add docs Signed-off-by: asoliman <[email protected]> Return empty report when merkleroots not generated Signed-off-by: asoliman <[email protected]> Rename packages Signed-off-by: asoliman <[email protected]> Rename variables Signed-off-by: asoliman <[email protected]> docs and lint Signed-off-by: asoliman <[email protected]> Cleaning Signed-off-by: asoliman <[email protected]> Refactoring based on reviews Signed-off-by: asoliman <[email protected]> Remove encode observation from merkleroot subprocessor Signed-off-by: asoliman <[email protected]> Remove go routines for now to simplify logic Signed-off-by: asoliman <[email protected]> Update commit/plugin.go Co-authored-by: Will Winder <[email protected]> review comments Signed-off-by: asoliman <[email protected]> refactor commit observation to use better names Signed-off-by: asoliman <[email protected]>
e3209ac
to
efec077
Compare
Test Coverage
|
Add PluginProcessor interface to create multiple Processors under OCR plugin.
The idea is to separate the logic of different types of observations and outcomes into separate processors.
This makes it easier to manage and test the logic of each type of observation/outcome without affecting each others.
Some of them will implement state machines (e.g. merkleroot), others might implement simpler logic. (e.g. token)
Also makes running them in parallel more streamlined.
The OCR plugin becomes a coordinator/collector of these SubPlugins.
Example Pseudo code:
Notice all PluginProcessor interface functions are using
prevOutcome
instead ofoutCtx
.We're interested in the prevOutcome, and it makes it easier to have all decoding on the top level (OCR plugin),
otherwise there might be cyclic dependencies or just complicating the code more.