Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log poller for vrf v2/v2+ #11174

Merged
merged 22 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (d *Delegate) JobType() job.Type {
return job.VRF
}

func (d *Delegate) BeforeJobCreated(spec job.Job) {}
func (d *Delegate) AfterJobCreated(spec job.Job) {}
func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }
func (d *Delegate) BeforeJobCreated(job.Job) {}
func (d *Delegate) AfterJobCreated(job.Job) {}
func (d *Delegate) BeforeJobDeleted(job.Job) {}
func (d *Delegate) OnDeleteJob(job.Job, pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -160,24 +160,28 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return nil, errors.Wrap(err2, "NewAggregatorV3Interface")
}

return []job.ServiceCtx{v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
return []job.ServiceCtx{
v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
func() {},
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTaskV2); ok {
if err2 := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil {
Expand Down Expand Up @@ -225,10 +229,13 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTask); ok {
return []job.ServiceCtx{&v1.Listener{
Expand Down
26 changes: 6 additions & 20 deletions core/services/vrf/v2/bhs_feeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrftesthelpers"
"github.com/smartcontractkit/chainlink/v2/core/store/models"

Expand Down Expand Up @@ -63,26 +62,12 @@ func TestStartHeartbeats(t *testing.T) {
heartbeatPeriod := 5 * time.Second

t.Run("bhs_feeder_startheartbeats_happy_path", func(tt *testing.T) {
coordinatorAddress := uni.rootContractAddress
vrfVersion := vrfcommon.V2

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
require.NoError(t, app.Start(testutils.Context(t)))

var (
v2CoordinatorAddress string
v2PlusCoordinatorAddress string
)

if vrfVersion == vrfcommon.V2 {
v2CoordinatorAddress = coordinatorAddress.String()
} else if vrfVersion == vrfcommon.V2Plus {
v2PlusCoordinatorAddress = coordinatorAddress.String()
}

_ = vrftesthelpers.CreateAndStartBHSJob(
t, bhsKeyAddresses, app, uni.bhsContractAddress.String(), "",
v2CoordinatorAddress, v2PlusCoordinatorAddress, "", 0, 200, heartbeatPeriod, 100)
uni.rootContractAddress.String(), "", "", 0, 200, heartbeatPeriod, 100)

// Ensure log poller is ready and has all logs.
require.NoError(t, app.GetRelayers().LegacyEVMChains().Slice()[0].LogPoller().Ready())
Expand All @@ -97,9 +82,10 @@ func TestStartHeartbeats(t *testing.T) {
t.Logf("Sleeping %.2f seconds before checking blockhash in BHS added by BHS_Heartbeats_Service\n", diff.Seconds())
time.Sleep(diff)
// storeEarliest in BHS contract stores blocktip - 256 in the Blockhash Store (BHS)
// before the initTxns:260 txns sent by the loop above, 18 txns are sent by
// newVRFCoordinatorV2Universe method. block tip is initTxns + 18
blockTip := initTxns + 18
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, uint64(blockTip-256))
tipHeader, err := uni.backend.HeaderByNumber(testutils.Context(t), nil)
require.NoError(t, err)
// the storeEarliest transaction will end up in a new block, hence the + 1 below.
blockNumberStored := tipHeader.Number.Uint64() - 256 + 1
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, blockNumberStored)
})
}
37 changes: 37 additions & 0 deletions core/services/vrf/v2/coordinator_v2x_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
type CoordinatorV2_X interface {
Address() common.Address
ParseRandomWordsRequested(log types.Log) (RandomWordsRequested, error)
ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error)
RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error)
AddConsumer(opts *bind.TransactOpts, subID *big.Int, consumer common.Address) (*types.Transaction, error)
CreateSubscription(opts *bind.TransactOpts) (*types.Transaction, error)
Expand All @@ -47,6 +48,10 @@ type CoordinatorV2_X interface {
GetCommitment(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error)
Migrate(opts *bind.TransactOpts, subID *big.Int, newCoordinator common.Address) (*types.Transaction, error)
FundSubscriptionWithNative(opts *bind.TransactOpts, subID *big.Int, amount *big.Int) (*types.Transaction, error)
// RandomWordsRequestedTopic returns the log topic of the RandomWordsRequested log
RandomWordsRequestedTopic() common.Hash
// RandomWordsFulfilledTopic returns the log topic of the RandomWordsFulfilled log
RandomWordsFulfilledTopic() common.Hash
}

type coordinatorV2 struct {
Expand All @@ -61,6 +66,14 @@ func NewCoordinatorV2(c *vrf_coordinator_v2.VRFCoordinatorV2) CoordinatorV2_X {
}
}

func (c *coordinatorV2) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsRequested{}.Topic()
}

func (c *coordinatorV2) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -73,6 +86,14 @@ func (c *coordinatorV2) ParseRandomWordsRequested(log types.Log) (RandomWordsReq
return NewV2RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
return c.coordinator.RequestRandomWords(opts, keyHash, subID.Uint64(), requestConfirmations, callbackGasLimit, numWords)
}
Expand Down Expand Up @@ -187,6 +208,14 @@ func NewCoordinatorV2_5(c vrf_coordinator_v2_5.VRFCoordinatorV25Interface) Coord
}
}

func (c *coordinatorV2_5) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsRequested{}.Topic()
}

func (c *coordinatorV2_5) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2_5) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -199,6 +228,14 @@ func (c *coordinatorV2_5) ParseRandomWordsRequested(log types.Log) (RandomWordsR
return NewV2_5RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2_5) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2_5RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2_5) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
extraArgs, err := extraargs.ExtraArgsV1(payInEth)
if err != nil {
Expand Down
35 changes: 27 additions & 8 deletions core/services/vrf/v2/integration_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func testSingleConsumerHappyPath(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)

Expand Down Expand Up @@ -205,8 +207,8 @@ func testMultipleConsumersNeedBHS(
simulatedOverrides(t, assets.GWei(10), keySpecificOverrides...)(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
c.EVM[0].FinalityDepth = ptr[uint32](2)
c.EVM[0].LogPollInterval = models.MustNewDuration(time.Second)
})
keys = append(keys, ownerKey, vrfKey)
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
Expand Down Expand Up @@ -353,8 +355,8 @@ func testMultipleConsumersNeedTrustedBHS(
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(5_000_000))
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
c.EVM[0].FinalityDepth = ptr[uint32](2)
c.EVM[0].LogPollInterval = models.MustNewDuration(time.Second)
})
keys = append(keys, ownerKey, vrfKey)
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
Expand Down Expand Up @@ -539,6 +541,8 @@ func testSingleConsumerHappyPathBatchFulfillment(
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.EVM[0].ChainID = (*utils.Big)(testutils.SimulatedChainID)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)

Expand Down Expand Up @@ -641,6 +645,8 @@ func testSingleConsumerNeedsTopUp(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key)

Expand Down Expand Up @@ -746,8 +752,8 @@ func testBlockHeaderFeeder(
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
c.EVM[0].FinalityDepth = ptr[uint32](2)
c.EVM[0].LogPollInterval = models.MustNewDuration(time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, vrfKey, bhfKey)
require.NoError(t, app.Start(testutils.Context(t)))
Expand Down Expand Up @@ -904,6 +910,8 @@ func testSingleConsumerForcedFulfillment(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)

Expand Down Expand Up @@ -1053,10 +1061,7 @@ func testSingleConsumerEIP150(
vrfVersion vrfcommon.Version,
nativePayment bool,
) {
callBackGasLimit := int64(2_500_000) // base callback gas.
eip150Fee := callBackGasLimit / 64 // premium needed for callWithExactGas
coordinatorFulfillmentOverhead := int64(90_000) // fixed gas used in coordinator fulfillment
gasLimit := callBackGasLimit + eip150Fee + coordinatorFulfillmentOverhead
callBackGasLimit := int64(2_500_000) // base callback gas.

key1 := cltest.MustGenerateRandomKey(t)
gasLanePriceWei := assets.GWei(10)
Expand All @@ -1066,8 +1071,10 @@ func testSingleConsumerEIP150(
Key: ptr(key1.EIP55Address),
GasEstimator: v2.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(gasLimit))
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(3.5e6))
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1136,6 +1143,8 @@ func testSingleConsumerEIP150Revert(
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(gasLimit))
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1199,6 +1208,8 @@ func testSingleConsumerBigGasCallbackSandwich(
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1319,6 +1330,8 @@ func testSingleConsumerMultipleGasLanes(
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, cheapKey, expensiveKey)
Expand Down Expand Up @@ -1434,6 +1447,8 @@ func testSingleConsumerAlwaysRevertingCallbackStillFulfilled(
GasEstimator: v2.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key)
consumer := uni.reverter
Expand Down Expand Up @@ -1505,6 +1520,8 @@ func testConsumerProxyHappyPath(
GasEstimator: v2.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)
consumerOwner := uni.neil
Expand Down Expand Up @@ -1629,6 +1646,8 @@ func testMaliciousConsumer(
c.EVM[0].GasEstimator.PriceDefault = assets.GWei(1)
c.EVM[0].GasEstimator.FeeCapDefault = assets.GWei(1)
c.EVM[0].ChainID = (*utils.Big)(testutils.SimulatedChainID)
c.Feature.LogPoller = ptr(true)
c.EVM[0].LogPollInterval = models.MustNewDuration(1 * time.Second)
})
carol := uni.vrfConsumers[0]

Expand Down
Loading
Loading