diff --git a/das/das.go b/das/das.go index 80c45f6666..fea1e6c6a2 100644 --- a/das/das.go +++ b/das/das.go @@ -41,11 +41,9 @@ type DataAvailabilityConfig struct { LocalCache CacheConfig `koanf:"local-cache"` RedisCache RedisConfig `koanf:"redis-cache"` - LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` - LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` - S3Storage S3StorageServiceConfig `koanf:"s3-storage"` - IpfsStorage IpfsStorageServiceConfig `koanf:"ipfs-storage"` - RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"` + LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` + LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` + S3Storage S3StorageServiceConfig `koanf:"s3-storage"` Key KeyConfig `koanf:"key"` @@ -68,7 +66,6 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{ RPCAggregator: DefaultAggregatorConfig, ParentChainConnectionAttempts: 15, PanicOnError: false, - IpfsStorage: DefaultIpfsStorageServiceConfig, } func OptionalAddressFromString(s string) (*common.Address, error) { @@ -115,7 +112,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f) LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f) S3ConfigAddOptions(prefix+".s3-storage", f) - RegularSyncStorageConfigAddOptions(prefix+".regular-sync-storage", f) // Key config for storage KeyConfigAddOptions(prefix+".key", f) @@ -129,7 +125,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { } // Both the Nitro node and daserver can use these options. - IpfsStorageServiceConfigAddOptions(prefix+".ipfs-storage", f) RestfulClientAggregatorConfigAddOptions(prefix+".rest-aggregator", f) f.String(prefix+".parent-chain-node-url", DefaultDataAvailabilityConfig.ParentChainNodeURL, "URL for parent chain node, only used in standalone daserver; when running as part of a node that node's L1 configuration is used") diff --git a/das/das_test.go b/das/das_test.go index 4377dc4dce..950b63d9d9 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -47,9 +47,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServicesFirst []*IterableStorageService - var syncToStorageServicesFirst []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config, &syncFromStorageServicesFirst, &syncToStorageServicesFirst) + storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(firstCtx, config, storageService) @@ -77,9 +75,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { secondCtx, secondCancel := context.WithCancel(context.Background()) defer secondCancel() - var syncFromStorageServicesSecond []*IterableStorageService - var syncToStorageServicesSecond []StorageService - storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config, &syncFromStorageServicesSecond, &syncToStorageServicesSecond) + storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) var daReader2 DataAvailabilityServiceReader = storageService2 @@ -140,9 +136,7 @@ func testDASMissingMessage(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(ctx, config, storageService) diff --git a/das/db_storage_service.go b/das/db_storage_service.go index 5596ff378e..0fbe1c2723 100644 --- a/das/db_storage_service.go +++ b/das/db_storage_service.go @@ -20,11 +20,9 @@ import ( ) type LocalDBStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` // BadgerDB options NumMemtables int `koanf:"num-memtables"` @@ -38,11 +36,9 @@ type LocalDBStorageConfig struct { var badgerDefaultOptions = badger.DefaultOptions("") var DefaultLocalDBStorageConfig = LocalDBStorageConfig{ - Enable: false, - DataDir: "", - DiscardAfterTimeout: false, - SyncFromStorageService: false, - SyncToStorageService: false, + Enable: false, + DataDir: "", + DiscardAfterTimeout: false, NumMemtables: badgerDefaultOptions.NumMemtables, NumLevelZeroTables: badgerDefaultOptions.NumLevelZeroTables, @@ -56,8 +52,6 @@ func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem") f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database") f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalDBStorageConfig.SyncFromStorageService, "enable db storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalDBStorageConfig.SyncToStorageService, "enable db storage to be used as a sink for regular sync storage") f.Int(prefix+".num-memtables", DefaultLocalDBStorageConfig.NumMemtables, "BadgerDB option: sets the maximum number of tables to keep in memory before stalling") f.Int(prefix+".num-level-zero-tables", DefaultLocalDBStorageConfig.NumLevelZeroTables, "BadgerDB option: sets the maximum number of Level 0 tables before compaction starts") @@ -158,13 +152,6 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6 }) } -func (dbs *DBStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return dbs.db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry(key.Bytes(), value) - return txn.SetEntry(e) - }) -} - func (dbs *DBStorageService) Sync(ctx context.Context) error { return dbs.db.Sync() } diff --git a/das/factory.go b/das/factory.go index 018129e673..d9eacd0ada 100644 --- a/das/factory.go +++ b/das/factory.go @@ -22,8 +22,6 @@ import ( func CreatePersistentStorageService( ctx context.Context, config *DataAvailabilityConfig, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, ) (StorageService, *LifecycleManager, error) { storageServices := make([]StorageService, 0, 10) var lifecycleManager LifecycleManager @@ -32,14 +30,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalDBStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalDBStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -49,14 +39,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalFileStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalFileStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -67,23 +49,6 @@ func CreatePersistentStorageService( return nil, nil, err } lifecycleManager.Register(s) - if config.S3Storage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.S3Storage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } - storageServices = append(storageServices, s) - } - - if config.IpfsStorage.Enable { - s, err := NewIpfsStorageService(ctx, config.IpfsStorage) - if err != nil { - return nil, nil, err - } - lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -105,8 +70,6 @@ func WrapStorageWithCache( ctx context.Context, config *DataAvailabilityConfig, storageService StorageService, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, lifecycleManager *LifecycleManager) (StorageService, error) { if storageService == nil { return nil, nil @@ -120,14 +83,6 @@ func WrapStorageWithCache( if err != nil { return nil, err } - if config.RedisCache.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(storageService)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - storageService = iterableStorageService - } - if config.RedisCache.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, storageService) - } } if config.LocalCache.Enable { storageService = NewCacheStorageService(config.LocalCache, storageService) @@ -151,10 +106,6 @@ func CreateBatchPosterDAS( if !config.RPCAggregator.Enable || !config.RestAggregator.Enable { return nil, nil, nil, errors.New("--node.data-availability.rpc-aggregator.enable and rest-aggregator.enable must be set when running a Batch Poster in AnyTrust mode") } - - if config.IpfsStorage.Enable { - return nil, nil, nil, errors.New("--node.data-availability.ipfs-storage.enable may not be set when running a Nitro AnyTrust node in Batch Poster mode") - } // Done checking config requirements var daWriter DataAvailabilityServiceWriter @@ -192,20 +143,17 @@ func CreateDAComponentsForDaserver( // Check config requirements if !config.LocalDBStorage.Enable && !config.LocalFileStorage.Enable && - !config.S3Storage.Enable && - !config.IpfsStorage.Enable { - return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage|ipfs-storage) must be enabled.") + !config.S3Storage.Enable { + return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage) must be enabled.") } // Done checking config requirements - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, &syncFromStorageServices, &syncToStorageServices) + storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config) if err != nil { return nil, nil, nil, nil, nil, err } - storageService, err = WrapStorageWithCache(ctx, config, storageService, &syncFromStorageServices, &syncToStorageServices, dasLifecycleManager) + storageService, err = WrapStorageWithCache(ctx, config, storageService, dasLifecycleManager) if err != nil { return nil, nil, nil, nil, nil, err } @@ -285,11 +233,6 @@ func CreateDAComponentsForDaserver( } } - if config.RegularSyncStorage.Enable && len(syncFromStorageServices) != 0 && len(syncToStorageServices) != 0 { - regularlySyncStorage := NewRegularlySyncStorage(syncFromStorageServices, syncToStorageServices, config.RegularSyncStorage) - regularlySyncStorage.Start(ctx) - } - if seqInboxAddress != nil { daReader, err = NewChainFetchReader(daReader, (*l1Reader).Client(), *seqInboxAddress) if err != nil { @@ -315,48 +258,22 @@ func CreateDAReaderForNode( return nil, nil, errors.New("node.data-availability.rpc-aggregator is only for Batch Poster mode") } - if !config.RestAggregator.Enable && !config.IpfsStorage.Enable { - return nil, nil, fmt.Errorf("--node.data-availability.enable was set but neither of --node.data-availability.(rest-aggregator|ipfs-storage) were enabled. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") - } - - if config.RestAggregator.SyncToStorage.Eager { - return nil, nil, errors.New("--node.data-availability.rest-aggregator.sync-to-storage.eager can't be used with a Nitro node, only lazy syncing can be used.") + if !config.RestAggregator.Enable { + return nil, nil, fmt.Errorf("--node.data-availability.enable was set but not --node.data-availability.rest-aggregator. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") } // Done checking config requirements - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, nil, nil) - if err != nil { - return nil, nil, err - } - + var lifecycleManager LifecycleManager var daReader DataAvailabilityServiceReader if config.RestAggregator.Enable { var restAgg *SimpleDASReaderAggregator - restAgg, err = NewRestfulClientAggregator(ctx, &config.RestAggregator) + restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { return nil, nil, err } restAgg.Start(ctx) - dasLifecycleManager.Register(restAgg) - - if storageService != nil { - syncConf := &config.RestAggregator.SyncToStorage - var retentionPeriodSeconds uint64 - if uint64(syncConf.RetentionPeriod) == math.MaxUint64 { - retentionPeriodSeconds = math.MaxUint64 - } else { - retentionPeriodSeconds = uint64(syncConf.RetentionPeriod.Seconds()) - } - - // This falls back to REST and updates the local IPFS repo if the data is found. - storageService = NewFallbackStorageService(storageService, restAgg, restAgg, - retentionPeriodSeconds, syncConf.IgnoreWriteErrors, true) - dasLifecycleManager.Register(storageService) - - daReader = storageService - } else { - daReader = restAgg - } + lifecycleManager.Register(restAgg) + daReader = restAgg } if seqInboxAddress != nil { @@ -370,5 +287,5 @@ func CreateDAReaderForNode( } } - return daReader, dasLifecycleManager, nil + return daReader, &lifecycleManager, nil } diff --git a/das/ipfs_storage_service.bkup_go b/das/ipfs_storage_service.bkup_go deleted file mode 100644 index 43b06fd4b6..0000000000 --- a/das/ipfs_storage_service.bkup_go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend. -// It takes advantage of IPFS' content addressing scheme to be able to directly retrieve -// the batches from IPFS using their root hash from the L1 sequencer inbox contract. - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "errors" - "io" - "math/rand" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ipfs/go-cid" - coreiface "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/ipfs/interface-go-ipfs-core/path" - "github.com/multiformats/go-multihash" - "github.com/offchainlabs/nitro/arbstate/daprovider" - "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/cmd/ipfshelper" - "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/util/pretty" - flag "github.com/spf13/pflag" -) - -type IpfsStorageServiceConfig struct { - Enable bool `koanf:"enable"` - RepoDir string `koanf:"repo-dir"` - ReadTimeout time.Duration `koanf:"read-timeout"` - Profiles string `koanf:"profiles"` - Peers []string `koanf:"peers"` - - // Pinning options - PinAfterGet bool `koanf:"pin-after-get"` - PinPercentage float64 `koanf:"pin-percentage"` -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, - RepoDir: "", - ReadTimeout: time.Minute, - Profiles: "", - Peers: []string{}, - - PinAfterGet: true, - PinPercentage: 100.0, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from IPFS") - f.String(prefix+".repo-dir", DefaultIpfsStorageServiceConfig.RepoDir, "directory to use to store the local IPFS repo") - f.Duration(prefix+".read-timeout", DefaultIpfsStorageServiceConfig.ReadTimeout, "timeout for IPFS reads, since by default it will wait forever. Treat timeout as not found") - f.String(prefix+".profiles", DefaultIpfsStorageServiceConfig.Profiles, "comma separated list of IPFS profiles to use, see https://docs.ipfs.tech/how-to/default-profile") - f.StringSlice(prefix+".peers", DefaultIpfsStorageServiceConfig.Peers, "list of IPFS peers to connect to, eg /ip4/1.2.3.4/tcp/12345/p2p/abc...xyz") - f.Bool(prefix+".pin-after-get", DefaultIpfsStorageServiceConfig.PinAfterGet, "pin sequencer batch data in IPFS") - f.Float64(prefix+".pin-percentage", DefaultIpfsStorageServiceConfig.PinPercentage, "percent of sequencer batch data to pin, as a floating point number in the range 0.0 to 100.0") -} - -type IpfsStorageService struct { - config IpfsStorageServiceConfig - ipfsHelper *ipfshelper.IpfsHelper - ipfsApi coreiface.CoreAPI -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - ipfsHelper, err := ipfshelper.CreateIpfsHelper(ctx, config.RepoDir, false, config.Peers, config.Profiles) - if err != nil { - return nil, err - } - addrs, err := ipfsHelper.GetPeerHostAddresses() - if err != nil { - return nil, err - } - log.Info("IPFS node started up", "hostAddresses", addrs) - - return &IpfsStorageService{ - config: config, - ipfsHelper: ipfsHelper, - ipfsApi: ipfsHelper.GetAPI(), - }, nil -} - -func hashToCid(hash common.Hash) (cid.Cid, error) { - multiEncodedHashBytes, err := multihash.Encode(hash[:], multihash.KECCAK_256) - if err != nil { - return cid.Cid{}, err - } - - _, multiHash, err := multihash.MHFromBytes(multiEncodedHashBytes) - if err != nil { - return cid.Cid{}, err - } - - return cid.NewCidV1(cid.Raw, multiHash), nil -} - -// GetByHash retrieves and reconstructs one batch's data, using IPFS to retrieve the preimages -// for each chunk of data and the dastree nodes. -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - log.Trace("das.IpfsStorageService.GetByHash", "hash", pretty.PrettyHash(hash)) - - doPin := false // If true, pin every block related to this batch - if s.config.PinAfterGet { - if s.config.PinPercentage == 100.0 { - doPin = true - } else if (rand.Float64() * 100.0) <= s.config.PinPercentage { - doPin = true - } - - } - - oracle := func(h common.Hash) ([]byte, error) { - thisCid, err := hashToCid(h) - if err != nil { - return nil, err - } - - ipfsPath := path.IpfsPath(thisCid) - log.Trace("Retrieving IPFS path", "path", ipfsPath.String()) - - parentCtx := ctx - if doPin { - // If we want to pin this batch, then detach from the parent context so - // we are not canceled before s.config.ReadTimeout. - parentCtx = context.Background() - } - - timeoutCtx, cancel := context.WithTimeout(parentCtx, s.config.ReadTimeout) - defer cancel() - rdr, err := s.ipfsApi.Block().Get(timeoutCtx, ipfsPath) - if err != nil { - if timeoutCtx.Err() != nil { - return nil, ErrNotFound - } - return nil, err - } - - data, err := io.ReadAll(rdr) - if err != nil { - return nil, err - } - - if doPin { - go func() { - pinCtx, pinCancel := context.WithTimeout(context.Background(), s.config.ReadTimeout) - defer pinCancel() - err := s.ipfsApi.Pin().Add(pinCtx, ipfsPath) - // Recursive pinning not needed, each dastree preimage fits in a single - // IPFS block. - if err != nil { - // Pinning is best-effort. - log.Warn("Failed to pin in IPFS", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } else { - log.Trace("Pin in IPFS successful", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } - }() - } - - return data, nil - } - - return dastree.Content(hash, oracle) -} - -// Put stores all the preimages required to reconstruct the dastree for single batch, -// ie the hashed data chunks and dastree nodes. -// This takes advantage of IPFS supporting keccak256 on raw data blocks for calculating -// its CIDs, and the fact that the dastree structure uses keccak256 for addressing its -// nodes, to directly store the dastree structure in IPFS. -// IPFS default block size is 256KB and dastree max block size is 64KB so each dastree -// node and data chunk easily fits within an IPFS block. -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.IpfsStorageService.Put", data, timeout, s) - - var chunks [][]byte - - record := func(_ common.Hash, value []byte, ty arbutil.PreimageType) { - chunks = append(chunks, value) - } - - _ = dastree.RecordHash(record, data) - - numChunks := len(chunks) - resultChan := make(chan error, numChunks) - for _, chunk := range chunks { - _chunk := chunk - go func() { - blockStat, err := s.ipfsApi.Block().Put( - ctx, - bytes.NewReader(_chunk), - options.Block.CidCodec("raw"), // Store the data in raw form since the hash in the CID must be the hash - // of the preimage for our lookup scheme to work. - options.Block.Hash(multihash.KECCAK_256, -1), // Use keccak256 to calculate the hash to put in the block's - // CID, since it is the same algo used by dastree. - options.Block.Pin(true)) // Keep the data in the local IPFS repo, don't GC it. - if err == nil { - log.Trace("Wrote IPFS path", "path", blockStat.Path().String()) - } - resultChan <- err - }() - } - - successfullyWrittenChunks := 0 - for err := range resultChan { - if err != nil { - return err - } - successfullyWrittenChunks++ - if successfullyWrittenChunks == numChunks { - return nil - } - } - panic("unreachable") -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, nil -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return nil -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return s.ipfsHelper.Close() -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - testData := []byte("Test-Data") - err := s.Put(ctx, testData, 0) - if err != nil { - return err - } - res, err := s.GetByHash(ctx, dastree.Hash(testData)) - if err != nil { - return err - } - if !bytes.Equal(res, testData) { - return errors.New("invalid GetByHash result") - } - return nil -} diff --git a/das/ipfs_storage_service_stub.go b/das/ipfs_storage_service_stub.go deleted file mode 100644 index 5814f2c7e4..0000000000 --- a/das/ipfs_storage_service_stub.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend stub -// a stub. we don't currently support ipfs - -//go:build !ipfs -// +build !ipfs - -package das - -import ( - "context" - "errors" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/arbstate/daprovider" - flag "github.com/spf13/pflag" -) - -var ErrIpfsNotSupported = errors.New("ipfs not supported") - -type IpfsStorageServiceConfig struct { - Enable bool -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "legacy option - not supported") -} - -type IpfsStorageService struct { -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService-not supported" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - return ErrIpfsNotSupported -} diff --git a/das/ipfs_storage_service_test.go b/das/ipfs_storage_service_test.go deleted file mode 100644 index 6e1a86b234..0000000000 --- a/das/ipfs_storage_service_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "math" - "math/rand" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/das/dastree" -) - -func runAddAndGetTest(t *testing.T, ctx context.Context, svc *IpfsStorageService, size int) { - - data := make([]byte, size) - _, err := rand.Read(data) - Require(t, err) - - err = svc.Put(ctx, data, 0) - Require(t, err) - - hash := dastree.Hash(data).Bytes() - returnedData, err := svc.GetByHash(ctx, common.BytesToHash(hash)) - Require(t, err) - if !bytes.Equal(data, returnedData) { - Fail(t, "Returned data didn't match!") - } - -} - -func TestIpfsStorageServiceAddAndGet(t *testing.T) { - enableLogging() - ctx := context.Background() - svc, err := NewIpfsStorageService(ctx, - IpfsStorageServiceConfig{ - Enable: true, - RepoDir: t.TempDir(), - ReadTimeout: time.Minute, - Profiles: "test", - }) - defer svc.Close(ctx) - Require(t, err) - - pow2Size := 1 << 16 // 64kB - for i := 1; i < 8; i++ { - runAddAndGetTest(t, ctx, svc, int(math.Pow10(i))) - runAddAndGetTest(t, ctx, svc, pow2Size) - runAddAndGetTest(t, ctx, svc, pow2Size-1) - runAddAndGetTest(t, ctx, svc, pow2Size+1) - pow2Size = pow2Size << 1 - } -} diff --git a/das/iterable_storage_service.go b/das/iterable_storage_service.go deleted file mode 100644 index a0829f00e4..0000000000 --- a/das/iterable_storage_service.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -const iteratorStorageKeyPrefix = "iterator_key_prefix_" -const iteratorBegin = "iterator_begin" -const iteratorEnd = "iterator_end" -const expirationTimeKeyPrefix = "expiration_time_key_prefix_" - -// IterationCompatibleStorageService is a StorageService which is -// compatible to be used as a backend for IterableStorageService. -type IterationCompatibleStorageService interface { - putKeyValue(ctx context.Context, key common.Hash, value []byte) error - StorageService -} - -// IterationCompatibleStorageServiceAdaptor is an adaptor used to covert iteration incompatible StorageService -// to IterationCompatibleStorageService (basically adds an empty putKeyValue to the StorageService) -type IterationCompatibleStorageServiceAdaptor struct { - StorageService -} - -func (i *IterationCompatibleStorageServiceAdaptor) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return nil -} - -func ConvertStorageServiceToIterationCompatibleStorageService(storageService StorageService) IterationCompatibleStorageService { - service, ok := storageService.(IterationCompatibleStorageService) - if ok { - return service - } - return &IterationCompatibleStorageServiceAdaptor{storageService} -} - -// An IterableStorageService is used as a wrapper on top of a storage service, -// to add the capability of iterating over the stored date in a sequential manner. -type IterableStorageService struct { - // Local copy of iterator end. End can also be accessed by getByHash for iteratorEnd. - end atomic.Value // atomic access to common.Hash - IterationCompatibleStorageService - - mutex sync.Mutex -} - -func NewIterableStorageService(storageService IterationCompatibleStorageService) *IterableStorageService { - i := &IterableStorageService{IterationCompatibleStorageService: storageService} - i.end.Store(common.Hash{}) - return i -} - -func (i *IterableStorageService) Put(ctx context.Context, data []byte, expiration uint64) error { - dataHash := dastree.Hash(data) - - // Do not insert data if data is already present. - // (This is being done to avoid redundant hash being added to the - // linked list ,since it can lead to loops in the linked list.) - if _, err := i.IterationCompatibleStorageService.GetByHash(ctx, dataHash); err == nil { - return nil - } - - if err := i.IterationCompatibleStorageService.Put(ctx, data, expiration); err != nil { - return err - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(dastree.Hash(data)))), []byte(strconv.FormatUint(expiration, 10))); err != nil { - return err - } - - i.mutex.Lock() - defer i.mutex.Unlock() - - endHash := i.End(ctx) - if (endHash == common.Hash{}) { - // First element being inserted in the chain. - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorBegin)), dataHash.Bytes()); err != nil { - return err - } - } else { - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorStorageKeyPrefix+EncodeStorageServiceKey(endHash))), dataHash.Bytes()); err != nil { - return err - } - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorEnd)), dataHash.Bytes()); err != nil { - return err - } - i.end.Store(dataHash) - - return nil -} - -func (i *IterableStorageService) GetExpirationTime(ctx context.Context, hash common.Hash) (uint64, error) { - value, err := i.IterationCompatibleStorageService.GetByHash(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(hash)))) - if err != nil { - return 0, err - } - - expirationTime, err := strconv.ParseUint(string(value), 10, 64) - if err != nil { - return 0, err - } - return expirationTime, nil -} - -func (i *IterableStorageService) DefaultBegin() common.Hash { - return dastree.Hash([]byte(iteratorBegin)) -} - -func (i *IterableStorageService) End(ctx context.Context) common.Hash { - endHash, ok := i.end.Load().(common.Hash) - if !ok { - return common.Hash{} - } - if (endHash != common.Hash{}) { - return endHash - } - value, err := i.GetByHash(ctx, dastree.Hash([]byte(iteratorEnd))) - if err != nil { - return common.Hash{} - } - endHash = common.BytesToHash(value) - i.end.Store(endHash) - return endHash -} - -func (i *IterableStorageService) Next(ctx context.Context, hash common.Hash) common.Hash { - if hash != i.DefaultBegin() { - hash = dastree.Hash([]byte(iteratorStorageKeyPrefix + EncodeStorageServiceKey(hash))) - } - value, err := i.GetByHash(ctx, hash) - if err != nil { - return common.Hash{} - } - return common.BytesToHash(value) -} diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 4ebb1d56d9..8be03bcb30 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -22,10 +22,8 @@ import ( ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ @@ -35,8 +33,6 @@ var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalFileStorageConfig.SyncFromStorageService, "enable local storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalFileStorageConfig.SyncToStorageService, "enable local storage to be used as a sink for regular sync storage") } type LocalFileStorageService struct { @@ -96,32 +92,6 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout } -func (s *LocalFileStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - fileName := EncodeStorageServiceKey(key) - finalPath := s.dataDir + "/" + fileName - - // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) - if err != nil { - return err - } - err = f.Chmod(0o600) - if err != nil { - return err - } - _, err = f.Write(value) - if err != nil { - return err - } - err = f.Close() - if err != nil { - return err - } - - return os.Rename(f.Name(), finalPath) - -} - func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } diff --git a/das/memory_backed_storage_service.go b/das/memory_backed_storage_service.go index 91f7d9a2f5..c013b501b9 100644 --- a/das/memory_backed_storage_service.go +++ b/das/memory_backed_storage_service.go @@ -53,16 +53,6 @@ func (m *MemoryBackedStorageService) Put(ctx context.Context, data []byte, expir return nil } -func (m *MemoryBackedStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - if m.closed { - return ErrClosed - } - m.contents[key] = append([]byte{}, value...) - return nil -} - func (m *MemoryBackedStorageService) Sync(ctx context.Context) error { m.rwmutex.RLock() defer m.rwmutex.RUnlock() diff --git a/das/redis_storage_service.go b/das/redis_storage_service.go index dbd85921ed..210d5cb2d4 100644 --- a/das/redis_storage_service.go +++ b/das/redis_storage_service.go @@ -24,12 +24,10 @@ import ( ) type RedisConfig struct { - Enable bool `koanf:"enable"` - Url string `koanf:"url"` - Expiration time.Duration `koanf:"expiration"` - KeyConfig string `koanf:"key-config"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + Url string `koanf:"url"` + Expiration time.Duration `koanf:"expiration"` + KeyConfig string `koanf:"key-config"` } var DefaultRedisConfig = RedisConfig{ @@ -43,8 +41,6 @@ func RedisConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".url", DefaultRedisConfig.Url, "Redis url") f.Duration(prefix+".expiration", DefaultRedisConfig.Expiration, "Redis expiration") f.String(prefix+".key-config", DefaultRedisConfig.KeyConfig, "Redis key config") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable Redis to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable Redis to be used as a sink for regular sync storage") } type RedisStorageService struct { @@ -139,17 +135,6 @@ func (rs *RedisStorageService) Put(ctx context.Context, value []byte, timeout ui return err } -func (rs *RedisStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - // Expiration is set to zero here, since we want to keep the index inserted for iterable storage forever. - err := rs.client.Set( - ctx, string(key.Bytes()), rs.signMessage(value), 0, - ).Err() - if err != nil { - log.Error("das.RedisStorageService.putKeyValue", "err", err) - } - return err -} - func (rs *RedisStorageService) Sync(ctx context.Context) error { return rs.baseStorageService.Sync(ctx) } diff --git a/das/regular_sync_storage_test.go b/das/regular_sync_storage_test.go deleted file mode 100644 index 5fed7a90b3..0000000000 --- a/das/regular_sync_storage_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2021-2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "bytes" - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -func TestRegularSyncStorage(t *testing.T) { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - syncFromStorageService := []*IterableStorageService{ - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - } - syncToStorageService := []StorageService{ - NewMemoryBackedStorageService(ctx), - NewMemoryBackedStorageService(ctx), - } - - regularSyncStorage := NewRegularlySyncStorage( - syncFromStorageService, - syncToStorageService, RegularSyncStorageConfig{ - Enable: true, - SyncInterval: 100 * time.Millisecond, - }) - - val := [][]byte{ - []byte("The first value"), - []byte("The second value"), - []byte("The third value"), - []byte("The forth value"), - } - valKey := []common.Hash{ - dastree.Hash(val[0]), - dastree.Hash(val[1]), - dastree.Hash(val[2]), - dastree.Hash(val[3]), - } - - reqCtx := context.Background() - timeout := uint64(time.Now().Add(time.Hour).Unix()) - for i := 0; i < 2; i++ { - for j := 0; j < 2; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - regularSyncStorage.Start(ctx) - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 2; j < 4; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 0; j < 4; j++ { - v, err := syncToStorageService[i].GetByHash(reqCtx, valKey[j]) - Require(t, err) - if !bytes.Equal(v, val[j]) { - t.Fatal(v, val[j]) - } - } - } -} diff --git a/das/regularly_sync_storage.go b/das/regularly_sync_storage.go deleted file mode 100644 index c6b8ed5ea1..0000000000 --- a/das/regularly_sync_storage.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/offchainlabs/nitro/util/stopwaiter" - - flag "github.com/spf13/pflag" -) - -type RegularSyncStorageConfig struct { - Enable bool `koanf:"enable"` - SyncInterval time.Duration `koanf:"sync-interval"` -} - -var DefaultRegularSyncStorageConfig = RegularSyncStorageConfig{ - Enable: false, - SyncInterval: 5 * time.Minute, -} - -func RegularSyncStorageConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultRegularSyncStorageConfig.Enable, "enable regular storage syncing") - f.Duration(prefix+".sync-interval", DefaultRegularSyncStorageConfig.SyncInterval, "interval for running regular storage sync") -} - -// A RegularlySyncStorage is used to sync data from syncFromStorageServices to -// all the syncToStorageServices at regular intervals. -// (Only newly added data since the last sync is copied over.) -type RegularlySyncStorage struct { - stopwaiter.StopWaiter - syncFromStorageServices []*IterableStorageService - syncToStorageServices []StorageService - lastSyncedHashOfEachSyncFromStorageService map[*IterableStorageService]common.Hash - syncInterval time.Duration -} - -func NewRegularlySyncStorage(syncFromStorageServices []*IterableStorageService, syncToStorageServices []StorageService, conf RegularSyncStorageConfig) *RegularlySyncStorage { - lastSyncedHashOfEachSyncFromStorageService := make(map[*IterableStorageService]common.Hash) - for _, syncFrom := range syncFromStorageServices { - lastSyncedHashOfEachSyncFromStorageService[syncFrom] = syncFrom.DefaultBegin() - } - return &RegularlySyncStorage{ - syncFromStorageServices: syncFromStorageServices, - syncToStorageServices: syncToStorageServices, - lastSyncedHashOfEachSyncFromStorageService: lastSyncedHashOfEachSyncFromStorageService, - syncInterval: conf.SyncInterval, - } -} - -func (r *RegularlySyncStorage) Start(ctx context.Context) { - // Start thread for regular sync - r.StopWaiter.Start(ctx, r) - r.CallIteratively(r.syncAllStorages) -} - -func (r *RegularlySyncStorage) syncAllStorages(ctx context.Context) time.Duration { - for syncFrom, lastSyncedHash := range r.lastSyncedHashOfEachSyncFromStorageService { - end := syncFrom.End(ctx) - if (end == common.Hash{}) { - continue - } - - syncHash := lastSyncedHash - for syncHash != end { - syncHash = syncFrom.Next(ctx, syncHash) - data, err := syncFrom.GetByHash(ctx, syncHash) - if err != nil { - continue - } - expirationTime, err := syncFrom.GetExpirationTime(ctx, syncHash) - if err != nil { - continue - } - for _, syncTo := range r.syncToStorageServices { - _, err = syncTo.GetByHash(ctx, syncHash) - if err == nil { - continue - } - - if err = syncTo.Put(ctx, data, expirationTime); err != nil { - log.Error("Error while running regular storage sync", "err", err) - } - } - } - r.lastSyncedHashOfEachSyncFromStorageService[syncFrom] = end - } - return r.syncInterval -} diff --git a/das/rpc_test.go b/das/rpc_test.go index 2839edee69..5f97ef8828 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -52,9 +52,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool) { RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) testhelpers.RequireImpl(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) diff --git a/das/s3_storage_service.go b/das/s3_storage_service.go index b5150fb8ed..a1de200c52 100644 --- a/das/s3_storage_service.go +++ b/das/s3_storage_service.go @@ -34,15 +34,13 @@ type S3Downloader interface { } type S3StorageServiceConfig struct { - Enable bool `koanf:"enable"` - AccessKey string `koanf:"access-key"` - Bucket string `koanf:"bucket"` - ObjectPrefix string `koanf:"object-prefix"` - Region string `koanf:"region"` - SecretKey string `koanf:"secret-key"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + AccessKey string `koanf:"access-key"` + Bucket string `koanf:"bucket"` + ObjectPrefix string `koanf:"object-prefix"` + Region string `koanf:"region"` + SecretKey string `koanf:"secret-key"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` } var DefaultS3StorageServiceConfig = S3StorageServiceConfig{} @@ -55,8 +53,6 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".region", DefaultS3StorageServiceConfig.Region, "S3 region") f.String(prefix+".secret-key", DefaultS3StorageServiceConfig.SecretKey, "S3 secret key") f.Bool(prefix+".discard-after-timeout", DefaultS3StorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable s3 to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable s3 to be used as a sink for regular sync storage") } type S3StorageService struct { @@ -125,18 +121,6 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint return err } -func (s3s *S3StorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - putObjectInput := s3.PutObjectInput{ - Bucket: aws.String(s3s.bucket), - Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)), - Body: bytes.NewReader(value)} - _, err := s3s.uploader.Upload(ctx, &putObjectInput) - if err != nil { - log.Error("das.S3StorageService.Store", "err", err) - } - return err -} - func (s3s *S3StorageService) Sync(ctx context.Context) error { return nil } diff --git a/system_tests/das_test.go b/system_tests/das_test.go index dbd78a0865..593eaa1bbe 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -61,9 +61,7 @@ func startLocalDASServer( RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*das.IterableStorageService - var syncToStorageServices []das.StorageService - storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config) defer lifecycleManager.StopAndWaitUntil(time.Second) Require(t, err)