-
Notifications
You must be signed in to change notification settings - Fork 679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ACP-118 Aggregator #3394
Open
joshua-kim
wants to merge
62
commits into
master
Choose a base branch
from
acp-118
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+918
−99
Open
Changes from all commits
Commits
Show all changes
62 commits
Select commit
Hold shift + click to select a range
7c6d0a7
implement acp-118 signature aggregation
joshua-kim 11ada0f
undo
joshua-kim e650f5d
nit
joshua-kim 81c9487
nit
joshua-kim 0760fcd
improve usage of context
joshua-kim b6cfd0e
improve doc
joshua-kim dacd131
nit
joshua-kim 810fb55
rename i -> index
joshua-kim 6f6c0b5
nit
joshua-kim 9c2e9eb
nit
joshua-kim 6589763
nit
joshua-kim c75e929
int
joshua-kim ce284e6
nit
joshua-kim f7e2ca9
nit
joshua-kim ef9e7ec
nit
joshua-kim a8642a5
nit
joshua-kim aa5da47
nit
joshua-kim a44b653
nit
joshua-kim 26bb1a0
nit
joshua-kim d2139cc
nit
joshua-kim 4868a99
nit
joshua-kim c5aaebb
nit
joshua-kim fecd189
nit
joshua-kim 3b23a22
nit
joshua-kim 420253d
nit
joshua-kim 4c4c380
nit
joshua-kim 738b0ce
nit
joshua-kim 455e03e
nit
joshua-kim 7d7c3c0
nit
joshua-kim f989cf5
nit
joshua-kim 900279d
nit
joshua-kim de7b65c
nit
joshua-kim 7c4fa96
nit
joshua-kim 9802ab2
fix tests
joshua-kim 8b6563a
nit
joshua-kim 4480b48
Update network/p2p/acp118/aggregator.go
joshua-kim 67f533e
Update network/p2p/acp118/aggregator.go
joshua-kim 2a9621b
Update network/p2p/acp118/aggregator.go
joshua-kim 3773575
move nil signatures check
joshua-kim 356c36d
nit
joshua-kim c279876
nti
joshua-kim 7bfe878
wip
joshua-kim cd716bc
nit
joshua-kim 7752928
nit
joshua-kim 6fcc0aa
nti
joshua-kim 17f2d75
nit
joshua-kim 57506a4
nit
joshua-kim 048dca9
nit
joshua-kim 79f560b
nit
joshua-kim e55212f
nit
joshua-kim 835de57
wip
joshua-kim 39f8ca4
add better tests
joshua-kim f5e05b9
nit
joshua-kim b598c39
nit
joshua-kim ba022ca
nit
joshua-kim e308117
fix flake
joshua-kim b870220
remove copy for shadowing
joshua-kim e6959b1
use send config
joshua-kim bab3a65
remove unused testing fields
joshua-kim f6d20e8
nit
joshua-kim 3e074e6
wip
joshua-kim 8c88acf
nit
joshua-kim File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package acp118 | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"math/big" | ||
|
||
"go.uber.org/zap" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/network/p2p" | ||
"github.com/ava-labs/avalanchego/proto/pb/sdk" | ||
"github.com/ava-labs/avalanchego/utils/crypto/bls" | ||
"github.com/ava-labs/avalanchego/utils/logging" | ||
"github.com/ava-labs/avalanchego/utils/set" | ||
"github.com/ava-labs/avalanchego/vms/platformvm/warp" | ||
) | ||
|
||
var errFailedVerification = errors.New("failed verification") | ||
|
||
type indexedValidator struct { | ||
*warp.Validator | ||
Index int | ||
} | ||
|
||
type result struct { | ||
NodeID ids.NodeID | ||
Validator indexedValidator | ||
Signature *bls.Signature | ||
Err error | ||
} | ||
|
||
// NewSignatureAggregator returns an instance of SignatureAggregator | ||
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator { | ||
return &SignatureAggregator{ | ||
log: log, | ||
client: client, | ||
} | ||
} | ||
|
||
// SignatureAggregator aggregates validator signatures for warp messages | ||
type SignatureAggregator struct { | ||
log logging.Logger | ||
client *p2p.Client | ||
} | ||
|
||
// AggregateSignatures blocks until quorumNum/quorumDen signatures from | ||
// validators are requested to be aggregated into a warp message or the context | ||
// is canceled. Returns the signed message and the amount of stake that signed | ||
// the message. Caller is responsible for providing a well-formed canonical | ||
// validator set corresponding to the signer bitset in the message. | ||
func (s *SignatureAggregator) AggregateSignatures( | ||
ctx context.Context, | ||
message *warp.Message, | ||
justification []byte, | ||
validators []*warp.Validator, | ||
quorumNum uint64, | ||
quorumDen uint64, | ||
) ( | ||
_ *warp.Message, | ||
aggregatedStake *big.Int, | ||
totalStake *big.Int, | ||
finished bool, | ||
_ error, | ||
) { | ||
request := &sdk.SignatureRequest{ | ||
Message: message.UnsignedMessage.Bytes(), | ||
Justification: justification, | ||
} | ||
|
||
requestBytes, err := proto.Marshal(request) | ||
if err != nil { | ||
return nil, nil, nil, false, fmt.Errorf("failed to marshal signature request: %w", err) | ||
} | ||
|
||
nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) | ||
// TODO expose concrete type to avoid type casting | ||
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature) | ||
if !ok { | ||
return nil, nil, nil, false, errors.New("invalid warp signature type") | ||
} | ||
|
||
signerBitSet := set.BitsFromBytes(bitSetSignature.Signers) | ||
|
||
nonSigners := make([]ids.NodeID, 0, len(validators)) | ||
aggregatedStakeWeight := new(big.Int) | ||
totalStakeWeight := new(big.Int) | ||
for i, validator := range validators { | ||
totalStakeWeight.Add(totalStakeWeight, new(big.Int).SetUint64(validator.Weight)) | ||
|
||
// Only try to aggregate signatures from validators that are not already in | ||
// the signer bit set | ||
if signerBitSet.Contains(i) { | ||
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(validator.Weight)) | ||
continue | ||
} | ||
|
||
v := indexedValidator{ | ||
Index: i, | ||
Validator: validator, | ||
} | ||
|
||
for _, nodeID := range v.NodeIDs { | ||
nodeIDsToValidator[nodeID] = v | ||
} | ||
|
||
nonSigners = append(nonSigners, v.NodeIDs...) | ||
} | ||
|
||
// Account for requested signatures + the signature that was provided | ||
signatures := make([]*bls.Signature, 0, len(nonSigners)+1) | ||
if bitSetSignature.Signature != [bls.SignatureLen]byte{} { | ||
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:]) | ||
if err != nil { | ||
return nil, nil, nil, false, fmt.Errorf("failed to parse bls signature: %w", err) | ||
} | ||
signatures = append(signatures, blsSignature) | ||
} | ||
|
||
results := make(chan result) | ||
handler := responseHandler{ | ||
message: message, | ||
nodeIDsToValidators: nodeIDsToValidator, | ||
results: results, | ||
} | ||
|
||
if err := s.client.AppRequest(ctx, set.Of(nonSigners...), requestBytes, handler.HandleResponse); err != nil { | ||
return nil, nil, nil, false, fmt.Errorf("failed to send aggregation request: %w", err) | ||
} | ||
|
||
minThreshold := new(big.Int).Mul(totalStakeWeight, new(big.Int).SetUint64(quorumNum)) | ||
minThreshold.Div(minThreshold, new(big.Int).SetUint64(quorumDen)) | ||
|
||
// Block until: | ||
// 1. The context is cancelled | ||
// 2. We get responses from all validators | ||
// 3. The specified security threshold is reached | ||
for i := 0; i < len(nonSigners); i++ { | ||
select { | ||
case <-ctx.Done(): | ||
// Try to return whatever progress we have if the context is cancelled | ||
msg, err := newWarpMessage(message, signerBitSet, signatures) | ||
if err != nil { | ||
return nil, nil, nil, false, err | ||
} | ||
|
||
return msg, aggregatedStakeWeight, totalStakeWeight, false, nil | ||
case result := <-results: | ||
if result.Err != nil { | ||
s.log.Debug( | ||
"dropping response", | ||
zap.Stringer("nodeID", result.NodeID), | ||
zap.Error(err), | ||
) | ||
continue | ||
} | ||
|
||
// Validators may share public keys so drop any duplicate signatures | ||
if signerBitSet.Contains(result.Validator.Index) { | ||
s.log.Debug( | ||
"dropping duplicate signature", | ||
zap.Stringer("nodeID", result.NodeID), | ||
zap.Error(err), | ||
) | ||
continue | ||
} | ||
|
||
signatures = append(signatures, result.Signature) | ||
signerBitSet.Add(result.Validator.Index) | ||
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(result.Validator.Weight)) | ||
|
||
if aggregatedStakeWeight.Cmp(minThreshold) != -1 { | ||
msg, err := newWarpMessage(message, signerBitSet, signatures) | ||
if err != nil { | ||
return nil, nil, nil, false, err | ||
} | ||
|
||
return msg, aggregatedStakeWeight, totalStakeWeight, true, nil | ||
} | ||
} | ||
} | ||
|
||
msg, err := newWarpMessage(message, signerBitSet, signatures) | ||
if err != nil { | ||
return nil, nil, nil, false, err | ||
} | ||
|
||
return msg, aggregatedStakeWeight, totalStakeWeight, true, nil | ||
} | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func newWarpMessage( | ||
message *warp.Message, | ||
signerBitSet set.Bits, | ||
signatures []*bls.Signature, | ||
) (*warp.Message, error) { | ||
if len(signatures) == 0 { | ||
return message, nil | ||
} | ||
|
||
aggregateSignature, err := bls.AggregateSignatures(signatures) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
bitSetSignature := &warp.BitSetSignature{ | ||
Signers: signerBitSet.Bytes(), | ||
Signature: [bls.SignatureLen]byte{}, | ||
} | ||
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) | ||
|
||
return warp.NewMessage(&message.UnsignedMessage, bitSetSignature) | ||
} | ||
|
||
type responseHandler struct { | ||
message *warp.Message | ||
nodeIDsToValidators map[ids.NodeID]indexedValidator | ||
results chan result | ||
} | ||
|
||
func (r *responseHandler) HandleResponse( | ||
_ context.Context, | ||
nodeID ids.NodeID, | ||
responseBytes []byte, | ||
err error, | ||
) { | ||
validator := r.nodeIDsToValidators[nodeID] | ||
if err != nil { | ||
r.results <- result{NodeID: nodeID, Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
response := &sdk.SignatureResponse{} | ||
if err := proto.Unmarshal(responseBytes, response); err != nil { | ||
r.results <- result{NodeID: nodeID, Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
signature, err := bls.SignatureFromBytes(response.Signature) | ||
if err != nil { | ||
r.results <- result{NodeID: nodeID, Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) { | ||
r.results <- result{NodeID: nodeID, Validator: validator, Err: errFailedVerification} | ||
return | ||
} | ||
|
||
r.results <- result{NodeID: nodeID, Validator: validator, Signature: signature} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[no action required] It looks like this is the only case that returns a non-nil message with
finished=false
. Could we simplify the function signature by removing that parameter and assuming the caller is aware of a cancelled context?