Skip to content

Commit

Permalink
Store last message pruned in database
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Dec 17, 2024
1 parent 36ac667 commit 6e0b0e0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
50 changes: 45 additions & 5 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -121,31 +122,31 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
}

func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, &m.cachedPrunedMessageResult, uint64(messageCount))
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, lastPrunedMessageResultKey, &m.cachedPrunedMessageResult, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting message results: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned message results:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount))
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, lastPrunedBlockHashInputFeedKey, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting expected block hashes: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned expected block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, lastPrunedMessageKey, &m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, lastPrunedRlpDelayedMessageKey, &m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
Expand All @@ -157,8 +158,12 @@ func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCoun

// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
// Checks if the last pruned key is set in the database and uses it as the start key if it is.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, lastPrunedKey []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
if startMinKey == 0 {
startMinKey = fetchLastPrunedKey(db, lastPrunedKey)
}
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
Expand All @@ -169,11 +174,46 @@ func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, pref
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
insertLastPrunedKey(db, lastPrunedKey, startMinKey)
return nil, nil
}
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
insertLastPrunedKey(db, lastPrunedKey, endMinKey-1)
}
return keys, err
}

func insertLastPrunedKey(db ethdb.Database, lastPrunedKey []byte, lastPrunedValue uint64) {
lastPrunedValueByte, err := rlp.EncodeToBytes(lastPrunedValue)
if err != nil {
log.Error("error encoding last pruned value: %w", err)
} else {
err = db.Put(lastPrunedKey, lastPrunedValueByte)
if err != nil {
log.Error("error saving last pruned value: %w", err)
}
}
}

func fetchLastPrunedKey(db ethdb.Database, lastPrunedKey []byte) uint64 {
hasKey, err := db.Has(lastPrunedKey)
if err != nil {
log.Warn("error checking for last pruned key: %w", err)
} else if hasKey {
lastPrunedValueByte, err := db.Get(lastPrunedKey)
if err != nil {
log.Warn("error fetching last pruned key: %w", err)
} else {
var lastPrunedValue uint64
err = rlp.DecodeBytes(lastPrunedValueByte, &lastPrunedValue)
if err != nil {
log.Warn("error decoding last pruned value: %w", err)
} else {
return lastPrunedValue
}
}
}
return 0
}
12 changes: 8 additions & 4 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ var (
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
messageCountKey []byte = []byte("_messageCount") // contains the current message count
lastPrunedMessageResultKey []byte = []byte("_lastPrunedMessageResultKey") // contains the last pruned message result key
lastPrunedBlockHashInputFeedKey []byte = []byte("_lastPrunedBlockHashInputFeedPrefix") // contains the last pruned block hash input feed key
lastPrunedMessageKey []byte = []byte("_lastPrunedMessageKey") // contains the last pruned message key
lastPrunedRlpDelayedMessageKey []byte = []byte("_lastPrunedRlpDelayedMessageKey") // contains the last pruned RLP delayed message key
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
)

const currentDbSchemaVersion uint64 = 1

0 comments on commit 6e0b0e0

Please sign in to comment.