diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 1a7d5f7e7..4ab163468 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -45,12 +45,11 @@ var ( dispersalServer *apiserver.DispersalServer dispersalServerV2 *apiserver.DispersalServerV2 - dockertestPool *dockertest.Pool - dockertestResource *dockertest.Resource - UUID = uuid.New() - metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) - shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID) - bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID) + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID) deployLocalStack bool localStackPort = "4568" @@ -589,7 +588,7 @@ func setup() { } - err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName) + err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, bucketTableName) if err != nil { teardown() panic("failed to deploy AWS resources") @@ -636,7 +635,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create dynamoDB client") } - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) globalParams := common.GlobalRateParams{ CountFailed: false, diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index 480a0fb4a..2eed16d49 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -32,7 +32,6 @@ type Config struct { RateConfig apiserver.RateConfig EnableRatelimiter bool BucketTableName string - ShadowTableName string BucketStoreSize int EthClientConfig geth.EthClientConfig MaxBlobSize int @@ -70,9 +69,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name), }, BlobstoreConfig: blobstore.Config{ - BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), - TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name), - ShadowTableName: ctx.GlobalString(flags.ShadowTableNameFlag.Name), + BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), + TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name), }, LoggerConfig: *loggerConfig, MetricsConfig: disperser.MetricsConfig{ diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index 9402f7f9f..375b6596d 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -30,13 +30,6 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"), } - ShadowTableNameFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "shadow-table-name"), - Usage: "Name of the dynamodb table to shadow write blob metadata", - Required: false, - EnvVar: common.PrefixEnvVar(envVarPrefix, "SHADOW_TABLE_NAME"), - Value: "", - } GrpcPortFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "grpc-port"), Usage: "Port at which disperser listens for grpc calls", @@ -126,7 +119,6 @@ var optionalFlags = []cli.Flag{ EnableRatelimiter, BucketStoreSize, GrpcTimeoutFlag, - ShadowTableNameFlag, MaxBlobSize, } diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index d205e44fc..2f6309151 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -96,7 +96,7 @@ func RunDisperserServer(ctx *cli.Context) error { bucketName := config.BlobstoreConfig.BucketName logger.Info("Creating blob store", "bucket", bucketName) - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) reg := prometheus.NewRegistry() diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 8ce537770..4d437fbd8 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -192,7 +192,7 @@ func RunBatcher(ctx *cli.Context) error { if err != nil || storeDurationBlocks == 0 { return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err) } - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) queue := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) cs := coreeth.NewChainState(tx, client) diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index a1331b324..ef0d55615 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -90,7 +90,7 @@ func RunDataApi(ctx *cli.Context) error { var ( promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, 0) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 38152e6c6..3712e7a2c 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -29,24 +29,19 @@ const ( // - StatusIndex: (Partition Key: Status, Sort Key: RequestedAt) -> Metadata // - BatchIndex: (Partition Key: BatchHeaderHash, Sort Key: BlobIndex) -> Metadata type BlobMetadataStore struct { - dynamoDBClient *commondynamodb.Client - logger logging.Logger - tableName string - shadowTableName string - ttl time.Duration + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration } -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, shadowTableName string, ttl time.Duration) *BlobMetadataStore { +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { logger.Debugf("creating blob metadata store with table %s with TTL: %s", tableName, ttl) - if shadowTableName != "" { - logger.Debugf("shadow blob metadata will be written to table %s with TTL: %s", shadowTableName, ttl) - } return &BlobMetadataStore{ - dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStore"), - tableName: tableName, - shadowTableName: shadowTableName, - ttl: ttl, + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStore"), + tableName: tableName, + ttl: ttl, } } @@ -56,13 +51,6 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada return err } - if s.shadowTableName != "" && s.shadowTableName != s.tableName { - err = s.dynamoDBClient.PutItem(ctx, s.shadowTableName, item) - if err != nil { - s.logger.Error("failed to put item into shadow table %s : %v", s.shadowTableName, err) - } - } - return s.dynamoDBClient.PutItem(ctx, s.tableName, item) } diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index c1b1c92b7..bec9c427b 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -8,7 +8,6 @@ import ( commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/ethereum/go-ethereum/common" @@ -301,58 +300,6 @@ func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { assert.Nil(t, lastEvaluatedKey) } -func TestShadowWriteBlobMetadata(t *testing.T) { - ctx := context.Background() - - blobKey := disperser.BlobKey{ - BlobHash: "shadowblob", - MetadataHash: "shadowhash", - } - expiry := uint64(time.Now().Add(time.Hour).Unix()) - metadata := &disperser.BlobMetadata{ - MetadataHash: blobKey.MetadataHash, - BlobHash: blobKey.BlobHash, - BlobStatus: disperser.Processing, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - - err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) - assert.NoError(t, err) - err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) - assert.NoError(t, err) - primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) - assert.NoError(t, err) - assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) - - // Check that the shadow metadata exists but status has NOT been updated - shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ - "MetadataHash": &types.AttributeValueMemberS{ - Value: blobKey.MetadataHash, - }, - "BlobHash": &types.AttributeValueMemberS{ - Value: blobKey.BlobHash, - }, - }) - assert.NoError(t, err) - shadowMetadata := disperser.BlobMetadata{} - err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) - assert.NoError(t, err) - assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, - }, - }) -} - func TestFilterOutExpiredBlobMetadata(t *testing.T) { ctx := context.Background() diff --git a/disperser/common/blobstore/blobstore_test.go b/disperser/common/blobstore/blobstore_test.go index 58fb4ea4c..72e8555f5 100644 --- a/disperser/common/blobstore/blobstore_test.go +++ b/disperser/common/blobstore/blobstore_test.go @@ -45,10 +45,9 @@ var ( deployLocalStack bool localStackPort = "4569" - dynamoClient *dynamodb.Client - blobMetadataStore *blobstore.BlobMetadataStore - shadowBlobMetadataStore *blobstore.BlobMetadataStore - sharedStorage *blobstore.SharedBlobStore + dynamoClient *dynamodb.Client + blobMetadataStore *blobstore.BlobMetadataStore + sharedStorage *blobstore.SharedBlobStore UUID = uuid.New() metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) @@ -106,8 +105,7 @@ func setup(m *testing.M) { panic("failed to create dynamodb client: " + err.Error()) } - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, metadataTableName, time.Hour) - shadowBlobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) sharedStorage = blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 0f1cc0589..157637431 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -48,9 +48,8 @@ type SharedBlobStore struct { } type Config struct { - BucketName string - TableName string - ShadowTableName string + BucketName string + TableName string } // This represents the s3 fetch result for a blob. diff --git a/inabox/deploy/cmd/main.go b/inabox/deploy/cmd/main.go index 4a50e3da8..2f519b1b9 100644 --- a/inabox/deploy/cmd/main.go +++ b/inabox/deploy/cmd/main.go @@ -16,9 +16,8 @@ var ( localstackFlagName = "localstack-port" deployResourcesFlagName = "deploy-resources" - metadataTableName = "test-BlobMetadata" - shadowMetadataTableName = "" // not used - bucketTableName = "test-BucketStore" + metadataTableName = "test-BlobMetadata" + bucketTableName = "test-BucketStore" chainCmdName = "chain" localstackCmdName = "localstack" @@ -138,7 +137,7 @@ func localstack(ctx *cli.Context) error { } if ctx.Bool(deployResourcesFlagName) { - return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, shadowMetadataTableName, bucketTableName) + return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, bucketTableName) } return nil diff --git a/inabox/deploy/localstack.go b/inabox/deploy/localstack.go index ca3277c4a..020f807b6 100644 --- a/inabox/deploy/localstack.go +++ b/inabox/deploy/localstack.go @@ -86,7 +86,7 @@ func StartDockertestWithLocalstackContainer(localStackPort string) (*dockertest. return pool, resource, nil } -func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, shadowTableName, bucketTableName string) error { +func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, bucketTableName string) error { if pool == nil { var err error diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 692d582c3..c3ec09703 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -39,16 +39,15 @@ var ( dockertestResource *dockertest.Resource localStackPort string - metadataTableName = "test-BlobMetadata" - shadowMetadataTableName = "" - bucketTableName = "test-BucketStore" - logger logging.Logger - ethClient common.EthClient - rpcClient common.RPCEthClient - mockRollup *rollupbindings.ContractMockRollup - retrievalClient clients.RetrievalClient - numConfirmations int = 3 - numRetries = 0 + metadataTableName = "test-BlobMetadata" + bucketTableName = "test-BucketStore" + logger logging.Logger + ethClient common.EthClient + rpcClient common.RPCEthClient + mockRollup *rollupbindings.ContractMockRollup + retrievalClient clients.RetrievalClient + numConfirmations int = 3 + numRetries = 0 cancel context.CancelFunc ) @@ -92,7 +91,7 @@ var _ = BeforeSuite(func() { dockertestPool = pool dockertestResource = resource - err = deploy.DeployResources(pool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName) + err = deploy.DeployResources(pool, localStackPort, metadataTableName, bucketTableName) Expect(err).To(BeNil()) } else {