From 451e62788d7a6510bd166f52c9d1965d9d9663cd Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Fri, 1 Mar 2024 15:26:37 -0800 Subject: [PATCH] send only blobs relevant to operators --- disperser/batcher/grpc/dispatcher.go | 29 ++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 0b7ff97a46..7004498727 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -46,9 +46,12 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, update chan core.SignerMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { - blobMessages := make([]*core.BlobMessage, len(blobs)) - for i, blob := range blobs { - blobMessages[i] = blob[id] + blobMessages := make([]*core.BlobMessage, 0) + for _, blob := range blobs { + // only include messages assigned to the operator + if msg, ok := blob[id]; ok { + blobMessages = append(blobMessages, msg) + } } sig, err := c.sendChunks(ctx, blobMessages, header, &op) if err != nil { @@ -89,7 +92,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, gc := node.NewDispersalClient(conn) ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() - request, totalSize, err := GetStoreChunksRequest(blobs, header) if err != nil { return nil, err @@ -153,15 +155,18 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { lengthProofData.YA1 = blob.BlobHeader.LengthProof.Y.A1.Marshal() } - quorumHeaders := make([]*node.BlobQuorumInfo, len(blob.BlobHeader.QuorumInfos)) + quorumHeaders := make([]*node.BlobQuorumInfo, 0) - for i, header := range blob.BlobHeader.QuorumInfos { - quorumHeaders[i] = &node.BlobQuorumInfo{ - QuorumId: uint32(header.QuorumID), - AdversaryThreshold: uint32(header.AdversaryThreshold), - ChunkLength: uint32(header.ChunkLength), - QuorumThreshold: uint32(header.QuorumThreshold), - Ratelimit: header.QuorumRate, + for _, header := range blob.BlobHeader.QuorumInfos { + if _, ok := blob.Bundles[header.QuorumID]; ok { + // only construct quorumHeaders for quorums in bundles + quorumHeaders = append(quorumHeaders, &node.BlobQuorumInfo{ + QuorumId: uint32(header.QuorumID), + AdversaryThreshold: uint32(header.AdversaryThreshold), + ChunkLength: uint32(header.ChunkLength), + QuorumThreshold: uint32(header.QuorumThreshold), + Ratelimit: header.QuorumRate, + }) } }