From c5da9469ba7ac42a11b685e03f627a96ed29919f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 28 Feb 2024 19:58:13 -0700 Subject: [PATCH] [network/p2p] Redesign Push Gossip (#2772) Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Co-authored-by: Stephen Buttolph Co-authored-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- network/p2p/gossip/gossip.go | 321 ++++++++++++++---- network/p2p/gossip/gossip_test.go | 307 +++++++++-------- network/p2p/gossip/gossipable.go | 2 + network/p2p/gossip/handler.go | 14 +- network/p2p/gossip/test_gossip.go | 5 + vms/avm/config_test.go | 4 +- vms/avm/environment_test.go | 2 +- vms/avm/network/config.go | 19 +- vms/avm/network/gossip.go | 9 +- vms/avm/network/gossip_test.go | 2 +- vms/avm/network/network.go | 128 +++---- vms/avm/network/network_test.go | 77 +---- vms/avm/service.go | 18 +- vms/avm/vm.go | 18 +- vms/avm/wallet_service.go | 5 +- vms/platformvm/block/builder/builder_test.go | 8 +- vms/platformvm/block/builder/helpers_test.go | 2 +- .../config/execution_config_test.go | 42 ++- vms/platformvm/network/config.go | 19 +- vms/platformvm/network/gossip.go | 5 + vms/platformvm/network/network.go | 141 +++----- vms/platformvm/network/network_test.go | 91 +---- vms/platformvm/service.go | 4 +- vms/platformvm/service_test.go | 4 +- vms/platformvm/validator_set_property_test.go | 4 +- vms/platformvm/vm.go | 9 +- vms/platformvm/vm_regression_test.go | 60 ++-- vms/platformvm/vm_test.go | 34 +- 30 files changed, 693 insertions(+), 667 deletions(-) diff --git a/go.mod b/go.mod index 1d77176690d0..555b0655a9ae 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ go 1.21 require ( github.com/DataDog/zstd v1.5.2 github.com/NYTimes/gziphandler v1.1.1 - github.com/ava-labs/coreth v0.13.0-rc.0 + github.com/ava-labs/coreth v0.13.1-rc.1 github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 diff --git a/go.sum b/go.sum index 160a06369f8e..1cb67722efe7 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/coreth v0.13.0-rc.0 h1:V2l3qj2ek3geKDJAnF2M94mYJK8kg2kePixujfJ0bmk= -github.com/ava-labs/coreth v0.13.0-rc.0/go.mod h1:eUMbBLDhlZASJjcbf0gIcD2GMn2rRRCUxC8MXLt5QQk= +github.com/ava-labs/coreth v0.13.1-rc.1 h1:T838dWZicYXrajXBeLbJTat5rtSXkZisOZ0qcHTEVjM= +github.com/ava-labs/coreth v0.13.1-rc.1/go.mod h1:LzD4pI5AjkPS3r86e36Xpt6PZB2pZ/wUS4Vn8kJwWmY= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index 3b910216d87c..5e3f4d3cf778 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -13,28 +13,32 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/utils" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/buffer" "github.com/ava-labs/avalanchego/utils/logging" ) const ( - typeLabel = "type" - pushType = "push" - pullType = "pull" + typeLabel = "type" + pushType = "push" + pullType = "pull" + unsentType = "unsent" + sentType = "sent" + + defaultGossipableCount = 64 ) var ( _ Gossiper = (*ValidatorGossiper)(nil) _ Gossiper = (*PullGossiper[*testTx])(nil) _ Gossiper = (*NoOpGossiper)(nil) - _ Gossiper = (*TestGossiper)(nil) - _ Accumulator[*testTx] = (*PushGossiper[*testTx])(nil) - _ Accumulator[*testTx] = (*NoOpAccumulator[*testTx])(nil) - _ Accumulator[*testTx] = (*TestAccumulator[*testTx])(nil) + _ Set[*testTx] = (*EmptySet[*testTx])(nil) + _ Set[*testTx] = (*FullSet[*testTx])(nil) metricLabels = []string{typeLabel} pushLabels = prometheus.Labels{ @@ -43,6 +47,18 @@ var ( pullLabels = prometheus.Labels{ typeLabel: pullType, } + unsentLabels = prometheus.Labels{ + typeLabel: unsentType, + } + sentLabels = prometheus.Labels{ + typeLabel: sentType, + } + + ErrInvalidDiscardedSize = errors.New("discarded size cannot be negative") + ErrInvalidTargetGossipSize = errors.New("target gossip size cannot be negative") + ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative") + + errEmptySetCantAdd = errors.New("empty set can not add") ) // Gossiper gossips Gossipables to other nodes @@ -51,13 +67,6 @@ type Gossiper interface { Gossip(ctx context.Context) error } -// Accumulator allows a caller to accumulate gossipables to be gossiped -type Accumulator[T Gossipable] interface { - Gossiper - // Add queues gossipables to be gossiped - Add(gossipables ...T) -} - // ValidatorGossiper only calls [Gossip] if the given node is a validator type ValidatorGossiper struct { Gossiper @@ -69,10 +78,12 @@ type ValidatorGossiper struct { // Metrics that are tracked across a gossip protocol. A given protocol should // only use a single instance of Metrics. type Metrics struct { - sentCount *prometheus.CounterVec - sentBytes *prometheus.CounterVec - receivedCount *prometheus.CounterVec - receivedBytes *prometheus.CounterVec + sentCount *prometheus.CounterVec + sentBytes *prometheus.CounterVec + receivedCount *prometheus.CounterVec + receivedBytes *prometheus.CounterVec + tracking *prometheus.GaugeVec + trackingLifetimeAverage prometheus.Gauge } // NewMetrics returns a common set of metrics @@ -101,12 +112,24 @@ func NewMetrics( Name: "gossip_received_bytes", Help: "amount of gossip received (bytes)", }, metricLabels), + tracking: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "gossip_tracking", + Help: "number of gossipables being tracked", + }, metricLabels), + trackingLifetimeAverage: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "gossip_tracking_lifetime_average", + Help: "average duration a gossipable has been tracked (ns)", + }), } err := utils.Err( metrics.Register(m.sentCount), metrics.Register(m.sentBytes), metrics.Register(m.receivedCount), metrics.Register(m.receivedBytes), + metrics.Register(m.tracking), + metrics.Register(m.trackingLifetimeAverage), ) return m, err } @@ -197,17 +220,17 @@ func (p *PullGossiper[_]) handleResponse( continue } - hash := gossipable.GossipID() + gossipID := gossipable.GossipID() p.log.Debug( "received gossip", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", hash), + zap.Stringer("id", gossipID), ) if err := p.set.Add(gossipable); err != nil { p.log.Debug( "failed to add gossip to the known set", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", hash), + zap.Stringer("id", gossipID), zap.Error(err), ) continue @@ -231,84 +254,231 @@ func (p *PullGossiper[_]) handleResponse( } // NewPushGossiper returns an instance of PushGossiper -func NewPushGossiper[T Gossipable](marshaller Marshaller[T], client *p2p.Client, metrics Metrics, targetGossipSize int) *PushGossiper[T] { - return &PushGossiper[T]{ - marshaller: marshaller, - client: client, - metrics: metrics, - targetGossipSize: targetGossipSize, - pending: buffer.NewUnboundedDeque[T](0), +func NewPushGossiper[T Gossipable]( + marshaller Marshaller[T], + mempool Set[T], + client *p2p.Client, + metrics Metrics, + discardedSize int, + targetGossipSize int, + maxRegossipFrequency time.Duration, +) (*PushGossiper[T], error) { + switch { + case discardedSize < 0: + return nil, ErrInvalidDiscardedSize + case targetGossipSize < 0: + return nil, ErrInvalidTargetGossipSize + case maxRegossipFrequency < 0: + return nil, ErrInvalidRegossipFrequency } + + return &PushGossiper[T]{ + marshaller: marshaller, + set: mempool, + client: client, + metrics: metrics, + targetGossipSize: targetGossipSize, + maxRegossipFrequency: maxRegossipFrequency, + + tracking: make(map[ids.ID]*tracking), + toGossip: buffer.NewUnboundedDeque[T](0), + toRegossip: buffer.NewUnboundedDeque[T](0), + discarded: &cache.LRU[ids.ID, struct{}]{Size: discardedSize}, + }, nil } // PushGossiper broadcasts gossip to peers randomly in the network type PushGossiper[T Gossipable] struct { - marshaller Marshaller[T] - client *p2p.Client - metrics Metrics - targetGossipSize int + marshaller Marshaller[T] + set Set[T] + client *p2p.Client + metrics Metrics + + targetGossipSize int + maxRegossipFrequency time.Duration - lock sync.Mutex - pending buffer.Deque[T] + lock sync.Mutex + tracking map[ids.ID]*tracking + addedTimeSum float64 // unix nanoseconds + toGossip buffer.Deque[T] + toRegossip buffer.Deque[T] + discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped } -// Gossip flushes any queued gossipables +type tracking struct { + addedTime float64 // unix nanoseconds + lastGossiped time.Time +} + +// Gossip flushes any queued gossipables. func (p *PushGossiper[T]) Gossip(ctx context.Context) error { + var ( + now = time.Now() + nowUnixNano = float64(now.UnixNano()) + ) + p.lock.Lock() - defer p.lock.Unlock() + defer func() { + p.updateMetrics(nowUnixNano) + p.lock.Unlock() + }() - if p.pending.Len() == 0 { + if len(p.tracking) == 0 { return nil } - sentBytes := 0 - gossip := make([][]byte, 0, p.pending.Len()) + var ( + sentBytes = 0 + gossip = make([][]byte, 0, defaultGossipableCount) + ) + + // Iterate over all unsent gossipables. for sentBytes < p.targetGossipSize { - gossipable, ok := p.pending.PeekLeft() + gossipable, ok := p.toGossip.PopLeft() if !ok { break } + // Ensure item is still in the set before we gossip. + gossipID := gossipable.GossipID() + tracking := p.tracking[gossipID] + if !p.set.Has(gossipID) { + delete(p.tracking, gossipID) + p.addedTimeSum -= tracking.addedTime + continue + } + bytes, err := p.marshaller.MarshalGossip(gossipable) if err != nil { - // remove this item so we don't get stuck in a loop - _, _ = p.pending.PopLeft() + delete(p.tracking, gossipID) + p.addedTimeSum -= tracking.addedTime return err } gossip = append(gossip, bytes) sentBytes += len(bytes) - p.pending.PopLeft() + p.toRegossip.PushRight(gossipable) + tracking.lastGossiped = now } + maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency) + + // Iterate over all previously sent gossipables to fill any remaining space + // in the gossip batch. + for sentBytes < p.targetGossipSize { + gossipable, ok := p.toRegossip.PopLeft() + if !ok { + break + } + + // Ensure item is still in the set before we gossip. + gossipID := gossipable.GossipID() + tracking := p.tracking[gossipID] + if !p.set.Has(gossipID) { + delete(p.tracking, gossipID) + p.addedTimeSum -= tracking.addedTime + p.discarded.Put(gossipID, struct{}{}) // only add to discarded if previously sent + continue + } + + // Ensure we don't attempt to send a gossipable too frequently. + if maxLastGossipTimeToRegossip.Before(tracking.lastGossiped) { + // Put the gossipable on the front of the queue to keep items sorted + // by last issuance time. + p.toRegossip.PushLeft(gossipable) + break + } + + bytes, err := p.marshaller.MarshalGossip(gossipable) + if err != nil { + // Should never happen because we've already sent this once. + delete(p.tracking, gossipID) + p.addedTimeSum -= tracking.addedTime + return err + } + + gossip = append(gossip, bytes) + sentBytes += len(bytes) + p.toRegossip.PushRight(gossipable) + tracking.lastGossiped = now + } + + // If there is nothing to gossip, we can exit early. + if len(gossip) == 0 { + return nil + } + + // Send gossipables to peers msgBytes, err := MarshalAppGossip(gossip) if err != nil { return err } - sentCountMetric, err := p.metrics.sentCount.GetMetricWith(pushLabels) if err != nil { return fmt.Errorf("failed to get sent count metric: %w", err) } - sentBytesMetric, err := p.metrics.sentBytes.GetMetricWith(pushLabels) if err != nil { return fmt.Errorf("failed to get sent bytes metric: %w", err) } - sentCountMetric.Add(float64(len(gossip))) sentBytesMetric.Add(float64(sentBytes)) - - return p.client.AppGossip(ctx, msgBytes) + if err := p.client.AppGossip(ctx, msgBytes); err != nil { + return fmt.Errorf("failed to gossip: %w", err) + } + return nil } +// Add enqueues new gossipables to be pushed. If a gossiable is already tracked, +// it is not added again. func (p *PushGossiper[T]) Add(gossipables ...T) { + var ( + now = time.Now() + nowUnixNano = float64(now.UnixNano()) + ) + p.lock.Lock() - defer p.lock.Unlock() + defer func() { + p.updateMetrics(nowUnixNano) + p.lock.Unlock() + }() + // Add new gossipables to be sent. for _, gossipable := range gossipables { - p.pending.PushRight(gossipable) + gossipID := gossipable.GossipID() + if _, ok := p.tracking[gossipID]; ok { + continue + } + + tracking := &tracking{ + addedTime: nowUnixNano, + } + if _, ok := p.discarded.Get(gossipID); ok { + // Pretend that recently discarded transactions were just gossiped. + tracking.lastGossiped = now + p.toRegossip.PushRight(gossipable) + } else { + p.toGossip.PushRight(gossipable) + } + p.tracking[gossipID] = tracking + p.addedTimeSum += nowUnixNano + } +} + +func (p *PushGossiper[_]) updateMetrics(nowUnixNano float64) { + var ( + numUnsent = float64(p.toGossip.Len()) + numSent = float64(p.toRegossip.Len()) + numTracking = numUnsent + numSent + averageLifetime float64 + ) + if numTracking != 0 { + averageLifetime = nowUnixNano - p.addedTimeSum/numTracking } + + p.metrics.tracking.With(unsentLabels).Set(numUnsent) + p.metrics.tracking.With(sentLabels).Set(numSent) + p.metrics.trackingLifetimeAverage.Set(averageLifetime) } // Every calls [Gossip] every [frequency] amount of time. @@ -335,39 +505,50 @@ func (NoOpGossiper) Gossip(context.Context) error { return nil } -type NoOpAccumulator[T Gossipable] struct{} +type TestGossiper struct { + GossipF func(ctx context.Context) error +} + +func (t *TestGossiper) Gossip(ctx context.Context) error { + return t.GossipF(ctx) +} + +type EmptySet[T Gossipable] struct{} -func (NoOpAccumulator[_]) Gossip(context.Context) error { +func (EmptySet[_]) Gossip(context.Context) error { return nil } -func (NoOpAccumulator[T]) Add(...T) {} +func (EmptySet[T]) Add(T) error { + return errEmptySetCantAdd +} -type TestGossiper struct { - GossipF func(ctx context.Context) error +func (EmptySet[T]) Has(ids.ID) bool { + return false } -func (t *TestGossiper) Gossip(ctx context.Context) error { - return t.GossipF(ctx) +func (EmptySet[T]) Iterate(func(gossipable T) bool) {} + +func (EmptySet[_]) GetFilter() ([]byte, []byte) { + return bloom.EmptyFilter.Marshal(), ids.Empty[:] } -type TestAccumulator[T Gossipable] struct { - GossipF func(ctx context.Context) error - AddF func(...T) +type FullSet[T Gossipable] struct{} + +func (FullSet[_]) Gossip(context.Context) error { + return nil } -func (t TestAccumulator[T]) Gossip(ctx context.Context) error { - if t.GossipF == nil { - return nil - } +func (FullSet[T]) Add(T) error { + return nil +} - return t.GossipF(ctx) +func (FullSet[T]) Has(ids.ID) bool { + return true } -func (t TestAccumulator[T]) Add(gossipables ...T) { - if t.AddF == nil { - return - } +func (FullSet[T]) Iterate(func(gossipable T) bool) {} - t.AddF(gossipables...) +func (FullSet[_]) GetFilter() ([]byte, []byte) { + return bloom.FullFilter.Marshal(), ids.Empty[:] } diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index 5d6fe9914d4c..ce612be1e55a 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -124,7 +124,6 @@ func TestGossiperGossip(t *testing.T) { handler := NewHandler[*testTx]( logging.NoLog{}, marshaller, - NoOpAccumulator[*testTx]{}, responseSet, metrics, tt.targetResponseSize, @@ -233,70 +232,158 @@ func TestValidatorGossiper(t *testing.T) { require.Equal(2, calls) } +func TestPushGossiperNew(t *testing.T) { + tests := []struct { + name string + discardedSize int + targetGossipSize int + maxRegossipFrequency time.Duration + expected error + }{ + { + name: "invalid discarded size", + discardedSize: -1, + expected: ErrInvalidDiscardedSize, + }, + { + name: "invalid target gossip size", + targetGossipSize: -1, + expected: ErrInvalidTargetGossipSize, + }, + { + name: "invalid max re-gossip frequency", + maxRegossipFrequency: -1, + expected: ErrInvalidRegossipFrequency, + }, + } + + for _, tt := range tests { + _, err := NewPushGossiper[*testTx]( + nil, + nil, + nil, + Metrics{}, + tt.discardedSize, + tt.targetGossipSize, + tt.maxRegossipFrequency, + ) + require.ErrorIs(t, err, tt.expected) + } +} + // Tests that the outgoing gossip is equivalent to what was accumulated func TestPushGossiper(t *testing.T) { + type cycle struct { + toAdd []*testTx + expected []*testTx + } tests := []struct { - name string - cycles [][]*testTx + name string + cycles []cycle + shouldRegossip bool }{ { - name: "single cycle", - cycles: [][]*testTx{ + name: "single cycle with regossip", + cycles: []cycle{ { - &testTx{ - id: ids.ID{0}, + toAdd: []*testTx{ + { + id: ids.ID{0}, + }, + { + id: ids.ID{1}, + }, + { + id: ids.ID{2}, + }, }, - &testTx{ - id: ids.ID{1}, - }, - &testTx{ - id: ids.ID{2}, + expected: []*testTx{ + { + id: ids.ID{0}, + }, + { + id: ids.ID{1}, + }, + { + id: ids.ID{2}, + }, }, }, }, + shouldRegossip: true, }, { - name: "multiple cycles", - cycles: [][]*testTx{ + name: "multiple cycles with regossip", + cycles: []cycle{ { - &testTx{ - id: ids.ID{0}, + toAdd: []*testTx{ + { + id: ids.ID{0}, + }, + }, + expected: []*testTx{ + { + id: ids.ID{0}, + }, }, }, { - &testTx{ - id: ids.ID{1}, + toAdd: []*testTx{ + { + id: ids.ID{1}, + }, }, - &testTx{ - id: ids.ID{2}, + expected: []*testTx{ + { + id: ids.ID{1}, + }, + { + id: ids.ID{0}, + }, }, }, { - &testTx{ - id: ids.ID{3}, + toAdd: []*testTx{ + { + id: ids.ID{2}, + }, }, - &testTx{ - id: ids.ID{4}, - }, - &testTx{ - id: ids.ID{5}, + expected: []*testTx{ + { + id: ids.ID{2}, + }, + { + id: ids.ID{1}, + }, + { + id: ids.ID{0}, + }, }, }, + }, + shouldRegossip: true, + }, + { + name: "verify that we don't gossip empty messages", + cycles: []cycle{ { - &testTx{ - id: ids.ID{6}, - }, - &testTx{ - id: ids.ID{7}, + toAdd: []*testTx{ + { + id: ids.ID{0}, + }, }, - &testTx{ - id: ids.ID{8}, - }, - &testTx{ - id: ids.ID{9}, + expected: []*testTx{ + { + id: ids.ID{0}, + }, }, }, + { + toAdd: []*testTx{}, + expected: []*testTx{}, + }, }, + shouldRegossip: false, }, } @@ -319,141 +406,63 @@ func TestPushGossiper(t *testing.T) { metrics, err := NewMetrics(prometheus.NewRegistry(), "") require.NoError(err) marshaller := testMarshaller{} - gossiper := NewPushGossiper[*testTx]( + + regossipTime := time.Hour + if tt.shouldRegossip { + regossipTime = time.Nanosecond + } + + gossiper, err := NewPushGossiper[*testTx]( marshaller, + FullSet[*testTx]{}, client, metrics, + 0, // the discarded cache size doesn't matter for this test units.MiB, + regossipTime, ) + require.NoError(err) - for _, gossipables := range tt.cycles { - gossiper.Add(gossipables...) + for _, cycle := range tt.cycles { + gossiper.Add(cycle.toAdd...) require.NoError(gossiper.Gossip(ctx)) want := &sdk.PushGossip{ - Gossip: make([][]byte, 0, len(tt.cycles)), + Gossip: make([][]byte, 0, len(cycle.expected)), } - for _, gossipable := range gossipables { + for _, gossipable := range cycle.expected { bytes, err := marshaller.MarshalGossip(gossipable) require.NoError(err) want.Gossip = append(want.Gossip, bytes) } - // remove the handler prefix - sentMsg := <-sender.SentAppGossip - got := &sdk.PushGossip{} - require.NoError(proto.Unmarshal(sentMsg[1:], got)) + if len(want.Gossip) > 0 { + // remove the handler prefix + sentMsg := <-sender.SentAppGossip + got := &sdk.PushGossip{} + require.NoError(proto.Unmarshal(sentMsg[1:], got)) + + require.Equal(want.Gossip, got.Gossip) + } else { + select { + case <-sender.SentAppGossip: + require.FailNow("unexpectedly sent gossip message") + default: + } + } - require.Equal(want.Gossip, got.Gossip) + if tt.shouldRegossip { + // Ensure that subsequent calls to `time.Now()` are + // sufficient for regossip. + time.Sleep(regossipTime + time.Nanosecond) + } } }) } } -// Tests that gossip to a peer should forward the gossip if it was not -// previously known -func TestPushGossipE2E(t *testing.T) { - require := require.New(t) - - // tx known by both the sender and the receiver which should not be - // forwarded - knownTx := &testTx{id: ids.GenerateTestID()} - - log := logging.NoLog{} - bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 100, 0.01, 0.05) - require.NoError(err) - set := &testSet{ - txs: make(map[ids.ID]*testTx), - bloom: bloom, - } - require.NoError(set.Add(knownTx)) - - forwarder := &common.FakeSender{ - SentAppGossip: make(chan []byte, 1), - } - forwarderNetwork, err := p2p.NewNetwork(log, forwarder, prometheus.NewRegistry(), "") - require.NoError(err) - handlerID := uint64(123) - client := forwarderNetwork.NewClient(handlerID) - - metrics, err := NewMetrics(prometheus.NewRegistry(), "") - require.NoError(err) - marshaller := testMarshaller{} - forwarderGossiper := NewPushGossiper[*testTx]( - marshaller, - client, - metrics, - units.MiB, - ) - - handler := NewHandler[*testTx]( - log, - marshaller, - forwarderGossiper, - set, - metrics, - 0, - ) - require.NoError(err) - require.NoError(forwarderNetwork.AddHandler(handlerID, handler)) - - issuer := &common.FakeSender{ - SentAppGossip: make(chan []byte, 1), - } - issuerNetwork, err := p2p.NewNetwork(log, issuer, prometheus.NewRegistry(), "") - require.NoError(err) - issuerClient := issuerNetwork.NewClient(handlerID) - require.NoError(err) - issuerGossiper := NewPushGossiper[*testTx]( - marshaller, - issuerClient, - metrics, - units.MiB, - ) - - want := []*testTx{ - {id: ids.GenerateTestID()}, - {id: ids.GenerateTestID()}, - {id: ids.GenerateTestID()}, - } - - // gossip both some unseen txs and one the receiver already knows about - var gossiped []*testTx - gossiped = append(gossiped, want...) - gossiped = append(gossiped, knownTx) - - issuerGossiper.Add(gossiped...) - addedToSet := make([]*testTx, 0, len(want)) - set.onAdd = func(tx *testTx) { - addedToSet = append(addedToSet, tx) - } - - ctx := context.Background() - require.NoError(issuerGossiper.Gossip(ctx)) - - // make sure that we only add new txs someone gossips to us - require.NoError(forwarderNetwork.AppGossip(ctx, ids.EmptyNodeID, <-issuer.SentAppGossip)) - require.Equal(want, addedToSet) - - // make sure that we only forward txs we have not already seen before - forwardedBytes := <-forwarder.SentAppGossip - forwardedMsg := &sdk.PushGossip{} - require.NoError(proto.Unmarshal(forwardedBytes[1:], forwardedMsg)) - require.Len(forwardedMsg.Gossip, len(want)) - - gotForwarded := make([]*testTx, 0, len(addedToSet)) - - for _, bytes := range forwardedMsg.Gossip { - tx, err := marshaller.UnmarshalGossip(bytes) - require.NoError(err) - gotForwarded = append(gotForwarded, tx) - } - - require.Equal(want, gotForwarded) -} - type testValidatorSet struct { validators set.Set[ids.NodeID] } diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go index 238c62b4641c..6af60d666bb6 100644 --- a/network/p2p/gossip/gossipable.go +++ b/network/p2p/gossip/gossipable.go @@ -21,6 +21,8 @@ type Set[T Gossipable] interface { // Add adds a Gossipable to the set. Returns an error if gossipable was not // added. Add(gossipable T) error + // Has returns true if the gossipable is in the set. + Has(gossipID ids.ID) bool // Iterate iterates over elements until [f] returns false Iterate(f func(gossipable T) bool) // GetFilter returns the byte representation of bloom filter and its diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 38e883926366..5c125864cc62 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -21,7 +21,6 @@ var _ p2p.Handler = (*Handler[*testTx])(nil) func NewHandler[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - accumulator Accumulator[T], set Set[T], metrics Metrics, targetResponseSize int, @@ -30,7 +29,6 @@ func NewHandler[T Gossipable]( Handler: p2p.NoOpHandler{}, log: log, marshaller: marshaller, - accumulator: accumulator, set: set, metrics: metrics, targetResponseSize: targetResponseSize, @@ -40,7 +38,6 @@ func NewHandler[T Gossipable]( type Handler[T Gossipable] struct { p2p.Handler marshaller Marshaller[T] - accumulator Accumulator[T] log logging.Logger set Set[T] metrics Metrics @@ -97,7 +94,7 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req return MarshalAppResponse(gossipBytes) } -func (h Handler[_]) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { +func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) { gossip, err := ParseAppGossip(gossipBytes) if err != nil { h.log.Debug("failed to unmarshal gossip", zap.Error(err)) @@ -123,16 +120,7 @@ func (h Handler[_]) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipByte zap.Stringer("id", gossipable.GossipID()), zap.Error(err), ) - continue } - - // continue gossiping messages we have not seen to other peers - h.accumulator.Add(gossipable) - } - - if err := h.accumulator.Gossip(ctx); err != nil { - h.log.Error("failed to forward gossip", zap.Error(err)) - return } receivedCountMetric, err := h.metrics.receivedCount.GetMetricWith(pushLabels) diff --git a/network/p2p/gossip/test_gossip.go b/network/p2p/gossip/test_gossip.go index 03098399462e..7f8782b65916 100644 --- a/network/p2p/gossip/test_gossip.go +++ b/network/p2p/gossip/test_gossip.go @@ -56,6 +56,11 @@ func (t *testSet) Add(gossipable *testTx) error { return nil } +func (t *testSet) Has(gossipID ids.ID) bool { + _, ok := t.txs[gossipID] + return ok +} + func (t *testSet) Iterate(f func(gossipable *testTx) bool) { for _, tx := range t.txs { if !f(tx) { diff --git a/vms/avm/config_test.go b/vms/avm/config_test.go index 27481d78b901..748ab8ae3250 100644 --- a/vms/avm/config_test.go +++ b/vms/avm/config_test.go @@ -40,6 +40,9 @@ func TestParseConfig(t *testing.T) { Network: network.Config{ MaxValidatorSetStaleness: time.Nanosecond, TargetGossipSize: network.DefaultConfig.TargetGossipSize, + PushGossipDiscardedCacheSize: network.DefaultConfig.PushGossipDiscardedCacheSize, + PushGossipMaxRegossipFrequency: network.DefaultConfig.PushGossipMaxRegossipFrequency, + PushGossipFrequency: network.DefaultConfig.PushGossipFrequency, PullGossipPollSize: network.DefaultConfig.PullGossipPollSize, PullGossipFrequency: network.DefaultConfig.PullGossipFrequency, PullGossipThrottlingPeriod: network.DefaultConfig.PullGossipThrottlingPeriod, @@ -47,7 +50,6 @@ func TestParseConfig(t *testing.T) { ExpectedBloomFilterElements: network.DefaultConfig.ExpectedBloomFilterElements, ExpectedBloomFilterFalsePositiveProbability: network.DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, MaxBloomFilterFalsePositiveProbability: network.DefaultConfig.MaxBloomFilterFalsePositiveProbability, - LegacyPushGossipCacheSize: network.DefaultConfig.LegacyPushGossipCacheSize, }, IndexTransactions: DefaultConfig.IndexTransactions, IndexAllowIncomplete: DefaultConfig.IndexAllowIncomplete, diff --git a/vms/avm/environment_test.go b/vms/avm/environment_test.go index 7b8ec8902078..92675ef96dea 100644 --- a/vms/avm/environment_test.go +++ b/vms/avm/environment_test.go @@ -489,7 +489,7 @@ func issueAndAccept( issuer <-chan common.Message, tx *txs.Tx, ) { - txID, err := vm.issueTx(tx) + txID, err := vm.issueTxFromRPC(tx) require.NoError(err) require.Equal(tx.ID(), txID) diff --git a/vms/avm/network/config.go b/vms/avm/network/config.go index 8536504d8383..599d7a962636 100644 --- a/vms/avm/network/config.go +++ b/vms/avm/network/config.go @@ -12,6 +12,9 @@ import ( var DefaultConfig = Config{ MaxValidatorSetStaleness: time.Minute, TargetGossipSize: 20 * units.KiB, + PushGossipDiscardedCacheSize: 1024, + PushGossipMaxRegossipFrequency: 10 * time.Second, + PushGossipFrequency: 500 * time.Millisecond, PullGossipPollSize: 1, PullGossipFrequency: 1500 * time.Millisecond, PullGossipThrottlingPeriod: 10 * time.Second, @@ -19,7 +22,6 @@ var DefaultConfig = Config{ ExpectedBloomFilterElements: 8 * 1024, ExpectedBloomFilterFalsePositiveProbability: .01, MaxBloomFilterFalsePositiveProbability: .05, - LegacyPushGossipCacheSize: 512, } type Config struct { @@ -30,6 +32,15 @@ type Config struct { // sent when pushing transactions and when responded to transaction pull // requests. TargetGossipSize int `json:"target-gossip-size"` + // PushGossipDiscardedCacheSize is the number of txIDs to cache to avoid + // pushing transactions that were recently dropped from the mempool. + PushGossipDiscardedCacheSize int `json:"push-gossip-discarded-cache-size"` + // PushGossipMaxRegossipFrequency is the limit for how frequently a + // transaction will be push gossiped. + PushGossipMaxRegossipFrequency time.Duration `json:"push-gossip-max-regossip-frequency"` + // PushGossipFrequency is how frequently rounds of push gossip are + // performed. + PushGossipFrequency time.Duration `json:"push-gossip-frequency"` // PullGossipPollSize is the number of validators to sample when performing // a round of pull gossip. PullGossipPollSize int `json:"pull-gossip-poll-size"` @@ -57,10 +68,4 @@ type Config struct { // The smaller this number is, the more frequently that the bloom filter // will be regenerated. MaxBloomFilterFalsePositiveProbability float64 `json:"max-bloom-filter-false-positive-probability"` - // LegacyPushGossipCacheSize tracks the most recently received transactions - // and ensures to only gossip them once. - // - // Deprecated: The legacy push gossip mechanism is deprecated in favor of - // the p2p SDK's push gossip mechanism. - LegacyPushGossipCacheSize int `json:"legacy-push-gossip-cache-size"` } diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index 0876f122c660..2d3ab40bf7bb 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -119,10 +119,15 @@ func (g *gossipMempool) Add(tx *txs.Tx) error { return err } - return g.AddVerified(tx) + return g.AddWithoutVerification(tx) } -func (g *gossipMempool) AddVerified(tx *txs.Tx) error { +func (g *gossipMempool) Has(txID ids.ID) bool { + _, ok := g.Mempool.Get(txID) + return ok +} + +func (g *gossipMempool) AddWithoutVerification(tx *txs.Tx) error { if err := g.Mempool.Add(tx); err != nil { g.Mempool.MarkDropped(tx.ID(), err) return err diff --git a/vms/avm/network/gossip_test.go b/vms/avm/network/gossip_test.go index 0a19dccc1d73..afd08920f1c1 100644 --- a/vms/avm/network/gossip_test.go +++ b/vms/avm/network/gossip_test.go @@ -128,6 +128,6 @@ func TestGossipMempoolAddVerified(t *testing.T) { TxID: ids.GenerateTestID(), } - require.NoError(mempool.AddVerified(tx)) + require.NoError(mempool.AddWithoutVerification(tx)) require.True(mempool.bloom.Has(tx)) } diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index 9cad3cb9aa63..63785b2b66d9 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -5,13 +5,11 @@ package network import ( "context" - "sync" "time" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" @@ -33,7 +31,8 @@ var ( type Network struct { *p2p.Network - txPushGossiper gossip.Accumulator[*txs.Tx] + txPushGossiper *gossip.PushGossiper[*txs.Tx] + txPushGossipFrequency time.Duration txPullGossiper gossip.Gossiper txPullGossipFrequency time.Duration @@ -41,10 +40,6 @@ type Network struct { parser txs.Parser mempool *gossipMempool appSender common.AppSender - - // gossip related attributes - recentTxsLock sync.Mutex - recentTxs *cache.LRU[ids.ID, struct{}] } func New( @@ -80,13 +75,6 @@ func New( return nil, err } - txPushGossiper := gossip.NewPushGossiper[*txs.Tx]( - marshaller, - txGossipClient, - txGossipMetrics, - config.TargetGossipSize, - ) - gossipMempool, err := newGossipMempool( mempool, registerer, @@ -101,8 +89,20 @@ func New( return nil, err } - var txPullGossiper gossip.Gossiper - txPullGossiper = gossip.NewPullGossiper[*txs.Tx]( + txPushGossiper, err := gossip.NewPushGossiper[*txs.Tx]( + marshaller, + gossipMempool, + txGossipClient, + txGossipMetrics, + config.PushGossipDiscardedCacheSize, + config.TargetGossipSize, + config.PushGossipMaxRegossipFrequency, + ) + if err != nil { + return nil, err + } + + var txPullGossiper gossip.Gossiper = gossip.NewPullGossiper[*txs.Tx]( ctx.Log, marshaller, gossipMempool, @@ -121,7 +121,6 @@ func New( handler := gossip.NewHandler[*txs.Tx]( ctx.Log, marshaller, - txPushGossiper, gossipMempool, txGossipMetrics, config.TargetGossipSize, @@ -154,20 +153,21 @@ func New( return &Network{ Network: p2pNetwork, txPushGossiper: txPushGossiper, + txPushGossipFrequency: config.PushGossipFrequency, txPullGossiper: txPullGossiper, txPullGossipFrequency: config.PullGossipFrequency, ctx: ctx, parser: parser, mempool: gossipMempool, appSender: appSender, - - recentTxs: &cache.LRU[ids.ID, struct{}]{ - Size: config.LegacyPushGossipCacheSize, - }, }, nil } -func (n *Network) Gossip(ctx context.Context) { +func (n *Network) PushGossip(ctx context.Context) { + gossip.Every(ctx, n.ctx.Log, n.txPushGossiper, n.txPushGossipFrequency) +} + +func (n *Network) PullGossip(ctx context.Context) { gossip.Every(ctx, n.ctx.Log, n.txPullGossiper, n.txPullGossipFrequency) } @@ -204,93 +204,41 @@ func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b return nil } - if err := n.mempool.Add(tx); err == nil { - txID := tx.ID() - n.txPushGossiper.Add(tx) - if err := n.txPushGossiper.Gossip(ctx); err != nil { - n.ctx.Log.Error("failed to gossip tx", - zap.Stringer("txID", tx.ID()), - zap.Error(err), - ) - } - n.gossipTxMessage(ctx, txID, msgBytes) + if err := n.mempool.Add(tx); err != nil { + n.ctx.Log.Debug("tx failed to be added to the mempool", + zap.Stringer("txID", tx.ID()), + zap.Error(err), + ) } return nil } -// IssueTx attempts to add a tx to the mempool, after verifying it. If the tx is -// added to the mempool, it will attempt to push gossip the tx to random peers -// in the network using both the legacy and p2p SDK. +// IssueTxFromRPC attempts to add a tx to the mempool, after verifying it. If +// the tx is added to the mempool, it will attempt to push gossip the tx to +// random peers in the network. // // If the tx is already in the mempool, mempool.ErrDuplicateTx will be // returned. // If the tx is not added to the mempool, an error will be returned. -func (n *Network) IssueTx(ctx context.Context, tx *txs.Tx) error { +func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { if err := n.mempool.Add(tx); err != nil { return err } - return n.gossipTx(ctx, tx) + n.txPushGossiper.Add(tx) + return nil } -// IssueVerifiedTx attempts to add a tx to the mempool, without first verifying -// it. If the tx is added to the mempool, it will attempt to push gossip the tx -// to random peers in the network using both the legacy and p2p SDK. +// IssueTxFromRPCWithoutVerification attempts to add a tx to the mempool, +// without first verifying it. If the tx is added to the mempool, it will +// attempt to push gossip the tx to random peers in the network. // // If the tx is already in the mempool, mempool.ErrDuplicateTx will be // returned. // If the tx is not added to the mempool, an error will be returned. -func (n *Network) IssueVerifiedTx(ctx context.Context, tx *txs.Tx) error { - if err := n.mempool.AddVerified(tx); err != nil { +func (n *Network) IssueTxFromRPCWithoutVerification(tx *txs.Tx) error { + if err := n.mempool.AddWithoutVerification(tx); err != nil { return err } - return n.gossipTx(ctx, tx) -} - -// gossipTx pushes the tx to peers using both the legacy and p2p SDK. -func (n *Network) gossipTx(ctx context.Context, tx *txs.Tx) error { n.txPushGossiper.Add(tx) - if err := n.txPushGossiper.Gossip(ctx); err != nil { - n.ctx.Log.Error("failed to gossip tx", - zap.Stringer("txID", tx.ID()), - zap.Error(err), - ) - } - - txBytes := tx.Bytes() - msg := &message.Tx{ - Tx: txBytes, - } - msgBytes, err := message.Build(msg) - if err != nil { - return err - } - - txID := tx.ID() - n.gossipTxMessage(ctx, txID, msgBytes) return nil } - -// gossipTxMessage pushes the tx message to peers using the legacy format. -// If the tx was recently gossiped, this function does nothing. -func (n *Network) gossipTxMessage(ctx context.Context, txID ids.ID, msgBytes []byte) { - n.recentTxsLock.Lock() - _, has := n.recentTxs.Get(txID) - n.recentTxs.Put(txID, struct{}{}) - n.recentTxsLock.Unlock() - - // Don't gossip a transaction if it has been recently gossiped. - if has { - return - } - - n.ctx.Log.Debug("gossiping tx", - zap.Stringer("txID", txID), - ) - - if err := n.appSender.SendAppGossip(ctx, msgBytes); err != nil { - n.ctx.Log.Error("failed to gossip tx", - zap.Stringer("txID", txID), - zap.Error(err), - ) - } -} diff --git a/vms/avm/network/network_test.go b/vms/avm/network/network_test.go index 0e4ff2990b6d..7eacad534c05 100644 --- a/vms/avm/network/network_test.go +++ b/vms/avm/network/network_test.go @@ -32,6 +32,9 @@ var ( testConfig = Config{ MaxValidatorSetStaleness: time.Second, TargetGossipSize: 1, + PushGossipDiscardedCacheSize: 1, + PushGossipMaxRegossipFrequency: time.Second, + PushGossipFrequency: time.Second, PullGossipPollSize: 1, PullGossipFrequency: time.Second, PullGossipThrottlingPeriod: time.Second, @@ -39,7 +42,6 @@ var ( ExpectedBloomFilterElements: 10, ExpectedBloomFilterFalsePositiveProbability: .1, MaxBloomFilterFalsePositiveProbability: .5, - LegacyPushGossipCacheSize: 512, } errTest = errors.New("test error") @@ -71,7 +73,6 @@ func TestNetworkAppGossip(t *testing.T) { msgBytesFunc func() []byte mempoolFunc func(*gomock.Controller) mempool.Mempool txVerifierFunc func(*gomock.Controller) TxVerifier - appSenderFunc func(*gomock.Controller) common.AppSender } tests := []test{ @@ -172,11 +173,6 @@ func TestNetworkAppGossip(t *testing.T) { txVerifier.EXPECT().VerifyTx(gomock.Any()).Return(nil) return txVerifier }, - appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) - return appSender - }, }, } @@ -209,13 +205,6 @@ func TestNetworkAppGossip(t *testing.T) { txVerifierFunc = tt.txVerifierFunc } - appSenderFunc := func(ctrl *gomock.Controller) common.AppSender { - return common.NewMockSender(ctrl) - } - if tt.appSenderFunc != nil { - appSenderFunc = tt.appSenderFunc - } - n, err := New( &snow.Context{ Log: logging.NoLog{}, @@ -223,7 +212,7 @@ func TestNetworkAppGossip(t *testing.T) { parser, txVerifierFunc(ctrl), mempoolFunc(ctrl), - appSenderFunc(ctrl), + common.NewMockSender(ctrl), prometheus.NewRegistry(), testConfig, ) @@ -233,7 +222,7 @@ func TestNetworkAppGossip(t *testing.T) { } } -func TestNetworkIssueTx(t *testing.T) { +func TestNetworkIssueTxFromRPC(t *testing.T) { type test struct { name string mempoolFunc func(*gomock.Controller) mempool.Mempool @@ -304,6 +293,7 @@ func TestNetworkIssueTx(t *testing.T) { mempool.EXPECT().Add(gomock.Any()).Return(nil) mempool.EXPECT().Len().Return(0) mempool.EXPECT().RequestBuildBlock() + mempool.EXPECT().Get(gomock.Any()).Return(nil, true).Times(2) return mempool }, txVerifierFunc: func(ctrl *gomock.Controller) TxVerifier { @@ -313,7 +303,7 @@ func TestNetworkIssueTx(t *testing.T) { }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) return appSender }, expectedErr: nil, @@ -368,13 +358,15 @@ func TestNetworkIssueTx(t *testing.T) { testConfig, ) require.NoError(err) - err = n.IssueTx(context.Background(), &txs.Tx{}) + err = n.IssueTxFromRPC(&txs.Tx{}) require.ErrorIs(err, tt.expectedErr) + + require.NoError(n.txPushGossiper.Gossip(context.Background())) }) } } -func TestNetworkIssueVerifiedTx(t *testing.T) { +func TestNetworkIssueTxFromRPCWithoutVerification(t *testing.T) { type test struct { name string mempoolFunc func(*gomock.Controller) mempool.Mempool @@ -397,6 +389,7 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { name: "happy path", mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) + mempool.EXPECT().Get(gomock.Any()).Return(nil, true).Times(2) mempool.EXPECT().Add(gomock.Any()).Return(nil) mempool.EXPECT().Len().Return(0) mempool.EXPECT().RequestBuildBlock() @@ -404,7 +397,7 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) return appSender }, expectedErr: nil, @@ -452,48 +445,10 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { testConfig, ) require.NoError(err) - err = n.IssueVerifiedTx(context.Background(), &txs.Tx{}) + err = n.IssueTxFromRPCWithoutVerification(&txs.Tx{}) require.ErrorIs(err, tt.expectedErr) + + require.NoError(n.txPushGossiper.Gossip(context.Background())) }) } } - -func TestNetworkGossipTx(t *testing.T) { - require := require.New(t) - ctrl := gomock.NewController(t) - - parser, err := txs.NewParser( - time.Time{}, - []fxs.Fx{ - &secp256k1fx.Fx{}, - }, - ) - require.NoError(err) - - appSender := common.NewMockSender(ctrl) - - n, err := New( - &snow.Context{ - Log: logging.NoLog{}, - }, - parser, - executor.NewMockManager(ctrl), - mempool.NewMockMempool(ctrl), - appSender, - prometheus.NewRegistry(), - testConfig, - ) - require.NoError(err) - - // Case: Tx was recently gossiped - txID := ids.GenerateTestID() - n.recentTxs.Put(txID, struct{}{}) - n.gossipTxMessage(context.Background(), txID, []byte{}) - // Didn't make a call to SendAppGossip - - // Case: Tx was not recently gossiped - msgBytes := []byte{1, 2, 3} - appSender.EXPECT().SendAppGossip(gomock.Any(), msgBytes).Return(nil) - n.gossipTxMessage(context.Background(), ids.GenerateTestID(), msgBytes) - // Did make a call to SendAppGossip -} diff --git a/vms/avm/service.go b/vms/avm/service.go index 4dcc210df813..5392308480a1 100644 --- a/vms/avm/service.go +++ b/vms/avm/service.go @@ -213,7 +213,7 @@ func (s *Service) IssueTx(_ *http.Request, args *api.FormattedTx, reply *api.JSO return err } - reply.TxID, err = s.vm.issueTx(tx) + reply.TxID, err = s.vm.issueTxFromRPC(tx) return err } @@ -714,7 +714,7 @@ func (s *Service) CreateAsset(_ *http.Request, args *CreateAssetArgs, reply *Ass return err } - assetID, err := s.vm.issueTx(tx) + assetID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -879,7 +879,7 @@ func (s *Service) CreateNFTAsset(_ *http.Request, args *CreateNFTAssetArgs, repl return err } - assetID, err := s.vm.issueTx(tx) + assetID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1199,7 +1199,7 @@ func (s *Service) SendMultiple(_ *http.Request, args *SendMultipleArgs, reply *a return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1361,7 +1361,7 @@ func (s *Service) Mint(_ *http.Request, args *MintArgs, reply *api.JSONTxIDChang return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1494,7 +1494,7 @@ func (s *Service) SendNFT(_ *http.Request, args *SendNFTArgs, reply *api.JSONTxI return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1617,7 +1617,7 @@ func (s *Service) MintNFT(_ *http.Request, args *MintNFTArgs, reply *api.JSONTxI return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1754,7 +1754,7 @@ func (s *Service) Import(_ *http.Request, args *ImportArgs, reply *api.JSONTxID) return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } @@ -1885,7 +1885,7 @@ func (s *Service) Export(_ *http.Request, args *ExportArgs, reply *api.JSONTxIDC return err } - txID, err := s.vm.issueTx(tx) + txID, err := s.vm.issueTxFromRPC(tx) if err != nil { return fmt.Errorf("problem issuing transaction: %w", err) } diff --git a/vms/avm/vm.go b/vms/avm/vm.go index 833bd6f79a4d..f401398f0a24 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -459,12 +459,18 @@ func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID, toEngine chan< // handled asynchronously. vm.Atomic.Set(vm.network) - vm.awaitShutdown.Add(1) + vm.awaitShutdown.Add(2) go func() { defer vm.awaitShutdown.Done() - // Invariant: Gossip must never grab the context lock. - vm.network.Gossip(vm.onShutdownCtx) + // Invariant: PushGossip must never grab the context lock. + vm.network.PushGossip(vm.onShutdownCtx) + }() + go func() { + defer vm.awaitShutdown.Done() + + // Invariant: PullGossip must never grab the context lock. + vm.network.PullGossip(vm.onShutdownCtx) }() go func() { @@ -507,13 +513,13 @@ func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) { ****************************************************************************** */ -// issueTx attempts to send a transaction to consensus. +// issueTxFromRPC attempts to send a transaction to consensus. // // Invariant: The context lock is not held // Invariant: This function is only called after Linearize has been called. -func (vm *VM) issueTx(tx *txs.Tx) (ids.ID, error) { +func (vm *VM) issueTxFromRPC(tx *txs.Tx) (ids.ID, error) { txID := tx.ID() - err := vm.network.IssueTx(context.TODO(), tx) + err := vm.network.IssueTxFromRPC(tx) if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) { vm.ctx.Log.Debug("failed to add tx to mempool", zap.Stringer("txID", txID), diff --git a/vms/avm/wallet_service.go b/vms/avm/wallet_service.go index 321bf9e57eb4..96b4cd405486 100644 --- a/vms/avm/wallet_service.go +++ b/vms/avm/wallet_service.go @@ -4,7 +4,6 @@ package avm import ( - "context" "errors" "fmt" "net/http" @@ -45,7 +44,7 @@ func (w *WalletService) decided(txID ids.ID) { return } - err := w.vm.network.IssueVerifiedTx(context.TODO(), tx) + err := w.vm.network.IssueTxFromRPCWithoutVerification(tx) if err == nil { w.vm.ctx.Log.Info("issued tx to mempool over wallet API", zap.Stringer("txID", txID), @@ -78,7 +77,7 @@ func (w *WalletService) issue(tx *txs.Tx) (ids.ID, error) { } if w.pendingTxs.Len() == 0 { - if err := w.vm.network.IssueVerifiedTx(context.TODO(), tx); err == nil { + if err := w.vm.network.IssueTxFromRPCWithoutVerification(tx); err == nil { w.vm.ctx.Log.Info("issued tx to mempool over wallet API", zap.Stringer("txID", txID), ) diff --git a/vms/platformvm/block/builder/builder_test.go b/vms/platformvm/block/builder/builder_test.go index e3486f96dca2..001f326ce20a 100644 --- a/vms/platformvm/block/builder/builder_test.go +++ b/vms/platformvm/block/builder/builder_test.go @@ -52,7 +52,7 @@ func TestBuildBlockBasic(t *testing.T) { // Issue the transaction env.ctx.Lock.Unlock() - require.NoError(env.network.IssueTx(context.Background(), tx)) + require.NoError(env.network.IssueTxFromRPC(tx)) env.ctx.Lock.Lock() _, ok := env.mempool.Get(txID) require.True(ok) @@ -126,7 +126,7 @@ func TestBuildBlockShouldReward(t *testing.T) { // Issue the transaction env.ctx.Lock.Unlock() - require.NoError(env.network.IssueTx(context.Background(), tx)) + require.NoError(env.network.IssueTxFromRPC(tx)) env.ctx.Lock.Lock() _, ok := env.mempool.Get(txID) require.True(ok) @@ -247,7 +247,7 @@ func TestBuildBlockForceAdvanceTime(t *testing.T) { // Issue the transaction env.ctx.Lock.Unlock() - require.NoError(env.network.IssueTx(context.Background(), tx)) + require.NoError(env.network.IssueTxFromRPC(tx)) env.ctx.Lock.Lock() _, ok := env.mempool.Get(txID) require.True(ok) @@ -504,7 +504,7 @@ func TestPreviouslyDroppedTxsCannotBeReAddedToMempool(t *testing.T) { // Issue the transaction env.ctx.Lock.Unlock() - err = env.network.IssueTx(context.Background(), tx) + err = env.network.IssueTxFromRPC(tx) require.ErrorIs(err, errTestingDropped) env.ctx.Lock.Lock() _, ok := env.mempool.Get(txID) diff --git a/vms/platformvm/block/builder/helpers_test.go b/vms/platformvm/block/builder/helpers_test.go index 9190f01e2c6b..c95844b842b3 100644 --- a/vms/platformvm/block/builder/helpers_test.go +++ b/vms/platformvm/block/builder/helpers_test.go @@ -103,7 +103,7 @@ type environment struct { Builder blkManager blockexecutor.Manager mempool mempool.Mempool - network network.Network + network *network.Network sender *common.SenderTest isBootstrapped *utils.Atomic[bool] diff --git a/vms/platformvm/config/execution_config_test.go b/vms/platformvm/config/execution_config_test.go index 89fd5cd55b05..8c28208cb49c 100644 --- a/vms/platformvm/config/execution_config_test.go +++ b/vms/platformvm/config/execution_config_test.go @@ -45,14 +45,16 @@ func TestExecutionConfigUnmarshal(t *testing.T) { "network": { "max-validator-set-staleness": 1, "target-gossip-size": 2, - "pull-gossip-poll-size": 3, - "pull-gossip-frequency": 4, - "pull-gossip-throttling-period": 5, - "pull-gossip-throttling-limit": 6, - "expected-bloom-filter-elements":7, - "expected-bloom-filter-false-positive-probability": 8, - "max-bloom-filter-false-positive-probability": 9, - "legacy-push-gossip-cache-size": 10 + "push-gossip-discarded-cache-size": 3, + "push-gossip-max-regossip-frequency": 4, + "push-gossip-frequency": 5, + "pull-gossip-poll-size": 6, + "pull-gossip-frequency": 7, + "pull-gossip-throttling-period": 8, + "pull-gossip-throttling-limit": 9, + "expected-bloom-filter-elements": 10, + "expected-bloom-filter-false-positive-probability": 11, + "max-bloom-filter-false-positive-probability": 12 }, "block-cache-size": 1, "tx-cache-size": 2, @@ -71,14 +73,16 @@ func TestExecutionConfigUnmarshal(t *testing.T) { Network: network.Config{ MaxValidatorSetStaleness: 1, TargetGossipSize: 2, - PullGossipPollSize: 3, - PullGossipFrequency: 4, - PullGossipThrottlingPeriod: 5, - PullGossipThrottlingLimit: 6, - ExpectedBloomFilterElements: 7, - ExpectedBloomFilterFalsePositiveProbability: 8, - MaxBloomFilterFalsePositiveProbability: 9, - LegacyPushGossipCacheSize: 10, + PushGossipDiscardedCacheSize: 3, + PushGossipMaxRegossipFrequency: 4, + PushGossipFrequency: 5, + PullGossipPollSize: 6, + PullGossipFrequency: 7, + PullGossipThrottlingPeriod: 8, + PullGossipThrottlingLimit: 9, + ExpectedBloomFilterElements: 10, + ExpectedBloomFilterFalsePositiveProbability: 11, + MaxBloomFilterFalsePositiveProbability: 12, }, BlockCacheSize: 1, TxCacheSize: 2, @@ -100,6 +104,8 @@ func TestExecutionConfigUnmarshal(t *testing.T) { "network": { "max-validator-set-staleness": 1, "target-gossip-size": 2, + "push-gossip-discarded-cache-size": 1024, + "push-gossip-max-regossip-frequency": 10000000000, "pull-gossip-poll-size": 3, "pull-gossip-frequency": 4, "pull-gossip-throttling-period": 5 @@ -120,6 +126,9 @@ func TestExecutionConfigUnmarshal(t *testing.T) { Network: network.Config{ MaxValidatorSetStaleness: 1, TargetGossipSize: 2, + PushGossipDiscardedCacheSize: DefaultExecutionConfig.Network.PushGossipDiscardedCacheSize, + PushGossipMaxRegossipFrequency: DefaultExecutionConfig.Network.PushGossipMaxRegossipFrequency, + PushGossipFrequency: DefaultExecutionConfig.Network.PushGossipFrequency, PullGossipPollSize: 3, PullGossipFrequency: 4, PullGossipThrottlingPeriod: 5, @@ -127,7 +136,6 @@ func TestExecutionConfigUnmarshal(t *testing.T) { ExpectedBloomFilterElements: DefaultExecutionConfig.Network.ExpectedBloomFilterElements, ExpectedBloomFilterFalsePositiveProbability: DefaultExecutionConfig.Network.ExpectedBloomFilterFalsePositiveProbability, MaxBloomFilterFalsePositiveProbability: DefaultExecutionConfig.Network.MaxBloomFilterFalsePositiveProbability, - LegacyPushGossipCacheSize: DefaultExecutionConfig.Network.LegacyPushGossipCacheSize, }, BlockCacheSize: 1, TxCacheSize: 2, diff --git a/vms/platformvm/network/config.go b/vms/platformvm/network/config.go index 8536504d8383..599d7a962636 100644 --- a/vms/platformvm/network/config.go +++ b/vms/platformvm/network/config.go @@ -12,6 +12,9 @@ import ( var DefaultConfig = Config{ MaxValidatorSetStaleness: time.Minute, TargetGossipSize: 20 * units.KiB, + PushGossipDiscardedCacheSize: 1024, + PushGossipMaxRegossipFrequency: 10 * time.Second, + PushGossipFrequency: 500 * time.Millisecond, PullGossipPollSize: 1, PullGossipFrequency: 1500 * time.Millisecond, PullGossipThrottlingPeriod: 10 * time.Second, @@ -19,7 +22,6 @@ var DefaultConfig = Config{ ExpectedBloomFilterElements: 8 * 1024, ExpectedBloomFilterFalsePositiveProbability: .01, MaxBloomFilterFalsePositiveProbability: .05, - LegacyPushGossipCacheSize: 512, } type Config struct { @@ -30,6 +32,15 @@ type Config struct { // sent when pushing transactions and when responded to transaction pull // requests. TargetGossipSize int `json:"target-gossip-size"` + // PushGossipDiscardedCacheSize is the number of txIDs to cache to avoid + // pushing transactions that were recently dropped from the mempool. + PushGossipDiscardedCacheSize int `json:"push-gossip-discarded-cache-size"` + // PushGossipMaxRegossipFrequency is the limit for how frequently a + // transaction will be push gossiped. + PushGossipMaxRegossipFrequency time.Duration `json:"push-gossip-max-regossip-frequency"` + // PushGossipFrequency is how frequently rounds of push gossip are + // performed. + PushGossipFrequency time.Duration `json:"push-gossip-frequency"` // PullGossipPollSize is the number of validators to sample when performing // a round of pull gossip. PullGossipPollSize int `json:"pull-gossip-poll-size"` @@ -57,10 +68,4 @@ type Config struct { // The smaller this number is, the more frequently that the bloom filter // will be regenerated. MaxBloomFilterFalsePositiveProbability float64 `json:"max-bloom-filter-false-positive-probability"` - // LegacyPushGossipCacheSize tracks the most recently received transactions - // and ensures to only gossip them once. - // - // Deprecated: The legacy push gossip mechanism is deprecated in favor of - // the p2p SDK's push gossip mechanism. - LegacyPushGossipCacheSize int `json:"legacy-push-gossip-cache-size"` } diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 5259a80ee54d..4ef98de351ba 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -135,6 +135,11 @@ func (g *gossipMempool) Add(tx *txs.Tx) error { return nil } +func (g *gossipMempool) Has(txID ids.ID) bool { + _, ok := g.Mempool.Get(txID) + return ok +} + func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { g.lock.RLock() defer g.lock.RUnlock() diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index af51c4755f4d..8116524e0194 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -5,13 +5,12 @@ package network import ( "context" - "sync" + "errors" "time" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" @@ -25,17 +24,9 @@ import ( const TxGossipHandlerID = 0 -type Network interface { - common.AppHandler +var errMempoolDisabledWithPartialSync = errors.New("mempool is disabled partial syncing") - // Gossip starts gossiping transactions and blocks until it completes. - Gossip(ctx context.Context) - // IssueTx verifies the transaction at the currently preferred state, adds - // it to the mempool, and gossips it to the network. - IssueTx(context.Context, *txs.Tx) error -} - -type network struct { +type Network struct { *p2p.Network log logging.Logger @@ -44,13 +35,10 @@ type network struct { partialSyncPrimaryNetwork bool appSender common.AppSender - txPushGossiper gossip.Accumulator[*txs.Tx] - txPullGossiper gossip.Gossiper - txGossipFrequency time.Duration - - // gossip related attributes - recentTxsLock sync.Mutex - recentTxs *cache.LRU[ids.ID, struct{}] + txPushGossiper *gossip.PushGossiper[*txs.Tx] + txPushGossipFrequency time.Duration + txPullGossiper gossip.Gossiper + txPullGossipFrequency time.Duration } func New( @@ -64,7 +52,7 @@ func New( appSender common.AppSender, registerer prometheus.Registerer, config Config, -) (Network, error) { +) (*Network, error) { p2pNetwork, err := p2p.NewNetwork(log, appSender, registerer, "p2p") if err != nil { return nil, err @@ -87,13 +75,6 @@ func New( return nil, err } - txPushGossiper := gossip.NewPushGossiper[*txs.Tx]( - marshaller, - txGossipClient, - txGossipMetrics, - config.TargetGossipSize, - ) - gossipMempool, err := newGossipMempool( mempool, registerer, @@ -107,8 +88,20 @@ func New( return nil, err } - var txPullGossiper gossip.Gossiper - txPullGossiper = gossip.NewPullGossiper[*txs.Tx]( + txPushGossiper, err := gossip.NewPushGossiper[*txs.Tx]( + marshaller, + gossipMempool, + txGossipClient, + txGossipMetrics, + config.PushGossipDiscardedCacheSize, + config.TargetGossipSize, + config.PushGossipMaxRegossipFrequency, + ) + if err != nil { + return nil, err + } + + var txPullGossiper gossip.Gossiper = gossip.NewPullGossiper[*txs.Tx]( log, marshaller, gossipMempool, @@ -127,7 +120,6 @@ func New( handler := gossip.NewHandler[*txs.Tx]( log, marshaller, - txPushGossiper, gossipMempool, txGossipMetrics, config.TargetGossipSize, @@ -157,7 +149,7 @@ func New( return nil, err } - return &network{ + return &Network{ Network: p2pNetwork, log: log, txVerifier: txVerifier, @@ -165,23 +157,33 @@ func New( partialSyncPrimaryNetwork: partialSyncPrimaryNetwork, appSender: appSender, txPushGossiper: txPushGossiper, + txPushGossipFrequency: config.PushGossipFrequency, txPullGossiper: txPullGossiper, - txGossipFrequency: config.PullGossipFrequency, - recentTxs: &cache.LRU[ids.ID, struct{}]{Size: config.LegacyPushGossipCacheSize}, + txPullGossipFrequency: config.PullGossipFrequency, }, nil } -func (n *network) Gossip(ctx context.Context) { +func (n *Network) PushGossip(ctx context.Context) { + // TODO: Even though the node is running partial sync, we should support + // issuing transactions from the RPC. + if n.partialSyncPrimaryNetwork { + return + } + + gossip.Every(ctx, n.log, n.txPushGossiper, n.txPushGossipFrequency) +} + +func (n *Network) PullGossip(ctx context.Context) { // If the node is running partial sync, we should not perform any pull // gossip. if n.partialSyncPrimaryNetwork { return } - gossip.Every(ctx, n.log, n.txPullGossiper, n.txGossipFrequency) + gossip.Every(ctx, n.log, n.txPullGossiper, n.txPullGossipFrequency) } -func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error { +func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error { n.log.Debug("called AppGossip message handler", zap.Stringer("nodeID", nodeID), zap.Int("messageLen", len(msgBytes)), @@ -220,76 +222,37 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b ) return nil } - txID := tx.ID() - - if err := n.issueTx(tx); err == nil { - n.legacyGossipTx(ctx, txID, msgBytes) - n.txPushGossiper.Add(tx) - return n.txPushGossiper.Gossip(ctx) - } + // Returning an error here would result in shutting down the chain. Logging + // is already included inside addTxToMempool, so there's nothing to do with + // the returned error here. + _ = n.addTxToMempool(tx) return nil } -func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error { - if err := n.issueTx(tx); err != nil { +func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { + // TODO: We should still push the transaction to some peers when partial + // syncing. + if err := n.addTxToMempool(tx); err != nil { return err } - - txBytes := tx.Bytes() - msg := &message.Tx{ - Tx: txBytes, - } - msgBytes, err := message.Build(msg) - if err != nil { - return err - } - - txID := tx.ID() - n.legacyGossipTx(ctx, txID, msgBytes) n.txPushGossiper.Add(tx) - return n.txPushGossiper.Gossip(ctx) + return nil } -// returns nil if the tx is in the mempool -func (n *network) issueTx(tx *txs.Tx) error { +func (n *Network) addTxToMempool(tx *txs.Tx) error { // If we are partially syncing the Primary Network, we should not be // maintaining the transaction mempool locally. if n.partialSyncPrimaryNetwork { - return nil + return errMempoolDisabledWithPartialSync } - if err := n.mempool.Add(tx); err != nil { + err := n.mempool.Add(tx) + if err != nil { n.log.Debug("tx failed to be added to the mempool", zap.Stringer("txID", tx.ID()), zap.Error(err), ) - - return err - } - - return nil -} - -func (n *network) legacyGossipTx(ctx context.Context, txID ids.ID, msgBytes []byte) { - n.recentTxsLock.Lock() - _, has := n.recentTxs.Get(txID) - n.recentTxs.Put(txID, struct{}{}) - n.recentTxsLock.Unlock() - - // Don't gossip a transaction if it has been recently gossiped. - if has { - return - } - - n.log.Debug("gossiping tx", - zap.Stringer("txID", txID), - ) - - if err := n.appSender.SendAppGossip(ctx, msgBytes); err != nil { - n.log.Error("failed to gossip tx", - zap.Stringer("txID", txID), - zap.Error(err), - ) } + return err } diff --git a/vms/platformvm/network/network_test.go b/vms/platformvm/network/network_test.go index 56957b0007c2..463f5c0cdc26 100644 --- a/vms/platformvm/network/network_test.go +++ b/vms/platformvm/network/network_test.go @@ -29,6 +29,9 @@ var ( testConfig = Config{ MaxValidatorSetStaleness: time.Second, TargetGossipSize: 1, + PushGossipDiscardedCacheSize: 1, + PushGossipMaxRegossipFrequency: time.Second, + PushGossipFrequency: time.Second, PullGossipPollSize: 1, PullGossipFrequency: time.Second, PullGossipThrottlingPeriod: time.Second, @@ -36,7 +39,6 @@ var ( ExpectedBloomFilterElements: 10, ExpectedBloomFilterFalsePositiveProbability: .1, MaxBloomFilterFalsePositiveProbability: .5, - LegacyPushGossipCacheSize: 512, } ) @@ -68,7 +70,6 @@ func TestNetworkAppGossip(t *testing.T) { msgBytesFunc func() []byte mempoolFunc func(*gomock.Controller) mempool.Mempool partialSyncPrimaryNetwork bool - appSenderFunc func(*gomock.Controller) common.AppSender } tests := []test{ @@ -81,9 +82,6 @@ func TestNetworkAppGossip(t *testing.T) { mempoolFunc: func(*gomock.Controller) mempool.Mempool { return nil }, - appSenderFunc: func(*gomock.Controller) common.AppSender { - return nil - }, }, { // Shouldn't attempt to issue or gossip the tx @@ -99,9 +97,6 @@ func TestNetworkAppGossip(t *testing.T) { mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { return mempool.NewMockMempool(ctrl) }, - appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - return common.NewMockSender(ctrl) - }, }, { name: "issuance succeeds", @@ -122,13 +117,6 @@ func TestNetworkAppGossip(t *testing.T) { mempool.EXPECT().RequestBuildBlock(false) return mempool }, - appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // we should gossip the tx twice because sdk and legacy gossip - // currently runs together - appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Times(2) - return appSender - }, }, { // Issue returns error because tx was dropped. We shouldn't gossip the tx. @@ -147,12 +135,9 @@ func TestNetworkAppGossip(t *testing.T) { mempool.EXPECT().GetDropReason(gomock.Any()).Return(errTest) return mempool }, - appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - return common.NewMockSender(ctrl) - }, }, { - name: "should AppGossip if primary network is not being fully synced", + name: "shouldn't AppGossip if primary network is not being fully synced", msgBytesFunc: func() []byte { msg := message.Tx{ Tx: testTx.Bytes(), @@ -162,16 +147,9 @@ func TestNetworkAppGossip(t *testing.T) { return msgBytes }, mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { - mempool := mempool.NewMockMempool(ctrl) - // mempool.EXPECT().Has(gomock.Any()).Return(true) - return mempool + return mempool.NewMockMempool(ctrl) }, partialSyncPrimaryNetwork: true, - appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - appSender := common.NewMockSender(ctrl) - // appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()) - return appSender - }, }, } @@ -190,7 +168,7 @@ func TestNetworkAppGossip(t *testing.T) { testTxVerifier{}, tt.mempoolFunc(ctrl), tt.partialSyncPrimaryNetwork, - tt.appSenderFunc(ctrl), + common.NewMockSender(ctrl), prometheus.NewRegistry(), DefaultConfig, ) @@ -201,7 +179,7 @@ func TestNetworkAppGossip(t *testing.T) { } } -func TestNetworkIssueTx(t *testing.T) { +func TestNetworkIssueTxFromRPC(t *testing.T) { tx := &txs.Tx{} type test struct { @@ -273,19 +251,15 @@ func TestNetworkIssueTx(t *testing.T) { expectedErr: errTest, }, { - name: "AppGossip tx but do not add to mempool if primary network is not being fully synced", + name: "mempool is disabled if primary network is not being fully synced", mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { return mempool.NewMockMempool(ctrl) }, partialSyncPrimaryNetwork: true, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // we should gossip the tx twice because sdk and legacy gossip - // currently runs together - appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) - return appSender + return common.NewMockSender(ctrl) }, - expectedErr: nil, + expectedErr: errMempoolDisabledWithPartialSync, }, { name: "happy path", @@ -296,13 +270,12 @@ func TestNetworkIssueTx(t *testing.T) { mempool.EXPECT().Add(gomock.Any()).Return(nil) mempool.EXPECT().Len().Return(0) mempool.EXPECT().RequestBuildBlock(false) + mempool.EXPECT().Get(gomock.Any()).Return(nil, true).Times(2) return mempool }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // we should gossip the tx twice because sdk and legacy gossip - // currently runs together appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) return appSender }, expectedErr: nil, @@ -329,44 +302,10 @@ func TestNetworkIssueTx(t *testing.T) { ) require.NoError(err) - err = n.IssueTx(context.Background(), tx) + err = n.IssueTxFromRPC(tx) require.ErrorIs(err, tt.expectedErr) + + require.NoError(n.txPushGossiper.Gossip(context.Background())) }) } } - -func TestNetworkGossipTx(t *testing.T) { - require := require.New(t) - ctrl := gomock.NewController(t) - - appSender := common.NewMockSender(ctrl) - - snowCtx := snowtest.Context(t, ids.Empty) - nIntf, err := New( - snowCtx.Log, - snowCtx.NodeID, - snowCtx.SubnetID, - snowCtx.ValidatorState, - testTxVerifier{}, - mempool.NewMockMempool(ctrl), - false, - appSender, - prometheus.NewRegistry(), - testConfig, - ) - require.NoError(err) - require.IsType(&network{}, nIntf) - n := nIntf.(*network) - - // Case: Tx was recently gossiped - txID := ids.GenerateTestID() - n.recentTxs.Put(txID, struct{}{}) - n.legacyGossipTx(context.Background(), txID, []byte{}) - // Didn't make a call to SendAppGossip - - // Case: Tx was not recently gossiped - msgBytes := []byte{1, 2, 3} - appSender.EXPECT().SendAppGossip(gomock.Any(), msgBytes).Return(nil) - n.legacyGossipTx(context.Background(), ids.GenerateTestID(), msgBytes) - // Did make a call to SendAppGossip -} diff --git a/vms/platformvm/service.go b/vms/platformvm/service.go index 5a9c6b900819..6e48f198fb9a 100644 --- a/vms/platformvm/service.go +++ b/vms/platformvm/service.go @@ -1400,7 +1400,7 @@ func (s *Service) GetBlockchains(_ *http.Request, _ *struct{}, response *GetBloc return nil } -func (s *Service) IssueTx(req *http.Request, args *api.FormattedTx, response *api.JSONTxID) error { +func (s *Service) IssueTx(_ *http.Request, args *api.FormattedTx, response *api.JSONTxID) error { s.vm.ctx.Log.Debug("API called", zap.String("service", "platform"), zap.String("method", "issueTx"), @@ -1415,7 +1415,7 @@ func (s *Service) IssueTx(req *http.Request, args *api.FormattedTx, response *ap return fmt.Errorf("couldn't parse tx: %w", err) } - if err := s.vm.issueTx(req.Context(), tx); err != nil { + if err := s.vm.issueTxFromRPC(tx); err != nil { return fmt.Errorf("couldn't issue tx: %w", err) } diff --git a/vms/platformvm/service_test.go b/vms/platformvm/service_test.go index ad5409d313b7..84ef2c60422c 100644 --- a/vms/platformvm/service_test.go +++ b/vms/platformvm/service_test.go @@ -184,7 +184,7 @@ func TestGetTxStatus(t *testing.T) { require.Zero(resp.Reason) // put the chain in existing chain list - require.NoError(service.vm.Network.IssueTx(context.Background(), tx)) + require.NoError(service.vm.Network.IssueTxFromRPC(tx)) service.vm.ctx.Lock.Lock() block, err := service.vm.BuildBlock(context.Background()) @@ -285,7 +285,7 @@ func TestGetTx(t *testing.T) { err = service.GetTx(nil, arg, &response) require.ErrorIs(err, database.ErrNotFound) // We haven't issued the tx yet - require.NoError(service.vm.Network.IssueTx(context.Background(), tx)) + require.NoError(service.vm.Network.IssueTxFromRPC(tx)) service.vm.ctx.Lock.Lock() blk, err := service.vm.BuildBlock(context.Background()) diff --git a/vms/platformvm/validator_set_property_test.go b/vms/platformvm/validator_set_property_test.go index cdac03ca53db..abd08bc1866a 100644 --- a/vms/platformvm/validator_set_property_test.go +++ b/vms/platformvm/validator_set_property_test.go @@ -298,7 +298,7 @@ func addPrimaryValidatorWithBLSKey(vm *VM, data *validatorInputData) (*state.Sta func internalAddValidator(vm *VM, signedTx *txs.Tx) (*state.Staker, error) { vm.ctx.Lock.Unlock() - err := vm.issueTx(context.Background(), signedTx) + err := vm.issueTxFromRPC(signedTx) vm.ctx.Lock.Lock() if err != nil { @@ -689,7 +689,7 @@ func buildVM(t *testing.T) (*VM, ids.ID, error) { return nil, ids.Empty, err } vm.ctx.Lock.Unlock() - err = vm.issueTx(context.Background(), testSubnet1) + err = vm.issueTxFromRPC(testSubnet1) vm.ctx.Lock.Lock() if err != nil { return nil, ids.Empty, err diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 8c4801e06ea0..30e1b8c3d63d 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -62,7 +62,7 @@ var ( type VM struct { config.Config blockbuilder.Builder - network.Network + *network.Network validators.State metrics metrics.Metrics @@ -219,7 +219,8 @@ func (vm *VM) Initialize( vm.onShutdownCtx, vm.onShutdownCtxCancel = context.WithCancel(context.Background()) // TODO: Wait for this goroutine to exit during Shutdown once the platformvm // has better control of the context lock. - go vm.Network.Gossip(vm.onShutdownCtx) + go vm.Network.PushGossip(vm.onShutdownCtx) + go vm.Network.PullGossip(vm.onShutdownCtx) vm.Builder = blockbuilder.New( mempool, @@ -541,8 +542,8 @@ func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, erro return vm.state.GetBlockIDAtHeight(height) } -func (vm *VM) issueTx(ctx context.Context, tx *txs.Tx) error { - err := vm.Network.IssueTx(ctx, tx) +func (vm *VM) issueTxFromRPC(tx *txs.Tx) error { + err := vm.Network.IssueTxFromRPC(tx) if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) { vm.ctx.Log.Debug("failed to add tx to mempool", zap.Stringer("txID", tx.ID()), diff --git a/vms/platformvm/vm_regression_test.go b/vms/platformvm/vm_regression_test.go index e612340546fe..cae74f602936 100644 --- a/vms/platformvm/vm_regression_test.go +++ b/vms/platformvm/vm_regression_test.go @@ -77,7 +77,7 @@ func TestAddDelegatorTxOverDelegatedRegression(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() addValidatorBlock, err := vm.Builder.BuildBlock(context.Background()) @@ -112,7 +112,7 @@ func TestAddDelegatorTxOverDelegatedRegression(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addFirstDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addFirstDelegatorTx)) vm.ctx.Lock.Lock() addFirstDelegatorBlock, err := vm.Builder.BuildBlock(context.Background()) @@ -149,7 +149,7 @@ func TestAddDelegatorTxOverDelegatedRegression(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addSecondDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addSecondDelegatorTx)) vm.ctx.Lock.Lock() addSecondDelegatorBlock, err := vm.Builder.BuildBlock(context.Background()) @@ -176,7 +176,7 @@ func TestAddDelegatorTxOverDelegatedRegression(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - err = vm.issueTx(context.Background(), addThirdDelegatorTx) + err = vm.issueTxFromRPC(addThirdDelegatorTx) require.ErrorIs(err, executor.ErrOverDelegated) vm.ctx.Lock.Lock() } @@ -249,7 +249,7 @@ func TestAddDelegatorTxHeapCorruption(t *testing.T) { // issue the add validator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -274,7 +274,7 @@ func TestAddDelegatorTxHeapCorruption(t *testing.T) { // issue the first add delegator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addFirstDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addFirstDelegatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the first add delegator tx @@ -299,7 +299,7 @@ func TestAddDelegatorTxHeapCorruption(t *testing.T) { // issue the second add delegator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addSecondDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addSecondDelegatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the second add delegator tx @@ -324,7 +324,7 @@ func TestAddDelegatorTxHeapCorruption(t *testing.T) { // issue the third add delegator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addThirdDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addThirdDelegatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the third add delegator tx @@ -349,7 +349,7 @@ func TestAddDelegatorTxHeapCorruption(t *testing.T) { // issue the fourth add delegator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addFourthDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addFourthDelegatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the fourth add delegator tx @@ -1179,7 +1179,7 @@ func TestAddDelegatorTxAddBeforeRemove(t *testing.T) { // issue the add validator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1204,7 +1204,7 @@ func TestAddDelegatorTxAddBeforeRemove(t *testing.T) { // issue the first add delegator tx vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addFirstDelegatorTx)) + require.NoError(vm.issueTxFromRPC(addFirstDelegatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the first add delegator tx @@ -1230,7 +1230,7 @@ func TestAddDelegatorTxAddBeforeRemove(t *testing.T) { // attempting to issue the second add delegator tx should fail because the // total stake weight would go over the limit. vm.ctx.Lock.Unlock() - err = vm.issueTx(context.Background(), addSecondDelegatorTx) + err = vm.issueTxFromRPC(addSecondDelegatorTx) require.ErrorIs(err, executor.ErrOverDelegated) vm.ctx.Lock.Lock() } @@ -1266,7 +1266,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionNotTracked(t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1286,7 +1286,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionNotTracked(t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), createSubnetTx)) + require.NoError(vm.issueTxFromRPC(createSubnetTx)) vm.ctx.Lock.Lock() // trigger block creation for the subnet tx @@ -1309,7 +1309,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionNotTracked(t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addSubnetValidatorTx)) + require.NoError(vm.issueTxFromRPC(addSubnetValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1341,7 +1341,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionNotTracked(t vm.clock.Set(validatorStartTime) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), removeSubnetValidatorTx)) + require.NoError(vm.issueTxFromRPC(removeSubnetValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1391,7 +1391,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionTracked(t *t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1411,7 +1411,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionTracked(t *t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), createSubnetTx)) + require.NoError(vm.issueTxFromRPC(createSubnetTx)) vm.ctx.Lock.Lock() // trigger block creation for the subnet tx @@ -1434,7 +1434,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionTracked(t *t require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addSubnetValidatorTx)) + require.NoError(vm.issueTxFromRPC(addSubnetValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1458,7 +1458,7 @@ func TestRemovePermissionedValidatorDuringPendingToCurrentTransitionTracked(t *t vm.clock.Set(validatorStartTime) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), removeSubnetValidatorTx)) + require.NoError(vm.issueTxFromRPC(removeSubnetValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -1520,7 +1520,7 @@ func TestSubnetValidatorBLSKeyDiffAfterExpiry(t *testing.T) { uPrimaryTx := primaryTx.Unsigned.(*txs.AddPermissionlessValidatorTx) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryTx)) + require.NoError(vm.issueTxFromRPC(primaryTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1550,7 +1550,7 @@ func TestSubnetValidatorBLSKeyDiffAfterExpiry(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), subnetTx)) + require.NoError(vm.issueTxFromRPC(subnetTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1626,7 +1626,7 @@ func TestSubnetValidatorBLSKeyDiffAfterExpiry(t *testing.T) { uPrimaryRestartTx := primaryRestartTx.Unsigned.(*txs.AddPermissionlessValidatorTx) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryRestartTx)) + require.NoError(vm.issueTxFromRPC(primaryRestartTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1733,7 +1733,7 @@ func TestPrimaryNetworkValidatorPopulatedToEmptyBLSKeyDiff(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryTx1)) + require.NoError(vm.issueTxFromRPC(primaryTx1)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1795,7 +1795,7 @@ func TestPrimaryNetworkValidatorPopulatedToEmptyBLSKeyDiff(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryRestartTx)) + require.NoError(vm.issueTxFromRPC(primaryRestartTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1866,7 +1866,7 @@ func TestSubnetValidatorPopulatedToEmptyBLSKeyDiff(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryTx1)) + require.NoError(vm.issueTxFromRPC(primaryTx1)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1896,7 +1896,7 @@ func TestSubnetValidatorPopulatedToEmptyBLSKeyDiff(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), subnetTx)) + require.NoError(vm.issueTxFromRPC(subnetTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -1970,7 +1970,7 @@ func TestSubnetValidatorPopulatedToEmptyBLSKeyDiff(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryRestartTx)) + require.NoError(vm.issueTxFromRPC(primaryRestartTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -2048,7 +2048,7 @@ func TestSubnetValidatorSetAfterPrimaryNetworkValidatorRemoval(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), primaryTx1)) + require.NoError(vm.issueTxFromRPC(primaryTx1)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) @@ -2075,7 +2075,7 @@ func TestSubnetValidatorSetAfterPrimaryNetworkValidatorRemoval(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), subnetTx)) + require.NoError(vm.issueTxFromRPC(subnetTx)) vm.ctx.Lock.Lock() require.NoError(buildAndAcceptStandardBlock(vm)) diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 1b16e72bf3c2..58bb2f929102 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -310,7 +310,7 @@ func defaultVM(t *testing.T, f fork) (*VM, database.Database, *mutableSharedMemo ) require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), testSubnet1)) + require.NoError(vm.issueTxFromRPC(testSubnet1)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) require.NoError(err) @@ -415,7 +415,7 @@ func TestAddValidatorCommit(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -514,7 +514,7 @@ func TestAddValidatorReject(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -563,7 +563,7 @@ func TestAddValidatorInvalidNotReissued(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - err = vm.issueTx(context.Background(), tx) + err = vm.issueTxFromRPC(tx) vm.ctx.Lock.Lock() require.ErrorIs(err, txexecutor.ErrDuplicateValidator) } @@ -598,7 +598,7 @@ func TestAddSubnetValidatorAccept(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -646,7 +646,7 @@ func TestAddSubnetValidatorReject(t *testing.T) { // trigger block creation vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -832,7 +832,7 @@ func TestCreateChain(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -883,7 +883,7 @@ func TestCreateSubnet(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), createSubnetTx)) + require.NoError(vm.issueTxFromRPC(createSubnetTx)) vm.ctx.Lock.Lock() // should contain the CreateSubnetTx @@ -927,7 +927,7 @@ func TestCreateSubnet(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() blk, err = vm.Builder.BuildBlock(context.Background()) // should add validator to the new subnet @@ -1031,7 +1031,7 @@ func TestAtomicImport(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), tx)) + require.NoError(vm.issueTxFromRPC(tx)) vm.ctx.Lock.Lock() blk, err := vm.Builder.BuildBlock(context.Background()) @@ -2030,7 +2030,7 @@ func TestRemovePermissionedValidatorDuringAddPending(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // trigger block creation for the validator tx @@ -2050,7 +2050,7 @@ func TestRemovePermissionedValidatorDuringAddPending(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), createSubnetTx)) + require.NoError(vm.issueTxFromRPC(createSubnetTx)) vm.ctx.Lock.Lock() // trigger block creation for the subnet tx @@ -2121,7 +2121,7 @@ func TestTransferSubnetOwnershipTx(t *testing.T) { subnetID := createSubnetTx.ID() vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), createSubnetTx)) + require.NoError(vm.issueTxFromRPC(createSubnetTx)) vm.ctx.Lock.Lock() createSubnetBlock, err := vm.Builder.BuildBlock(context.Background()) require.NoError(err) @@ -2156,7 +2156,7 @@ func TestTransferSubnetOwnershipTx(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), transferSubnetOwnershipTx)) + require.NoError(vm.issueTxFromRPC(transferSubnetOwnershipTx)) vm.ctx.Lock.Lock() transferSubnetOwnershipBlock, err := vm.Builder.BuildBlock(context.Background()) require.NoError(err) @@ -2242,7 +2242,7 @@ func TestBaseTx(t *testing.T) { require.Equal(sendAmt, key1OutputAmt) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), baseTx)) + require.NoError(vm.issueTxFromRPC(baseTx)) vm.ctx.Lock.Lock() baseTxBlock, err := vm.Builder.BuildBlock(context.Background()) require.NoError(err) @@ -2281,7 +2281,7 @@ func TestPruneMempool(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), baseTx)) + require.NoError(vm.issueTxFromRPC(baseTx)) vm.ctx.Lock.Lock() // [baseTx] should be in the mempool. @@ -2313,7 +2313,7 @@ func TestPruneMempool(t *testing.T) { require.NoError(err) vm.ctx.Lock.Unlock() - require.NoError(vm.issueTx(context.Background(), addValidatorTx)) + require.NoError(vm.issueTxFromRPC(addValidatorTx)) vm.ctx.Lock.Lock() // Advance clock to [endTime], making [addValidatorTx] invalid.