Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ACP-118 Aggregator #3394

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
7c6d0a7
implement acp-118 signature aggregation
joshua-kim Jun 11, 2024
11ada0f
undo
joshua-kim Oct 22, 2024
e650f5d
nit
joshua-kim Oct 22, 2024
81c9487
nit
joshua-kim Nov 12, 2024
0760fcd
improve usage of context
joshua-kim Nov 12, 2024
b6cfd0e
improve doc
joshua-kim Nov 12, 2024
dacd131
nit
joshua-kim Nov 12, 2024
810fb55
rename i -> index
joshua-kim Nov 13, 2024
6f6c0b5
nit
joshua-kim Nov 14, 2024
9c2e9eb
nit
joshua-kim Nov 18, 2024
6589763
nit
joshua-kim Nov 18, 2024
c75e929
int
joshua-kim Nov 19, 2024
ce284e6
nit
joshua-kim Nov 19, 2024
f7e2ca9
nit
joshua-kim Nov 19, 2024
ef9e7ec
nit
joshua-kim Nov 19, 2024
a8642a5
nit
joshua-kim Nov 19, 2024
aa5da47
nit
joshua-kim Nov 19, 2024
a44b653
nit
joshua-kim Nov 19, 2024
26bb1a0
nit
joshua-kim Nov 19, 2024
d2139cc
nit
joshua-kim Nov 19, 2024
4868a99
nit
joshua-kim Nov 19, 2024
c5aaebb
nit
joshua-kim Nov 19, 2024
fecd189
nit
joshua-kim Nov 19, 2024
3b23a22
nit
joshua-kim Nov 19, 2024
420253d
nit
joshua-kim Nov 19, 2024
4c4c380
nit
joshua-kim Nov 19, 2024
738b0ce
nit
joshua-kim Nov 19, 2024
455e03e
nit
joshua-kim Nov 20, 2024
7d7c3c0
nit
joshua-kim Nov 20, 2024
f989cf5
nit
joshua-kim Nov 20, 2024
900279d
nit
joshua-kim Nov 20, 2024
de7b65c
nit
joshua-kim Nov 20, 2024
7c4fa96
nit
joshua-kim Nov 20, 2024
9802ab2
fix tests
joshua-kim Nov 20, 2024
8b6563a
nit
joshua-kim Nov 20, 2024
4480b48
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
67f533e
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
2a9621b
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
3773575
move nil signatures check
joshua-kim Nov 21, 2024
356c36d
nit
joshua-kim Nov 26, 2024
c279876
nti
joshua-kim Nov 26, 2024
7bfe878
wip
joshua-kim Nov 21, 2024
cd716bc
nit
joshua-kim Nov 27, 2024
7752928
nit
joshua-kim Nov 27, 2024
6fcc0aa
nti
joshua-kim Dec 3, 2024
17f2d75
nit
joshua-kim Dec 3, 2024
57506a4
nit
joshua-kim Dec 3, 2024
048dca9
nit
joshua-kim Dec 3, 2024
79f560b
nit
joshua-kim Dec 3, 2024
e55212f
nit
joshua-kim Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions network/p2p/acp118/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package acp118

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
)

// ErrFailedAggregation is returned if it's not possible for us to
// generate a signature
var (
ErrFailedAggregation = errors.New("failed aggregation")
errFailedVerification = errors.New("failed verification")
)

// Validator signs warp messages. NodeID must be unique across validators, but
// PublicKey is not guaranteed to be unique.
type Validator struct {
NodeID ids.NodeID
PublicKey *bls.PublicKey
Weight uint64
}

type indexedValidator struct {
Validator
Index int
}

type result struct {
Validator indexedValidator
Signature *bls.Signature
Err error
}

// NewSignatureAggregator returns an instance of SignatureAggregator
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator {
return &SignatureAggregator{
log: log,
client: client,
}
}

// SignatureAggregator aggregates validator signatures for warp messages
type SignatureAggregator struct {
log logging.Logger
client *p2p.Client
}

// AggregateSignatures blocks until quorumNum/quorumDen signatures from
// validators are requested to be aggregated into a warp message or the context
// is canceled. Returns the signed message and the amount of stake that signed
// the message. Caller is responsible for providing a well-formed canonical
// validator set corresponding to the signer bitset in the message.
func (s *SignatureAggregator) AggregateSignatures(
ctx context.Context,
message *warp.Message,
justification []byte,
validators []Validator,
quorumNum uint64,
quorumDen uint64,
) (*warp.Message, uint64, uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this correctly handles the case that BLS public keys are shared across validators.

In Warp, only 1 signature is ever allowed from a BLS key in a warp message. If different nodeIDs have the same BLS key, their weights are aggregated for the BLS key's index

request := &sdk.SignatureRequest{
Message: message.UnsignedMessage.Bytes(),
Justification: justification,
}

requestBytes, err := proto.Marshal(request)
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to marshal signature request: %w", err)
}

nodeIDsToValidator := make(map[ids.NodeID]indexedValidator)
// TODO expose concrete type to avoid type casting
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature)
if !ok {
return nil, 0, 0, errors.New("invalid warp signature type")
}

var signerBitSet set.Bits
if bitSetSignature.Signers != nil {
signerBitSet = set.BitsFromBytes(bitSetSignature.Signers)
} else {
signerBitSet = set.NewBits()
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

sampleable := make([]ids.NodeID, 0, len(validators))
aggregatedStakeWeight := uint64(0)
totalStakeWeight := uint64(0)
for i, v := range validators {
totalStakeWeight += v.Weight
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

// Only try to aggregate signatures from validators that are not already in
// the signer bit set
if signerBitSet.Contains(i) {
aggregatedStakeWeight += v.Weight
continue
}

nodeIDsToValidator[v.NodeID] = indexedValidator{
Index: i,
Validator: v,
}
sampleable = append(sampleable, v.NodeID)
}

signatures := make([]*bls.Signature, 0, len(sampleable)+1)
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
if bitSetSignature.Signature != [bls.SignatureLen]byte{} {
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:])
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to parse bls signature: %w", err)
}
signatures = append(signatures, blsSignature)
}

results := make(chan result)
job := responseHandler{
message: message,
nodeIDsToValidator: nodeIDsToValidator,
results: results,
}

for _, nodeID := range sampleable {
// Avoid var shadowing in goroutine
nodeIDCopy := nodeID
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
go func() {
if err := s.client.AppRequest(ctx, set.Of(nodeIDCopy), requestBytes, job.HandleResponse); err != nil {
results <- result{Validator: nodeIDsToValidator[nodeIDCopy], Err: err}
return
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}

failedStakeWeight := uint64(0)
minThreshold := (totalStakeWeight * quorumNum) / quorumDen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-ctx.Done():
if len(signatures) == 0 {
return message, 0, totalStakeWeight, nil
}

// Try to return whatever progress we have if the context is cancelled
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, 0, 0, err
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

return msg, aggregatedStakeWeight, totalStakeWeight, nil
case result := <-results:
if result.Err != nil {
s.log.Debug(
"dropping response",
zap.Stringer("nodeID", result.Validator.NodeID),
zap.Uint64("weight", result.Validator.Weight),
zap.Error(err),
)

// Fast-fail if it's not possible to generate a signature that meets the
// minimum threshold
failedStakeWeight += result.Validator.Weight
if totalStakeWeight-failedStakeWeight < minThreshold {
return nil, 0, 0, ErrFailedAggregation
}
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with hypersdk's expected usage here? I thought that num/dem were going to be the maximum weights it would wait for, but that the minimum would be lower than that (meaning that if we are passing in the max here, we could be terminating when we realize we can't get the maximum... But we actually could have gotten the number that hypersdk wanted).

Copy link
Contributor Author

@joshua-kim joshua-kim Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes more sense for us to change the behavior so that this api blocks until all responses come back, or we reach the provided num/den threshold instead of failing instead.

}

signatures = append(signatures, result.Signature)
signerBitSet.Add(result.Validator.Index)
aggregatedStakeWeight += result.Validator.Weight

if aggregatedStakeWeight >= minThreshold {
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, 0, 0, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, nil
}
}
}
}

func newWarpMessage(
message *warp.Message,
signerBitSet set.Bits,
signatures []*bls.Signature,
) (*warp.Message, error) {
aggregateSignature, err := bls.AggregateSignatures(signatures)
if err != nil {
return nil, err
}

bitSetSignature := &warp.BitSetSignature{
Signers: signerBitSet.Bytes(),
Signature: [bls.SignatureLen]byte{},
}
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature))

return warp.NewMessage(&message.UnsignedMessage, bitSetSignature)
}

type responseHandler struct {
message *warp.Message
nodeIDsToValidator map[ids.NodeID]indexedValidator
results chan result
}

func (r *responseHandler) HandleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
validator := r.nodeIDsToValidator[nodeID]

if err != nil {
r.results <- result{Validator: validator, Err: err}
return
}

response := &sdk.SignatureResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
r.results <- result{Validator: validator, Err: err}
return
}

signature, err := bls.SignatureFromBytes(response.Signature)
if err != nil {
r.results <- result{Validator: validator, Err: err}
return
}

if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) {
r.results <- result{Validator: validator, Err: errFailedVerification}
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


r.results <- result{Validator: validator, Signature: signature}
}
Loading
Loading