diff --git a/config/peer_config.go b/config/peer_config.go new file mode 100644 index 00000000..e9667784 --- /dev/null +++ b/config/peer_config.go @@ -0,0 +1,44 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package config + +import ( + "net/netip" + + "github.com/ava-labs/avalanchego/ids" +) + +type PeerConfig struct { + ID string `mapstructure:"id" json:"id"` + IP string `mapstructure:"ip" json:"ip"` + + id ids.NodeID + ip netip.AddrPort +} + +func (c *PeerConfig) Validate() error { + var ( + id ids.NodeID + ip netip.AddrPort + err error + ) + if id, err = ids.NodeIDFromString(c.ID); err != nil { + return err + } + if ip, err = netip.ParseAddrPort(c.IP); err != nil { + return err + } + c.id = id + c.ip = ip + + return nil +} + +func (c *PeerConfig) GetID() ids.NodeID { + return c.id +} + +func (c *PeerConfig) GetIP() netip.AddrPort { + return c.ip +} diff --git a/database/json_file_storage.go b/database/json_file_storage.go index 95353546..f5785543 100644 --- a/database/json_file_storage.go +++ b/database/json_file_storage.go @@ -125,6 +125,12 @@ func (s *JSONFileStorage) getCurrentState(relayerID common.Hash) (chainState, bo // Put the value into the JSON database. Read the current chain state and overwrite the key, if it exists // If the file corresponding to {relayerID} does not exist, then it will be created func (s *JSONFileStorage) Put(relayerID common.Hash, dataKey DataKey, value []byte) error { + s.logger.Debug( + "db put", + zap.Stringer("relayerID", relayerID), + zap.Stringer("key", dataKey), + zap.String("value", string(value)), + ) mutex, ok := s.mutexes[relayerID] if !ok { return errors.Wrap( diff --git a/go.mod b/go.mod index 8668cc7c..5f605b13 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module github.com/ava-labs/awm-relayer go 1.22.8 require ( - github.com/ava-labs/avalanchego v1.12.0-fuji + github.com/alexliesenfeld/health v0.8.0 + github.com/ava-labs/avalanchego v1.12.0-initial-poc.9.0.20241122192639-7c3ad181c928 github.com/ava-labs/coreth v0.13.9-rc.1 github.com/ava-labs/subnet-evm v0.6.12 - github.com/ava-labs/teleporter v1.0.8-0.20241121223552-226937a967e8 + github.com/ava-labs/teleporter v1.0.8-0.20241122194201-a6e92843c3b1 github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/aws/aws-sdk-go-v2/config v1.28.5 github.com/aws/aws-sdk-go-v2/service/kms v1.37.6 @@ -17,12 +18,14 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 - github.com/redis/go-redis/v9 v9.7.0 + github.com/redis/go-redis/v9 v9.6.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.9.0 + go.uber.org/atomic v1.11.0 go.uber.org/mock v0.5.0 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.9.0 google.golang.org/grpc v1.68.0 google.golang.org/protobuf v1.35.2 ) @@ -93,6 +96,7 @@ require ( github.com/gorilla/rpc v1.2.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect github.com/hashicorp/go-bexpr v0.1.10 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -184,10 +188,3 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) - -require ( - github.com/alexliesenfeld/health v0.8.0 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect - go.uber.org/atomic v1.11.0 - golang.org/x/sync v0.9.0 -) diff --git a/go.sum b/go.sum index 691abf40..51a80084 100644 --- a/go.sum +++ b/go.sum @@ -60,14 +60,14 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/ava-labs/avalanchego v1.12.0-fuji h1:o/GbXrqW9CAXu2jX/a1dZtvFiiSVCWomJZyxF4hCQOA= -github.com/ava-labs/avalanchego v1.12.0-fuji/go.mod h1:yhD5dpZyStIVbxQ550EDi5w5SL7DQ/xGE6TIxosb7U0= +github.com/ava-labs/avalanchego v1.12.0-initial-poc.9.0.20241122192639-7c3ad181c928 h1:th+K+wWgAYL/NsrFJyO+/sThLRdEDE0+I4vgbPLoWQQ= +github.com/ava-labs/avalanchego v1.12.0-initial-poc.9.0.20241122192639-7c3ad181c928/go.mod h1:yhD5dpZyStIVbxQ550EDi5w5SL7DQ/xGE6TIxosb7U0= github.com/ava-labs/coreth v0.13.9-rc.1 h1:qIICpC/OZGYUP37QnLgIqqwGmxnLwLpZaUlqJNI85vU= github.com/ava-labs/coreth v0.13.9-rc.1/go.mod h1:7aMsRIo/3GBE44qWZMjnfqdqfcfZ5yShTTm2LObLaYo= github.com/ava-labs/subnet-evm v0.6.12 h1:jL3FmjdFcNfS0qwbehwN6DkAg9y7zexB1riiGBxRsM0= github.com/ava-labs/subnet-evm v0.6.12/go.mod h1:vffwL4UqAh7ibpWjveUuUhamm3a9w75q92bG5vXdX5k= -github.com/ava-labs/teleporter v1.0.8-0.20241121223552-226937a967e8 h1:jsH1wv1GgeztvipQG3di1OTruSHbFAwwP4K6clzTRLE= -github.com/ava-labs/teleporter v1.0.8-0.20241121223552-226937a967e8/go.mod h1:Q4/DDZPLI5f96xDykVXPT85PeJS3IqDPDJDk3UdQOuQ= +github.com/ava-labs/teleporter v1.0.8-0.20241122194201-a6e92843c3b1 h1:y1zjdfGlfTZQoPyUyPjsu9FjDK8w19OWUTpgVzQSh0w= +github.com/ava-labs/teleporter v1.0.8-0.20241122194201-a6e92843c3b1/go.mod h1:45NrpvVlms+xHL/rFZT7VrRJqajT7UUW78lzBe3hAzU= github.com/aws/aws-sdk-go-v2 v1.32.5 h1:U8vdWJuY7ruAkzaOdD7guwJjD06YSKmnKCJs7s3IkIo= github.com/aws/aws-sdk-go-v2 v1.32.5/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2/config v1.28.5 h1:Za41twdCXbuyyWv9LndXxZZv3QhTG1DinqlFsSuvtI0= @@ -556,8 +556,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= -github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/peers/app_request_network.go b/peers/app_request_network.go index e790b41b..f962ece1 100644 --- a/peers/app_request_network.go +++ b/peers/app_request_network.go @@ -8,6 +8,7 @@ package peers import ( "context" "encoding/hex" + "fmt" "os" "sync" "time" @@ -18,9 +19,13 @@ import ( "github.com/ava-labs/avalanchego/network" avagoCommon "github.com/ava-labs/avalanchego/snow/engine/common" snowVdrs "github.com/ava-labs/avalanchego/snow/validators" + vdrs "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/subnets" + "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/sampler" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm" "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/peers/validators" "github.com/prometheus/client_golang/prometheus" @@ -30,10 +35,11 @@ import ( const ( InboundMessageChannelSize = 1000 DefaultAppRequestTimeout = time.Second * 2 + ValidatorRefreshPeriod = time.Second * 5 + NumBootstrapNodes = 5 ) type AppRequestNetwork interface { - ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[ids.NodeID] ConnectToCanonicalValidators(subnetID ids.ID) ( *ConnectedCanonicalValidators, error, @@ -50,6 +56,8 @@ type AppRequestNetwork interface { subnetID ids.ID, allower subnets.Allower, ) set.Set[ids.NodeID] + Shutdown() + TrackSubnet(subnetID ids.ID) } type appRequestNetwork struct { @@ -57,14 +65,12 @@ type appRequestNetwork struct { handler *RelayerExternalHandler infoAPI *InfoAPI logger logging.Logger - lock *sync.Mutex + lock *sync.RWMutex validatorClient *validators.CanonicalValidatorClient metrics *AppRequestNetworkMetrics - // Nodes that we should connect to that are not publicly discoverable. - // Should only be used for local or custom blockchains where validators are not - // publicly discoverable by primary network nodes. - manuallyTrackedPeers []info.Peer + trackedSubnets set.Set[ids.ID] + manager vdrs.Manager } // NewNetwork creates a P2P network client for interacting with validators @@ -86,8 +92,8 @@ func NewNetwork( metrics, err := newAppRequestNetworkMetrics(registerer) if err != nil { - logger.Fatal("Failed to create app request network metrics", zap.Error(err)) - panic(err) + logger.Error("Failed to create app request network metrics", zap.Error(err)) + return nil, err } // Create the handler for handling inbound app responses @@ -117,7 +123,10 @@ func NewNetwork( return nil, err } - testNetwork, err := network.NewTestNetwork(logger, networkID, snowVdrs.NewManager(), trackedSubnets, handler) + validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.GetPChainAPI()) + manager := snowVdrs.NewManager() + + testNetwork, err := network.NewTestNetwork(logger, networkID, manager, trackedSubnets, handler) if err != nil { logger.Error( "Failed to create test network", @@ -126,95 +135,171 @@ func NewNetwork( return nil, err } - validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.GetPChainAPI()) + for _, peer := range manuallyTrackedPeers { + logger.Info( + "Manually Tracking peer (startup)", + zap.Stringer("ID", peer.ID), + zap.Stringer("IP", peer.PublicIP), + ) + testNetwork.ManuallyTrack(peer.ID, peer.PublicIP) + } - arNetwork := &appRequestNetwork{ - network: testNetwork, - handler: handler, - infoAPI: infoAPI, - logger: logger, - lock: new(sync.Mutex), - validatorClient: validatorClient, - metrics: metrics, - manuallyTrackedPeers: manuallyTrackedPeers, + // Connect to a sample of the primary network validators, with connection + // info pulled from the info API + peers, err := infoAPI.Peers(context.Background(), nil) + if err != nil { + logger.Error( + "Failed to get peers", + zap.Error(err), + ) + return nil, err + } + peersMap := make(map[ids.NodeID]info.Peer) + for _, peer := range peers { + peersMap[peer.ID] = peer } + + pClient := platformvm.NewClient(cfg.GetPChainAPI().BaseURL) + vdrs, err := pClient.GetCurrentValidators(context.Background(), constants.PrimaryNetworkID, nil) + if err != nil { + logger.Error("Failed to get current validators", zap.Error(err)) + return nil, err + } + + // Sample until we've connected to the target number of bootstrap nodes + s := sampler.NewUniform() + s.Initialize(uint64(len(vdrs))) + numConnected := 0 + for numConnected < NumBootstrapNodes { + i, ok := s.Next() + if !ok { + // If we've sampled all the nodes and still haven't connected to the target number of bootstrap nodes, + // then warn and stop sampling by either returning an error or breaking + logger.Warn( + "Failed to connect to enough bootstrap nodes", + zap.Int("targetBootstrapNodes", NumBootstrapNodes), + zap.Int("numAvailablePeers", len(peers)), + zap.Int("connectedBootstrapNodes", numConnected), + ) + if numConnected == 0 { + return nil, fmt.Errorf("failed to connect to any bootstrap nodes") + } + break + } + if peer, ok := peersMap[vdrs[i].NodeID]; ok { + logger.Info( + "Manually tracking bootstrap node", + zap.Stringer("ID", peer.ID), + zap.Stringer("IP", peer.PublicIP), + ) + testNetwork.ManuallyTrack(peer.ID, peer.PublicIP) + numConnected++ + } + } + go logger.RecoverAndPanic(func() { testNetwork.Dispatch() }) + arNetwork := &appRequestNetwork{ + network: testNetwork, + handler: handler, + infoAPI: infoAPI, + logger: logger, + lock: new(sync.RWMutex), + validatorClient: validatorClient, + metrics: metrics, + trackedSubnets: trackedSubnets, + manager: manager, + } + + arNetwork.startUpdateValidators() + return arNetwork, nil } -// ConnectPeers connects the network to peers with the given nodeIDs. -// Returns the set of nodeIDs that were successfully connected to. -func (n *appRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[ids.NodeID] { - n.lock.Lock() - defer n.lock.Unlock() +// Helper to scope read lock acquisition +func (n *appRequestNetwork) containsSubnet(subnetID ids.ID) bool { + n.lock.RLock() + defer n.lock.RUnlock() + return n.trackedSubnets.Contains(subnetID) +} - // First, check if we are already connected to all the peers - connectedPeers := n.network.PeerInfo(nodeIDs.List()) - if len(connectedPeers) == nodeIDs.Len() { - return nodeIDs +func (n *appRequestNetwork) TrackSubnet(subnetID ids.ID) { + if n.containsSubnet(subnetID) { + return } - // If we are not connected to all the peers already, then we have to iterate - // through the full list of peers obtained from the info API. Rather than iterating - // through connectedPeers for already tracked peers, just iterate through the full list, - // re-adding connections to already tracked peers. + n.logger.Debug("Tracking subnet", zap.Stringer("subnetID", subnetID)) + n.trackedSubnets.Add(subnetID) + n.updateValidatorSet(context.Background(), subnetID) +} - startInfoAPICall := time.Now() - // Get the list of publicly discoverable peers - peers, err := n.infoAPI.Peers(context.Background(), nil) - n.setInfoAPICallLatencyMS(float64(time.Since(startInfoAPICall).Milliseconds())) +func (n *appRequestNetwork) startUpdateValidators() { + go func() { + // Fetch validators immediately when called, and refresh every ValidatorRefreshPeriod + ticker := time.NewTicker(ValidatorRefreshPeriod) + for ; true; <-ticker.C { + n.updateValidatorSet(context.Background(), constants.PrimaryNetworkID) + for _, subnet := range n.trackedSubnets.List() { + n.updateValidatorSet(context.Background(), subnet) + } + } + }() +} + +func (n *appRequestNetwork) updateValidatorSet( + ctx context.Context, + subnetID ids.ID, +) error { + n.lock.Lock() + defer n.lock.Unlock() + + n.logger.Debug("Fetching validators for subnet ID", zap.Stringer("subnetID", subnetID)) + + // Fetch the subnet validators from the P-Chain + validators, err := n.validatorClient.GetProposedValidators(ctx, subnetID) if err != nil { - n.logger.Error( - "Failed to get peers", - zap.Error(err), - ) - return nil + return err } - // Add manually tracked peers - peers = append(peers, n.manuallyTrackedPeers...) + validatorsMap := make(map[ids.NodeID]*vdrs.GetValidatorOutput) + for _, vdr := range validators { + validatorsMap[vdr.NodeID] = vdr + } - // Attempt to connect to each peer - var trackedNodes set.Set[ids.NodeID] - for _, peer := range peers { - if nodeIDs.Contains(peer.ID) { - trackedNodes.Add(peer.ID) - n.network.ManuallyTrack(peer.ID, peer.PublicIP) - if len(trackedNodes) == nodeIDs.Len() { - return trackedNodes + // Remove any elements from the manager that are not in the new validator set + currentVdrs := n.manager.GetValidatorIDs(subnetID) + for _, nodeID := range currentVdrs { + if _, ok := validatorsMap[nodeID]; !ok { + n.logger.Debug("Removing validator", zap.Stringer("nodeID", nodeID), zap.Stringer("subnetID", subnetID)) + weight := n.manager.GetWeight(subnetID, nodeID) + if err := n.manager.RemoveWeight(subnetID, nodeID, weight); err != nil { + return err } } } - // If the Info API node is in nodeIDs, it will not be reflected in the call to info.Peers. - // In this case, we need to manually track the API node. - startInfoAPICall = time.Now() - apiNodeID, _, err := n.infoAPI.GetNodeID(context.Background()) - n.setInfoAPICallLatencyMS(float64(time.Since(startInfoAPICall).Milliseconds())) - if err != nil { - n.logger.Error( - "Failed to get API Node ID", - zap.Error(err), - ) - } else if nodeIDs.Contains(apiNodeID) { - startInfoAPICall = time.Now() - apiNodeIPPort, err := n.infoAPI.GetNodeIP(context.Background()) - n.setInfoAPICallLatencyMS(float64(time.Since(startInfoAPICall).Milliseconds())) - if err != nil { - n.logger.Error( - "Failed to get API Node IP", - zap.Error(err), - ) - } else { - trackedNodes.Add(apiNodeID) - n.network.ManuallyTrack(apiNodeID, apiNodeIPPort) + // Add any elements from the new validator set that are not in the manager + for _, vdr := range validators { + if _, ok := n.manager.GetValidator(subnetID, vdr.NodeID); !ok { + n.logger.Debug("Adding validator", zap.Stringer("nodeID", vdr.NodeID), zap.Stringer("subnetID", subnetID)) + if err := n.manager.AddStaker( + subnetID, + vdr.NodeID, + vdr.PublicKey, + ids.Empty, + vdr.Weight, + ); err != nil { + return err + } } } + return nil +} - return trackedNodes +func (n *appRequestNetwork) Shutdown() { + n.network.StartClose() } // Helper struct to hold connected validator information @@ -235,6 +320,9 @@ func (c *ConnectedCanonicalValidators) GetValidator(nodeID ids.NodeID) (*warp.Va // ConnectToCanonicalValidators connects to the canonical validators of the given subnet and returns the connected // validator information func (n *appRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*ConnectedCanonicalValidators, error) { + // Track the subnet + n.TrackSubnet(subnetID) + // Get the subnet's current canonical validator set startPChainAPICall := time.Now() validatorSet, totalValidatorWeight, err := n.validatorClient.GetCurrentCanonicalValidatorSet(subnetID) @@ -255,13 +343,8 @@ func (n *appRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*Conn } } - // Manually connect to all peers in the validator set - // If new peers are connected, AppRequests may fail while the handshake is in progress. - // In that case, AppRequests to those nodes will be retried in the next iteration of the retry loop. - connectedNodes := n.ConnectPeers(nodeIDs) - // Calculate the total weight of connected validators. - connectedWeight := calculateConnectedWeight(validatorSet, nodeValidatorIndexMap, connectedNodes) + connectedWeight := calculateConnectedWeight(validatorSet, nodeValidatorIndexMap, nodeIDs) return &ConnectedCanonicalValidators{ ConnectedWeight: connectedWeight, @@ -294,10 +377,6 @@ func (n *appRequestNetwork) GetSubnetID(blockchainID ids.ID) (ids.ID, error) { // Metrics // -func (n *appRequestNetwork) setInfoAPICallLatencyMS(latency float64) { - n.metrics.infoAPICallLatencyMS.Observe(latency) -} - func (n *appRequestNetwork) setPChainAPICallLatencyMS(latency float64) { n.metrics.pChainAPICallLatencyMS.Observe(latency) } diff --git a/peers/app_request_network_metrics.go b/peers/app_request_network_metrics.go index b585533e..3319bda9 100644 --- a/peers/app_request_network_metrics.go +++ b/peers/app_request_network_metrics.go @@ -11,25 +11,12 @@ var ( ) type AppRequestNetworkMetrics struct { - infoAPICallLatencyMS prometheus.Histogram pChainAPICallLatencyMS prometheus.Histogram connects prometheus.Counter disconnects prometheus.Counter } func newAppRequestNetworkMetrics(registerer prometheus.Registerer) (*AppRequestNetworkMetrics, error) { - infoAPICallLatencyMS := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "info_api_call_latency_ms", - Help: "Latency of calling info api in milliseconds", - Buckets: prometheus.ExponentialBucketsRange(100, 10000, 10), - }, - ) - if infoAPICallLatencyMS == nil { - return nil, ErrFailedToCreateAppRequestNetworkMetrics - } - registerer.MustRegister(infoAPICallLatencyMS) - pChainAPICallLatencyMS := prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "p_chain_api_call_latency_ms", @@ -65,7 +52,6 @@ func newAppRequestNetworkMetrics(registerer prometheus.Registerer) (*AppRequestN registerer.MustRegister(disconnects) return &AppRequestNetworkMetrics{ - infoAPICallLatencyMS: infoAPICallLatencyMS, pChainAPICallLatencyMS: pChainAPICallLatencyMS, connects: connects, disconnects: disconnects, diff --git a/peers/mocks/mock_app_request_network.go b/peers/mocks/mock_app_request_network.go index db7cedd6..d9bcd991 100644 --- a/peers/mocks/mock_app_request_network.go +++ b/peers/mocks/mock_app_request_network.go @@ -44,20 +44,6 @@ func (m *MockAppRequestNetwork) EXPECT() *MockAppRequestNetworkMockRecorder { return m.recorder } -// ConnectPeers mocks base method. -func (m *MockAppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[ids.NodeID] { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConnectPeers", nodeIDs) - ret0, _ := ret[0].(set.Set[ids.NodeID]) - return ret0 -} - -// ConnectPeers indicates an expected call of ConnectPeers. -func (mr *MockAppRequestNetworkMockRecorder) ConnectPeers(nodeIDs any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectPeers", reflect.TypeOf((*MockAppRequestNetwork)(nil).ConnectPeers), nodeIDs) -} - // ConnectToCanonicalValidators mocks base method. func (m *MockAppRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*peers.ConnectedCanonicalValidators, error) { m.ctrl.T.Helper() @@ -127,3 +113,27 @@ func (mr *MockAppRequestNetworkMockRecorder) Send(msg, nodeIDs, subnetID, allowe mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockAppRequestNetwork)(nil).Send), msg, nodeIDs, subnetID, allower) } + +// Shutdown mocks base method. +func (m *MockAppRequestNetwork) Shutdown() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Shutdown") +} + +// Shutdown indicates an expected call of Shutdown. +func (mr *MockAppRequestNetworkMockRecorder) Shutdown() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockAppRequestNetwork)(nil).Shutdown)) +} + +// TrackSubnet mocks base method. +func (m *MockAppRequestNetwork) TrackSubnet(subnetID ids.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TrackSubnet", subnetID) +} + +// TrackSubnet indicates an expected call of TrackSubnet. +func (mr *MockAppRequestNetworkMockRecorder) TrackSubnet(subnetID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackSubnet", reflect.TypeOf((*MockAppRequestNetwork)(nil).TrackSubnet), subnetID) +} diff --git a/peers/validators/canonical_validator_client.go b/peers/validators/canonical_validator_client.go index 0373e91c..ea60a805 100644 --- a/peers/validators/canonical_validator_client.go +++ b/peers/validators/canonical_validator_client.go @@ -80,6 +80,21 @@ func (v *CanonicalValidatorClient) GetCurrentValidatorSet( return nil, 0, nil } +func (v *CanonicalValidatorClient) GetProposedValidators( + ctx context.Context, + subnetID ids.ID, +) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + res, err := v.client.GetValidatorsAt(ctx, subnetID, pchainapi.ProposedHeight, v.options...) + if err != nil { + v.logger.Debug( + "Error fetching proposed validators", + zap.String("subnetID", subnetID.String()), + zap.Error(err)) + return nil, err + } + return res, nil +} + // Gets the validator set of the given subnet at the given P-chain block height. // Uses [platform.getValidatorsAt] with supplied height func (v *CanonicalValidatorClient) GetValidatorSet( @@ -90,7 +105,7 @@ func (v *CanonicalValidatorClient) GetValidatorSet( res, err := v.client.GetValidatorsAt(ctx, subnetID, pchainapi.Height(height), v.options...) if err != nil { v.logger.Debug( - "P-chain RPC to getValidatorAt returned error. Falling back to getCurrentValidators", + "Error fetching validators at height", zap.String("subnetID", subnetID.String()), zap.Uint64("pChainHeight", height), zap.Error(err)) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index a0aff4a7..847215f1 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -147,6 +147,13 @@ func (r *ApplicationRelayer) ProcessHeight( handlers []messages.MessageHandler, errChan chan error, ) { + r.logger.Debug( + "Processing block", + zap.Uint64("height", height), + zap.Stringer("relayerID", r.relayerID.ID), + zap.Stringer("blockchainID", r.relayerID.SourceBlockchainID), + zap.Int("numMessages", len(handlers)), + ) var eg errgroup.Group for _, handler := range handlers { eg.Go(func() error { diff --git a/relayer/checkpoint/checkpoint.go b/relayer/checkpoint/checkpoint.go index 6df59338..21060f5c 100644 --- a/relayer/checkpoint/checkpoint.go +++ b/relayer/checkpoint/checkpoint.go @@ -106,6 +106,7 @@ func (cm *CheckpointManager) listenForWriteSignal() { // Heights are committed in sequence, so if height is not exactly one // greater than the current committedHeight, it is instead cached in memory // to potentially be committed later. +// TODO: We should only stage heights once all app relayers for a given source chain have staged func (cm *CheckpointManager) StageCommittedHeight(height uint64) { cm.lock.Lock() defer cm.lock.Unlock() diff --git a/relayer/config/config.go b/relayer/config/config.go index 747ceec3..6501785a 100644 --- a/relayer/config/config.go +++ b/relayer/config/config.go @@ -64,6 +64,7 @@ type Config struct { ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"` DeciderURL string `mapstructure:"decider-url" json:"decider-url"` SignatureCacheSize uint64 `mapstructure:"signature-cache-size" json:"signature-cache-size"` + ManuallyTrackedPeers []*basecfg.PeerConfig `mapstructure:"manually-tracked-peers" json:"manually-tracked-peers"` // mapstructure doesn't handle time.Time out of the box so handle it manually EtnaTime time.Time `json:"etna-time"` @@ -107,6 +108,11 @@ func (c *Config) Validate() error { if c.DBWriteIntervalSeconds == 0 || c.DBWriteIntervalSeconds > 600 { return errors.New("db-write-interval-seconds must be between 1 and 600") } + for _, p := range c.ManuallyTrackedPeers { + if err := p.Validate(); err != nil { + return err + } + } blockchainIDToSubnetID := make(map[ids.ID]ids.ID) diff --git a/relayer/main/main.go b/relayer/main/main.go index 177bb82e..9983daaf 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -12,12 +12,13 @@ import ( "runtime" "strings" + "github.com/ava-labs/avalanchego/api/info" "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" + "github.com/ava-labs/avalanchego/network/peer" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages" offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" @@ -137,29 +138,42 @@ func main() { if logLevel <= logging.Debug { networkLogLevel = logLevel } - var trackedSubnets set.Set[ids.ID] - // trackedSubnets is no longer strictly required but keeping it here for now - // to keep full parity with existing AWM relayer for now - // TODO: remove this from here once trackedSubnets are no longer referenced - // by ping messages in avalanchego - for _, sourceBlockchain := range cfg.SourceBlockchains { - if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID { - continue - } - trackedSubnets.Add(sourceBlockchain.GetSubnetID()) + + // Initialize message creator passed down to relayers for creating app requests. + // We do not collect metrics for the message creator. + messageCreator, err := message.NewCreator( + logger, + prometheus.DefaultRegisterer, + constants.DefaultNetworkCompressionType, + constants.DefaultNetworkMaximumInboundTimeout, + ) + if err != nil { + logger.Fatal("Failed to create message creator", zap.Error(err)) + panic(err) + } + + var manuallyTrackedPeers []info.Peer + for _, p := range cfg.ManuallyTrackedPeers { + manuallyTrackedPeers = append(manuallyTrackedPeers, info.Peer{ + Info: peer.Info{ + PublicIP: p.GetIP(), + ID: p.GetID(), + }, + }) } network, err := peers.NewNetwork( networkLogLevel, registerer, - trackedSubnets, nil, + manuallyTrackedPeers, &cfg, ) if err != nil { logger.Fatal("Failed to create app request network", zap.Error(err)) panic(err) } + defer network.Shutdown() err = relayer.InitializeConnectionsAndCheckStake(logger, network, &cfg) if err != nil { @@ -175,19 +189,6 @@ func main() { panic(err) } - // Initialize message creator passed down to relayers for creating app requests. - // We do not collect metrics for the message creator. - messageCreator, err := message.NewCreator( - logger, - prometheus.DefaultRegisterer, - constants.DefaultNetworkCompressionType, - constants.DefaultNetworkMaximumInboundTimeout, - ) - if err != nil { - logger.Fatal("Failed to create message creator", zap.Error(err)) - panic(err) - } - // Initialize the database db, err := database.NewDatabase(logger, &cfg) if err != nil { @@ -223,11 +224,11 @@ func main() { signatureAggregator, err := aggregator.NewSignatureAggregator( network, logger, + messageCreator, cfg.SignatureCacheSize, sigAggMetrics.NewSignatureAggregatorMetrics( prometheus.DefaultRegisterer, ), - messageCreator, cfg.EtnaTime, ) if err != nil { diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index acf7317d..04dde0ec 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -66,7 +66,7 @@ func (mc *MessageCoordinator) getAppRelayerMessageHandler( // than just the ones supported by a single listener instance. mc.logger.Debug( "Warp message from unsupported message protocol address. Not relaying.", - zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()), + zap.Stringer("protocolAddress", warpMessageInfo.SourceAddress), ) return nil, nil, nil } @@ -86,11 +86,11 @@ func (mc *MessageCoordinator) getAppRelayerMessageHandler( mc.logger.Info( "Unpacked warp message", - zap.String("sourceBlockchainID", sourceBlockchainID.String()), - zap.String("originSenderAddress", originSenderAddress.String()), - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("destinationAddress", destinationAddress.String()), - zap.String("warpMessageID", warpMessageInfo.UnsignedMessage.ID().String()), + zap.Stringer("sourceBlockchainID", sourceBlockchainID), + zap.Stringer("originSenderAddress", originSenderAddress), + zap.Stringer("destinationBlockchainID", destinationBlockchainID), + zap.Stringer("destinationAddress", destinationAddress), + zap.Stringer("warpMessageID", warpMessageInfo.UnsignedMessage.ID()), ) appRelayer := mc.getApplicationRelayer( @@ -168,10 +168,10 @@ func (mc *MessageCoordinator) getApplicationRelayer( } mc.logger.Debug( "Application relayer not found. Skipping message relay.", - zap.String("blockchainID", sourceBlockchainID.String()), - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("originSenderAddress", originSenderAddress.String()), - zap.String("destinationAddress", destinationAddress.String()), + zap.Stringer("blockchainID", sourceBlockchainID), + zap.Stringer("destinationBlockchainID", destinationBlockchainID), + zap.Stringer("originSenderAddress", originSenderAddress), + zap.Stringer("destinationAddress", destinationAddress), ) return nil } @@ -182,7 +182,7 @@ func (mc *MessageCoordinator) ProcessWarpMessage(warpMessage *relayerTypes.WarpM mc.logger.Error( "Failed to parse Warp message.", zap.Error(err), - zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()), + zap.Stringer("warpMessageID", warpMessage.UnsignedMessage.ID()), ) return common.Hash{}, err } @@ -203,7 +203,7 @@ func (mc *MessageCoordinator) ProcessMessageID( if !ok { mc.logger.Error( "Source client not found", - zap.String("blockchainID", blockchainID.String()), + zap.Stringer("blockchainID", blockchainID), ) return common.Hash{}, fmt.Errorf("source client not set for blockchain: %s", blockchainID.String()) } @@ -212,7 +212,7 @@ func (mc *MessageCoordinator) ProcessMessageID( if err != nil { mc.logger.Error( "Failed to fetch warp from blockchain", - zap.String("blockchainID", blockchainID.String()), + zap.Stringer("blockchainID", blockchainID), zap.Error(err), ) return common.Hash{}, fmt.Errorf("could not fetch warp message from ID: %w", err) @@ -228,6 +228,11 @@ func (mc *MessageCoordinator) ProcessBlock( ethClient ethclient.Client, errChan chan error, ) { + mc.logger.Debug( + "Processing block", + zap.Uint64("blockNumber", blockHeader.Number.Uint64()), + zap.Stringer("blockchainID", blockchainID), + ) // Parse the logs in the block, and group by application relayer block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient) if err != nil { @@ -243,8 +248,8 @@ func (mc *MessageCoordinator) ProcessBlock( if err != nil { mc.logger.Error( "Failed to parse message", - zap.String("blockchainID", warpLogInfo.UnsignedMessage.SourceChainID.String()), - zap.String("protocolAddress", warpLogInfo.SourceAddress.String()), + zap.Stringer("blockchainID", warpLogInfo.UnsignedMessage.SourceChainID), + zap.Stringer("protocolAddress", warpLogInfo.SourceAddress), zap.Error(err), ) continue @@ -263,7 +268,11 @@ func (mc *MessageCoordinator) ProcessBlock( // Dispatch all messages in the block to the appropriate application relayer. // An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed. handlers := messageHandlers[appRelayer.relayerID.ID] - + mc.logger.Debug( + "Dispatching to app relayer", + zap.Stringer("relayerID", appRelayer.relayerID.ID), + zap.Int("numMessages", len(handlers)), + ) go appRelayer.ProcessHeight(block.BlockNumber, handlers, errChan) } } diff --git a/scripts/versions.sh b/scripts/versions.sh index ceef4a14..eee4d782 100755 --- a/scripts/versions.sh +++ b/scripts/versions.sh @@ -18,6 +18,8 @@ export GO_VERSION=${GO_VERSION:-$(getDepVersion go)} # Don't export them as they're used in the context of other calls AVALANCHEGO_VERSION=${AVALANCHEGO_VERSION:-$(getDepVersion github.com/ava-labs/avalanchego)} +# Temporarily hardcode the Avalanchego version until outbound networking relaxation is available +AVALANCHEGO_VERSION=v1.12.0-fuji GINKGO_VERSION=${GINKGO_VERSION:-$(getDepVersion github.com/onsi/ginkgo/v2)} SUBNET_EVM_VERSION=${SUBNET_EVM_VERSION:-$(getDepVersion github.com/ava-labs/subnet-evm)} diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index cd38ca87..17820f13 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -39,10 +39,10 @@ type blsSignatureBuf [bls.SignatureLen]byte const ( // Number of retries to collect signatures from validators - maxRelayerQueryAttempts = 5 + maxRelayerQueryAttempts = 10 // Maximum amount of time to spend waiting (in addition to network round trip time per attempt) // during relayer signature query routine - signatureRequestRetryWaitPeriodMs = 10_000 + signatureRequestRetryWaitPeriodMs = 20_000 ) var ( @@ -70,9 +70,9 @@ type SignatureAggregator struct { func NewSignatureAggregator( network peers.AppRequestNetwork, logger logging.Logger, + messageCreator message.Creator, signatureCacheSize uint64, metrics *metrics.SignatureAggregatorMetrics, - messageCreator message.Creator, etnaTime time.Time, ) (*SignatureAggregator, error) { cache, err := cache.NewCache(signatureCacheSize, logger) @@ -87,28 +87,33 @@ func NewSignatureAggregator( subnetIDsByBlockchainID: map[ids.ID]ids.ID{}, logger: logger, metrics: metrics, - messageCreator: messageCreator, currentRequestID: atomic.Uint32{}, cache: cache, etnaTime: etnaTime, + messageCreator: messageCreator, } sa.currentRequestID.Store(rand.Uint32()) return &sa, nil } +func (s *SignatureAggregator) Shutdown() { + s.network.Shutdown() +} + func (s *SignatureAggregator) CreateSignedMessage( unsignedMessage *avalancheWarp.UnsignedMessage, justification []byte, inputSigningSubnet ids.ID, quorumPercentage uint64, ) (*avalancheWarp.Message, error) { + s.logger.Debug("Creating signed message", zap.String("warpMessageID", unsignedMessage.ID().String())) var signingSubnet ids.ID var err error // If signingSubnet is not set we default to the subnet of the source blockchain sourceSubnet, err := s.getSubnetID(unsignedMessage.SourceChainID) if err != nil { return nil, fmt.Errorf( - "Source message subnet not found for chainID %s", + "source message subnet not found for chainID %s", unsignedMessage.SourceChainID, ) } @@ -117,6 +122,11 @@ func (s *SignatureAggregator) CreateSignedMessage( } else { signingSubnet = inputSigningSubnet } + s.logger.Debug( + "Creating signed message with signing subnet", + zap.String("warpMessageID", unsignedMessage.ID().String()), + zap.Stringer("signingSubnet", signingSubnet), + ) connectedValidators, err := s.network.ConnectToCanonicalValidators(signingSubnet) if err != nil { @@ -129,6 +139,7 @@ func (s *SignatureAggregator) CreateSignedMessage( s.metrics.FailuresToGetValidatorSet.Inc() return nil, fmt.Errorf("%s: %w", msg, err) } + s.logger.Debug("Connected to canonical validators", zap.String("warpMessageID", unsignedMessage.ID().String())) s.metrics.ConnectedStakeWeightPercentage.WithLabelValues( signingSubnet.String(), ).Set( diff --git a/signature-aggregator/aggregator/aggregator_test.go b/signature-aggregator/aggregator/aggregator_test.go index 8c386808..22647baa 100644 --- a/signature-aggregator/aggregator/aggregator_test.go +++ b/signature-aggregator/aggregator/aggregator_test.go @@ -64,9 +64,9 @@ func instantiateAggregator(t *testing.T) ( ), ), ), + messageCreator, 1024, sigAggMetrics, - messageCreator, // Setting the etnaTime to a minute ago so that the post-etna code path is used in the test time.Now().Add(-1*time.Minute), ) diff --git a/signature-aggregator/main/main.go b/signature-aggregator/main/main.go index 829f8600..48ef4302 100644 --- a/signature-aggregator/main/main.go +++ b/signature-aggregator/main/main.go @@ -84,17 +84,6 @@ func main() { if logLevel <= logging.Debug { networkLogLevel = logLevel } - network, err := peers.NewNetwork( - networkLogLevel, - prometheus.DefaultRegisterer, - nil, - nil, - &cfg, - ) - if err != nil { - logger.Fatal("Failed to create app request network", zap.Error(err)) - panic(err) - } // Initialize message creator passed down to relayers for creating app requests. // We do not collect metrics for the message creator. @@ -109,15 +98,28 @@ func main() { panic(err) } + network, err := peers.NewNetwork( + networkLogLevel, + prometheus.DefaultRegisterer, + nil, + nil, + &cfg, + ) + if err != nil { + logger.Fatal("Failed to create app request network", zap.Error(err)) + panic(err) + } + defer network.Shutdown() + registry := metrics.Initialize(cfg.MetricsPort) metricsInstance := metrics.NewSignatureAggregatorMetrics(registry) signatureAggregator, err := aggregator.NewSignatureAggregator( network, logger, + messageCreator, cfg.SignatureCacheSize, metricsInstance, - messageCreator, cfg.EtnaTime, ) if err != nil { diff --git a/tests/basic_relay.go b/tests/basic_relay.go index f96bcc86..1de207a8 100644 --- a/tests/basic_relay.go +++ b/tests/basic_relay.go @@ -52,6 +52,7 @@ func BasicRelay(network *network.LocalNetwork, teleporter utils.TeleporterTestIn fundedAddress, relayerKey, ) + // The config needs to be validated in order to be passed to database.GetConfigRelayerIDs relayerConfig.Validate() @@ -71,7 +72,7 @@ func BasicRelay(network *network.LocalNetwork, teleporter utils.TeleporterTestIn defer relayerCleanup() // Wait for relayer to start up - startupCtx, startupCancel := context.WithTimeout(ctx, 15*time.Second) + startupCtx, startupCancel := context.WithTimeout(ctx, 60*time.Second) defer startupCancel() testUtils.WaitForChannelClose(startupCtx, readyChan) @@ -157,7 +158,7 @@ func BasicRelay(network *network.LocalNetwork, teleporter utils.TeleporterTestIn // Wait for relayer to start up log.Info("Waiting for the relayer to start up") - startupCtx, startupCancel = context.WithTimeout(ctx, 15*time.Second) + startupCtx, startupCancel = context.WithTimeout(ctx, 60*time.Second) defer startupCancel() testUtils.WaitForChannelClose(startupCtx, readyChan) diff --git a/tests/contracts/lib/teleporter b/tests/contracts/lib/teleporter index 226937a9..a6e92843 160000 --- a/tests/contracts/lib/teleporter +++ b/tests/contracts/lib/teleporter @@ -1 +1 @@ -Subproject commit 226937a967e8947034f2704de6f168bcf48e2d2b +Subproject commit a6e92843c3b13eef1e813fd898d9a44a2da6a2a0 diff --git a/tests/e2e_test.go b/tests/e2e_test.go index f92cfdde..00bbd396 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/ava-labs/avalanchego/utils/units" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/teleporter/tests/network" @@ -81,7 +82,7 @@ var _ = ginkgo.BeforeSuite(func() { utils.SanitizeHexString(teleporterDeployerTransactionStr), ) Expect(err).Should(BeNil()) - networkStartCtx, networkStartCancel := context.WithTimeout(ctx, 120*time.Second) + networkStartCtx, networkStartCancel := context.WithTimeout(ctx, 240*2*time.Second) defer networkStartCancel() localNetworkInstance = network.NewLocalNetwork( networkStartCtx, @@ -128,6 +129,23 @@ var _ = ginkgo.BeforeSuite(func() { teleporterInfo.DeployTeleporterRegistry(subnet, fundedKey) } + // Convert the subnets to sovereign L1s + for _, subnet := range localNetworkInstance.GetSubnetsInfo() { + localNetworkInstance.ConvertSubnet( + networkStartCtx, + subnet, + teleporterTestUtils.PoAValidatorManager, + []uint64{units.Schmeckle, units.Schmeckle}, + fundedKey, + false) + } + + // Restart the network to attempt to refresh TLS connections + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60*len(localNetworkInstance.Nodes))*time.Second) + defer cancel() + err = localNetworkInstance.Restart(ctx, os.Stdout) + Expect(err).Should(BeNil()) + decider = exec.CommandContext(ctx, "./tests/cmd/decider/decider") decider.Start() go func() { diff --git a/tests/signature_aggregator_api.go b/tests/signature_aggregator_api.go index d7cb8344..98709676 100644 --- a/tests/signature_aggregator_api.go +++ b/tests/signature_aggregator_api.go @@ -4,7 +4,6 @@ package tests import ( - "bufio" "bytes" "context" "encoding/hex" @@ -12,13 +11,10 @@ import ( "fmt" "io" "net/http" - "strconv" - "strings" "time" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/signature-aggregator/api" - "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/teleporter/tests/interfaces" "github.com/ava-labs/teleporter/tests/network" @@ -69,7 +65,7 @@ func SignatureAggregatorAPI(network *network.LocalNetwork, teleporter utils.Tele // End setup step // Begin Test Case 1 - log.Info("Sending teleporter message") + log.Info("Sending teleporter message from A -> B") receipt, _, _ := testUtils.SendBasicTeleporterMessage( ctx, teleporter, @@ -122,110 +118,20 @@ func SignatureAggregatorAPI(network *network.LocalNetwork, teleporter utils.Tele sendRequestToAPI() - // Check metrics - metricsSample := sampleMetrics(signatureAggregatorConfig.MetricsPort) - for _, m := range []struct { - name string - op string - value int - }{ - {metrics.Opts.AggregateSignaturesRequestCount.Name, "==", 1}, - {metrics.Opts.AggregateSignaturesLatencyMS.Name, ">", 0}, - {metrics.Opts.AppRequestCount.Name, "<=", 5}, - {metrics.Opts.FailuresToGetValidatorSet.Name, "==", 0}, - {metrics.Opts.FailuresToConnectToSufficientStake.Name, "==", 0}, - {metrics.Opts.FailuresSendingToNode.Name, "<", 5}, - {metrics.Opts.ValidatorTimeouts.Name, "==", 0}, - {metrics.Opts.InvalidSignatureResponses.Name, "==", 0}, - {metrics.Opts.SignatureCacheHits.Name, "==", 0}, - {metrics.Opts.SignatureCacheMisses.Name, "==", 0}, - { - fmt.Sprintf( - "%s{subnetID=\"%s\"}", - metrics.Opts.ConnectedStakeWeightPercentage.Name, - subnetAInfo.SubnetID.String(), - ), - "==", - 100, - }, - } { - Expect(metricsSample[m.name]).Should( - BeNumerically(m.op, m.value), - "Expected metric %s %s %d", - m.name, - m.op, - m.value, - ) - } - - // make a second request, and ensure that the metrics reflect that the - // signatures for the second request are retrieved from the cache. note - // that even though 4 signatures were requested in the previous - // request, only 3 will be cached, because that's all that was required - // to reach a quorum, so that's all that were handled. - sendRequestToAPI() - metricsSample2 := sampleMetrics(signatureAggregatorConfig.MetricsPort) - Expect( - metricsSample2[metrics.Opts.AppRequestCount.Name], - ).Should(Equal(metricsSample[metrics.Opts.AppRequestCount.Name])) - Expect( - metricsSample2[metrics.Opts.SignatureCacheHits.Name], - ).Should(BeNumerically("==", 3)) - Expect( - metricsSample2[metrics.Opts.SignatureCacheMisses.Name], - ).Should(Equal(metricsSample[metrics.Opts.SignatureCacheMisses.Name])) -} - -// returns a map of metric names to metric samples -func sampleMetrics(port uint16) map[string]uint64 { - resp, err := http.Get( - fmt.Sprintf("http://localhost:%d/metrics", port), + // Try in the other direction + log.Info("Sending teleporter message from B -> A") + receipt, _, _ = testUtils.SendBasicTeleporterMessage( + ctx, + teleporter, + subnetBInfo, + subnetAInfo, + fundedKey, + fundedAddress, ) - Expect(err).Should(BeNil()) - - body, err := io.ReadAll(resp.Body) - Expect(err).Should(BeNil()) - defer resp.Body.Close() - - var samples = make(map[string]uint64) - scanner := bufio.NewScanner(strings.NewReader(string(body))) - for scanner.Scan() { - line := scanner.Text() - for _, metricName := range []string{ - metrics.Opts.AggregateSignaturesLatencyMS.Name, - metrics.Opts.AggregateSignaturesRequestCount.Name, - metrics.Opts.AppRequestCount.Name, - metrics.Opts.FailuresToGetValidatorSet.Name, - metrics.Opts.FailuresToConnectToSufficientStake.Name, - metrics.Opts.FailuresSendingToNode.Name, - metrics.Opts.ValidatorTimeouts.Name, - metrics.Opts.InvalidSignatureResponses.Name, - metrics.Opts.SignatureCacheHits.Name, - metrics.Opts.SignatureCacheMisses.Name, - metrics.Opts.ConnectedStakeWeightPercentage.Name, - } { - if strings.HasPrefix( - line, - "U__signature_2d_aggregator_"+metricName, - ) { - log.Debug("Found metric line", "line", line) - parts := strings.Fields(line) - - metricName = strings.Replace(parts[0], "U__signature_2d_aggregator_", "", 1) - - // Parse the metric count from the last field of the line - value, err := strconv.ParseUint(parts[len(parts)-1], 10, 64) - if err != nil { - log.Warn("failed to parse value from metric line") - continue - } - log.Debug("parsed metric", "name", metricName, "value", value) - - samples[metricName] = value - } else { - log.Debug("Ignoring non-metric line", "line", line) - } - } + warpMessage = getWarpMessageFromLog(ctx, receipt, subnetBInfo) + + reqBody = api.AggregateSignatureRequest{ + Message: "0x" + hex.EncodeToString(warpMessage.Bytes()), } - return samples + sendRequestToAPI() } diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 84cfd42d..41e9a85e 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -49,7 +49,7 @@ const ( func BuildAllExecutables(ctx context.Context) { cmd := exec.Command("./scripts/build.sh") out, err := cmd.CombinedOutput() - fmt.Println(string(out)) + log.Info(string(out)) Expect(err).Should(BeNil()) } @@ -192,7 +192,7 @@ func CreateDefaultRelayerConfig( } return relayercfg.Config{ - LogLevel: logging.Info.LowerString(), + LogLevel: logLevel.LowerString(), PChainAPI: &config.APIConfig{ BaseURL: sourceSubnetsInfo[0].NodeURIs[0], }, @@ -228,7 +228,7 @@ func CreateDefaultSignatureAggregatorConfig( ) // Construct the config values for each subnet return signatureaggregatorcfg.Config{ - LogLevel: logging.Info.LowerString(), + LogLevel: logLevel.LowerString(), PChainAPI: &config.APIConfig{ BaseURL: sourceSubnetsInfo[0].NodeURIs[0], }, @@ -602,9 +602,11 @@ func runExecutable( for { resp, err := http.Get(healthCheckUrl) if err == nil && resp.StatusCode == 200 { + log.Info("Health check passed", "appName", appName) close(readyChan) break } + log.Info("Health check failed", "appName", appName, "err", err) time.Sleep(time.Second * 1) } }() diff --git a/tests/utils/warp-genesis-template.json b/tests/utils/warp-genesis-template.json index 44dd8b5d..4cf2d62e 100644 --- a/tests/utils/warp-genesis-template.json +++ b/tests/utils/warp-genesis-template.json @@ -1,5 +1,5 @@ { - "config": { + "config": { "chainId": , "homesteadBlock": 0, "eip150Block": 0, @@ -12,52 +12,80 @@ "istanbulBlock": 0, "muirGlacierBlock": 0, "feeConfig": { - "gasLimit": 20000000, - "minBaseFee": 1000000000, - "targetGas": 100000000, - "baseFeeChangeDenominator": 48, - "minBlockGasCost": 0, - "maxBlockGasCost": 10000000, - "targetBlockRate": 2, - "blockGasCostStep": 500000 + "gasLimit": 20000000, + "minBaseFee": 1000000000, + "targetGas": 100000000, + "baseFeeChangeDenominator": 48, + "minBlockGasCost": 0, + "maxBlockGasCost": 10000000, + "targetBlockRate": 2, + "blockGasCostStep": 500000 }, "warpConfig": { - "blockTimestamp": 1719343601 + "blockTimestamp": 1719343601 }, "contractNativeMinterConfig": { - "blockTimestamp": 0, - "adminAddresses": [ - "0xAcB633F5B00099c7ec187eB00156c5cd9D854b5B", - "0x3405506b3711859c5070949ed9b700c7ba7bf750" - ] + "blockTimestamp": 0, + "adminAddresses": [ + "0xAcB633F5B00099c7ec187eB00156c5cd9D854b5B", + "0x3405506b3711859c5070949ed9b700c7ba7bf750", + "0x962c62B01529ecc0561D85d3fe395921ddC3665B", + "0x1549B96D9D97F435CA9b25000FEDE3A7e54C0bb9", + "0x190110D1228EB2cDd36559b2215A572Dc8592C3d", + "0xf9EF017A764F265A1fD0975bfc200725E41d860E", + "0x4f3663be6d22B0F19F8617f1A9E9485aB0144Bff", + "0x463a6bE7a5098A5f06435c6c468adD338F15B93A", + "0x8db97C7cEcE249c2b98bDC0226Cc4C2A57BF52FC" + ] } - }, - "alloc": { - "0x8db97C7cEcE249c2b98bDC0226Cc4C2A57BF52FC": { - "balance": "0x52B7D2DCC80CD2E4000000" - }, + }, + "alloc": { "": { - "balance": "0x0", - "code": "", - "storage": { - "0x0000000000000000000000000000000000000000000000000000000000000000": "0x0000000000000000000000000000000000000000000000000000000000000001", - "0x0000000000000000000000000000000000000000000000000000000000000001": "0x0000000000000000000000000000000000000000000000000000000000000001" - }, - "nonce": 1 + "balance": "0x0", + "code": "", + "storage": { + "0x0000000000000000000000000000000000000000000000000000000000000000": "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x0000000000000000000000000000000000000000000000000000000000000001": "0x0000000000000000000000000000000000000000000000000000000000000001" + }, + "nonce": 1 }, "": { - "balance": "0x0", - "nonce": 1 + "balance": "0x0", + "nonce": 1 + }, + "0x8db97C7cEcE249c2b98bDC0226Cc4C2A57BF52FC": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0x1337cfd2dCff6270615B90938aCB1efE79801704": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0xFcec6c0674037f99fa473de09609B4b6D8158863": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0x2e1533d976A675bCD6306deC3B05e9f73e6722Fb": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0xA638b0a597dc0520e2f20E83cFbeBBCd45a79990": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0x787C079cB0d5A7AA1Cae95d991F76Dce771A432D": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0x741D536f5B07bcD43727CD8435389CA36aE5A4Ae": { + "balance": "0x52B7D2DCC80CD2E4000000" + }, + "0xd466f12795BA59d0fef389c21fA63c287956fb18": { + "balance": "0x52B7D2DCC80CD2E4000000" } - }, - "nonce": "0x0", - "timestamp": "0x667B19F0", - "extraData": "0x00", - "gasLimit": "0x1312D00", - "difficulty": "0x0", - "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", - "coinbase": "0x0000000000000000000000000000000000000000", - "number": "0x0", - "gasUsed": "0x0", - "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000" -} + }, + "nonce": "0x0", + "timestamp": "0x667B19F0", + "extraData": "0x00", + "gasLimit": "0x1312D00", + "difficulty": "0x0", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "coinbase": "0x0000000000000000000000000000000000000000", + "number": "0x0", + "gasUsed": "0x0", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000" + }