From 0e444b077e50c2d0a4631dbbd18a9057458856e2 Mon Sep 17 00:00:00 2001 From: Nodar Date: Tue, 19 Sep 2023 14:56:10 +0200 Subject: [PATCH 1/2] Add flag to dataposter and clear leveldb when that flag is set --- arbnode/batch_poster.go | 4 +-- arbnode/dataposter/data_poster.go | 50 +++++++++++++++++++-------- arbnode/dataposter/leveldb/leveldb.go | 12 +++++++ arbnode/dataposter/storage_test.go | 27 +++++++++++++++ arbnode/node.go | 5 +-- system_tests/batch_poster_test.go | 2 +- system_tests/staker_test.go | 6 ++-- 7 files changed, 83 insertions(+), 23 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 89a36eba91..4f50831e67 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -210,7 +210,7 @@ var TestBatchPosterConfig = BatchPosterConfig{ L1BlockBoundBypass: time.Hour, } -func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) { +func NewBatchPoster(ctx context.Context, dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) { seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client()) if err != nil { return nil, err @@ -253,7 +253,7 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe dataPosterConfigFetcher := func() *dataposter.DataPosterConfig { return &config().DataPoster } - b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition) + b.dataPoster, err = dataposter.NewDataPoster(ctx, dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition) if err != nil { return nil, err } diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index b1e6555b26..546cedfa3f 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -91,7 +91,7 @@ func parseReplacementTimes(val string) ([]time.Duration, error) { return append(res, time.Hour*24*365*10), nil } -func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)) (*DataPoster, error) { +func NewDataPoster(ctx context.Context, db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)) (*DataPoster, error) { initConfig := config() replacementTimes, err := parseReplacementTimes(initConfig.ReplacementTimes) if err != nil { @@ -118,7 +118,13 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a return nil, err } case initConfig.UseLevelDB: - queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) + ldb := leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) + if config().Dangerous.ClearLevelDB { + if err := ldb.PruneAll(ctx); err != nil { + return nil, err + } + } + queue = ldb default: queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) } @@ -618,19 +624,26 @@ type DataPosterConfig struct { ReplacementTimes string `koanf:"replacement-times"` // This is forcibly disabled if the parent chain is an Arbitrum chain, // so you should probably use DataPoster's waitForL1Finality method instead of reading this field directly. - WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` - MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` - MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` - TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` - UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` - MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` - MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` - MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"` - NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"` - AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"` - UseLevelDB bool `koanf:"use-leveldb"` - UseNoOpStorage bool `koanf:"use-noop-storage"` - LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"` + WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` + MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` + MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` + TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` + UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` + MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` + MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` + MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"` + NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"` + AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"` + UseLevelDB bool `koanf:"use-leveldb"` + UseNoOpStorage bool `koanf:"use-noop-storage"` + LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"` + Dangerous DangerousConfig `koanf:"dangerous"` +} + +type DangerousConfig struct { + // This should be used with caution, only when dataposter somehow gets in a + // bad state and we require clearing it. + ClearLevelDB bool `koanf:"clear-leveldb"` } // ConfigFetcher function type is used instead of directly passing config so @@ -652,7 +665,13 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled") f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything") f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)") + signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f) + addDangerousOptions(prefix+".dangerous", f) +} + +func addDangerousOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".clear-leveldb", DefaultDataPosterConfig.Dangerous.ClearLevelDB, "clear leveldb") } var DefaultDataPosterConfig = DataPosterConfig{ @@ -668,6 +687,7 @@ var DefaultDataPosterConfig = DataPosterConfig{ UseLevelDB: true, UseNoOpStorage: false, LegacyStorageEncoding: true, + Dangerous: DangerousConfig{ClearLevelDB: false}, } var DefaultDataPosterConfigForValidator = func() DataPosterConfig { diff --git a/arbnode/dataposter/leveldb/leveldb.go b/arbnode/dataposter/leveldb/leveldb.go index cfb34b04f7..7c65602974 100644 --- a/arbnode/dataposter/leveldb/leveldb.go +++ b/arbnode/dataposter/leveldb/leveldb.go @@ -79,6 +79,18 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er return s.encDec().Decode(val) } +func (s *Storage) PruneAll(ctx context.Context) error { + idx, err := s.lastItemIdx(ctx) + if err != nil { + return fmt.Errorf("pruning all keys: %w", err) + } + until, err := strconv.Atoi(string(idx)) + if err != nil { + return fmt.Errorf("converting last item index bytes to integer: %w", err) + } + return s.Prune(ctx, uint64(until+1)) +} + func (s *Storage) Prune(ctx context.Context, until uint64) error { cnt, err := s.Length(ctx) if err != nil { diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go index d536e5da05..0b3df50125 100644 --- a/arbnode/dataposter/storage_test.go +++ b/arbnode/dataposter/storage_test.go @@ -135,6 +135,33 @@ func initStorages(ctx context.Context, t *testing.T) map[string]QueueStorage { return m } +func TestPruneAll(t *testing.T) { + s := newLevelDBStorage(t, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) + ctx := context.Background() + for i := 0; i < 20; i++ { + if err := s.Put(ctx, uint64(i), nil, valueOf(t, i)); err != nil { + t.Fatalf("Error putting a key/value: %v", err) + } + } + size, err := s.Length(ctx) + if err != nil { + t.Fatalf("Length() unexpected error %v", err) + } + if size != 20 { + t.Errorf("Length()=%v want 20", size) + } + if err := s.PruneAll(ctx); err != nil { + t.Fatalf("PruneAll() unexpected error: %v", err) + } + size, err = s.Length(ctx) + if err != nil { + t.Fatalf("Length() unexpected error %v", err) + } + if size != 0 { + t.Errorf("Length()=%v want 0", size) + } +} + func TestFetchContents(t *testing.T) { ctx := context.Background() for name, s := range initStorages(ctx, t) { diff --git a/arbnode/node.go b/arbnode/node.go index 5bdc716264..a2e526b097 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -540,7 +540,7 @@ func checkArbDbSchemaVersion(arbDb ethdb.Database) error { } func StakerDataposter( - db ethdb.Database, l1Reader *headerreader.HeaderReader, + ctx context.Context, db ethdb.Database, l1Reader *headerreader.HeaderReader, transactOpts *bind.TransactOpts, cfgFetcher ConfigFetcher, syncMonitor *SyncMonitor, ) (*dataposter.DataPoster, error) { if transactOpts == nil { @@ -564,7 +564,7 @@ func StakerDataposter( dpCfg := func() *dataposter.DataPosterConfig { return &cfg.Staker.DataPoster } - return dataposter.NewDataPoster(db, l1Reader, transactOpts, redisC, redisLock, dpCfg, mdRetriever) + return dataposter.NewDataPoster(ctx, db, l1Reader, transactOpts, redisC, redisLock, dpCfg, mdRetriever) } func createNodeImpl( @@ -805,6 +805,7 @@ func createNodeImpl( if config.Staker.Enable { dp, err := StakerDataposter( + ctx, rawdb.NewTable(arbDb, storage.StakerPrefix), l1Reader, txOptsValidator, diff --git a/system_tests/batch_poster_test.go b/system_tests/batch_poster_test.go index 11bf92608b..8b0811c223 100644 --- a/system_tests/batch_poster_test.go +++ b/system_tests/batch_poster_test.go @@ -82,7 +82,7 @@ func testBatchPosterParallel(t *testing.T, useRedis bool) { for i := 0; i < parallelBatchPosters; i++ { // Make a copy of the batch poster config so NewBatchPoster calling Validate() on it doesn't race batchPosterConfig := conf.BatchPoster - batchPoster, err := arbnode.NewBatchPoster(nil, nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil) + batchPoster, err := arbnode.NewBatchPoster(ctx, nil, nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil) Require(t, err) batchPoster.Start(ctx) defer batchPoster.StopAndWait() diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 96ea1ee2e7..b4ce67b178 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -130,7 +130,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) valConfig := staker.TestL1ValidatorConfig - dpA, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) + dpA, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) if err != nil { t.Fatalf("Error creating validator dataposter: %v", err) } @@ -178,7 +178,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) } Require(t, err) - dpB, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeB.L1Reader, &l1authB, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) + dpB, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeB.L1Reader, &l1authB, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) if err != nil { t.Fatalf("Error creating validator dataposter: %v", err) } @@ -217,7 +217,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) err = valWalletB.Initialize(ctx) Require(t, err) } - dpC, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) + dpC, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) if err != nil { t.Fatalf("Error creating validator dataposter: %v", err) } From 9fd0ff5f9b59c2b73aa42f8426ec1bf8acfa9fa2 Mon Sep 17 00:00:00 2001 From: Nodar Date: Tue, 19 Sep 2023 15:21:31 +0200 Subject: [PATCH 2/2] Pass context to batch poster --- arbnode/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/node.go b/arbnode/node.go index a2e526b097..28f0ada338 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -874,7 +874,7 @@ func createNodeImpl( if txOptsBatchPoster == nil { return nil, errors.New("batchposter, but no TxOpts") } - batchPoster, err = NewBatchPoster(rawdb.NewTable(arbDb, storage.BatchPosterPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) + batchPoster, err = NewBatchPoster(ctx, rawdb.NewTable(arbDb, storage.BatchPosterPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) if err != nil { return nil, err }