Skip to content

Commit

Permalink
Leader lane multi-chain gas price reports (#1071)
Browse files Browse the repository at this point in the history
Previously, leader lane ORM was added to enabled Commit plugin to read
multi-chain gas prices during `Observation`. This PR updates `Report`
phase so Commit plugin can now report multi-chain gas prices.

Important changes are:
1. make use of the PriceReportingDisabled flag to skip price reports on
non-leader lanes
2. update gas price logic during Report to handle multi-chain gas prices
  • Loading branch information
matYang authored Jun 27, 2024
1 parent 0bf5774 commit dc8599f
Show file tree
Hide file tree
Showing 5 changed files with 702 additions and 383 deletions.
220 changes: 132 additions & 88 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t
return nil, err
}

sourceGasPriceUSD, tokenPricesUSD, err := r.observePriceUpdates(ctx)
// Fetches multi-lane gasPricesUSD and tokenPricesUSD for the same dest chain
gasPricesUSD, sourceGasPriceUSD, tokenPricesUSD, err := r.observePriceUpdates(ctx)
if err != nil {
return nil, err
}

lggr.Infow("Observation",
"minSeqNr", minSeqNr,
"maxSeqNr", maxSeqNr,
"sourceGasPriceUSD", sourceGasPriceUSD,
"gasPricesUSD", gasPricesUSD,
"tokenPricesUSD", tokenPricesUSD,
"epochAndRound", epochAndRound,
"messageIDs", messageIDs,
Expand All @@ -134,11 +135,43 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t
Min: minSeqNr,
Max: maxSeqNr,
},
TokenPricesUSD: tokenPricesUSD,
SourceGasPriceUSD: sourceGasPriceUSD,
TokenPricesUSD: tokenPricesUSD,
SourceGasPriceUSD: sourceGasPriceUSD,
SourceGasPriceUSDPerChain: gasPricesUSD,
}.Marshal()
}

// observePriceUpdates fetches latest gas and token prices from DB as long as price reporting is not disabled.
// The prices are aggregated for all lanes for the same destination chain.
func (r *CommitReportingPlugin) observePriceUpdates(
ctx context.Context,
) (gasPricesUSD map[uint64]*big.Int, sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Do not observe prices if price reporting is disabled. Price reporting will be disabled for lanes that are not leader lanes.
if r.offchainConfig.PriceReportingDisabled {
r.lggr.Infow("Price reporting disabled, skipping gas and token price reads")
return map[uint64]*big.Int{}, nil, map[cciptypes.Address]*big.Int{}, nil
}

// Fetches multi-lane gas prices and token prices, for the given dest chain
gasPricesUSD, tokenPricesUSD, err = r.priceService.GetGasAndTokenPrices(ctx, r.destChainSelector)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get prices from PriceService: %w", err)
}

// Set prices to empty maps if nil to be friendlier to JSON encoding
if gasPricesUSD == nil {
gasPricesUSD = map[uint64]*big.Int{}
}
if tokenPricesUSD == nil {
tokenPricesUSD = map[cciptypes.Address]*big.Int{}
}

// For backwards compatibility with the older release during phased rollout, set the default gas price on this lane
sourceGasPriceUSD = gasPricesUSD[r.sourceChainSelector]

return gasPricesUSD, sourceGasPriceUSD, tokenPricesUSD, nil
}

func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Context, lggr logger.Logger) (uint64, uint64, []cciptypes.Hash, error) {
nextSeqNum, err := r.commitStoreReader.GetExpectedNextSequenceNumber(ctx)
if err != nil {
Expand Down Expand Up @@ -174,23 +207,6 @@ func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Conte
return minSeqNr, maxSeqNr, messageIDs, nil
}

func (r *CommitReportingPlugin) observePriceUpdates(
ctx context.Context,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
gasPricesUSD, tokenPricesUSD, err := r.priceService.GetGasAndTokenPrices(ctx, r.destChainSelector)
if err != nil {
return nil, nil, err
}

// Reduce to single gas price for compatibility. In a followup PR, Commit plugin will make use of all source chain gas prices.
sourceGasPriceUSD = gasPricesUSD[r.sourceChainSelector]
if sourceGasPriceUSD == nil {
return nil, nil, fmt.Errorf("missing gas price for sourceChainSelector %d", r.sourceChainSelector)
}

return sourceGasPriceUSD, tokenPricesUSD, nil
}

// Gets the latest token price updates based on logs within the heartbeat
// The updates returned by this function are guaranteed to not contain nil values.
func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time) (map[cciptypes.Address]update, error) {
Expand Down Expand Up @@ -219,33 +235,34 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context,
return latestUpdates, nil
}

// getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat.
// If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned.
func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time) (gasUpdate update, error error) {
// If there are no price updates inflight, check latest prices onchain
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
// getLatestGasPriceUpdate returns the latest gas price updates based on logs within the heartbeat.
// If an update is found, it is not expected to contain a nil value.
func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time) (map[uint64]update, error) {
gasPriceUpdates, err := r.destPriceRegistryReader.GetAllGasPriceUpdatesCreatedAfter(
ctx,
r.sourceChainSelector,
now.Add(-r.offchainConfig.GasPriceHeartBeat),
0,
)

if err != nil {
return update{}, err
return nil, err
}

for _, priceUpdate := range gasPriceUpdates {
latestUpdates := make(map[uint64]update)
for _, gasUpdate := range gasPriceUpdates {
priceUpdate := gasUpdate.GasPriceUpdate
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.GasPriceUpdate.TimestampUnixSec.Int64(), 0)
if !timestamp.Before(gasUpdate.timestamp) {
gasUpdate = update{
timestamp := time.Unix(priceUpdate.TimestampUnixSec.Int64(), 0)
if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.DestChainSelector].timestamp) {
latestUpdates[priceUpdate.DestChainSelector] = update{
timestamp: timestamp,
value: priceUpdate.Value,
}
}
}

r.lggr.Infow("Latest gas price from log poller", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp)
return gasUpdate, nil
r.lggr.Infow("Latest gas price from log poller", "latestUpdates", latestUpdates)
return latestUpdates, nil
}

func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
Expand All @@ -259,7 +276,7 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.

parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations)

intervals, gasPriceObs, tokenPriceObs, err := extractObservationData(lggr, r.F, parsableObservations)
intervals, gasPriceObs, tokenPriceObs, err := extractObservationData(lggr, r.F, r.sourceChainSelector, parsableObservations)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -363,18 +380,26 @@ func calculateIntervalConsensus(intervals []cciptypes.CommitStoreInterval, f int

// extractObservationData extracts observation fields into their own slices
// and filters out observation data that are invalid
func extractObservationData(lggr logger.Logger, f int, observations []ccip.CommitObservation) (intervals []cciptypes.CommitStoreInterval, gasPrices []*big.Int, tokenPrices map[cciptypes.Address][]*big.Int, err error) {
// We require at least f+1 observations to each consensus. Checking to ensure there are at least f+1 parsed observations.
func extractObservationData(lggr logger.Logger, f int, sourceChainSelector uint64, observations []ccip.CommitObservation) (intervals []cciptypes.CommitStoreInterval, gasPrices map[uint64][]*big.Int, tokenPrices map[cciptypes.Address][]*big.Int, err error) {
// We require at least f+1 observations to reach consensus. Checking to ensure there are at least f+1 parsed observations.
if len(observations) <= f {
return nil, nil, nil, fmt.Errorf("not enough observations to form consensus: #obs=%d, f=%d", len(observations), f)
}

gasPriceObservations := make(map[uint64][]*big.Int)
tokenPriceObservations := make(map[cciptypes.Address][]*big.Int)
for _, obs := range observations {
intervals = append(intervals, obs.Interval)

if obs.SourceGasPriceUSD != nil {
gasPrices = append(gasPrices, obs.SourceGasPriceUSD)
for selector, price := range obs.SourceGasPriceUSDPerChain {
if price != nil {
gasPriceObservations[selector] = append(gasPriceObservations[selector], price)
}
}
// During phased rollout, NOPs running old release only report SourceGasPriceUSD.
// An empty `SourceGasPriceUSDPerChain` with a non-nil `SourceGasPriceUSD` can only happen with old release.
if len(obs.SourceGasPriceUSDPerChain) == 0 && obs.SourceGasPriceUSD != nil {
gasPriceObservations[sourceChainSelector] = append(gasPriceObservations[sourceChainSelector], obs.SourceGasPriceUSD)
}

for token, price := range obs.TokenPricesUSD {
Expand All @@ -384,29 +409,39 @@ func extractObservationData(lggr logger.Logger, f int, observations []ccip.Commi
}
}

// Observations are invalid if observed gas price is nil, we require at least f+1 valid observations.
if len(gasPrices) <= f {
return nil, nil, nil, fmt.Errorf("not enough valid observations with non-nil gas prices: #obs=%d, f=%d", len(gasPrices), f)
// Price is dropped if there are not enough valid observations. With a threshold of 2*(f-1) + 1, we achieve a balance between safety and liveness.
// During phased-rollout where some honest nodes may not have started observing the token yet, it requires 5 malicious node with 1 being the leader to successfully alter price.
// During regular operation, it requires 3 malicious nodes with 1 being the leader to temporarily delay price update for the token.
priceReportingThreshold := 2*(f-1) + 1

gasPrices = make(map[uint64][]*big.Int)
for selector, perChainPriceObservations := range gasPriceObservations {
if len(perChainPriceObservations) < priceReportingThreshold {
lggr.Warnf("Skipping chain with selector %d due to not enough valid observations: #obs=%d, f=%d, threshold=%d", selector, len(perChainPriceObservations), f, priceReportingThreshold)
continue
}
gasPrices[selector] = perChainPriceObservations
}

tokenPrices = make(map[cciptypes.Address][]*big.Int)
for token, perTokenPriceObservations := range tokenPriceObservations {
// Token price is dropped if there are not enough valid observations. With a threshold of 2*(f-1) + 1, we achieve a balance between safety and liveness.
// During phased-rollout where some honest nodes may not have started observing the token yet, it requires 5 malicious node with 1 being the leader to successfully alter price.
// During regular operation, it requires 3 malicious nodes with 1 being the leader to temporarily delay price update for the token.
if len(perTokenPriceObservations) < (2*(f-1) + 1) {
lggr.Warnf("Skipping token %s due to not enough valid observations: #obs=%d, f=%d", string(token), len(perTokenPriceObservations), f)
if len(perTokenPriceObservations) < priceReportingThreshold {
lggr.Warnf("Skipping token %s due to not enough valid observations: #obs=%d, f=%d, threshold=%d", string(token), len(perTokenPriceObservations), f, priceReportingThreshold)
continue
}

tokenPrices[token] = perTokenPriceObservations
}

return intervals, gasPrices, tokenPrices, nil
}

// selectPriceUpdates filters out gas and token price updates that are already inflight
func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time.Time, gasPriceObs []*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time.Time, gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
// If price reporting is disabled, there is no need to select price updates.
if r.offchainConfig.PriceReportingDisabled {
return nil, nil, nil
}

latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, now)
if err != nil {
return nil, nil, err
Expand All @@ -421,8 +456,9 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time
}

// Note priceUpdates must be deterministic.
// The provided latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
// The provided gasPriceObs and tokenPriceObs should not contain nil values.
// The returned latestGasPrice and latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
var tokenPriceUpdates []cciptypes.TokenPrice
for token, tokenPriceObservations := range tokenPriceObs {
medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations)
Expand All @@ -432,7 +468,7 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, to
tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat
tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB))
if tokenPriceUpdatedRecently && tokenPriceNotChanged {
r.lggr.Debugw("price was updated recently, skipping the update",
r.lggr.Debugw("token price was updated recently, skipping the update",
"token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value)
continue // skip the update if we recently had a price update close to the new value
}
Expand All @@ -449,30 +485,38 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, to
return tokenPriceUpdates[i].Token < tokenPriceUpdates[j].Token
})

newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObs) // Compute the median price
if err != nil {
return nil, nil, err
}
destChainSelector := r.sourceChainSelector // Assuming plugin lane is A->B, we write to B the gas price of A

var gasPriceUpdate []cciptypes.GasPrice
// Default to updating so that we update if there are no prior updates.
shouldUpdate := true
if latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
for chainSelector, gasPriceObservations := range gasPriceObs {
newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObservations) // Compute the median price
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err)
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
shouldUpdate = false

// Default to updating so that we update if there are no prior updates.
latestGasPrice, exists := latestGasPrice[chainSelector]
if exists && latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
if err != nil {
return nil, nil, err
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update",
"chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value)
continue
}
}
}
if shouldUpdate {
// Although onchain interface accepts multi gas updates, we only do 1 gas price per report for now.
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{DestChainSelector: destChainSelector, Value: newGasPrice})

gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}

sort.Slice(gasPriceUpdate, func(i, j int) bool {
return gasPriceUpdate[i].DestChainSelector < gasPriceUpdate[j].DestChainSelector
})

return gasPriceUpdate, tokenPriceUpdates, nil
}

Expand Down Expand Up @@ -608,14 +652,9 @@ func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.L
if !hasGasPriceUpdate && !hasTokenPriceUpdates {
return true
}
// Commit plugin currently only supports 1 gas price per report. If report contains more than 1, reject the report.
if len(report.GasPrices) > 1 {
lggr.Errorw("Report is stale because it contains more than 1 gas price update", "GasPriceUpdates", report.GasPrices)
return true
}

// We consider a price update as stale when, there isn't an update or there is an update that is stale.
gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices[0])
gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices)
tokenPricesStale := !hasTokenPriceUpdates || r.isStaleTokenPrices(ctx, lggr, report.TokenPrices)

if gasPriceStale && tokenPricesStale {
Expand Down Expand Up @@ -654,30 +693,35 @@ func (r *CommitReportingPlugin) isStaleMerkleRoot(ctx context.Context, lggr logg
return false
}

func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPrice cciptypes.GasPrice) bool {
func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPriceUpdates []cciptypes.GasPrice) bool {
latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, time.Now())
if err != nil {
lggr.Errorw("Report is stale because getLatestGasPriceUpdate failed", "err", err)
lggr.Errorw("Gas price is stale because getLatestGasPriceUpdate failed", "err", err)
return true
}

if latestGasPrice.value != nil {
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(gasPrice.Value, latestGasPrice.value)
for _, gasPriceUpdate := range gasPriceUpdates {
latestUpdate, exists := latestGasPrice[gasPriceUpdate.DestChainSelector]
if !exists || latestUpdate.value == nil {
lggr.Infow("Found non-stale gas price", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value)
return false
}

gasPriceDeviated, err := r.gasPriceEstimator.Deviates(gasPriceUpdate.Value, latestUpdate.value)
if err != nil {
lggr.Errorw("Report is stale because deviation check failed", "err", err)
lggr.Errorw("Gas price is stale because deviation check failed", "err", err)
return true
}

if !gasPriceDeviated {
lggr.Infow("Report is stale because of gas price",
"latestGasPriceUpdate", latestGasPrice.value,
"currentUsdPerUnitGas", gasPrice.Value,
"destChainSelector", gasPrice.DestChainSelector)
return true
if gasPriceDeviated {
lggr.Infow("Found non-stale gas price", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value, "latestUpdate", latestUpdate.value)
return false
}
lggr.Infow("Gas price is stale", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value, "latestGasPrice", latestUpdate.value)
}

return false
lggr.Infow("All gas prices are stale")
return true
}

func (r *CommitReportingPlugin) isStaleTokenPrices(ctx context.Context, lggr logger.Logger, priceUpdates []cciptypes.TokenPrice) bool {
Expand Down
Loading

0 comments on commit dc8599f

Please sign in to comment.