Skip to content

Commit

Permalink
feat: make rewrite optional
Browse files Browse the repository at this point in the history
  • Loading branch information
tchardin committed Apr 29, 2024
1 parent d6e110a commit 9e8bf0a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 121 deletions.
119 changes: 60 additions & 59 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,10 +705,11 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
}

type BackendGroup struct {
Name string
Backends []*Backend
WeightedRouting bool
Consensus *ConsensusPoller
Name string
Backends []*Backend
WeightedRouting bool
Consensus *ConsensusPoller
ConsensusNoRewrite bool
}

func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
Expand All @@ -718,52 +719,52 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch

backends := bg.orderedBackendsForRequest()

// overriddenResponses := make([]*indexedReqRes, 0)
// rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))

// if bg.Consensus != nil {
// // When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// // serving traffic from any backend that agrees in the consensus group

// // We also rewrite block tags to enforce compliance with consensus
// rctx := RewriteContext{
// latest: bg.Consensus.GetLatestBlockNumber(),
// safe: bg.Consensus.GetSafeBlockNumber(),
// finalized: bg.Consensus.GetFinalizedBlockNumber(),
// maxBlockRange: bg.Consensus.maxBlockRange,
// }

// for i, req := range rpcReqs {
// res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
// result, err := RewriteTags(rctx, req, &res)
// switch result {
// case RewriteOverrideError:
// overriddenResponses = append(overriddenResponses, &indexedReqRes{
// index: i,
// req: req,
// res: &res,
// })
// if errors.Is(err, ErrRewriteBlockOutOfRange) {
// res.Error = ErrBlockOutOfRange
// } else if errors.Is(err, ErrRewriteRangeTooLarge) {
// res.Error = ErrInvalidParams(
// fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
// )
// } else {
// res.Error = ErrParseErr
// }
// case RewriteOverrideResponse:
// overriddenResponses = append(overriddenResponses, &indexedReqRes{
// index: i,
// req: req,
// res: &res,
// })
// case RewriteOverrideRequest, RewriteNone:
// rewrittenReqs = append(rewrittenReqs, req)
// }
// }
// rpcReqs = rewrittenReqs
// }
overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))

if bg.Consensus != nil && !bg.ConsensusNoRewrite {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group

// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
}

for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
result, err := RewriteTags(rctx, req, &res)
switch result {
case RewriteOverrideError:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
rpcReqs = rewrittenReqs
}

rpcRequestsTotal.Inc()

Expand Down Expand Up @@ -813,15 +814,15 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
}
}

// // re-apply overridden responses
// for _, ov := range overriddenResponses {
// if len(res) > 0 {
// // insert ov.res at position ov.index
// res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
// } else {
// res = append(res, ov.res)
// }
// }
// re-apply overridden responses
for _, ov := range overriddenResponses {
if len(res) > 0 {
// insert ov.res at position ov.index
res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
} else {
res = append(res, ov.res)
}
}

return res, servedBy, nil
}
Expand Down
1 change: 1 addition & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type BackendGroupConfig struct {
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"`
ConsensusNoRewrite bool `toml:"consensus_no_rewrite"`

ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"`
ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`
Expand Down
119 changes: 60 additions & 59 deletions proxyd/consensus_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ConsensusPoller struct {
maxBlockLag uint64
maxBlockRange uint64
interval time.Duration
noRewrite bool
}

type backendState struct {
Expand Down Expand Up @@ -238,6 +239,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3,
interval: DefaultPollerInterval,
noRewrite: bg.ConsensusNoRewrite,
}

for _, opt := range opts {
Expand Down Expand Up @@ -374,13 +376,13 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// the lowest safe block number
// the lowest finalized block number
var lowestLatestBlock hexutil.Uint64
//var lowestLatestBlockHash string
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
for _, bs := range candidates {
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber
//lowestLatestBlockHash = bs.latestBlockHash
lowestLatestBlockHash = bs.latestBlockHash
}
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber
Expand All @@ -393,63 +395,63 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the proposed block among the candidates
// the proposed block needs have the same hash in the entire consensus group
proposedBlock := lowestLatestBlock
//proposedBlockHash := lowestLatestBlockHash
//hasConsensus := false
//broken := false
proposedBlockHash := lowestLatestBlockHash
hasConsensus := false
broken := false

if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("new lowest latest block amongst candidate. updating", "lowestLatestBlock", lowestLatestBlock)
}

// if there is a block to propose, check if it is the same in all backends
//if proposedBlock > 0 {
// for !hasConsensus {
// allAgreed := true
// for be := range candidates {
// actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
// if err != nil {
// log.Warn("error updating backend", "name", be.Name, "err", err)
// continue
// }
// if proposedBlockHash == "" {
// proposedBlockHash = actualBlockHash
// }
// blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
// if blocksDontMatch {
// if currentConsensusBlockNumber >= actualBlockNumber {
// log.Warn("backend broke consensus",
// "name", be.Name,
// "actualBlockNumber", actualBlockNumber,
// "actualBlockHash", actualBlockHash,
// "proposedBlock", proposedBlock,
// "proposedBlockHash", proposedBlockHash)
// broken = true
// }
// allAgreed = false
// break
// }
// }
// if allAgreed {
// hasConsensus = true
// } else {
// // walk one block behind and try again
// proposedBlock -= 1
// proposedBlockHash = ""
// log.Debug("no consensus, now trying", "block:", proposedBlock)
// }
// }
//}

//if broken {
// // propagate event to other interested parts, such as cache invalidator
// for _, l := range cp.listeners {
// l()
// }
// log.Info("consensus broken",
// "currentConsensusBlockNumber", currentConsensusBlockNumber,
// "proposedBlock", proposedBlock,
// "proposedBlockHash", proposedBlockHash)
//}
if proposedBlock > 0 && !cp.noRewrite {
for !hasConsensus {
allAgreed := true
for be := range candidates {
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
broken = true
}
allAgreed = false
break
}
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Debug("no consensus, now trying", "block:", proposedBlock)
}
}
}

if broken {
// propagate event to other interested parts, such as cache invalidator
for _, l := range cp.listeners {
l()
}
log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
}

// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock)
Expand Down Expand Up @@ -481,12 +483,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
RecordGroupConsensusCount(cp.backendGroup, len(group))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
}

log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
}

// IsBanned checks if a specific backend is banned
Expand Down
7 changes: 4 additions & 3 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ func Start(config *Config) (*Server, func(), error) {
}

backendGroups[bgName] = &BackendGroup{
Name: bgName,
Backends: backends,
WeightedRouting: bg.WeightedRouting,
Name: bgName,
Backends: backends,
WeightedRouting: bg.WeightedRouting,
ConsensusNoRewrite: bg.ConsensusNoRewrite,
}
}

Expand Down

0 comments on commit 9e8bf0a

Please sign in to comment.