Skip to content

Commit

Permalink
populate retry interval for mercury requests and handle 206 response …
Browse files Browse the repository at this point in the history
…code (#11211)

* populate retry interval for mercury requests and handle 206 response code

* remove support for blocknumber in v0.3
  • Loading branch information
FelixFan1992 authored Nov 7, 2023
1 parent 04c29d6 commit ff2a95c
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 74 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20230922131214-122accb19ea6
github.com/smartcontractkit/ocr2keepers v0.7.27
github.com/smartcontractkit/ocr2keepers v0.7.28
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb
github.com/spf13/cobra v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1466,8 +1466,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20230922131214-122accb19ea6 h1:eSo9r53fARv2MnIO5pqYvQOXMBsTlAwhHyQ6BAVp6bY=
github.com/smartcontractkit/libocr v0.0.0-20230922131214-122accb19ea6/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.27 h1:kwqMrzmEdq6gH4yqNuLQCbdlED0KaIjwZzu3FF+Gves=
github.com/smartcontractkit/ocr2keepers v0.7.27/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas=
github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGujq7tg0LYQE+x6JU=
github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
Expand Down
18 changes: 12 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import (
)

const (
defaultPluginRetryExpiration = 30 * time.Minute
// defaultAllowListExpiration decides how long an upkeep's allow list info will be valid for.
defaultAllowListExpiration = 20 * time.Minute
// allowListCleanupInterval decides when the expired items in allowList cache will be deleted.
allowListCleanupInterval = 5 * time.Minute
defaultAllowListExpiration = 10 * time.Minute
// cleanupInterval decides when the expired items in cache will be deleted.
cleanupInterval = 5 * time.Minute
logTriggerRefreshBatchSize = 32
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
)

var (
Expand Down Expand Up @@ -99,9 +102,10 @@ func NewEvmRegistry(
headFunc: func(ocr2keepers.BlockKey) {},
chLog: make(chan logpoller.Log, 1000),
mercury: &MercuryConfig{
cred: mc,
abi: streamsLookupCompatibleABI,
allowListCache: cache.New(defaultAllowListExpiration, allowListCleanupInterval),
cred: mc,
abi: streamsLookupCompatibleABI,
allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval),
pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval),
},
hc: http.DefaultClient,
logEventProvider: logEventProvider,
Expand All @@ -125,6 +129,8 @@ type MercuryConfig struct {
abi abi.ABI
// allowListCache stores the upkeeps privileges. In 2.1, this only includes a JSON bytes for allowed to use mercury
allowListCache *cache.Cache

pluginRetryCache *cache.Cache
}

type EvmRegistry struct {
Expand Down
80 changes: 57 additions & 23 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep
}

if !allowed {
lggr.Warnf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
lggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonMercuryAccessNotAllowed)
continue
}
} else if l.feedParamKey != feedIDs || l.timeParamKey != timestamp {
// if mercury version cannot be determined, set failure reason
lggr.Warnf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
lggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonInvalidRevertDataInput)
continue
}
Expand All @@ -165,10 +165,11 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep
func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *StreamsLookup, i int, checkResults []ocr2keepers.CheckResult, lggr logger.Logger) {
defer wg.Done()

state, reason, values, retryable, err := r.doMercuryRequest(ctx, lookup, lggr)
state, reason, values, retryable, ri, err := r.doMercuryRequest(ctx, lookup, generatePluginRetryKey(checkResults[i].WorkID, lookup.block), lggr)
if err != nil {
lggr.Errorf("upkeep %s retryable %v doMercuryRequest: %s", lookup.upkeepId, retryable, err.Error())
lggr.Errorf("upkeep %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.upkeepId, retryable, ri, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = ri
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
return
Expand Down Expand Up @@ -299,12 +300,12 @@ func (r *EvmRegistry) checkCallback(ctx context.Context, values [][]byte, lookup
}

// doMercuryRequest sends requests to Mercury API to retrieve mercury data.
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, error) {
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, prk string, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) {
var isMercuryV03 bool
resultLen := len(sl.feeds)
ch := make(chan MercuryData, resultLen)
if len(sl.feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.feedParamKey, sl.timeParamKey, sl.feeds)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.feedParamKey, sl.timeParamKey, sl.feeds)
}
if sl.feedParamKey == feedIdHex && sl.timeParamKey == blockNumber {
// only mercury v0.2
Expand All @@ -318,10 +319,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l
ch = make(chan MercuryData, resultLen)
go r.multiFeedsRequest(ctx, ch, sl, lggr)
} else {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.feedParamKey, sl.timeParamKey, sl.feeds)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.feedParamKey, sl.timeParamKey, sl.feeds)
}

var reqErr error
var ri time.Duration
results := make([][]byte, len(sl.feeds))
retryable := true
allSuccess := true
Expand All @@ -344,8 +346,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l
results[m.Index] = m.Bytes[0]
}
}
if retryable && !allSuccess {
ri = r.calculateRetryConfig(prk)
}
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, ri, reqErr
}

// singleFeedRequest sends a v0.2 Mercury request for a single feed report.
Expand Down Expand Up @@ -399,7 +404,7 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa
return err1
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError {
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.time.String(), sl.upkeepId.String(), resp.StatusCode, sl.feeds[index])
retryable = true
state = encoding.MercuryFlakyFailure
Expand Down Expand Up @@ -436,9 +441,9 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa
sent = true
return nil
},
// only retry when the error is 404 Not Found or 500 Internal Server Error
// only retry when the error is 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError)
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
Expand Down Expand Up @@ -520,16 +525,16 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
} else if resp.StatusCode == http.StatusBadRequest {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
} else if resp.StatusCode == http.StatusInternalServerError {
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3 with message: %s", sl.time.String(), sl.upkeepId.String(), resp.StatusCode, string(body))
} else if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusInternalServerError)
} else if resp.StatusCode == 420 {
// in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
return fmt.Errorf("%d", resp.StatusCode)
} else if resp.StatusCode == http.StatusPartialContent {
lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.time.String(), sl.upkeepId.String(), sl.feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusPartialContent)
} else if resp.StatusCode != http.StatusOK {
retryable = false
state = encoding.InvalidMercuryRequest
Expand All @@ -549,8 +554,11 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
// hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated
if len(response.Reports) != len(sl.feeds) {
// TODO: AUTO-5044: calculate what reports are missing and log a warning
lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server retruned 200 status with %d reports while we requested %d feeds, treating as 404 (not found) and retrying", sl.time.String(), sl.upkeepId.String(), len(response.Reports), len(sl.feeds))
var receivedFeeds []string
for _, f := range response.Reports {
receivedFeeds = append(receivedFeeds, f.FeedID)
}
lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.time.String(), sl.upkeepId.String(), receivedFeeds, sl.feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusNotFound)
Expand All @@ -575,9 +583,9 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
sent = true
return nil
},
// only retry when the error is 404 Not Found or 500 Internal Server Error
// only retry when the error is 206 Partial Content, 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError)
return err.Error() == fmt.Sprintf("%d", http.StatusPartialContent) || err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
Expand Down Expand Up @@ -610,3 +618,29 @@ func (r *EvmRegistry) generateHMAC(method string, path string, body []byte, clie
userHmac := hex.EncodeToString(signedMessage.Sum(nil))
return userHmac
}

// calculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
func (r *EvmRegistry) calculateRetryConfig(prk string) time.Duration {
var ri time.Duration
var retries int
totalAttempts, ok := r.mercury.pluginRetryCache.Get(prk)
if ok {
retries = totalAttempts.(int)
if retries < totalFastPluginRetries {
ri = 1 * time.Second
} else if retries < totalMediumPluginRetries {
ri = 5 * time.Second
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
ri = 1 * time.Second
}
r.mercury.pluginRetryCache.Set(prk, retries+1, cache.DefaultExpiration)
return ri
}

// generatePluginRetryKey returns a plugin retry cache key
func generatePluginRetryKey(workID string, block uint64) string {
return workID + "|" + fmt.Sprintf("%d", block)
}
Loading

0 comments on commit ff2a95c

Please sign in to comment.