Skip to content

Commit

Permalink
send only blobs relevant to operators
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 1, 2024
1 parent 313bd6c commit 451e627
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera
func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, update chan core.SignerMessage) {
for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
blobMessages := make([]*core.BlobMessage, len(blobs))
for i, blob := range blobs {
blobMessages[i] = blob[id]
blobMessages := make([]*core.BlobMessage, 0)
for _, blob := range blobs {
// only include messages assigned to the operator
if msg, ok := blob[id]; ok {
blobMessages = append(blobMessages, msg)
}
}
sig, err := c.sendChunks(ctx, blobMessages, header, &op)
if err != nil {
Expand Down Expand Up @@ -89,7 +92,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
gc := node.NewDispersalClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()

request, totalSize, err := GetStoreChunksRequest(blobs, header)
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,15 +155,18 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
lengthProofData.YA1 = blob.BlobHeader.LengthProof.Y.A1.Marshal()
}

quorumHeaders := make([]*node.BlobQuorumInfo, len(blob.BlobHeader.QuorumInfos))
quorumHeaders := make([]*node.BlobQuorumInfo, 0)

for i, header := range blob.BlobHeader.QuorumInfos {
quorumHeaders[i] = &node.BlobQuorumInfo{
QuorumId: uint32(header.QuorumID),
AdversaryThreshold: uint32(header.AdversaryThreshold),
ChunkLength: uint32(header.ChunkLength),
QuorumThreshold: uint32(header.QuorumThreshold),
Ratelimit: header.QuorumRate,
for _, header := range blob.BlobHeader.QuorumInfos {
if _, ok := blob.Bundles[header.QuorumID]; ok {
// only construct quorumHeaders for quorums in bundles
quorumHeaders = append(quorumHeaders, &node.BlobQuorumInfo{
QuorumId: uint32(header.QuorumID),
AdversaryThreshold: uint32(header.AdversaryThreshold),
ChunkLength: uint32(header.ChunkLength),
QuorumThreshold: uint32(header.QuorumThreshold),
Ratelimit: header.QuorumRate,
})
}
}

Expand Down

0 comments on commit 451e627

Please sign in to comment.