-
Notifications
You must be signed in to change notification settings - Fork 675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ACP-118 Aggregator #3394
base: master
Are you sure you want to change the base?
Changes from 34 commits
7c6d0a7
11ada0f
e650f5d
81c9487
0760fcd
b6cfd0e
dacd131
810fb55
6f6c0b5
9c2e9eb
6589763
c75e929
ce284e6
f7e2ca9
ef9e7ec
a8642a5
aa5da47
a44b653
26bb1a0
d2139cc
4868a99
c5aaebb
fecd189
3b23a22
420253d
4c4c380
738b0ce
455e03e
7d7c3c0
f989cf5
900279d
de7b65c
7c4fa96
9802ab2
8b6563a
4480b48
67f533e
2a9621b
3773575
356c36d
c279876
7bfe878
cd716bc
7752928
6fcc0aa
17f2d75
57506a4
048dca9
79f560b
e55212f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package acp118 | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"go.uber.org/zap" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/network/p2p" | ||
"github.com/ava-labs/avalanchego/proto/pb/sdk" | ||
"github.com/ava-labs/avalanchego/utils/crypto/bls" | ||
"github.com/ava-labs/avalanchego/utils/logging" | ||
"github.com/ava-labs/avalanchego/utils/set" | ||
"github.com/ava-labs/avalanchego/vms/platformvm/warp" | ||
) | ||
|
||
// ErrFailedAggregation is returned if it's not possible for us to | ||
// generate a signature | ||
var ( | ||
ErrFailedAggregation = errors.New("failed aggregation") | ||
errFailedVerification = errors.New("failed verification") | ||
) | ||
|
||
// Validator signs warp messages. NodeID must be unique across validators, but | ||
// PublicKey is not guaranteed to be unique. | ||
type Validator struct { | ||
NodeID ids.NodeID | ||
PublicKey *bls.PublicKey | ||
Weight uint64 | ||
} | ||
|
||
type indexedValidator struct { | ||
Validator | ||
Index int | ||
} | ||
|
||
type result struct { | ||
Validator indexedValidator | ||
Signature *bls.Signature | ||
Err error | ||
} | ||
|
||
// NewSignatureAggregator returns an instance of SignatureAggregator | ||
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator { | ||
return &SignatureAggregator{ | ||
log: log, | ||
client: client, | ||
} | ||
} | ||
|
||
// SignatureAggregator aggregates validator signatures for warp messages | ||
type SignatureAggregator struct { | ||
log logging.Logger | ||
client *p2p.Client | ||
} | ||
|
||
// AggregateSignatures blocks until quorumNum/quorumDen signatures from | ||
// validators are requested to be aggregated into a warp message or the context | ||
// is canceled. Returns the signed message and the amount of stake that signed | ||
// the message. Caller is responsible for providing a well-formed canonical | ||
// validator set corresponding to the signer bitset in the message. | ||
func (s *SignatureAggregator) AggregateSignatures( | ||
ctx context.Context, | ||
message *warp.Message, | ||
justification []byte, | ||
validators []Validator, | ||
quorumNum uint64, | ||
quorumDen uint64, | ||
) (*warp.Message, uint64, uint64, error) { | ||
request := &sdk.SignatureRequest{ | ||
Message: message.UnsignedMessage.Bytes(), | ||
Justification: justification, | ||
} | ||
|
||
requestBytes, err := proto.Marshal(request) | ||
if err != nil { | ||
return nil, 0, 0, fmt.Errorf("failed to marshal signature request: %w", err) | ||
} | ||
|
||
nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) | ||
// TODO expose concrete type to avoid type casting | ||
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature) | ||
if !ok { | ||
return nil, 0, 0, errors.New("invalid warp signature type") | ||
} | ||
|
||
var signerBitSet set.Bits | ||
if bitSetSignature.Signers != nil { | ||
signerBitSet = set.BitsFromBytes(bitSetSignature.Signers) | ||
} else { | ||
signerBitSet = set.NewBits() | ||
} | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
sampleable := make([]ids.NodeID, 0, len(validators)) | ||
aggregatedStakeWeight := uint64(0) | ||
totalStakeWeight := uint64(0) | ||
for i, v := range validators { | ||
totalStakeWeight += v.Weight | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Only try to aggregate signatures from validators that are not already in | ||
// the signer bit set | ||
if signerBitSet.Contains(i) { | ||
aggregatedStakeWeight += v.Weight | ||
continue | ||
} | ||
|
||
nodeIDsToValidator[v.NodeID] = indexedValidator{ | ||
Index: i, | ||
Validator: v, | ||
} | ||
sampleable = append(sampleable, v.NodeID) | ||
} | ||
|
||
signatures := make([]*bls.Signature, 0, len(sampleable)+1) | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if bitSetSignature.Signature != [bls.SignatureLen]byte{} { | ||
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:]) | ||
if err != nil { | ||
return nil, 0, 0, fmt.Errorf("failed to parse bls signature: %w", err) | ||
} | ||
signatures = append(signatures, blsSignature) | ||
} | ||
|
||
results := make(chan result) | ||
job := responseHandler{ | ||
message: message, | ||
nodeIDsToValidator: nodeIDsToValidator, | ||
results: results, | ||
} | ||
|
||
for _, nodeID := range sampleable { | ||
// Avoid var shadowing in goroutine | ||
nodeIDCopy := nodeID | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
go func() { | ||
if err := s.client.AppRequest(ctx, set.Of(nodeIDCopy), requestBytes, job.HandleResponse); err != nil { | ||
results <- result{Validator: nodeIDsToValidator[nodeIDCopy], Err: err} | ||
return | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}() | ||
} | ||
|
||
failedStakeWeight := uint64(0) | ||
minThreshold := (totalStakeWeight * quorumNum) / quorumDen | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this match https://github.com/ava-labs/avalanchego/blob/master/vms/platformvm/warp/signature.go#L150 exactly?
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
if len(signatures) == 0 { | ||
return message, 0, totalStakeWeight, nil | ||
} | ||
|
||
// Try to return whatever progress we have if the context is cancelled | ||
msg, err := newWarpMessage(message, signerBitSet, signatures) | ||
if err != nil { | ||
return nil, 0, 0, err | ||
} | ||
joshua-kim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return msg, aggregatedStakeWeight, totalStakeWeight, nil | ||
case result := <-results: | ||
if result.Err != nil { | ||
s.log.Debug( | ||
"dropping response", | ||
zap.Stringer("nodeID", result.Validator.NodeID), | ||
zap.Uint64("weight", result.Validator.Weight), | ||
zap.Error(err), | ||
) | ||
|
||
// Fast-fail if it's not possible to generate a signature that meets the | ||
// minimum threshold | ||
failedStakeWeight += result.Validator.Weight | ||
if totalStakeWeight-failedStakeWeight < minThreshold { | ||
return nil, 0, 0, ErrFailedAggregation | ||
} | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work with hypersdk's expected usage here? I thought that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it makes more sense for us to change the behavior so that this api blocks until all responses come back, or we reach the provided |
||
} | ||
|
||
signatures = append(signatures, result.Signature) | ||
signerBitSet.Add(result.Validator.Index) | ||
aggregatedStakeWeight += result.Validator.Weight | ||
|
||
if aggregatedStakeWeight >= minThreshold { | ||
msg, err := newWarpMessage(message, signerBitSet, signatures) | ||
if err != nil { | ||
return nil, 0, 0, err | ||
} | ||
|
||
return msg, aggregatedStakeWeight, totalStakeWeight, nil | ||
} | ||
} | ||
} | ||
} | ||
|
||
func newWarpMessage( | ||
message *warp.Message, | ||
signerBitSet set.Bits, | ||
signatures []*bls.Signature, | ||
) (*warp.Message, error) { | ||
aggregateSignature, err := bls.AggregateSignatures(signatures) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
bitSetSignature := &warp.BitSetSignature{ | ||
Signers: signerBitSet.Bytes(), | ||
Signature: [bls.SignatureLen]byte{}, | ||
} | ||
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) | ||
|
||
return warp.NewMessage(&message.UnsignedMessage, bitSetSignature) | ||
} | ||
|
||
type responseHandler struct { | ||
message *warp.Message | ||
nodeIDsToValidator map[ids.NodeID]indexedValidator | ||
results chan result | ||
} | ||
|
||
func (r *responseHandler) HandleResponse( | ||
_ context.Context, | ||
nodeID ids.NodeID, | ||
responseBytes []byte, | ||
err error, | ||
) { | ||
validator := r.nodeIDsToValidator[nodeID] | ||
|
||
if err != nil { | ||
r.results <- result{Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
response := &sdk.SignatureResponse{} | ||
if err := proto.Unmarshal(responseBytes, response); err != nil { | ||
r.results <- result{Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
signature, err := bls.SignatureFromBytes(response.Signature) | ||
if err != nil { | ||
r.results <- result{Validator: validator, Err: err} | ||
return | ||
} | ||
|
||
if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) { | ||
r.results <- result{Validator: validator, Err: errFailedVerification} | ||
return | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
|
||
r.results <- result{Validator: validator, Signature: signature} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this correctly handles the case that BLS public keys are shared across validators.
In Warp, only 1 signature is ever allowed from a BLS key in a warp message. If different nodeIDs have the same BLS key, their weights are aggregated for the BLS key's index