Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config change] Deprecate local-db-storage, add migration opt, fixes: NIT-2539 #2409

Merged
merged 6 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type DataAvailabilityConfig struct {
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`

MigrateLocalDBToFileStorage bool `koanf:"migrate-local-db-to-file-storage"`

Key KeyConfig `koanf:"key"`

RPCAggregator AggregatorConfig `koanf:"rpc-aggregator"`
Expand Down Expand Up @@ -112,6 +114,7 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
f.Bool(prefix+".migrate-local-db-to-file-storage", DefaultDataAvailabilityConfig.MigrateLocalDBToFileStorage, "daserver will migrate all data on startup from local-db-storage to local-file-storage, then mark local-db-storage as unusable")

// Key config for storage
KeyConfigAddOptions(prefix+".key", f)
Expand Down
96 changes: 94 additions & 2 deletions das/db_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"

badger "github.com/dgraph-io/badger/v4"
Expand Down Expand Up @@ -35,6 +38,8 @@ type LocalDBStorageConfig struct {

var badgerDefaultOptions = badger.DefaultOptions("")

const migratedMarker = "MIGRATED"

var DefaultLocalDBStorageConfig = LocalDBStorageConfig{
Enable: false,
DataDir: "",
Expand All @@ -49,7 +54,7 @@ var DefaultLocalDBStorageConfig = LocalDBStorageConfig{
}

func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem")
f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "!!!DEPRECATED, USE local-file-storage!!! enable storage/retrieval of sequencer batch data from a database on the local filesystem")
f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database")
f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout")

Expand All @@ -69,7 +74,17 @@ type DBStorageService struct {
stopWaiter stopwaiter.StopWaiterSafe
}

func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (StorageService, error) {
// The DBStorageService is deprecated. This function will migrate data to the target
// LocalFileStorageService if it is provided and migration hasn't already happened.
func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig, target *LocalFileStorageService) (*DBStorageService, error) {
if alreadyMigrated(config.DataDir) {
log.Warn("local-db-storage already migrated, please remove it from the daserver configuration and restart. data-dir can be cleaned up manually now")
return nil, nil
}
if target == nil {
log.Error("local-db-storage is DEPRECATED, please use use the local-file-storage and migrate-local-db-to-file-storage options. This error will be made fatal in future, continuing for now...")
}

options := badger.DefaultOptions(config.DataDir).
WithNumMemtables(config.NumMemtables).
WithNumLevelZeroTables(config.NumLevelZeroTables).
Expand All @@ -87,9 +102,21 @@ func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (Sto
discardAfterTimeout: config.DiscardAfterTimeout,
dirPath: config.DataDir,
}

if target != nil {
if err = ret.migrateTo(ctx, target); err != nil {
return nil, fmt.Errorf("error migrating local-db-storage to %s: %w", target, err)
}
if err = ret.setMigrated(); err != nil {
return nil, fmt.Errorf("error finalizing migration of local-db-storage to %s: %w", target, err)
}
return nil, nil
}

if err := ret.stopWaiter.Start(ctx, ret); err != nil {
return nil, err
}

err = ret.stopWaiter.LaunchThreadSafe(func(myCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
Expand Down Expand Up @@ -152,6 +179,48 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6
})
}

func (dbs *DBStorageService) migrateTo(ctx context.Context, s StorageService) error {
originExpirationPolicy, err := dbs.ExpirationPolicy(ctx)
if err != nil {
return err
}
targetExpirationPolicy, err := s.ExpirationPolicy(ctx)
if err != nil {
return err
}

if originExpirationPolicy == daprovider.KeepForever && targetExpirationPolicy == daprovider.DiscardAfterDataTimeout {
return errors.New("can't migrate from DBStorageService to target, incompatible expiration policies - can't migrate from non-expiring to expiring since non-expiring DB lacks expiry time metadata")
}

return dbs.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
log.Info("Migrating from DBStorageService", "target", s)
migrationStart := time.Now()
count := 0
for it.Rewind(); it.Valid(); it.Next() {
if count%1000 == 0 {
log.Info("Migration in progress", "migrated", count)
}
item := it.Item()
k := item.Key()
expiry := item.ExpiresAt()
err := item.Value(func(v []byte) error {
log.Trace("migrated", "key", pretty.FirstFewBytes(k), "value", pretty.FirstFewBytes(v), "expiry", expiry)
return s.Put(ctx, v, expiry)
})
if err != nil {
return err
}
count++
}
log.Info("Migration from DBStorageService complete", "target", s, "migrated", count, "duration", time.Since(migrationStart))
return nil
})
}

func (dbs *DBStorageService) Sync(ctx context.Context) error {
return dbs.db.Sync()
}
Expand All @@ -160,6 +229,29 @@ func (dbs *DBStorageService) Close(ctx context.Context) error {
return dbs.stopWaiter.StopAndWait()
}

func alreadyMigrated(dirPath string) bool {
migratedMarkerFile := filepath.Join(dirPath, migratedMarker)
_, err := os.Stat(migratedMarkerFile)
if os.IsNotExist(err) {
return false
}
if err != nil {
log.Error("error checking if local-db-storage is already migrated", "err", err)
return false
}
return true
}

func (dbs *DBStorageService) setMigrated() error {
migratedMarkerFile := filepath.Join(dbs.dirPath, migratedMarker)
file, err := os.OpenFile(migratedMarkerFile, os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return err
}
file.Close()
return nil
}

func (dbs *DBStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if dbs.discardAfterTimeout {
return daprovider.DiscardAfterDataTimeout, nil
Expand Down
36 changes: 25 additions & 11 deletions das/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,36 @@ func CreatePersistentStorageService(
) (StorageService, *LifecycleManager, error) {
storageServices := make([]StorageService, 0, 10)
var lifecycleManager LifecycleManager
if config.LocalDBStorage.Enable {
s, err := NewDBStorageService(ctx, &config.LocalDBStorage)
var err error

var fs *LocalFileStorageService
if config.LocalFileStorage.Enable {
fs, err = NewLocalFileStorageService(config.LocalFileStorage)
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}

if config.LocalFileStorage.Enable {
s, err := NewLocalFileStorageService(config.LocalFileStorage)
err = fs.start(ctx)
if err != nil {
return nil, nil, err
}
err = s.start(ctx)
lifecycleManager.Register(fs)
storageServices = append(storageServices, fs)
}

if config.LocalDBStorage.Enable {
var s *DBStorageService
if config.MigrateLocalDBToFileStorage {
s, err = NewDBStorageService(ctx, &config.LocalDBStorage, fs)
} else {
s, err = NewDBStorageService(ctx, &config.LocalDBStorage, nil)
}
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
if s != nil {
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}
}

if config.S3Storage.Enable {
Expand All @@ -67,6 +77,10 @@ func CreatePersistentStorageService(
if len(storageServices) == 1 {
return storageServices[0], &lifecycleManager, nil
}
if len(storageServices) == 0 {
return nil, nil, errors.New("No data-availability storage backend has been configured")
}

return nil, &lifecycleManager, nil
}

Expand Down
Loading