Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ACP-118 Aggregator #3394

Open
wants to merge 62 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
7c6d0a7
implement acp-118 signature aggregation
joshua-kim Jun 11, 2024
11ada0f
undo
joshua-kim Oct 22, 2024
e650f5d
nit
joshua-kim Oct 22, 2024
81c9487
nit
joshua-kim Nov 12, 2024
0760fcd
improve usage of context
joshua-kim Nov 12, 2024
b6cfd0e
improve doc
joshua-kim Nov 12, 2024
dacd131
nit
joshua-kim Nov 12, 2024
810fb55
rename i -> index
joshua-kim Nov 13, 2024
6f6c0b5
nit
joshua-kim Nov 14, 2024
9c2e9eb
nit
joshua-kim Nov 18, 2024
6589763
nit
joshua-kim Nov 18, 2024
c75e929
int
joshua-kim Nov 19, 2024
ce284e6
nit
joshua-kim Nov 19, 2024
f7e2ca9
nit
joshua-kim Nov 19, 2024
ef9e7ec
nit
joshua-kim Nov 19, 2024
a8642a5
nit
joshua-kim Nov 19, 2024
aa5da47
nit
joshua-kim Nov 19, 2024
a44b653
nit
joshua-kim Nov 19, 2024
26bb1a0
nit
joshua-kim Nov 19, 2024
d2139cc
nit
joshua-kim Nov 19, 2024
4868a99
nit
joshua-kim Nov 19, 2024
c5aaebb
nit
joshua-kim Nov 19, 2024
fecd189
nit
joshua-kim Nov 19, 2024
3b23a22
nit
joshua-kim Nov 19, 2024
420253d
nit
joshua-kim Nov 19, 2024
4c4c380
nit
joshua-kim Nov 19, 2024
738b0ce
nit
joshua-kim Nov 19, 2024
455e03e
nit
joshua-kim Nov 20, 2024
7d7c3c0
nit
joshua-kim Nov 20, 2024
f989cf5
nit
joshua-kim Nov 20, 2024
900279d
nit
joshua-kim Nov 20, 2024
de7b65c
nit
joshua-kim Nov 20, 2024
7c4fa96
nit
joshua-kim Nov 20, 2024
9802ab2
fix tests
joshua-kim Nov 20, 2024
8b6563a
nit
joshua-kim Nov 20, 2024
4480b48
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
67f533e
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
2a9621b
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
3773575
move nil signatures check
joshua-kim Nov 21, 2024
356c36d
nit
joshua-kim Nov 26, 2024
c279876
nti
joshua-kim Nov 26, 2024
7bfe878
wip
joshua-kim Nov 21, 2024
cd716bc
nit
joshua-kim Nov 27, 2024
7752928
nit
joshua-kim Nov 27, 2024
6fcc0aa
nti
joshua-kim Dec 3, 2024
17f2d75
nit
joshua-kim Dec 3, 2024
57506a4
nit
joshua-kim Dec 3, 2024
048dca9
nit
joshua-kim Dec 3, 2024
79f560b
nit
joshua-kim Dec 3, 2024
e55212f
nit
joshua-kim Dec 3, 2024
835de57
wip
joshua-kim Dec 3, 2024
39f8ca4
add better tests
joshua-kim Dec 5, 2024
f5e05b9
nit
joshua-kim Dec 5, 2024
b598c39
nit
joshua-kim Dec 5, 2024
ba022ca
nit
joshua-kim Dec 5, 2024
e308117
fix flake
joshua-kim Dec 5, 2024
b870220
remove copy for shadowing
joshua-kim Dec 5, 2024
e6959b1
use send config
joshua-kim Dec 5, 2024
bab3a65
remove unused testing fields
joshua-kim Dec 5, 2024
f6d20e8
nit
joshua-kim Dec 6, 2024
3e074e6
wip
joshua-kim Dec 6, 2024
8c88acf
nit
joshua-kim Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions network/p2p/acp118/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package acp118

import (
"context"
"errors"
"fmt"
"math/big"

"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
)

var errFailedVerification = errors.New("failed verification")

type indexedValidator struct {
*warp.Validator
Index int
}

type result struct {
NodeID ids.NodeID
Validator indexedValidator
Signature *bls.Signature
Err error
}

// NewSignatureAggregator returns an instance of SignatureAggregator
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator {
return &SignatureAggregator{
log: log,
client: client,
}
}

// SignatureAggregator aggregates validator signatures for warp messages
type SignatureAggregator struct {
log logging.Logger
client *p2p.Client
}

// AggregateSignatures blocks until quorumNum/quorumDen signatures from
// validators are requested to be aggregated into a warp message or the context
// is canceled. Returns the signed message and the amount of stake that signed
// the message. Caller is responsible for providing a well-formed canonical
// validator set corresponding to the signer bitset in the message.
func (s *SignatureAggregator) AggregateSignatures(
ctx context.Context,
message *warp.Message,
justification []byte,
validators []*warp.Validator,
quorumNum uint64,
quorumDen uint64,
) (
_ *warp.Message,
aggregatedStake *big.Int,
totalStake *big.Int,
finished bool,
_ error,
) {
request := &sdk.SignatureRequest{
Message: message.UnsignedMessage.Bytes(),
Justification: justification,
}

requestBytes, err := proto.Marshal(request)
if err != nil {
return nil, nil, nil, false, fmt.Errorf("failed to marshal signature request: %w", err)
}

nodeIDsToValidator := make(map[ids.NodeID]indexedValidator)
// TODO expose concrete type to avoid type casting
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature)
if !ok {
return nil, nil, nil, false, errors.New("invalid warp signature type")
}

signerBitSet := set.BitsFromBytes(bitSetSignature.Signers)

nonSigners := make([]ids.NodeID, 0, len(validators))
aggregatedStakeWeight := new(big.Int)
totalStakeWeight := new(big.Int)
for i, validator := range validators {
totalStakeWeight.Add(totalStakeWeight, new(big.Int).SetUint64(validator.Weight))

// Only try to aggregate signatures from validators that are not already in
// the signer bit set
if signerBitSet.Contains(i) {
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(validator.Weight))
continue
}

v := indexedValidator{
Index: i,
Validator: validator,
}

for _, nodeID := range v.NodeIDs {
nodeIDsToValidator[nodeID] = v
}

nonSigners = append(nonSigners, v.NodeIDs...)
}

// Account for requested signatures + the signature that was provided
signatures := make([]*bls.Signature, 0, len(nonSigners)+1)
if bitSetSignature.Signature != [bls.SignatureLen]byte{} {
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:])
if err != nil {
return nil, nil, nil, false, fmt.Errorf("failed to parse bls signature: %w", err)
}
signatures = append(signatures, blsSignature)
}

results := make(chan result)
handler := responseHandler{
message: message,
nodeIDsToValidators: nodeIDsToValidator,
results: results,
}

if err := s.client.AppRequest(ctx, set.Of(nonSigners...), requestBytes, handler.HandleResponse); err != nil {
return nil, nil, nil, false, fmt.Errorf("failed to send aggregation request: %w", err)
}

minThreshold := new(big.Int).Mul(totalStakeWeight, new(big.Int).SetUint64(quorumNum))
minThreshold.Div(minThreshold, new(big.Int).SetUint64(quorumDen))

// Block until:
// 1. The context is cancelled
// 2. We get responses from all validators
// 3. The specified security threshold is reached
for i := 0; i < len(nonSigners); i++ {
select {
case <-ctx.Done():
// Try to return whatever progress we have if the context is cancelled
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[no action required] It looks like this is the only case that returns a non-nil message with finished=false. Could we simplify the function signature by removing that parameter and assuming the caller is aware of a cancelled context?

case result := <-results:
if result.Err != nil {
s.log.Debug(
"dropping response",
zap.Stringer("nodeID", result.NodeID),
zap.Error(err),
)
continue
}

// Validators may share public keys so drop any duplicate signatures
if signerBitSet.Contains(result.Validator.Index) {
s.log.Debug(
"dropping duplicate signature",
zap.Stringer("nodeID", result.NodeID),
zap.Error(err),
)
continue
}

signatures = append(signatures, result.Signature)
signerBitSet.Add(result.Validator.Index)
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(result.Validator.Weight))

if aggregatedStakeWeight.Cmp(minThreshold) != -1 {
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, true, nil
}
}
}

msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, true, nil
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

func newWarpMessage(
message *warp.Message,
signerBitSet set.Bits,
signatures []*bls.Signature,
) (*warp.Message, error) {
if len(signatures) == 0 {
return message, nil
}

aggregateSignature, err := bls.AggregateSignatures(signatures)
if err != nil {
return nil, err
}

bitSetSignature := &warp.BitSetSignature{
Signers: signerBitSet.Bytes(),
Signature: [bls.SignatureLen]byte{},
}
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature))

return warp.NewMessage(&message.UnsignedMessage, bitSetSignature)
}

type responseHandler struct {
message *warp.Message
nodeIDsToValidators map[ids.NodeID]indexedValidator
results chan result
}

func (r *responseHandler) HandleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
validator := r.nodeIDsToValidators[nodeID]
if err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

response := &sdk.SignatureResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

signature, err := bls.SignatureFromBytes(response.Signature)
if err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) {
r.results <- result{NodeID: nodeID, Validator: validator, Err: errFailedVerification}
return
}

r.results <- result{NodeID: nodeID, Validator: validator, Signature: signature}
}
Loading
Loading