Skip to content

Commit

Permalink
Merge pull request #30 from datachainlab/fix-message-aggregation
Browse files Browse the repository at this point in the history
Fix message aggregation

Signed-off-by: Jun Kimura <[email protected]>
  • Loading branch information
bluele authored Jul 16, 2024
2 parents 749a997 + 1bf2f75 commit 586027e
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 15 deletions.
45 changes: 30 additions & 15 deletions relay/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (pr *Prover) SetupHeadersForUpdate(dstChain core.FinalityAwareChain, latest
// NOTE: assume that the messages length and the signatures length are the same
if pr.config.MessageAggregation {
pr.getLogger().Info("aggregate messages", "num_messages", len(messages))
update, err := pr.aggregateMessages(messages, signatures, pr.activeEnclaveKey.EnclaveKeyAddress)
update, err := aggregateMessages(pr.getLogger(), pr.config.GetMessageAggregationBatchSize(), pr.lcpServiceClient.AggregateMessages, messages, signatures, pr.activeEnclaveKey.EnclaveKeyAddress)
if err != nil {
return nil, err
}
Expand All @@ -210,14 +210,23 @@ func (pr *Prover) SetupHeadersForUpdate(dstChain core.FinalityAwareChain, latest
return updates, nil
}

func (pr *Prover) aggregateMessages(messages [][]byte, signatures [][]byte, signer []byte) (*lcptypes.UpdateClientMessage, error) {
type MessageAggregator func(ctx context.Context, in *elc.MsgAggregateMessages, opts ...grpc.CallOption) (*elc.MsgAggregateMessagesResponse, error)

func aggregateMessages(
logger *log.RelayLogger,
batchSize uint64,
messageAggregator MessageAggregator,
messages [][]byte,
signatures [][]byte,
signer []byte,
) (*lcptypes.UpdateClientMessage, error) {
if len(messages) == 0 {
return nil, fmt.Errorf("aggregateMessages: messages must not be empty")
} else if len(messages) != len(signatures) {
return nil, fmt.Errorf("aggregateMessages: messages and signatures must have the same length: messages=%v signatures=%v", len(messages), len(signatures))
}
for {
batches, err := splitIntoMultiBatch(messages, signatures, signer, pr.config.GetMessageAggregationBatchSize())
batches, err := splitIntoMultiBatch(messages, signatures, signer, batchSize)
if err != nil {
return nil, err
}
Expand All @@ -235,7 +244,7 @@ func (pr *Prover) aggregateMessages(messages [][]byte, signatures [][]byte, sign
Messages: batches[0].Messages,
Signatures: batches[0].Signatures,
}
resp, err := pr.lcpServiceClient.AggregateMessages(context.TODO(), &m)
resp, err := messageAggregator(context.TODO(), &m)
if err != nil {
return nil, fmt.Errorf("failed to aggregate messages: msg=%v %w", m, err)
}
Expand All @@ -247,22 +256,28 @@ func (pr *Prover) aggregateMessages(messages [][]byte, signatures [][]byte, sign
} else if n == 0 {
return nil, fmt.Errorf("unexpected error: batches must not be empty")
} else {
pr.getLogger().Info("aggregateMessages", "num_batches", n)
logger.Info("aggregateMessages", "num_batches", n)
}
messages = nil
signatures = nil
for i, b := range batches {
m := elc.MsgAggregateMessages{
Signer: b.Signer,
Messages: b.Messages,
Signatures: b.Signatures,
}
resp, err := pr.lcpServiceClient.AggregateMessages(context.TODO(), &m)
if err != nil {
return nil, fmt.Errorf("failed to aggregate messages: i=%v msg=%v %w", i, m, err)
logger.Info("aggregateMessages", "batch_index", i, "num_messages", len(b.Messages))
if len(b.Messages) == 1 {
messages = append(messages, b.Messages[0])
signatures = append(signatures, b.Signatures[0])
} else {
m := elc.MsgAggregateMessages{
Signer: b.Signer,
Messages: b.Messages,
Signatures: b.Signatures,
}
resp, err := messageAggregator(context.TODO(), &m)
if err != nil {
return nil, fmt.Errorf("failed to aggregate messages: batch_index=%v msg=%v %w", i, m, err)
}
messages = append(messages, resp.Message)
signatures = append(signatures, resp.Signature)
}
messages = append(messages, resp.Message)
signatures = append(signatures, resp.Signature)
}
}
}
Expand Down
216 changes: 216 additions & 0 deletions relay/prover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package relay

import (
"context"
"fmt"
"testing"

"github.com/datachainlab/lcp-go/relay/elc"
"github.com/hyperledger-labs/yui-relayer/log"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func TestSplitIntoMultiBatch(t *testing.T) {
var M = func(n uint8) []byte {
return []byte(fmt.Sprintf("message-%03d", n))
}
var S = func(n uint8) []byte {
return []byte(fmt.Sprintf("signature-%03d", n))
}
var Signer = func(n uint8) []byte {
return []byte(fmt.Sprintf("signer-%03d", n))
}
var cases = []struct {
Messages [][]byte
Signatures [][]byte
BatchSizes []int
Signer []byte
MessageBatchSize uint64
Error bool
}{
// Messages.len = 1 is invalid
{
Messages: [][]byte{M(0)},
Signatures: [][]byte{S(0)},
BatchSizes: []int{1},
Signer: Signer(0),
MessageBatchSize: 1,
Error: true,
},
{
Messages: [][]byte{M(0), M(1)},
Signatures: [][]byte{S(0), S(1)},
BatchSizes: []int{2},
Signer: Signer(0),
MessageBatchSize: 2,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2)},
Signatures: [][]byte{S(0), S(1), S(2)},
BatchSizes: []int{3},
Signer: Signer(0),
MessageBatchSize: 3,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2)},
Signatures: [][]byte{S(0), S(1), S(2)},
BatchSizes: []int{2, 1},
Signer: Signer(0),
MessageBatchSize: 2,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2), M(3)},
Signatures: [][]byte{S(0), S(1), S(2), S(3)},
BatchSizes: []int{3, 1},
Signer: Signer(0),
MessageBatchSize: 3,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2), M(3), M(4)},
Signatures: [][]byte{S(0), S(1), S(2), S(3), S(4)},
BatchSizes: []int{3, 2},
Signer: Signer(0),
MessageBatchSize: 3,
Error: false,
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
require := require.New(t)
batches, err := splitIntoMultiBatch(c.Messages, c.Signatures, c.Signer, c.MessageBatchSize)
if c.Error {
require.Error(err)
return
} else {
require.NoError(err)
}
require.Len(batches, len(c.BatchSizes))
for i, size := range c.BatchSizes {
require.Equal(batches[i].Signer, c.Signer)
require.Len(batches[i].Messages, size)
require.Len(batches[i].Signatures, size)
}
})
}
}

func TestAggregateMessages(t *testing.T) {
var M = func(n uint8) []byte {
return []byte(fmt.Sprintf("message-%03d", n))
}
var S = func(n uint8) []byte {
return []byte(fmt.Sprintf("signature-%03d", n))
}
var Signer = func(n uint8) []byte {
return []byte(fmt.Sprintf("signer-%03d", n))
}

err := log.InitLogger("DEBUG", "text", "stdout")
require.NoError(t, err)
logger := log.GetLogger()

var cases = []struct {
Messages [][]byte
Signatures [][]byte
Signer []byte
BatchSize uint64
Error bool
}{
// Messages.len = 0 is invalid
{
Messages: [][]byte{},
Signatures: [][]byte{},
Signer: Signer(0),
BatchSize: 2,
Error: true,
},
{
Messages: [][]byte{M(0)},
Signatures: [][]byte{S(0)},
Signer: Signer(0),
BatchSize: 2,
Error: false,
},
{
Messages: [][]byte{M(0), M(1)},
Signatures: [][]byte{S(0), S(1)},
Signer: Signer(0),
BatchSize: 2,
Error: false,
},
// BatchSize = 1 is invalid
{
Messages: [][]byte{M(0), M(1)},
Signatures: [][]byte{S(0), S(1)},
Signer: Signer(0),
BatchSize: 1,
Error: true,
},
{
Messages: [][]byte{M(0), M(1), M(2)},
Signatures: [][]byte{S(0), S(1), S(2)},
Signer: Signer(0),
BatchSize: 2,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2), M(3)},
Signatures: [][]byte{S(0), S(1), S(2), S(3)},
Signer: Signer(0),
BatchSize: 2,
Error: false,
},
{
Messages: [][]byte{M(0), M(1), M(2), M(3), M(4)},
Signatures: [][]byte{S(0), S(1), S(2), S(3), S(4)},
Signer: Signer(0),
BatchSize: 2,
Error: false,
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
require := require.New(t)
res, err := aggregateMessages(logger, c.BatchSize, mockMessageAggregator, c.Messages, c.Signatures, c.Signer)
if c.Error {
require.Error(err)
return
} else {
require.NoError(err)
}
require.Equal(res.ProxyMessage, concatBytes(c.Messages))
require.Equal(res.Signatures[0], concatBytes(c.Signatures))
})
}
}

func concatBytes(bzs [][]byte) []byte {
var res []byte
for _, b := range bzs {
res = append(res, b...)
}
return res
}

func mockMessageAggregator(_ context.Context, in *elc.MsgAggregateMessages, _ ...grpc.CallOption) (*elc.MsgAggregateMessagesResponse, error) {
var res elc.MsgAggregateMessagesResponse
if len(in.Messages) != len(in.Signatures) {
return nil, fmt.Errorf("messages and signatures must have the same length")
}
if len(in.Messages) == 0 {
return nil, fmt.Errorf("messages.len = 0 is invalid")
} else if len(in.Messages) == 1 {
return nil, fmt.Errorf("messages.len = 1 is invalid")
}
for i := 0; i < len(in.Messages); i++ {
res.Message = append(res.Message, in.Messages[i]...)
res.Signature = append(res.Signature, in.Signatures[i]...)
}
return &res, nil
}

0 comments on commit 586027e

Please sign in to comment.