Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to dataposter and clear leveldb when that flag is set #1870

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1BlockBoundBypass: time.Hour,
}

func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
func NewBatchPoster(ctx context.Context, dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client())
if err != nil {
return nil, err
Expand Down Expand Up @@ -253,7 +253,7 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
}
b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
b.dataPoster, err = dataposter.NewDataPoster(ctx, dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
if err != nil {
return nil, err
}
Expand Down
50 changes: 35 additions & 15 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func parseReplacementTimes(val string) ([]time.Duration, error) {
return append(res, time.Hour*24*365*10), nil
}

func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)) (*DataPoster, error) {
func NewDataPoster(ctx context.Context, db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)) (*DataPoster, error) {
initConfig := config()
replacementTimes, err := parseReplacementTimes(initConfig.ReplacementTimes)
if err != nil {
Expand All @@ -118,7 +118,13 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a
return nil, err
}
case initConfig.UseLevelDB:
queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
ldb := leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
if config().Dangerous.ClearLevelDB {
if err := ldb.PruneAll(ctx); err != nil {
return nil, err
}
}
queue = ldb
default:
queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
}
Expand Down Expand Up @@ -618,19 +624,26 @@ type DataPosterConfig struct {
ReplacementTimes string `koanf:"replacement-times"`
// This is forcibly disabled if the parent chain is an Arbitrum chain,
// so you should probably use DataPoster's waitForL1Finality method instead of reading this field directly.
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"`
NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"`
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"`
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"`
NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"`
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"`
Dangerous DangerousConfig `koanf:"dangerous"`
}

type DangerousConfig struct {
// This should be used with caution, only when dataposter somehow gets in a
// bad state and we require clearing it.
ClearLevelDB bool `koanf:"clear-leveldb"`
}

// ConfigFetcher function type is used instead of directly passing config so
Expand All @@ -652,7 +665,13 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled")
f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything")
f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)")

signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
addDangerousOptions(prefix+".dangerous", f)
}

func addDangerousOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".clear-leveldb", DefaultDataPosterConfig.Dangerous.ClearLevelDB, "clear leveldb")
}

var DefaultDataPosterConfig = DataPosterConfig{
Expand All @@ -668,6 +687,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
UseLevelDB: true,
UseNoOpStorage: false,
LegacyStorageEncoding: true,
Dangerous: DangerousConfig{ClearLevelDB: false},
}

var DefaultDataPosterConfigForValidator = func() DataPosterConfig {
Expand Down
12 changes: 12 additions & 0 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
return s.encDec().Decode(val)
}

func (s *Storage) PruneAll(ctx context.Context) error {
idx, err := s.lastItemIdx(ctx)
if err != nil {
return fmt.Errorf("pruning all keys: %w", err)
}
until, err := strconv.Atoi(string(idx))
if err != nil {
return fmt.Errorf("converting last item index bytes to integer: %w", err)
}
return s.Prune(ctx, uint64(until+1))
}

func (s *Storage) Prune(ctx context.Context, until uint64) error {
cnt, err := s.Length(ctx)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions arbnode/dataposter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,33 @@ func initStorages(ctx context.Context, t *testing.T) map[string]QueueStorage {
return m
}

func TestPruneAll(t *testing.T) {
s := newLevelDBStorage(t, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
ctx := context.Background()
for i := 0; i < 20; i++ {
if err := s.Put(ctx, uint64(i), nil, valueOf(t, i)); err != nil {
t.Fatalf("Error putting a key/value: %v", err)
}
}
size, err := s.Length(ctx)
if err != nil {
t.Fatalf("Length() unexpected error %v", err)
}
if size != 20 {
t.Errorf("Length()=%v want 20", size)
}
if err := s.PruneAll(ctx); err != nil {
t.Fatalf("PruneAll() unexpected error: %v", err)
}
size, err = s.Length(ctx)
if err != nil {
t.Fatalf("Length() unexpected error %v", err)
}
if size != 0 {
t.Errorf("Length()=%v want 0", size)
}
}

func TestFetchContents(t *testing.T) {
ctx := context.Background()
for name, s := range initStorages(ctx, t) {
Expand Down
5 changes: 3 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@
}

func StakerDataposter(
db ethdb.Database, l1Reader *headerreader.HeaderReader,
ctx context.Context, db ethdb.Database, l1Reader *headerreader.HeaderReader,
transactOpts *bind.TransactOpts, cfgFetcher ConfigFetcher, syncMonitor *SyncMonitor,
) (*dataposter.DataPoster, error) {
if transactOpts == nil {
Expand All @@ -564,7 +564,7 @@
dpCfg := func() *dataposter.DataPosterConfig {
return &cfg.Staker.DataPoster
}
return dataposter.NewDataPoster(db, l1Reader, transactOpts, redisC, redisLock, dpCfg, mdRetriever)
return dataposter.NewDataPoster(ctx, db, l1Reader, transactOpts, redisC, redisLock, dpCfg, mdRetriever)
}

func createNodeImpl(
Expand Down Expand Up @@ -805,6 +805,7 @@

if config.Staker.Enable {
dp, err := StakerDataposter(
ctx,
rawdb.NewTable(arbDb, storage.StakerPrefix),
l1Reader,
txOptsValidator,
Expand Down Expand Up @@ -873,7 +874,7 @@
if txOptsBatchPoster == nil {
return nil, errors.New("batchposter, but no TxOpts")
}
batchPoster, err = NewBatchPoster(rawdb.NewTable(arbDb, storage.BatchPosterPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter)

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

not enough arguments in call to NewBatchPoster

Check failure on line 877 in arbnode/node.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

not enough arguments in call to NewBatchPoster
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion system_tests/batch_poster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func testBatchPosterParallel(t *testing.T, useRedis bool) {
for i := 0; i < parallelBatchPosters; i++ {
// Make a copy of the batch poster config so NewBatchPoster calling Validate() on it doesn't race
batchPosterConfig := conf.BatchPoster
batchPoster, err := arbnode.NewBatchPoster(nil, nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil)
batchPoster, err := arbnode.NewBatchPoster(ctx, nil, nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil)
Require(t, err)
batchPoster.Start(ctx)
defer batchPoster.StopAndWait()
Expand Down
6 changes: 3 additions & 3 deletions system_tests/staker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool)

valConfig := staker.TestL1ValidatorConfig

dpA, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
dpA, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
if err != nil {
t.Fatalf("Error creating validator dataposter: %v", err)
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool)
}
Require(t, err)

dpB, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeB.L1Reader, &l1authB, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
dpB, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeB.L1Reader, &l1authB, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
if err != nil {
t.Fatalf("Error creating validator dataposter: %v", err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool)
err = valWalletB.Initialize(ctx)
Require(t, err)
}
dpC, err := arbnode.StakerDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
dpC, err := arbnode.StakerDataposter(ctx, rawdb.NewTable(l2nodeB.ArbDB, storage.StakerPrefix), l2nodeA.L1Reader, &l1authA, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil)
if err != nil {
t.Fatalf("Error creating validator dataposter: %v", err)
}
Expand Down
Loading