Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement traversal based early termination #3337

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,8 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("couldn't initialize snow base message handler: %w", err)
}

var snowmanConsensus smcon.Consensus = &smcon.Topological{}
topological := &smcon.Topological{}
var snowmanConsensus smcon.Consensus = topological
aaronbuchwald marked this conversation as resolved.
Show resolved Hide resolved
if m.TracingEnabled {
snowmanConsensus = smcon.Trace(snowmanConsensus, m.Tracer)
}
Expand All @@ -936,6 +937,7 @@ func (m *manager) createAvalancheChain(
ConnectedValidators: connectedValidators,
Params: consensusParams,
Consensus: snowmanConsensus,
BlockTraversal: topological,
}
var snowmanEngine common.Engine
snowmanEngine, err = smeng.New(snowmanEngineConfig)
Expand Down Expand Up @@ -1313,14 +1315,16 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("couldn't initialize snow base message handler: %w", err)
}

var consensus smcon.Consensus = &smcon.Topological{}
topological := &smcon.Topological{}
aaronbuchwald marked this conversation as resolved.
Show resolved Hide resolved
var consensus smcon.Consensus = topological
if m.TracingEnabled {
consensus = smcon.Trace(consensus, m.Tracer)
}

// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
BlockTraversal: topological,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Expand Down
188 changes: 155 additions & 33 deletions snow/consensus/snowman/poll/early_term_no_traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
package poll
marun marked this conversation as resolved.
Show resolved Hide resolved

import (
"bytes"
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/bag"
)

Expand Down Expand Up @@ -38,7 +40,7 @@ var (
}
)

type earlyTermNoTraversalMetrics struct {
type earlyTermTraversalMetrics struct {
durExhaustedPolls prometheus.Gauge
durEarlyFailPolls prometheus.Gauge
durEarlyAlphaPrefPolls prometheus.Gauge
Expand All @@ -50,7 +52,7 @@ type earlyTermNoTraversalMetrics struct {
countEarlyAlphaConfPolls prometheus.Counter
}

func newEarlyTermNoTraversalMetrics(reg prometheus.Registerer) (*earlyTermNoTraversalMetrics, error) {
func newEarlyTermTraversalMetrics(reg prometheus.Registerer) (*earlyTermTraversalMetrics, error) {
pollCountVec := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "poll_count",
Help: "Total # of terminated polls by reason",
Expand All @@ -66,7 +68,7 @@ func newEarlyTermNoTraversalMetrics(reg prometheus.Registerer) (*earlyTermNoTrav
return nil, fmt.Errorf("%w: %w", errPollDurationVectorMetrics, err)
}

return &earlyTermNoTraversalMetrics{
return &earlyTermTraversalMetrics{
durExhaustedPolls: durPollsVec.With(exhaustedLabel),
durEarlyFailPolls: durPollsVec.With(earlyFailLabel),
durEarlyAlphaPrefPolls: durPollsVec.With(earlyAlphaPrefLabel),
Expand All @@ -78,54 +80,57 @@ func newEarlyTermNoTraversalMetrics(reg prometheus.Registerer) (*earlyTermNoTrav
}, nil
}

func (m *earlyTermNoTraversalMetrics) observeExhausted(duration time.Duration) {
func (m *earlyTermTraversalMetrics) observeExhausted(duration time.Duration) {
m.durExhaustedPolls.Add(float64(duration.Nanoseconds()))
m.countExhaustedPolls.Inc()
}

func (m *earlyTermNoTraversalMetrics) observeEarlyFail(duration time.Duration) {
func (m *earlyTermTraversalMetrics) observeEarlyFail(duration time.Duration) {
m.durEarlyFailPolls.Add(float64(duration.Nanoseconds()))
m.countEarlyFailPolls.Inc()
}

func (m *earlyTermNoTraversalMetrics) observeEarlyAlphaPref(duration time.Duration) {
func (m *earlyTermTraversalMetrics) observeEarlyAlphaPref(duration time.Duration) {
m.durEarlyAlphaPrefPolls.Add(float64(duration.Nanoseconds()))
m.countEarlyAlphaPrefPolls.Inc()
}

func (m *earlyTermNoTraversalMetrics) observeEarlyAlphaConf(duration time.Duration) {
func (m *earlyTermTraversalMetrics) observeEarlyAlphaConf(duration time.Duration) {
m.durEarlyAlphaConfPolls.Add(float64(duration.Nanoseconds()))
m.countEarlyAlphaConfPolls.Inc()
}

type earlyTermNoTraversalFactory struct {
type earlyTermTraversalFactory struct {
alphaPreference int
alphaConfidence int

metrics *earlyTermNoTraversalMetrics
bt snow.BlockTraversal
metrics *earlyTermTraversalMetrics
}

// NewEarlyTermNoTraversalFactory returns a factory that returns polls with
// NewEarlyTermTraversalFactory returns a factory that returns polls with
// early termination, without doing DAG traversals
func NewEarlyTermNoTraversalFactory(
func NewEarlyTermTraversalFactory(
alphaPreference int,
alphaConfidence int,
reg prometheus.Registerer,
bt snow.BlockTraversal,
) (Factory, error) {
metrics, err := newEarlyTermNoTraversalMetrics(reg)
metrics, err := newEarlyTermTraversalMetrics(reg)
if err != nil {
return nil, err
}

return &earlyTermNoTraversalFactory{
return &earlyTermTraversalFactory{
bt: bt,
alphaPreference: alphaPreference,
alphaConfidence: alphaConfidence,
metrics: metrics,
}, nil
}

func (f *earlyTermNoTraversalFactory) New(vdrs bag.Bag[ids.NodeID]) Poll {
return &earlyTermNoTraversalPoll{
func (f *earlyTermTraversalFactory) New(vdrs bag.Bag[ids.NodeID]) Poll {
return &earlyTermTraversalPoll{
bt: f.bt,
polled: vdrs,
alphaPreference: f.alphaPreference,
alphaConfidence: f.alphaConfidence,
Expand All @@ -134,22 +139,21 @@ func (f *earlyTermNoTraversalFactory) New(vdrs bag.Bag[ids.NodeID]) Poll {
}
}

// earlyTermNoTraversalPoll finishes when any remaining validators can't change
// the result of the poll. However, does not terminate tightly with this bound.
// It terminates as quickly as it can without performing any DAG traversals.
type earlyTermNoTraversalPoll struct {
// earlyTermTraversalPoll finishes when any remaining validators can't change
// the result of the poll for all the votes and transitive votes.
type earlyTermTraversalPoll struct {
votes bag.Bag[ids.ID]
polled bag.Bag[ids.NodeID]
alphaPreference int
alphaConfidence int

metrics *earlyTermNoTraversalMetrics
start time.Time
finished bool
bt snow.BlockTraversal
metrics *earlyTermTraversalMetrics
start time.Time
finished bool
}

// Vote registers a response for this poll
func (p *earlyTermNoTraversalPoll) Vote(vdr ids.NodeID, vote ids.ID) {
func (p *earlyTermTraversalPoll) Vote(vdr ids.NodeID, vote ids.ID) {
count := p.polled.Count(vdr)
// make sure that a validator can't respond multiple times
p.polled.Remove(vdr)
Expand All @@ -159,7 +163,7 @@ func (p *earlyTermNoTraversalPoll) Vote(vdr ids.NodeID, vote ids.ID) {
}

// Drop any future response for this poll
func (p *earlyTermNoTraversalPoll) Drop(vdr ids.NodeID) {
func (p *earlyTermTraversalPoll) Drop(vdr ids.NodeID) {
p.polled.Remove(vdr)
}

Expand All @@ -172,12 +176,13 @@ func (p *earlyTermNoTraversalPoll) Drop(vdr ids.NodeID) {
// impossible for it to achieve an alphaConfidence majority after applying
// transitive voting.
// 4. A single element has achieved an alphaConfidence majority.
func (p *earlyTermNoTraversalPoll) Finished() bool {
func (p *earlyTermTraversalPoll) Finished() bool {
if p.finished {
return true
}

remaining := p.polled.Len()

if remaining == 0 {
p.finished = true
p.metrics.observeExhausted(time.Since(p.start))
Expand All @@ -192,15 +197,63 @@ func (p *earlyTermNoTraversalPoll) Finished() bool {
return true // Case 2
}

_, freq := p.votes.Mode()
if freq >= p.alphaPreference && maxPossibleVotes < p.alphaConfidence {
// v
// /
// u
// We build a vote graph where each vertex represents a block ID.
// A vertex 'v' is a parent of vertex 'u' if the ID of 'u' corresponds
// to a block that is the successive block of the corresponding block for 'v'.
votesGraph := buildVoteGraph(p.bt.GetParent, p.votes)

// If vertex 'v' is a parent of vertex 'u', then a vote for the ID of vertex 'u'
// should also be considered as a vote for the ID of the vertex 'v'.
transitiveVotes := computeTransitiveVoteCountGraph(&votesGraph, p.votes)

// v
// / \
// u w
// If two competing blocks 'u', 'w' are potential successors to a block 'v',
// snowman would instantiate a unary snowflake instance on the prefix of 'u' and 'w'.
// The prefix inherits the votes for the IDs of 'u' and 'w'.
// We therefore compute the transitive votes for all prefixes of IDs
// for each bifurcation in the transitive vote graph.
transitiveVotesForPrefixes := computeTransitiveVotesForPrefixes(&votesGraph, transitiveVotes)

// We wish to compute the votes for snowflake instances, no matter if they correspond to an actual block ID,
// or a unary snowflake instance for a shared prefix between a bifurcation of two competing blocks.
// For that, only the number of votes and existence of such snowflake instances matters.
voteCountsForIDsOrPrefixes := aggregateVotesFromPrefixesAndIDs(transitiveVotesForPrefixes, transitiveVotes)

// Given the aforementioned votes, we wish to see whether there exists a snowflake instance
// that can benefit from waiting for more invocations of Vote().
// We therefore check each amount of votes separately and see if voting for that snowflake instance
// should terminate, as it cannot be improved by further voting.
weCantImproveVoteForSomeIDOrPrefix := make(booleans, len(voteCountsForIDsOrPrefixes))
marun marked this conversation as resolved.
Show resolved Hide resolved
for i, completedVotes := range voteCountsForIDsOrPrefixes {
shouldTerminate := p.shouldTerminateDueToConfidence(completedVotes, remaining)
weCantImproveVoteForSomeIDOrPrefix[i] = shouldTerminate
}

// We should terminate the poll only when voting for all snowflake instances should terminate.
if weCantImproveVoteForSomeIDOrPrefix.allTrue() && len(weCantImproveVoteForSomeIDOrPrefix) > 0 {
p.finished = true
}

return p.finished
}

func (p *earlyTermTraversalPoll) shouldTerminateDueToConfidence(freq int, remaining int) bool {
maxPossibleVotes := freq + remaining
if maxPossibleVotes < p.alphaPreference {
return true // Case 2
}

if freq >= p.alphaPreference && maxPossibleVotes < p.alphaConfidence {
p.metrics.observeEarlyAlphaPref(time.Since(p.start))
return true // Case 3
}

if freq >= p.alphaConfidence {
p.finished = true
p.metrics.observeEarlyAlphaConf(time.Since(p.start))
return true // Case 4
}
Expand All @@ -209,11 +262,11 @@ func (p *earlyTermNoTraversalPoll) Finished() bool {
}

// Result returns the result of this poll
func (p *earlyTermNoTraversalPoll) Result() bag.Bag[ids.ID] {
func (p *earlyTermTraversalPoll) Result() bag.Bag[ids.ID] {
return p.votes
}

func (p *earlyTermNoTraversalPoll) PrefixedString(prefix string) string {
func (p *earlyTermTraversalPoll) PrefixedString(prefix string) string {
return fmt.Sprintf(
"waiting on %s\n%sreceived %s",
p.polled.PrefixedString(prefix),
Expand All @@ -222,6 +275,75 @@ func (p *earlyTermNoTraversalPoll) PrefixedString(prefix string) string {
)
}

func (p *earlyTermNoTraversalPoll) String() string {
func (p *earlyTermTraversalPoll) String() string {
return p.PrefixedString("")
}

func aggregateVotesFromPrefixesAndIDs(transitiveVotesForPrefixes map[string]int, transitiveVotes bag.Bag[ids.ID]) []int {
voteCountsForIDsOrPrefixes := make([]int, 0, len(transitiveVotesForPrefixes)+len(transitiveVotes.List()))

for _, id := range transitiveVotes.List() {
votesForID := transitiveVotes.Count(id)
voteCountsForIDsOrPrefixes = append(voteCountsForIDsOrPrefixes, votesForID)
}

for _, voteCount := range transitiveVotesForPrefixes {
voteCountsForIDsOrPrefixes = append(voteCountsForIDsOrPrefixes, voteCount)
}
return voteCountsForIDsOrPrefixes
}

func computeTransitiveVotesForPrefixes(votesGraph *voteGraph, transitiveVotes bag.Bag[ids.ID]) map[string]int {
votesForPrefix := make(map[string]int)
votesGraph.traverse(func(v *voteVertex) {
descendantIDs := descendantIDsOfVertex(v)
pg := longestSharedPrefixes(descendantIDs)
// Each shared prefix is associated with a bunch of IDs.
// Sum up all the transitive votes for these blocks,
// and return all such shared prefixes indexed by the underlying transitive descendant IDs.
pg.bifurcationsWithCommonPrefix(func(ids []ids.ID) {
key := concatIDs(ids)
count := sumVotesFromIDs(ids, transitiveVotes)
votesForPrefix[key] = count
})
})
return votesForPrefix
}

func descendantIDsOfVertex(v *voteVertex) []ids.ID {
descendanstIDs := make([]ids.ID, len(v.descendants))
for i, child := range v.descendants {
descendanstIDs[i] = child.id
}
return descendanstIDs
}

func concatIDs(ids []ids.ID) string {
var bb bytes.Buffer
for _, id := range ids {
bb.WriteString(id.String())
bb.WriteString(" ")
}
return bb.String()
}

func sumVotesFromIDs(ids []ids.ID, transitiveVotes bag.Bag[ids.ID]) int {
var count int
for _, id := range ids {
count += transitiveVotes.Count(id)
}
return count
}

// booleans represents zero or more booleans
type booleans []bool

// allTrue returns whether all booleans are true
func (bs booleans) allTrue() bool {
for _, b := range bs {
if !b {
return false
}
}
return true
}
Loading