From cf4ecbab0a4bfd85c7682d3761ff9ec0b15fc020 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:43:14 -0400 Subject: [PATCH] add acp-118 implementation Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/acp118/aggregator.go | 229 ++++++++++++++++++++++++++ network/p2p/acp118/aggregator_test.go | 205 +++++++++++++++++++++++ network/p2p/error.go | 6 + network/p2p/p2ptest/client.go | 16 +- x/sync/client_test.go | 19 +-- x/sync/manager.go | 23 +-- x/sync/network_server.go | 27 ++- x/sync/network_server_test.go | 21 ++- x/sync/sync_test.go | 118 ++++++------- 9 files changed, 546 insertions(+), 118 deletions(-) create mode 100644 network/p2p/acp118/aggregator.go create mode 100644 network/p2p/acp118/aggregator_test.go diff --git a/network/p2p/acp118/aggregator.go b/network/p2p/acp118/aggregator.go new file mode 100644 index 000000000000..9cbc10a9442c --- /dev/null +++ b/network/p2p/acp118/aggregator.go @@ -0,0 +1,229 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.uber.org/zap" + "golang.org/x/sync/semaphore" + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var ( + ErrDuplicateValidator = errors.New("duplicate validator") + ErrInsufficientSignatures = errors.New("failed to aggregate sufficient stake weight of signatures") +) + +type result struct { + message *warp.Message + err error +} + +type Validator struct { + NodeID ids.NodeID + PublicKey *bls.PublicKey + Weight uint64 +} + +type indexedValidator struct { + Validator + I int +} + +// NewSignatureAggregator returns an instance of SignatureAggregator +func NewSignatureAggregator( + log logging.Logger, + client *p2p.Client, + maxPending int, +) *SignatureAggregator { + return &SignatureAggregator{ + log: log, + client: client, + maxPending: int64(maxPending), + } +} + +// SignatureAggregator aggregates validator signatures for warp messages +type SignatureAggregator struct { + log logging.Logger + client *p2p.Client + maxPending int64 +} + +// AggregateSignatures blocks until stakeWeightThreshold of validators signs the +// provided message. Validators are issued requests in the caller-specified +// order. +func (s *SignatureAggregator) AggregateSignatures( + parentCtx context.Context, + message *warp.UnsignedMessage, + justification []byte, + validators []Validator, + stakeWeightThreshold uint64, +) (*warp.Message, error) { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + request := &sdk.SignatureRequest{ + Message: message.Bytes(), + Justification: justification, + } + + requestBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal signature request: %w", err) + } + + done := make(chan result) + pendingRequests := semaphore.NewWeighted(s.maxPending) + lock := &sync.Mutex{} + aggregatedStakeWeight := uint64(0) + attemptedStakeWeight := uint64(0) + totalStakeWeight := uint64(0) + signatures := make([]*bls.Signature, 0) + signerBitSet := set.NewBits() + + nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) + for i, v := range validators { + totalStakeWeight += v.Weight + + // Sanity check the validator set provided by the caller + if _, ok := nodeIDsToValidator[v.NodeID]; ok { + return nil, fmt.Errorf("%w: %s", ErrDuplicateValidator, v.NodeID) + } + + nodeIDsToValidator[v.NodeID] = indexedValidator{ + I: i, + Validator: v, + } + } + + onResponse := func( + _ context.Context, + nodeID ids.NodeID, + responseBytes []byte, + err error, + ) { + // We are guaranteed a response from a node in the validator set + validator := nodeIDsToValidator[nodeID] + + defer func() { + lock.Lock() + attemptedStakeWeight += validator.Weight + remainingStakeWeight := totalStakeWeight - attemptedStakeWeight + failed := remainingStakeWeight < stakeWeightThreshold + lock.Unlock() + + if failed { + done <- result{err: ErrInsufficientSignatures} + } + + pendingRequests.Release(1) + }() + + if err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + return + } + + response := &sdk.SignatureResponse{} + if err := proto.Unmarshal(responseBytes, response); err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + return + } + + signature, err := bls.SignatureFromBytes(response.Signature) + if err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "invalid signature"), + zap.Error(err), + ) + return + } + + if !bls.Verify(validator.PublicKey, signature, message.Bytes()) { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "public key failed verification"), + ) + return + } + + lock.Lock() + signerBitSet.Add(validator.I) + signatures = append(signatures, signature) + aggregatedStakeWeight += validator.Weight + + if aggregatedStakeWeight >= stakeWeightThreshold { + aggregateSignature, err := bls.AggregateSignatures(signatures) + if err != nil { + done <- result{err: err} + lock.Unlock() + return + } + + bitSetSignature := &warp.BitSetSignature{ + Signers: signerBitSet.Bytes(), + Signature: [bls.SignatureLen]byte{}, + } + + copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) + signedMessage, err := warp.NewMessage(message, bitSetSignature) + done <- result{message: signedMessage, err: err} + lock.Unlock() + return + } + + lock.Unlock() + } + + for _, validator := range validators { + if err := pendingRequests.Acquire(ctx, 1); err != nil { + return nil, err + } + + // Avoid loop shadowing in goroutine + validatorCopy := validator + go func() { + if err := s.client.AppRequest( + ctx, + set.Of(validatorCopy.NodeID), + requestBytes, + onResponse, + ); err != nil { + done <- result{err: err} + return + } + }() + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-done: + return r.message, r.err + } +} diff --git a/network/p2p/acp118/aggregator_test.go b/network/p2p/acp118/aggregator_test.go new file mode 100644 index 000000000000..50622fc4ac99 --- /dev/null +++ b/network/p2p/acp118/aggregator_test.go @@ -0,0 +1,205 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +func TestVerifier_Verify(t *testing.T) { + nodeID0 := ids.GenerateTestNodeID() + sk0, err := bls.NewSecretKey() + require.NoError(t, err) + pk0 := bls.PublicFromSecretKey(sk0) + + nodeID1 := ids.GenerateTestNodeID() + sk1, err := bls.NewSecretKey() + require.NoError(t, err) + pk1 := bls.PublicFromSecretKey(sk1) + + networkID := uint32(123) + subnetID := ids.GenerateTestID() + chainID := ids.GenerateTestID() + signer := warp.NewSigner(sk0, networkID, chainID) + + tests := []struct { + name string + + handler p2p.Handler + + ctx context.Context + validators []Validator + threshold uint64 + + pChainState validators.State + pChainHeight uint64 + quorumNum uint64 + quorumDen uint64 + + wantAggregateSignaturesErr error + wantVerifyErr error + }{ + { + name: "passes attestation and verification", + handler: NewHandler(&testAttestor{}, signer, networkID, chainID), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + pChainState: &validatorstest.State{ + T: t, + GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { + return subnetID, nil + }, + GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{ + nodeID0: { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, nil + }, + }, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "passes attestation and fails verification - insufficient stake", + handler: NewHandler(&testAttestor{}, signer, networkID, chainID), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + pChainState: &validatorstest.State{ + T: t, + GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { + return subnetID, nil + }, + GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{ + nodeID0: { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + nodeID1: { + NodeID: nodeID1, + PublicKey: pk1, + Weight: 1, + }, + }, nil + }, + }, + quorumNum: 2, + quorumDen: 2, + wantVerifyErr: warp.ErrInsufficientWeight, + }, + { + name: "fails attestation", + handler: NewHandler( + &testAttestor{Err: errors.New("foobar")}, + signer, + networkID, + chainID, + ), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + wantAggregateSignaturesErr: ErrInsufficientSignatures, + }, + { + name: "invalid validator set", + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + wantAggregateSignaturesErr: ErrDuplicateValidator, + }, + { + name: "context canceled", + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + return ctx + }(), + wantAggregateSignaturesErr: context.Canceled, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + ctx := context.Background() + message, err := warp.NewUnsignedMessage(networkID, chainID, []byte("payload")) + require.NoError(err) + client := p2ptest.NewClient(t, ctx, tt.handler, ids.GenerateTestNodeID(), nodeID0) + verifier := NewSignatureAggregator(logging.NoLog{}, client, 1) + + signedMessage, err := verifier.AggregateSignatures( + tt.ctx, + message, + []byte("justification"), + tt.validators, + tt.threshold, + ) + require.ErrorIs(err, tt.wantAggregateSignaturesErr) + + if signedMessage == nil { + return + } + + err = signedMessage.Signature.Verify( + ctx, + &signedMessage.UnsignedMessage, + networkID, + tt.pChainState, + 0, + tt.quorumNum, + tt.quorumDen, + ) + require.ErrorIs(err, tt.wantVerifyErr) + }) + } +} diff --git a/network/p2p/error.go b/network/p2p/error.go index 07207319a041..67b0317153e6 100644 --- a/network/p2p/error.go +++ b/network/p2p/error.go @@ -30,4 +30,10 @@ var ( Code: -4, Message: "throttled", } + // ErrAttestFailed should be used to indicate that a request failed + // to be signed due to the peer being unable to attest the message + ErrAttestFailed = &common.AppError{ + Code: -5, + Message: "failed attestation", + } ) diff --git a/network/p2p/p2ptest/client.go b/network/p2p/p2ptest/client.go index b75654028666..8d41d6b99bce 100644 --- a/network/p2p/p2ptest/client.go +++ b/network/p2p/p2ptest/client.go @@ -23,7 +23,7 @@ import ( // communicate with a server with the specified handler func NewClient( t *testing.T, - ctx context.Context, + rootCtx context.Context, handler p2p.Handler, clientNodeID ids.NodeID, serverNodeID ids.NodeID, @@ -38,8 +38,6 @@ func NewClient( require.NoError(t, err) clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes)) }() @@ -58,8 +56,6 @@ func NewClient( } serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes)) }() @@ -68,8 +64,6 @@ func NewClient( } serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{ Code: errorCode, @@ -80,10 +74,10 @@ func NewClient( return nil } - require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil)) + require.NoError(t, clientNetwork.Connected(rootCtx, clientNodeID, nil)) + require.NoError(t, clientNetwork.Connected(rootCtx, serverNodeID, nil)) + require.NoError(t, serverNetwork.Connected(rootCtx, clientNodeID, nil)) + require.NoError(t, serverNetwork.Connected(rootCtx, serverNodeID, nil)) require.NoError(t, serverNetwork.AddHandler(0, handler)) return clientNetwork.NewClient(0) diff --git a/x/sync/client_test.go b/x/sync/client_test.go index decc3e20405d..2633071439da 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -38,12 +38,12 @@ func newDefaultDBConfig() merkledb.Config { } } -func newFlakyRangeProofHandler( +func newModifiedRangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.RangeProof), ) p2p.Handler { - handler := NewGetRangeProofHandler(logging.NoLog{}, db) + handler := NewSyncGetRangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -74,12 +74,12 @@ func newFlakyRangeProofHandler( } } -func newFlakyChangeProofHandler( +func newModifiedChangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.ChangeProof), ) p2p.Handler { - handler := NewGetChangeProofHandler(logging.NoLog{}, db) + handler := NewSyncGetChangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -145,14 +145,3 @@ func (c *counter) Inc() int { c.i++ return result } - -type waitingHandler struct { - p2p.NoOpHandler - handler p2p.Handler - updatedRootChan chan struct{} -} - -func (w *waitingHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { - <-w.updatedRootChan - return w.handler.AppRequest(ctx, nodeID, deadline, requestBytes) -} diff --git a/x/sync/manager.go b/x/sync/manager.go index ddcdc1637088..dd176c223033 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -41,7 +41,7 @@ var ( ErrAlreadyStarted = errors.New("cannot start a Manager that has already been started") ErrAlreadyClosed = errors.New("Manager is closed") ErrNoRangeProofClientProvided = errors.New("range proof client is a required field of the sync config") - ErrNoChangeProofClientProvided = errors.New("change proof client is a required field of the sync config") + ErrNoChangeProofClientProvided = errors.New("change proofclient is a required field of the sync config") ErrNoDatabaseProvided = errors.New("sync database is a required field of the sync config") ErrNoLogProvided = errors.New("log is a required field of the sync config") ErrZeroWorkLimit = errors.New("simultaneous work limit must be greater than 0") @@ -305,12 +305,7 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { return } - select { - case <-ctx.Done(): - m.finishWorkItem() - return - case <-time.After(waitTime): - } + <-time.After(waitTime) if work.localRootID == ids.Empty { // the keys in this range have not been downloaded, so get all key/values @@ -373,8 +368,7 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleChangeProofResponse(ctx, targetRootID, work, request, responseBytes, err); err != nil { - // TODO log responses - m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) + m.config.Log.Debug("dropping response", zap.Error(err)) m.retryWork(work) return } @@ -431,8 +425,7 @@ func (m *Manager) requestRangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleRangeProofResponse(ctx, targetRootID, work, request, responseBytes, appErr); err != nil { - // TODO log responses - m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) + m.config.Log.Debug("dropping response", zap.Error(err)) m.retryWork(work) return } @@ -468,11 +461,10 @@ func (m *Manager) retryWork(work *workItem) { m.workLock.Lock() m.unprocessedWork.Insert(work) m.workLock.Unlock() - m.unprocessedWorkCond.Signal() } // Returns an error if we should drop the response -func (m *Manager) shouldHandleResponse( +func (m *Manager) handleResponse( bytesLimit uint32, responseBytes []byte, err error, @@ -507,7 +499,7 @@ func (m *Manager) handleRangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -558,7 +550,7 @@ func (m *Manager) handleChangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -614,6 +606,7 @@ func (m *Manager) handleChangeProofResponse( m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof) case *pb.SyncGetChangeProofResponse_RangeProof: + var rangeProof merkledb.RangeProof if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil { return err diff --git a/x/sync/network_server.go b/x/sync/network_server.go index 2153f2fbcc97..10d86ed140eb 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -49,8 +49,8 @@ var ( errInvalidBounds = errors.New("start key is greater than end key") errInvalidRootHash = fmt.Errorf("root hash must have length %d", hashing.HashLen) - _ p2p.Handler = (*GetChangeProofHandler)(nil) - _ p2p.Handler = (*GetRangeProofHandler)(nil) + _ p2p.Handler = (*SyncGetChangeProofHandler)(nil) + _ p2p.Handler = (*SyncGetRangeProofHandler)(nil) ) func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { @@ -60,21 +60,21 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { return maybe.Nothing[[]byte]() } -func NewGetChangeProofHandler(log logging.Logger, db DB) *GetChangeProofHandler { - return &GetChangeProofHandler{ +func NewSyncGetChangeProofHandler(log logging.Logger, db DB) *SyncGetChangeProofHandler { + return &SyncGetChangeProofHandler{ log: log, db: db, } } -type GetChangeProofHandler struct { +type SyncGetChangeProofHandler struct { log logging.Logger db DB } -func (*GetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*SyncGetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { +func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { req := &pb.SyncGetChangeProofRequest{} if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ @@ -120,7 +120,6 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ if !errors.Is(err, merkledb.ErrInsufficientHistory) { // We should only fail to get a change proof if we have insufficient history. // Other errors are unexpected. - // TODO define custom errors return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to get change proof: %s", err), @@ -192,21 +191,21 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ } } -func NewGetRangeProofHandler(log logging.Logger, db DB) *GetRangeProofHandler { - return &GetRangeProofHandler{ +func NewSyncGetRangeProofHandler(log logging.Logger, db DB) *SyncGetRangeProofHandler { + return &SyncGetRangeProofHandler{ log: log, db: db, } } -type GetRangeProofHandler struct { +type SyncGetRangeProofHandler struct { log logging.Logger db DB } -func (*GetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*SyncGetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { +func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { req := &pb.SyncGetRangeProofRequest{} if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ @@ -228,7 +227,7 @@ func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ t proofBytes, err := getRangeProof( ctx, - g.db, + s.db, req, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(rangeProof.ToProto()) diff --git a/x/sync/network_server_test.go b/x/sync/network_server_test.go index c78554cea59f..84dbd1c12682 100644 --- a/x/sync/network_server_test.go +++ b/x/sync/network_server_test.go @@ -85,7 +85,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedErr: p2p.ErrUnexpected, }, { - name: "response bounded by key limit", + name: "key limit too large", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: 2 * defaultRequestKeyLimit, @@ -94,7 +94,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedResponseLen: defaultRequestKeyLimit, }, { - name: "response bounded by byte limit", + name: "bytes limit too large", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: defaultRequestKeyLimit, @@ -118,7 +118,7 @@ func Test_Server_GetRangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewGetRangeProofHandler(logging.NoLog{}, smallTrieDB) + handler := NewSyncGetRangeProofHandler(logging.NoLog{}, smallTrieDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) responseBytes, err := handler.AppRequest(context.Background(), test.nodeID, time.Time{}, requestBytes) @@ -130,12 +130,17 @@ func Test_Server_GetRangeProof(t *testing.T) { require.Nil(responseBytes) return } + require.NotNil(responseBytes) - var proofProto pb.RangeProof - require.NoError(proto.Unmarshal(responseBytes, &proofProto)) + var proof *merkledb.RangeProof + if !test.proofNil { + var proofProto pb.RangeProof + require.NoError(proto.Unmarshal(responseBytes, &proofProto)) - var proof merkledb.RangeProof - require.NoError(proof.UnmarshalProto(&proofProto)) + var p merkledb.RangeProof + require.NoError(p.UnmarshalProto(&proofProto)) + proof = &p + } if test.expectedResponseLen > 0 { require.LessOrEqual(len(proof.KeyValues), test.expectedResponseLen) @@ -339,7 +344,7 @@ func Test_Server_GetChangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewGetChangeProofHandler(logging.NoLog{}, serverDB) + handler := NewSyncGetChangeProofHandler(logging.NoLog{}, serverDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 41dd5829a7b8..7373c08bf5c5 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -19,12 +19,13 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/maybe" "github.com/ava-labs/avalanchego/x/merkledb" ) -var _ p2p.Handler = (*waitingHandler)(nil) +var _ p2p.Handler = (*testHandler)(nil) func Test_Creation(t *testing.T) { require := require.New(t) @@ -39,8 +40,8 @@ func Test_Creation(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, BranchFactor: merkledb.BranchFactor16, @@ -72,8 +73,8 @@ func Test_Completion(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -177,8 +178,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -253,8 +254,8 @@ func Test_Sync_FindNextKey_Deleted(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -303,8 +304,8 @@ func Test_Sync_FindNextKey_BranchInLocal(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -340,8 +341,8 @@ func Test_Sync_FindNextKey_BranchInReceived(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -376,8 +377,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -438,8 +439,8 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -507,8 +508,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -540,6 +541,7 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { // Test findNextKey by computing the expected result in a naive, inefficient // way and comparing it to the actual result + func TestFindNextKeyRandom(t *testing.T) { now := time.Now().UnixNano() t.Logf("seed: %d", now) @@ -730,8 +732,8 @@ func TestFindNextKeyRandom(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: localDB, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: ids.GenerateTestID(), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -773,7 +775,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - too many leaves in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = append(response.KeyValues, merkledb.KeyValue{}) }) @@ -783,7 +785,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed first key in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] }) @@ -793,7 +795,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed first key in response and replaced proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] response.KeyValues = []merkledb.KeyValue{ { @@ -819,7 +821,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed key from middle of response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { i := rand.Intn(max(1, len(response.KeyValues)-1)) // #nosec G404 _ = slices.Delete(response.KeyValues, i, min(len(response.KeyValues), i+1)) }) @@ -830,7 +832,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - start and end proof nodes removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil }) @@ -841,7 +843,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - end proof removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.EndProof = nil }) @@ -851,7 +853,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - empty proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil response.KeyValues = nil @@ -864,7 +866,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { name: "range proof server flake", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewGetRangeProofHandler(logging.NoLog{}, db), + Handler: NewSyncGetRangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, @@ -872,7 +874,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - too many keys in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = append(response.KeyChanges, make([]merkledb.KeyChange, defaultRequestKeyLimit)...) }) @@ -882,7 +884,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - removed first key in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = response.KeyChanges[min(1, len(response.KeyChanges)):] }) @@ -892,7 +894,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - removed key from middle of response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { i := rand.Intn(max(1, len(response.KeyChanges)-1)) // #nosec G404 _ = slices.Delete(response.KeyChanges, i, min(len(response.KeyChanges), i+1)) }) @@ -901,9 +903,9 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { }, }, { - name: "change proof bad response - all proof keys removed from response", + name: "all proof keys removed from response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.StartProof = nil response.EndProof = nil }) @@ -912,10 +914,10 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { }, }, { - name: "change proof flaky server", + name: "flaky change proof client", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewGetChangeProofHandler(logging.NoLog{}, db), + Handler: NewSyncGetChangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, @@ -945,13 +947,13 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { changeProofClient *p2p.Client ) - rangeProofHandler := NewGetRangeProofHandler(logging.NoLog{}, dbToSync) + rangeProofHandler := NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync) rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.rangeProofClient != nil { rangeProofClient = tt.rangeProofClient(dbToSync) } - changeProofHandler := NewGetChangeProofHandler(logging.NoLog{}, dbToSync) + changeProofHandler := NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync) changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.changeProofClient != nil { changeProofClient = tt.changeProofClient(dbToSync) @@ -974,12 +976,8 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { require.NoError(syncer.Start(ctx)) // Simulate writes on the server - // - // TODO add more writes when api is not flaky. There is an inherent - // race condition in between writes where UpdateSyncTarget might - // error because it has already reached the sync target before it - // is called. - for i := 0; i < 50; i++ { + // TODO more than a single write when API is less flaky + for i := 0; i <= 1; i++ { addkey := make([]byte, r.Intn(50)) _, err = r.Read(addkey) require.NoError(err) @@ -1031,8 +1029,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1058,8 +1056,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { newSyncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1128,13 +1126,13 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { updatedRootChan <- struct{}{} ctx := context.Background() - rangeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ - handler: NewGetRangeProofHandler(logging.NoLog{}, dbToSync), + rangeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ + handler: NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) - changeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ - handler: NewGetChangeProofHandler(logging.NoLog{}, dbToSync), + changeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ + handler: NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) @@ -1184,11 +1182,10 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - ctx := context.Background() m, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1287,3 +1284,14 @@ func generateTrieWithMinKeyLen(t *testing.T, r *rand.Rand, count int, minKeyLen } return db, batch.Write() } + +type testHandler struct { + p2p.NoOpHandler + handler p2p.Handler + updatedRootChan chan struct{} +} + +func (t *testHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { + <-t.updatedRootChan + return t.handler.AppRequest(ctx, nodeID, deadline, requestBytes) +}