Skip to content

Commit

Permalink
feat: log poller for vrf v2/+
Browse files Browse the repository at this point in the history
  • Loading branch information
makramkd committed Nov 20, 2023
1 parent 3d25a9e commit d6e83b7
Show file tree
Hide file tree
Showing 19 changed files with 2,218 additions and 1,794 deletions.
57 changes: 32 additions & 25 deletions core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (d *Delegate) JobType() job.Type {
return job.VRF
}

func (d *Delegate) BeforeJobCreated(spec job.Job) {}
func (d *Delegate) AfterJobCreated(spec job.Job) {}
func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }
func (d *Delegate) BeforeJobCreated(job.Job) {}
func (d *Delegate) AfterJobCreated(job.Job) {}
func (d *Delegate) BeforeJobDeleted(job.Job) {}
func (d *Delegate) OnDeleteJob(job.Job, pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -160,24 +160,28 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return nil, errors.Wrap(err2, "NewAggregatorV3Interface")
}

return []job.ServiceCtx{v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
return []job.ServiceCtx{
v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
func() {},
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTaskV2); ok {
if err2 := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil {
Expand Down Expand Up @@ -225,10 +229,13 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTask); ok {
return []job.ServiceCtx{&v1.Listener{
Expand Down
26 changes: 6 additions & 20 deletions core/services/vrf/v2/bhs_feeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrftesthelpers"
"github.com/smartcontractkit/chainlink/v2/core/store/models"

Expand Down Expand Up @@ -63,26 +62,12 @@ func TestStartHeartbeats(t *testing.T) {
heartbeatPeriod := 5 * time.Second

t.Run("bhs_feeder_startheartbeats_happy_path", func(tt *testing.T) {
coordinatorAddress := uni.rootContractAddress
vrfVersion := vrfcommon.V2

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
require.NoError(t, app.Start(testutils.Context(t)))

var (
v2CoordinatorAddress string
v2PlusCoordinatorAddress string
)

if vrfVersion == vrfcommon.V2 {
v2CoordinatorAddress = coordinatorAddress.String()
} else if vrfVersion == vrfcommon.V2Plus {
v2PlusCoordinatorAddress = coordinatorAddress.String()
}

_ = vrftesthelpers.CreateAndStartBHSJob(
t, bhsKeyAddresses, app, uni.bhsContractAddress.String(), "",
v2CoordinatorAddress, v2PlusCoordinatorAddress, "", 0, 200, heartbeatPeriod, 100)
uni.rootContractAddress.String(), "", "", 0, 200, heartbeatPeriod, 100)

// Ensure log poller is ready and has all logs.
require.NoError(t, app.GetRelayers().LegacyEVMChains().Slice()[0].LogPoller().Ready())
Expand All @@ -97,9 +82,10 @@ func TestStartHeartbeats(t *testing.T) {
t.Logf("Sleeping %.2f seconds before checking blockhash in BHS added by BHS_Heartbeats_Service\n", diff.Seconds())
time.Sleep(diff)
// storeEarliest in BHS contract stores blocktip - 256 in the Blockhash Store (BHS)
// before the initTxns:260 txns sent by the loop above, 18 txns are sent by
// newVRFCoordinatorV2Universe method. block tip is initTxns + 18
blockTip := initTxns + 18
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, uint64(blockTip-256))
tipHeader, err := uni.backend.HeaderByNumber(testutils.Context(t), nil)
require.NoError(t, err)
// the storeEarliest transaction will end up in a new block, hence the + 1 below.
blockNumberStored := tipHeader.Number.Uint64() - 256 + 1
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, blockNumberStored)
})
}
37 changes: 37 additions & 0 deletions core/services/vrf/v2/coordinator_v2x_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
type CoordinatorV2_X interface {
Address() common.Address
ParseRandomWordsRequested(log types.Log) (RandomWordsRequested, error)
ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error)
RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error)
AddConsumer(opts *bind.TransactOpts, subID *big.Int, consumer common.Address) (*types.Transaction, error)
CreateSubscription(opts *bind.TransactOpts) (*types.Transaction, error)
Expand All @@ -47,6 +48,10 @@ type CoordinatorV2_X interface {
GetCommitment(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error)
Migrate(opts *bind.TransactOpts, subID *big.Int, newCoordinator common.Address) (*types.Transaction, error)
FundSubscriptionWithNative(opts *bind.TransactOpts, subID *big.Int, amount *big.Int) (*types.Transaction, error)
// RandomWordsRequestedTopic returns the log topic of the RandomWordsRequested log
RandomWordsRequestedTopic() common.Hash
// RandomWordsFulfilledTopic returns the log topic of the RandomWordsFulfilled log
RandomWordsFulfilledTopic() common.Hash
}

type coordinatorV2 struct {
Expand All @@ -61,6 +66,14 @@ func NewCoordinatorV2(c *vrf_coordinator_v2.VRFCoordinatorV2) CoordinatorV2_X {
}
}

func (c *coordinatorV2) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsRequested{}.Topic()
}

func (c *coordinatorV2) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -73,6 +86,14 @@ func (c *coordinatorV2) ParseRandomWordsRequested(log types.Log) (RandomWordsReq
return NewV2RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
return c.coordinator.RequestRandomWords(opts, keyHash, subID.Uint64(), requestConfirmations, callbackGasLimit, numWords)
}
Expand Down Expand Up @@ -187,6 +208,14 @@ func NewCoordinatorV2_5(c vrf_coordinator_v2_5.VRFCoordinatorV25Interface) Coord
}
}

func (c *coordinatorV2_5) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsRequested{}.Topic()
}

func (c *coordinatorV2_5) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2_5) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -199,6 +228,14 @@ func (c *coordinatorV2_5) ParseRandomWordsRequested(log types.Log) (RandomWordsR
return NewV2_5RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2_5) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2_5RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2_5) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
extraArgs, err := extraargs.ExtraArgsV1(payInEth)
if err != nil {
Expand Down
Loading

0 comments on commit d6e83b7

Please sign in to comment.