diff --git a/disperser/batcher/finalizer.go b/disperser/batcher/finalizer.go index 77891fa9fd..37ca5d719f 100644 --- a/disperser/batcher/finalizer.go +++ b/disperser/batcher/finalizer.go @@ -100,10 +100,10 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error { if err != nil { return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err) } - metadatas := metadatas - f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", lastFinalBlock) + metas := metadatas + f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metas), "finalizedBlockNumber", lastFinalBlock) pool.Submit(func() { - f.updateBlobs(ctx, metadatas, lastFinalBlock) + f.updateBlobs(ctx, metas, lastFinalBlock) }) totalProcessed += len(metadatas) diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 880f85c00b..207becd77b 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "sort" "strconv" + "sync" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" @@ -13,6 +14,7 @@ import ( // BlobStore is an in-memory implementation of the BlobStore interface type BlobStore struct { + mu sync.RWMutex Blobs map[disperser.BlobHash]*BlobHolder Metadata map[disperser.BlobKey]*disperser.BlobMetadata } @@ -33,6 +35,8 @@ func NewBlobStore() disperser.BlobStore { } func (q *BlobStore) StoreBlob(ctx context.Context, blob *core.Blob, requestedAt uint64) (disperser.BlobKey, error) { + q.mu.Lock() + defer q.mu.Unlock() blobKey := disperser.BlobKey{} // Generate the blob key blobHash, err := q.getNewBlobHash() @@ -63,6 +67,8 @@ func (q *BlobStore) StoreBlob(ctx context.Context, blob *core.Blob, requestedAt } func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobHash) ([]byte, error) { + q.mu.RLock() + defer q.mu.RUnlock() if holder, ok := q.Blobs[blobHash]; ok { return holder.Data, nil } else { @@ -71,6 +77,8 @@ func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobH } func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) { + q.mu.Lock() + defer q.mu.Unlock() // TODO (ian-shim): remove this check once we are sure that the metadata is never overwritten refreshedMetadata, err := q.GetBlobMetadata(ctx, existingMetadata.GetBlobKey()) if err != nil { @@ -92,6 +100,8 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis } func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) { + q.mu.Lock() + defer q.mu.Unlock() blobKey := existingMetadata.GetBlobKey() if _, ok := q.Metadata[blobKey]; !ok { return nil, disperser.ErrBlobNotFound @@ -104,6 +114,8 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing } func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.Metadata[blobKey]; !ok { return disperser.ErrBlobNotFound } @@ -113,6 +125,8 @@ func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.Blo } func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.BlobKey) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.Metadata[blobKey]; !ok { return disperser.ErrBlobNotFound } @@ -122,6 +136,8 @@ func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.Bl } func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKey) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.Metadata[blobKey]; !ok { return disperser.ErrBlobNotFound } @@ -131,6 +147,8 @@ func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKe } func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadata *disperser.BlobMetadata) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok { return disperser.ErrBlobNotFound } @@ -140,6 +158,8 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat } func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) { + q.mu.RLock() + defer q.mu.RUnlock() blobs := make(map[disperser.BlobKey]*core.Blob) for _, meta := range metadata { if holder, ok := q.Blobs[meta.BlobHash]; ok { @@ -155,6 +175,8 @@ func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperse } func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*disperser.BlobMetadata, error) { + q.mu.RLock() + defer q.mu.RUnlock() metas := make([]*disperser.BlobMetadata, 0) for _, meta := range q.Metadata { if meta.BlobStatus == status { @@ -165,6 +187,8 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse } func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + q.mu.RLock() + defer q.mu.RUnlock() metas := make([]*disperser.BlobMetadata, 0) foundStart := exclusiveStartKey == nil @@ -206,6 +230,8 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s } func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { + q.mu.RLock() + defer q.mu.RUnlock() for _, meta := range q.Metadata { if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash && meta.ConfirmationInfo.BlobIndex == blobIndex { return meta, nil @@ -216,6 +242,8 @@ func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32] } func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) { + q.mu.RLock() + defer q.mu.RUnlock() metas := make([]*disperser.BlobMetadata, 0) for _, meta := range q.Metadata { if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash {