diff --git a/Dockerfile b/Dockerfile index 5c56b60cc0..88c34cec44 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,7 +41,8 @@ RUN apt-get update && apt-get install -y curl build-essential=12.9 FROM wasm-base as wasm-libs-builder # clang / lld used by soft-float wasm -RUN apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1 wabt +RUN apt-get update && \ + apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1 wabt # pinned rust 1.75.0 RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.75.0 --target x86_64-unknown-linux-gnu wasm32-unknown-unknown wasm32-wasi COPY ./Makefile ./ @@ -203,6 +204,7 @@ COPY ./scripts/download-machine.sh . #RUN ./download-machine.sh consensus-v11 0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a #RUN ./download-machine.sh consensus-v11.1 0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4 #RUN ./download-machine.sh consensus-v20 0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4 +RUN ./download-machine.sh consensus-v30-rc.2 0xb0de9cb89e4d944ae6023a3b62276e54804c242fd8c4c2d8e6cc4450f5fa8b1b FROM golang:1.21-bookworm as node-builder WORKDIR /workspace @@ -268,11 +270,15 @@ USER user WORKDIR /home/user/ ENTRYPOINT [ "/usr/local/bin/nitro" ] +FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 as nitro-legacy + FROM nitro-node-slim as nitro-node USER root COPY --from=prover-export /bin/jit /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/daserver /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/datool /usr/local/bin/ +COPY --from=nitro-legacy /home/user/target/machines /home/user/nitro-legacy/machines +RUN rm -rf /workspace/target/legacy-machines/latest RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ apt-get install -y \ @@ -282,10 +288,23 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* /usr/share/doc/* /var/cache/ldconfig/aux-cache /usr/lib/python3.9/__pycache__/ /usr/lib/python3.9/*/__pycache__/ /var/log/* && \ nitro --version +ENTRYPOINT [ "/usr/local/bin/nitro" , "--validation.wasm.allowed-wasm-module-roots", "/home/user/nitro-legacy/machines,/home/user/target/machines"] + +USER user +FROM nitro-node as nitro-node-validator +USER root +COPY --from=nitro-legacy /usr/local/bin/nitro-val /home/user/nitro-legacy/bin/nitro-val +COPY --from=nitro-legacy /usr/local/bin/jit /home/user/nitro-legacy/bin/jit +RUN export DEBIAN_FRONTEND=noninteractive && \ + apt-get update && \ + apt-get install -y xxd netcat-traditional && \ + rm -rf /var/lib/apt/lists/* /usr/share/doc/* /var/cache/ldconfig/aux-cache /usr/lib/python3.9/__pycache__/ /usr/lib/python3.9/*/__pycache__/ /var/log/* +COPY scripts/split-val-entry.sh /usr/local/bin +ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] USER user -FROM nitro-node as nitro-node-dev-base +FROM nitro-node-validator as nitro-node-dev USER root # Copy in latest WASM module root RUN rm -f /home/user/target/machines/latest @@ -309,22 +328,5 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ USER user -FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 as nitro-legacy - -FROM nitro-node-dev-base as nitro-node-dev -USER root - -RUN export DEBIAN_FRONTEND=noninteractive && \ - apt-get update && \ - apt-get install -y xxd netcat-traditional && \ - rm -rf /var/lib/apt/lists/* /usr/share/doc/* /var/cache/ldconfig/aux-cache /usr/lib/python3.9/__pycache__/ /usr/lib/python3.9/*/__pycache__/ /var/log/* -COPY scripts/split-val-entry.sh /usr/local/bin -COPY --from=nitro-legacy /home/user/target/machines /home/user/nitro-legacy/machines -RUN rm -rf /workspace/target/legacy-machines/latest -COPY --from=nitro-legacy /usr/local/bin/nitro-val /home/user/nitro-legacy/bin/nitro-val -COPY --from=nitro-legacy /usr/local/bin/jit /home/user/nitro-legacy/bin/jit -ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] -USER user - FROM nitro-node as nitro-node-default # Just to ensure nitro-node-dist is default diff --git a/arbitrator/stylus/src/cache.rs b/arbitrator/stylus/src/cache.rs index 2b83c6152f..06739f2219 100644 --- a/arbitrator/stylus/src/cache.rs +++ b/arbitrator/stylus/src/cache.rs @@ -21,7 +21,7 @@ macro_rules! cache { } pub struct InitCache { - arbos: HashMap, + long_term: HashMap, lru: LruCache, } @@ -59,20 +59,31 @@ impl CacheItem { } impl InitCache { + // current implementation only has one tag that stores to the long_term + // future implementations might have more, but 0 is a reserved tag + // that will never modify long_term state + const ARBOS_TAG: u32 = 1; + fn new(size: usize) -> Self { Self { - arbos: HashMap::new(), + long_term: HashMap::new(), lru: LruCache::new(NonZeroUsize::new(size).unwrap()), } } + pub fn set_lru_size(size: u32) { + cache!() + .lru + .resize(NonZeroUsize::new(size.try_into().unwrap()).unwrap()) + } + /// Retrieves a cached value, updating items as necessary. pub fn get(module_hash: Bytes32, version: u16, debug: bool) -> Option<(Module, Store)> { let mut cache = cache!(); let key = CacheKey::new(module_hash, version, debug); // See if the item is in the long term cache - if let Some(item) = cache.arbos.get(&key) { + if let Some(item) = cache.long_term.get(&key) { return Some(item.data()); } @@ -84,18 +95,27 @@ impl InitCache { } /// Inserts an item into the long term cache, cloning from the LRU cache if able. + /// If long_term_tag is 0 will only insert to LRU pub fn insert( module_hash: Bytes32, module: &[u8], version: u16, + long_term_tag: u32, debug: bool, ) -> Result<(Module, Store)> { let key = CacheKey::new(module_hash, version, debug); // if in LRU, add to ArbOS let mut cache = cache!(); + if let Some(item) = cache.long_term.get(&key) { + return Ok(item.data()); + } if let Some(item) = cache.lru.peek(&key).cloned() { - cache.arbos.insert(key, item.clone()); + if long_term_tag == Self::ARBOS_TAG { + cache.long_term.insert(key, item.clone()); + } else { + cache.lru.promote(&key) + } return Ok(item.data()); } drop(cache); @@ -105,37 +125,34 @@ impl InitCache { let item = CacheItem::new(module, engine); let data = item.data(); - cache!().arbos.insert(key, item); + let mut cache = cache!(); + if long_term_tag != Self::ARBOS_TAG { + cache.lru.put(key, item); + } else { + cache.long_term.insert(key, item); + } Ok(data) } - /// Inserts an item into the short-lived LRU cache. - pub fn insert_lru( - module_hash: Bytes32, - module: &[u8], - version: u16, - debug: bool, - ) -> Result<(Module, Store)> { - let engine = CompileConfig::version(version, debug).engine(); - let module = unsafe { Module::deserialize_unchecked(&engine, module)? }; - - let key = CacheKey::new(module_hash, version, debug); - let item = CacheItem::new(module, engine); - cache!().lru.put(key, item.clone()); - Ok(item.data()) - } - /// Evicts an item in the long-term cache. - pub fn evict(module_hash: Bytes32, version: u16, debug: bool) { + pub fn evict(module_hash: Bytes32, version: u16, long_term_tag: u32, debug: bool) { + if long_term_tag != Self::ARBOS_TAG { + return; + } let key = CacheKey::new(module_hash, version, debug); - cache!().arbos.remove(&key); + let mut cache = cache!(); + if let Some(item) = cache.long_term.remove(&key) { + cache.lru.put(key, item); + } } - /// Modifies the cache for reorg, dropping the long-term cache. - pub fn reorg(_block: u64) { + pub fn clear_long_term(long_term_tag: u32) { + if long_term_tag != Self::ARBOS_TAG { + return; + } let mut cache = cache!(); let cache = &mut *cache; - for (key, item) in cache.arbos.drain() { + for (key, item) in cache.long_term.drain() { cache.lru.put(key, item); // not all will fit, just a heuristic } } diff --git a/arbitrator/stylus/src/lib.rs b/arbitrator/stylus/src/lib.rs index 7abfb98bf5..3c53359f8b 100644 --- a/arbitrator/stylus/src/lib.rs +++ b/arbitrator/stylus/src/lib.rs @@ -183,6 +183,7 @@ pub unsafe extern "C" fn stylus_call( debug_chain: bool, output: *mut RustBytes, gas: *mut u64, + long_term_tag: u32, ) -> UserOutcomeKind { let module = module.slice(); let calldata = calldata.slice().to_vec(); @@ -193,7 +194,14 @@ pub unsafe extern "C" fn stylus_call( // Safety: module came from compile_user_wasm and we've paid for memory expansion let instance = unsafe { - NativeInstance::deserialize_cached(module, config.version, evm_api, evm_data, debug_chain) + NativeInstance::deserialize_cached( + module, + config.version, + evm_api, + evm_data, + long_term_tag, + debug_chain, + ) }; let mut instance = match instance { Ok(instance) => instance, @@ -212,33 +220,47 @@ pub unsafe extern "C" fn stylus_call( status } +/// resize lru +#[no_mangle] +pub extern "C" fn stylus_cache_lru_resize(size: u32) { + InitCache::set_lru_size(size); +} + /// Caches an activated user program. /// /// # Safety /// /// `module` must represent a valid module produced from `stylus_activate`. +/// arbos_tag: a tag for arbos cache. 0 won't affect real caching +/// currently only if tag==1 caching will be affected #[no_mangle] pub unsafe extern "C" fn stylus_cache_module( module: GoSliceData, module_hash: Bytes32, version: u16, + arbos_tag: u32, debug: bool, ) { - if let Err(error) = InitCache::insert(module_hash, module.slice(), version, debug) { + if let Err(error) = InitCache::insert(module_hash, module.slice(), version, arbos_tag, debug) { panic!("tried to cache invalid asm!: {error}"); } } /// Evicts an activated user program from the init cache. #[no_mangle] -pub extern "C" fn stylus_evict_module(module_hash: Bytes32, version: u16, debug: bool) { - InitCache::evict(module_hash, version, debug); +pub extern "C" fn stylus_evict_module( + module_hash: Bytes32, + version: u16, + arbos_tag: u32, + debug: bool, +) { + InitCache::evict(module_hash, version, arbos_tag, debug); } /// Reorgs the init cache. This will likely never happen. #[no_mangle] -pub extern "C" fn stylus_reorg_vm(block: u64) { - InitCache::reorg(block); +pub extern "C" fn stylus_reorg_vm(_block: u64, arbos_tag: u32) { + InitCache::clear_long_term(arbos_tag); } /// Frees the vector. Does nothing when the vector is null. diff --git a/arbitrator/stylus/src/native.rs b/arbitrator/stylus/src/native.rs index 6d5e4cd2e9..2858d59fdc 100644 --- a/arbitrator/stylus/src/native.rs +++ b/arbitrator/stylus/src/native.rs @@ -113,6 +113,7 @@ impl> NativeInstance { version: u16, evm: E, evm_data: EvmData, + mut long_term_tag: u32, debug: bool, ) -> Result { let compile = CompileConfig::version(version, debug); @@ -122,10 +123,11 @@ impl> NativeInstance { if let Some((module, store)) = InitCache::get(module_hash, version, debug) { return Self::from_module(module, store, env); } - let (module, store) = match env.evm_data.cached { - true => InitCache::insert(module_hash, module, version, debug)?, - false => InitCache::insert_lru(module_hash, module, version, debug)?, - }; + if !env.evm_data.cached { + long_term_tag = 0; + } + let (module, store) = + InitCache::insert(module_hash, module, version, long_term_tag, debug)?; Self::from_module(module, store, env) } diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 5c879743a4..594e0cedb5 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -65,6 +65,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* if err != nil { Fail(t, err) } + execEngine.Initialize(gethexec.DefaultCachingConfig.StylusLRUCache) execSeq := &execClientWrapper{execEngine, t} inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher) if err != nil { diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index 31bf1a63ff..5d18341a27 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -23,13 +23,14 @@ import ( type MessagePruner struct { stopwaiter.StopWaiter - transactionStreamer *TransactionStreamer - inboxTracker *InboxTracker - config MessagePrunerConfigFetcher - pruningLock sync.Mutex - lastPruneDone time.Time - cachedPrunedMessages uint64 - cachedPrunedDelayedMessages uint64 + transactionStreamer *TransactionStreamer + inboxTracker *InboxTracker + config MessagePrunerConfigFetcher + pruningLock sync.Mutex + lastPruneDone time.Time + cachedPrunedMessages uint64 + cachedPrunedBlockHashesInputFeed uint64 + cachedPrunedDelayedMessages uint64 } type MessagePrunerConfig struct { @@ -115,7 +116,15 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g } func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error { - prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) + prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount)) + if err != nil { + return fmt.Errorf("error deleting expected block hashes: %w", err) + } + if len(prunedKeysRange) > 0 { + log.Info("Pruned expected block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) + } + + prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) if err != nil { return fmt.Errorf("error deleting last batch messages: %w", err) } diff --git a/arbnode/message_pruner_test.go b/arbnode/message_pruner_test.go index 0212ed2364..ed85c0ebce 100644 --- a/arbnode/message_pruner_test.go +++ b/arbnode/message_pruner_test.go @@ -22,8 +22,8 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { Require(t, err) checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) + checkDbKeys(t, messagesCount, transactionStreamerDb, blockHashInputFeedPrefix) checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix) - } func TestMessagePrunerTwoHalves(t *testing.T) { @@ -71,16 +71,18 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { Require(t, err) checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix) + checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, blockHashInputFeedPrefix) checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix) } func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) { - transactionStreamerDb := rawdb.NewMemoryDatabase() for i := uint64(0); i < uint64(messageCount); i++ { err := transactionStreamerDb.Put(dbKey(messagePrefix, i), []byte{}) Require(t, err) + err = transactionStreamerDb.Put(dbKey(blockHashInputFeedPrefix, i), []byte{}) + Require(t, err) } inboxTrackerDb := rawdb.NewMemoryDatabase() diff --git a/arbnode/schema.go b/arbnode/schema.go index ddc7cf54fd..2854b7e785 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -5,6 +5,7 @@ package arbnode var ( messagePrefix []byte = []byte("m") // maps a message sequence number to a message + blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 0d5ae829b0..b79b1aa963 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -60,7 +60,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time - broadcasterQueuedMessages []arbostypes.MessageWithMetadata + broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash broadcasterQueuedMessagesPos uint64 broadcasterQueuedMessagesActiveReorg bool @@ -140,6 +140,16 @@ type L1PriceData struct { currentEstimateOfL1GasPrice uint64 } +// Represents a block's hash in the database. +// Necessary because RLP decoder doesn't produce nil values by default. +type blockHashDBValue struct { + BlockHash *common.Hash `rlp:"nil"` +} + +const ( + BlockHashMismatchLogMsg = "BlockHash from feed doesn't match locally computed hash. Check feed source." +) + func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { s.cachedL1PriceDataMutex.Lock() defer s.cachedL1PriceDataMutex.Unlock() @@ -371,7 +381,7 @@ func deleteFromRange(ctx context.Context, db ethdb.Database, prefix []byte, star // The insertion mutex must be held. This acquires the reorg mutex. // Note: oldMessages will be empty if reorgHook is nil -func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata) error { +func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash) error { if count == 0 { return errors.New("cannot reorg out init message") } @@ -465,14 +475,14 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde return err } - messagesWithBlockHash := make([]broadcaster.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) + messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { - messagesWithBlockHash = append(messagesWithBlockHash, broadcaster.MessageWithMetadataAndBlockHash{ - Message: newMessages[i], - BlockHash: &messagesResults[i].BlockHash, + messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: newMessages[i].MessageWithMeta, + BlockHash: &messagesResults[i].BlockHash, }) } - s.broadcastMessages(messagesWithBlockHash, count) + s.broadcastMessages(messagesWithComputedBlockHash, count) if s.validator != nil { err = s.validator.Reorg(s.GetContext(), count) @@ -481,6 +491,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde } } + err = deleteStartingAt(s.db, batch, blockHashInputFeedPrefix, uint64ToKey(uint64(count))) + if err != nil { + return err + } err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count))) if err != nil { return err @@ -510,6 +524,10 @@ func dbKey(prefix []byte, pos uint64) []byte { return key } +func isErrNotFound(err error) bool { + return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) +} + // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadata, error) { key := dbKey(messagePrefix, uint64(seqNum)) @@ -526,6 +544,36 @@ func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostyp return &message, nil } +func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadataAndBlockHash, error) { + msg, err := s.GetMessage(seqNum) + if err != nil { + return nil, err + } + + // Get block hash. + // To keep it backwards compatible, since it is possible that a message related + // to a sequence number exists in the database, but the block hash doesn't. + key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) + var blockHash *common.Hash + data, err := s.db.Get(key) + if err == nil { + var blockHashDBVal blockHashDBValue + err = rlp.DecodeBytes(data, &blockHashDBVal) + if err != nil { + return nil, err + } + blockHash = blockHashDBVal.BlockHash + } else if !isErrNotFound(err) { + return nil, err + } + + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: *msg, + BlockHash: blockHash, + } + return &msgWithBlockHash, nil +} + // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. func (s *TransactionStreamer) GetMessageCount() (arbutil.MessageIndex, error) { posBytes, err := s.db.Get(messageCountKey) @@ -579,7 +627,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe return nil } broadcastStartPos := feedMessages[0].SequenceNumber - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash broadcastAfterPos := broadcastStartPos for _, feedMessage := range feedMessages { if broadcastAfterPos != feedMessage.SequenceNumber { @@ -588,7 +636,11 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if feedMessage.Message.Message == nil || feedMessage.Message.Message.Header == nil { return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber) } - messages = append(messages, feedMessage.Message) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: feedMessage.Message, + BlockHash: feedMessage.BlockHash, + } + messages = append(messages, msgWithBlockHash) broadcastAfterPos++ } @@ -607,7 +659,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe messages = messages[dups:] broadcastStartPos += arbutil.MessageIndex(dups) if oldMsg != nil { - s.logReorg(broadcastStartPos, oldMsg, &messages[0], false) + s.logReorg(broadcastStartPos, oldMsg, &messages[0].MessageWithMeta, false) } if len(messages) == 0 { // No new messages received @@ -657,7 +709,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if broadcastStartPos > 0 { _, err := s.GetMessage(broadcastStartPos - 1) if err != nil { - if !errors.Is(err, leveldb.ErrNotFound) && !errors.Is(err, pebble.ErrNotFound) { + if !isErrNotFound(err) { return err } // Message before current message doesn't exist in database, so don't add current messages yet @@ -709,11 +761,18 @@ func endBatch(batch ethdb.Batch) error { } func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { + messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messages)) + for _, message := range messages { + messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: message, + }) + } + if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache s.TrimCache(pos + arbutil.MessageIndex(len(messages))) s.reorgMutex.RLock() - dups, _, _, err := s.countDuplicateMessages(pos, messages, nil) + dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil) s.reorgMutex.RUnlock() if err != nil { return err @@ -730,7 +789,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m s.insertionMutex.Lock() defer s.insertionMutex.Unlock() - return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messages, batch) + return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch) } func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) { @@ -748,7 +807,7 @@ func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) ( func (s *TransactionStreamer) countDuplicateMessages( pos arbutil.MessageIndex, - messages []arbostypes.MessageWithMetadata, + messages []arbostypes.MessageWithMetadataAndBlockHash, batch *ethdb.Batch, ) (int, bool, *arbostypes.MessageWithMetadata, error) { curMsg := 0 @@ -769,7 +828,7 @@ func (s *TransactionStreamer) countDuplicateMessages( return 0, false, nil, err } nextMessage := messages[curMsg] - wantMessage, err := rlp.EncodeToBytes(nextMessage) + wantMessage, err := rlp.EncodeToBytes(nextMessage.MessageWithMeta) if err != nil { return 0, false, nil, err } @@ -785,12 +844,12 @@ func (s *TransactionStreamer) countDuplicateMessages( return curMsg, true, nil, nil } var duplicateMessage bool - if nextMessage.Message != nil { - if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.Message.BatchGasCost == nil { + if nextMessage.MessageWithMeta.Message != nil { + if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.MessageWithMeta.Message.BatchGasCost == nil { // Remove both of the batch gas costs and see if the messages still differ - nextMessageCopy := nextMessage + nextMessageCopy := nextMessage.MessageWithMeta nextMessageCopy.Message = new(arbostypes.L1IncomingMessage) - *nextMessageCopy.Message = *nextMessage.Message + *nextMessageCopy.Message = *nextMessage.MessageWithMeta.Message batchGasCostBkup := dbMessageParsed.Message.BatchGasCost dbMessageParsed.Message.BatchGasCost = nil nextMessageCopy.Message.BatchGasCost = nil @@ -798,7 +857,7 @@ func (s *TransactionStreamer) countDuplicateMessages( // Actually this isn't a reorg; only the batch gas costs differed duplicateMessage = true // If possible - update the message in the database to add the gas cost cache. - if batch != nil && nextMessage.Message.BatchGasCost != nil { + if batch != nil && nextMessage.MessageWithMeta.Message.BatchGasCost != nil { if *batch == nil { *batch = s.db.NewBatch() } @@ -842,7 +901,7 @@ func (s *TransactionStreamer) logReorg(pos arbutil.MessageIndex, dbMsg *arbostyp } -func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { var confirmedReorg bool var oldMsg *arbostypes.MessageWithMetadata var lastDelayedRead uint64 @@ -860,7 +919,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].MessageWithMeta.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } @@ -898,13 +957,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].MessageWithMeta.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } } if oldMsg != nil { - s.logReorg(messageStartPos, oldMsg, &messages[0], confirmedReorg) + s.logReorg(messageStartPos, oldMsg, &messages[0].MessageWithMeta, confirmedReorg) } if feedReorg { @@ -924,12 +983,12 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Validate delayed message counts of remaining messages for i, msg := range messages { msgPos := messageStartPos + arbutil.MessageIndex(i) - diff := msg.DelayedMessagesRead - lastDelayedRead + diff := msg.MessageWithMeta.DelayedMessagesRead - lastDelayedRead if diff != 0 && diff != 1 { - return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.DelayedMessagesRead, msgPos) + return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.MessageWithMeta.DelayedMessagesRead, msgPos) } - lastDelayedRead = msg.DelayedMessagesRead - if msg.Message == nil { + lastDelayedRead = msg.MessageWithMeta.DelayedMessagesRead + if msg.MessageWithMeta.Message == nil { return fmt.Errorf("attempted to insert nil message at position %v", msgPos) } } @@ -1007,15 +1066,15 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } } - if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil { - return err + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: msgWithMeta, + BlockHash: &msgResult.BlockHash, } - msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{ - Message: msgWithMeta, - BlockHash: &msgResult.BlockHash, + if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { + return err } - s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return nil } @@ -1036,9 +1095,23 @@ func (s *TransactionStreamer) PopulateFeedBacklog() error { return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer) } -func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { + // write message with metadata key := dbKey(messagePrefix, uint64(pos)) - msgBytes, err := rlp.EncodeToBytes(msg) + msgBytes, err := rlp.EncodeToBytes(msg.MessageWithMeta) + if err != nil { + return err + } + if err := batch.Put(key, msgBytes); err != nil { + return err + } + + // write block hash + blockHashDBVal := blockHashDBValue{ + BlockHash: msg.BlockHash, + } + key = dbKey(blockHashInputFeedPrefix, uint64(pos)) + msgBytes, err = rlp.EncodeToBytes(blockHashDBVal) if err != nil { return err } @@ -1046,7 +1119,7 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty } func (s *TransactionStreamer) broadcastMessages( - msgs []broadcaster.MessageWithMetadataAndBlockHash, + msgs []arbostypes.MessageWithMetadataAndBlockHash, pos arbutil.MessageIndex, ) { if s.broadcastServer == nil { @@ -1059,7 +1132,7 @@ func (s *TransactionStreamer) broadcastMessages( // The mutex must be held, and pos must be the latest message count. // `batch` may be nil, which initializes a new batch. The batch is closed out in this function. -func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { if batch == nil { batch = s.db.NewBatch() } @@ -1095,6 +1168,20 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut return s.exec.ResultAtPos(count - 1) } +func (s *TransactionStreamer) checkResult(msgResult *execution.MessageResult, expectedBlockHash *common.Hash) { + if expectedBlockHash == nil { + return + } + if msgResult.BlockHash != *expectedBlockHash { + log.Error( + BlockHashMismatchLogMsg, + "expected", expectedBlockHash, + "actual", msgResult.BlockHash, + ) + return + } +} + // exposed for testing // return value: true if should be called again immediately func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool { @@ -1121,7 +1208,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution if pos >= msgCount { return false } - msg, err := s.GetMessage(pos) + msgAndBlockHash, err := s.getMessageWithMetadataAndBlockHash(pos) if err != nil { log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false @@ -1135,7 +1222,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution } msgForPrefetch = msg } - msgResult, err := s.exec.DigestMessage(pos, msg, msgForPrefetch) + msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockHash.MessageWithMeta, msgForPrefetch) if err != nil { logger := log.Warn if prevMessageCount < msgCount { @@ -1145,11 +1232,13 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution return false } - msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{ - Message: *msg, - BlockHash: &msgResult.BlockHash, + s.checkResult(msgResult, msgAndBlockHash.BlockHash) + + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: msgAndBlockHash.MessageWithMeta, + BlockHash: &msgResult.BlockHash, } - s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return pos+1 < msgCount } diff --git a/arbos/arbostypes/messagewithmeta.go b/arbos/arbostypes/messagewithmeta.go index a3d4f5e3c3..79b7c4f9d2 100644 --- a/arbos/arbostypes/messagewithmeta.go +++ b/arbos/arbostypes/messagewithmeta.go @@ -18,6 +18,11 @@ type MessageWithMetadata struct { DelayedMessagesRead uint64 `json:"delayedMessagesRead"` } +type MessageWithMetadataAndBlockHash struct { + MessageWithMeta MessageWithMetadata + BlockHash *common.Hash +} + var EmptyTestMessageWithMetadata = MessageWithMetadata{ Message: &EmptyTestIncomingMessage, } diff --git a/arbos/programs/data_pricer.go b/arbos/programs/data_pricer.go index b0184d7dc7..ed7c98556d 100644 --- a/arbos/programs/data_pricer.go +++ b/arbos/programs/data_pricer.go @@ -27,12 +27,14 @@ const ( inertiaOffset ) +const ArbitrumStartTime = 1421388000 // the day it all began + const initialDemand = 0 // no demand const InitialHourlyBytes = 1 * (1 << 40) / (365 * 24) // 1Tb total footprint const initialBytesPerSecond = InitialHourlyBytes / (60 * 60) // refill each second -const initialLastUpdateTime = 1421388000 // the day it all began -const initialMinPrice = 82928201 // 5Mb = $1 -const initialInertia = 21360419 // expensive at 1Tb +const initialLastUpdateTime = ArbitrumStartTime +const initialMinPrice = 82928201 // 5Mb = $1 +const initialInertia = 21360419 // expensive at 1Tb func initDataPricer(sto *storage.Storage) { demand := sto.OpenStorageBackedUint32(demandOffset) diff --git a/arbos/programs/native.go b/arbos/programs/native.go index 7a6c16d866..f24dcac64d 100644 --- a/arbos/programs/native.go +++ b/arbos/programs/native.go @@ -172,6 +172,7 @@ func callProgram( evmData *EvmData, stylusParams *ProgParams, memoryModel *MemoryModel, + arbos_tag uint32, ) ([]byte, error) { db := interpreter.Evm().StateDB debug := stylusParams.DebugMode @@ -198,6 +199,7 @@ func callProgram( cbool(debug), output, (*u64)(&scope.Contract.Gas), + u32(arbos_tag), )) depth := interpreter.Depth() @@ -228,8 +230,9 @@ func cacheProgram(db vm.StateDB, module common.Hash, program Program, params *St if err != nil { panic("unable to recreate wasm") } - state.CacheWasmRust(asm, module, program.version, debug) - db.RecordCacheWasm(state.CacheWasm{ModuleHash: module, Version: program.version, Debug: debug}) + tag := db.Database().WasmCacheTag() + state.CacheWasmRust(asm, module, program.version, tag, debug) + db.RecordCacheWasm(state.CacheWasm{ModuleHash: module, Version: program.version, Tag: tag, Debug: debug}) } } @@ -237,22 +240,27 @@ func cacheProgram(db vm.StateDB, module common.Hash, program Program, params *St // For gas estimation and eth_call, we ignore permanent updates and rely on Rust's LRU. func evictProgram(db vm.StateDB, module common.Hash, version uint16, debug bool, runMode core.MessageRunMode, forever bool) { if runMode == core.MessageCommitMode { - state.EvictWasmRust(module, version, debug) + tag := db.Database().WasmCacheTag() + state.EvictWasmRust(module, version, tag, debug) if !forever { - db.RecordEvictWasm(state.EvictWasm{ModuleHash: module, Version: version, Debug: debug}) + db.RecordEvictWasm(state.EvictWasm{ModuleHash: module, Version: version, Tag: tag, Debug: debug}) } } } func init() { - state.CacheWasmRust = func(asm []byte, moduleHash common.Hash, version uint16, debug bool) { - C.stylus_cache_module(goSlice(asm), hashToBytes32(moduleHash), u16(version), cbool(debug)) + state.CacheWasmRust = func(asm []byte, moduleHash common.Hash, version uint16, tag uint32, debug bool) { + C.stylus_cache_module(goSlice(asm), hashToBytes32(moduleHash), u16(version), u32(tag), cbool(debug)) } - state.EvictWasmRust = func(moduleHash common.Hash, version uint16, debug bool) { - C.stylus_evict_module(hashToBytes32(moduleHash), u16(version), cbool(debug)) + state.EvictWasmRust = func(moduleHash common.Hash, version uint16, tag uint32, debug bool) { + C.stylus_evict_module(hashToBytes32(moduleHash), u16(version), u32(tag), cbool(debug)) } } +func ResizeWasmLruCache(size uint32) { + C.stylus_cache_lru_resize(u32(size)) +} + func (value bytes32) toHash() common.Hash { hash := common.Hash{} for index, b := range value.bytes { diff --git a/arbos/programs/programs.go b/arbos/programs/programs.go index d3113ae98d..97b5477afc 100644 --- a/arbos/programs/programs.go +++ b/arbos/programs/programs.go @@ -166,6 +166,7 @@ func (p Programs) CallProgram( tracingInfo *util.TracingInfo, calldata []byte, reentrant bool, + runmode core.MessageRunMode, ) ([]byte, error) { evm := interpreter.Evm() contract := scope.Contract @@ -237,7 +238,11 @@ func (p Programs) CallProgram( if contract.CodeAddr != nil { address = *contract.CodeAddr } - return callProgram(address, moduleHash, localAsm, scope, interpreter, tracingInfo, calldata, evmData, goParams, model) + var arbos_tag uint32 + if runmode == core.MessageCommitMode { + arbos_tag = statedb.Database().WasmCacheTag() + } + return callProgram(address, moduleHash, localAsm, scope, interpreter, tracingInfo, calldata, evmData, goParams, model, arbos_tag) } func getWasm(statedb vm.StateDB, program common.Address) ([]byte, error) { @@ -527,12 +532,12 @@ func (status userStatus) toResult(data []byte, debug bool) ([]byte, string, erro // Hours since Arbitrum began, rounded down. func hoursSinceArbitrum(time uint64) uint24 { - return uint24((time - lastUpdateTimeOffset) / 3600) + return am.SaturatingUUCast[uint24]((am.SaturatingUSub(time, ArbitrumStartTime)) / 3600) } // Computes program age in seconds from the hours passed since Arbitrum began. func hoursToAge(time uint64, hours uint24) uint64 { seconds := am.SaturatingUMul(uint64(hours), 3600) - activatedAt := am.SaturatingUAdd(lastUpdateTimeOffset, seconds) + activatedAt := am.SaturatingUAdd(ArbitrumStartTime, seconds) return am.SaturatingUSub(time, activatedAt) } diff --git a/arbos/programs/testconstants.go b/arbos/programs/testconstants.go index 215b5fb8a7..1ab0e6e93b 100644 --- a/arbos/programs/testconstants.go +++ b/arbos/programs/testconstants.go @@ -1,6 +1,9 @@ // Copyright 2024, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE +//go:build !wasm +// +build !wasm + package programs // This file exists because cgo isn't allowed in tests diff --git a/arbos/programs/wasm.go b/arbos/programs/wasm.go index 95f30e83b6..4bc978a2b6 100644 --- a/arbos/programs/wasm.go +++ b/arbos/programs/wasm.go @@ -143,6 +143,7 @@ func callProgram( evmData *EvmData, params *ProgParams, memoryModel *MemoryModel, + _arbos_tag uint32, ) ([]byte, error) { reqHandler := newApiClosures(interpreter, tracingInfo, scope, memoryModel) gasLeft, retData, err := CallProgramLoop(moduleHash, calldata, scope.Contract.Gas, evmData, params, reqHandler) diff --git a/arbos/tx_processor.go b/arbos/tx_processor.go index b5fb64f695..65762fd2d1 100644 --- a/arbos/tx_processor.go +++ b/arbos/tx_processor.go @@ -127,6 +127,7 @@ func (p *TxProcessor) ExecuteWASM(scope *vm.ScopeContext, input []byte, interpre tracingInfo, input, reentrant, + p.RunMode(), ) } diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index ac5c6c39da..ba95f2d8af 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -22,11 +22,6 @@ import ( "github.com/offchainlabs/nitro/wsbroadcastserver" ) -type MessageWithMetadataAndBlockHash struct { - Message arbostypes.MessageWithMetadata - BlockHash *common.Hash -} - type Broadcaster struct { server *wsbroadcastserver.WSBroadcastServer backlog backlog.Backlog @@ -98,7 +93,7 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) { } func (b *Broadcaster) BroadcastMessages( - messagesWithBlockHash []MessageWithMetadataAndBlockHash, + messagesWithBlockHash []arbostypes.MessageWithMetadataAndBlockHash, seq arbutil.MessageIndex, ) (err error) { defer func() { @@ -109,7 +104,7 @@ func (b *Broadcaster) BroadcastMessages( }() var feedMessages []*m.BroadcastFeedMessage for i, msg := range messagesWithBlockHash { - bfm, err := b.NewBroadcastFeedMessage(msg.Message, seq+arbutil.MessageIndex(i), msg.BlockHash) + bfm, err := b.NewBroadcastFeedMessage(msg.MessageWithMeta, seq+arbutil.MessageIndex(i), msg.BlockHash) if err != nil { return err } diff --git a/cmd/conf/chain.go b/cmd/conf/chain.go index 531945b4d6..ab9a713287 100644 --- a/cmd/conf/chain.go +++ b/cmd/conf/chain.go @@ -20,11 +20,12 @@ type ParentChainConfig struct { } var L1ConnectionConfigDefault = rpcclient.ClientConfig{ - URL: "", - Retries: 2, - Timeout: time.Minute, - ConnectionWait: time.Minute, - ArgLogLimit: 2048, + URL: "", + Retries: 2, + Timeout: time.Minute, + ConnectionWait: time.Minute, + ArgLogLimit: 2048, + WebsocketMessageSizeLimit: 256 * 1024 * 1024, } var L1ConfigDefault = ParentChainConfig{ diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index c52c87732c..0b36fcfdaf 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -186,7 +186,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if err != nil { return nil, nil, err } - chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb) + chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb, 1) err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired()) if err != nil { return chainDb, nil, fmt.Errorf("error pruning: %w", err) @@ -243,7 +243,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if err != nil { return nil, nil, err } - chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb) + chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb, 1) if config.Init.ImportFile != "" { initDataReader, err = statetransfer.NewJsonInitDataReader(config.Init.ImportFile) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 9280c3af02..815257cf7a 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -6,6 +6,7 @@ package main import ( "context" "crypto/ecdsa" + "encoding/hex" "errors" "fmt" "io" @@ -452,7 +453,21 @@ func mainImpl() int { if len(allowedWasmModuleRoots) > 0 { moduleRootMatched := false for _, root := range allowedWasmModuleRoots { - if common.HexToHash(root) == moduleRoot { + bytes, err := hex.DecodeString(root) + if err == nil { + if common.HexToHash(root) == common.BytesToHash(bytes) { + moduleRootMatched = true + break + } + continue + } + locator, locatorErr := server_common.NewMachineLocator(root) + if locatorErr != nil { + log.Warn("allowed-wasm-module-roots: value not a hex nor valid path:", "value", root, "locatorErr", locatorErr, "decodeErr", err) + continue + } + path := locator.GetMachinePath(moduleRoot) + if _, err := os.Stat(path); err == nil { moduleRootMatched = true break } diff --git a/cmd/staterecovery/staterecovery.go b/cmd/staterecovery/staterecovery.go index 6390826a91..58ad06ad14 100644 --- a/cmd/staterecovery/staterecovery.go +++ b/cmd/staterecovery/staterecovery.go @@ -31,7 +31,7 @@ func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheCon return fmt.Errorf("start block parent is missing, parent block number: %d", current-1) } hashConfig := *hashdb.Defaults - hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit + hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit * 1024 * 1024 trieConfig := &trie.Config{ Preimages: false, HashDB: &hashConfig, diff --git a/execution/gethexec/blockchain.go b/execution/gethexec/blockchain.go index 2a20c3da26..1d5060ca8a 100644 --- a/execution/gethexec/blockchain.go +++ b/execution/gethexec/blockchain.go @@ -37,6 +37,7 @@ type CachingConfig struct { SnapshotRestoreGasLimit uint64 `koanf:"snapshot-restore-gas-limit"` MaxNumberOfBlocksToSkipStateSaving uint32 `koanf:"max-number-of-blocks-to-skip-state-saving"` MaxAmountOfGasToSkipStateSaving uint64 `koanf:"max-amount-of-gas-to-skip-state-saving"` + StylusLRUCache uint32 `koanf:"stylus-lru-cache"` } func CachingConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -51,6 +52,7 @@ func CachingConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".snapshot-restore-gas-limit", DefaultCachingConfig.SnapshotRestoreGasLimit, "maximum gas rolled back to recover snapshot") f.Uint32(prefix+".max-number-of-blocks-to-skip-state-saving", DefaultCachingConfig.MaxNumberOfBlocksToSkipStateSaving, "maximum number of blocks to skip state saving to persistent storage (archive node only) -- warning: this option seems to cause issues") f.Uint64(prefix+".max-amount-of-gas-to-skip-state-saving", DefaultCachingConfig.MaxAmountOfGasToSkipStateSaving, "maximum amount of gas in blocks to skip saving state to Persistent storage (archive node only) -- warning: this option seems to cause issues") + f.Uint32(prefix+".stylus-lru-cache", DefaultCachingConfig.StylusLRUCache, "initialized stylus programs to keep in LRU cache") } var DefaultCachingConfig = CachingConfig{ @@ -65,6 +67,22 @@ var DefaultCachingConfig = CachingConfig{ SnapshotRestoreGasLimit: 300_000_000_000, MaxNumberOfBlocksToSkipStateSaving: 0, MaxAmountOfGasToSkipStateSaving: 0, + StylusLRUCache: 256, +} + +var TestCachingConfig = CachingConfig{ + Archive: false, + BlockCount: 128, + BlockAge: 30 * time.Minute, + TrieTimeLimit: time.Hour, + TrieDirtyCache: 1024, + TrieCleanCache: 600, + SnapshotCache: 400, + DatabaseCache: 2048, + SnapshotRestoreGasLimit: 300_000_000_000, + MaxNumberOfBlocksToSkipStateSaving: 0, + MaxAmountOfGasToSkipStateSaving: 0, + StylusLRUCache: 0, } // TODO remove stack from parameters as it is no longer needed here diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 38569f44ab..3ef894d402 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -1,6 +1,9 @@ // Copyright 2022-2024, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE +//go:build !wasm +// +build !wasm + package gethexec /* @@ -28,6 +31,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbosState" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbos/l1pricing" + "github.com/offchainlabs/nitro/arbos/programs" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/arbmath" @@ -72,6 +76,12 @@ func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { }, nil } +func (n *ExecutionEngine) Initialize(rustCacheSize uint32) { + if rustCacheSize != 0 { + programs.ResizeWasmLruCache(rustCacheSize) + } +} + func (s *ExecutionEngine) SetRecorder(recorder *BlockRecorder) { if s.Started() { panic("trying to set recorder after start") @@ -116,7 +126,7 @@ func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { return s.consensus } -func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { if count == 0 { return nil, errors.New("cannot reorg out genesis") } @@ -137,8 +147,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return nil, nil } + tag := s.bc.StateCache().WasmCacheTag() // reorg Rust-side VM state - C.stylus_reorg_vm(C.uint64_t(blockNum)) + C.stylus_reorg_vm(C.uint64_t(blockNum), C.uint32_t(tag)) err := s.bc.ReorgToOldBlock(targetBlock) if err != nil { @@ -149,9 +160,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost for i := range newMessages { var msgForPrefetch *arbostypes.MessageWithMetadata if i < len(newMessages)-1 { - msgForPrefetch = &newMessages[i] + msgForPrefetch = &newMessages[i].MessageWithMeta } - msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) + msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].MessageWithMeta, msgForPrefetch) if err != nil { return nil, err } @@ -197,7 +208,7 @@ func (s *ExecutionEngine) NextDelayedMessageNumber() (uint64, error) { return currentHeader.Nonce.Uint64(), nil } -func messageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) { +func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) { var l2Message []byte if len(txes) == 1 && txErrors[0] == nil { txBytes, err := txes[0].MarshalBinary() @@ -368,7 +379,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, nil } - msg, err := messageFromTxes(header, txes, hooks.TxErrors) + msg, err := MessageFromTxes(header, txes, hooks.TxErrors) if err != nil { return nil, err } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index ae76b88530..5903c0bb3b 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -107,6 +107,7 @@ var ConfigDefault = Config{ func ConfigDefaultNonSequencerTest() *Config { config := ConfigDefault + config.Caching = TestCachingConfig config.ParentChainReader = headerreader.TestConfig config.Sequencer.Enable = false config.Forwarder = DefaultTestForwarderConfig @@ -119,6 +120,7 @@ func ConfigDefaultNonSequencerTest() *Config { func ConfigDefaultTest() *Config { config := ConfigDefault + config.Caching = TestCachingConfig config.Sequencer = TestSequencerConfig config.ParentChainReader = headerreader.TestConfig config.ForwardingTarget = "null" @@ -280,6 +282,7 @@ func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) { } func (n *ExecutionNode) Initialize(ctx context.Context) error { + n.ExecEngine.Initialize(n.ConfigFetcher().Caching.StylusLRUCache) n.ArbInterface.Initialize(n) err := n.Backend.Start() if err != nil { @@ -346,7 +349,7 @@ func (n *ExecutionNode) StopAndWait() { func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } -func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { return n.ExecEngine.Reorg(count, newMessages, oldMessages) } func (n *ExecutionNode) HeadMessageNumber() (arbutil.MessageIndex, error) { diff --git a/execution/interface.go b/execution/interface.go index d2a5b58fe5..66aefe9a5e 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -31,7 +31,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*MessageResult, error) - Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) + Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) diff --git a/go-ethereum b/go-ethereum index 8048ac4bed..b8d4ced531 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 8048ac4bed2eda18284e3c022ea5ee4cce771134 +Subproject commit b8d4ced5316c987d095ef1fc3ecb5e8ae0df094d diff --git a/scripts/split-val-entry.sh b/scripts/split-val-entry.sh index 6f56a8ec46..8e1be0f6cc 100755 --- a/scripts/split-val-entry.sh +++ b/scripts/split-val-entry.sh @@ -16,4 +16,4 @@ for port in 52000 52001; do done done echo launching nitro-node -/usr/local/bin/nitro --node.block-validator.validation-server-configs-list='[{"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52000"}, {"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52001"}]' "$@" +/usr/local/bin/nitro --validation.wasm.allowed-wasm-module-roots /home/user/nitro-legacy/machines,/home/user/target/machines --node.block-validator.validation-server-configs-list='[{"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52000"}, {"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52001"}]' "$@" diff --git a/staker/block_validator.go b/staker/block_validator.go index e494b3da10..027ee78248 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1097,13 +1097,18 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { for _, root := range moduleRoots { if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) { v.chosenValidator[root] = v.redisValidator + log.Info("validator chosen", "WasmModuleRoot", root, "chosen", "redis") } else { for _, spawner := range v.execSpawners { if validator.SpawnerSupportsModule(spawner, root) { v.chosenValidator[root] = spawner + log.Info("validator chosen", "WasmModuleRoot", root, "chosen", spawner.Name()) break } } + if v.chosenValidator[root] == nil { + log.Error("validator not found", "WasmModuleRoot", root) + } } } return nil diff --git a/system_tests/common_test.go b/system_tests/common_test.go index f6bfde2108..edc16ffec4 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -777,7 +777,7 @@ func createL2BlockChainWithStackConfig( Require(t, err) wasmData, err := stack.OpenDatabase("wasm", 0, 0, "wasm/", false) Require(t, err) - chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmData) + chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmData, 0) arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) Require(t, err) @@ -984,7 +984,7 @@ func Create2ndNodeWithConfig( Require(t, err) wasmData, err := l2stack.OpenDatabase("wasm", 0, 0, "wasm/", false) Require(t, err) - l2chainDb := rawdb.WrapDatabaseWithWasm(l2chainData, wasmData) + l2chainDb := rawdb.WrapDatabaseWithWasm(l2chainData, wasmData, 0) l2arbDb, err := l2stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) Require(t, err) diff --git a/system_tests/recreatestate_rpc_test.go b/system_tests/recreatestate_rpc_test.go index 777ed17961..bf321808de 100644 --- a/system_tests/recreatestate_rpc_test.go +++ b/system_tests/recreatestate_rpc_test.go @@ -449,7 +449,7 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig } func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) { - cacheConfig := gethexec.DefaultCachingConfig + cacheConfig := gethexec.TestCachingConfig cacheConfig.Archive = true cacheConfig.SnapshotCache = 0 // disable snapshots cacheConfig.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 886a0528c7..43d55f40c9 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" "math/big" + "net" "testing" "time" "github.com/go-redis/redis/v8" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/arbostypes" @@ -21,6 +23,7 @@ import ( "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/testhelpers" ) func initRedisForTest(t *testing.T, ctx context.Context, redisUrl string, nodeNames []string) { @@ -270,6 +273,8 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { } func testCoordinatorMessageSync(t *testing.T, successCase bool) { + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -304,16 +309,25 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { nodeConfigDup := *builder.nodeConfig builder.nodeConfig = &nodeConfigDup - + builder.nodeConfig.Feed.Output = *newBroadcasterConfigTest() builder.nodeConfig.SeqCoordinator.MyUrl = nodeNames[1] if !successCase { builder.nodeConfig.SeqCoordinator.Signer.ECDSA.AcceptSequencer = false builder.nodeConfig.SeqCoordinator.Signer.ECDSA.AllowedAddresses = []string{builder.L2Info.GetAddress("User2").Hex()} } - testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: builder.nodeConfig}) defer cleanupB() + // Build nodeBOutputFeedReader. + // nodeB doesn't sequence transactions, but adds messages related to them to its output feed. + // nodeBOutputFeedReader reads those messages from this feed and processes them. + // nodeBOutputFeedReader doesn't read messages from L1 since none of the nodes posts to L1. + nodeBPort := testClientB.ConsensusNode.BroadcastServer.ListenerAddr().(*net.TCPAddr).Port + nodeConfigNodeBOutputFeedReader := arbnode.ConfigDefaultL1NonSequencerTest() + nodeConfigNodeBOutputFeedReader.Feed.Input = *newBroadcastClientConfigTest(nodeBPort) + testClientNodeBOutputFeedReader, cleanupNodeBOutputFeedReader := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: nodeConfigNodeBOutputFeedReader}) + defer cleanupNodeBOutputFeedReader() + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil) err = builder.L2.Client.SendTransaction(ctx, tx) @@ -330,6 +344,19 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { if l2balance.Cmp(big.NewInt(1e12)) != 0 { t.Fatal("Unexpected balance:", l2balance) } + + // check that nodeBOutputFeedReader also processed the transaction + _, err = WaitForTx(ctx, testClientNodeBOutputFeedReader.Client, tx.Hash(), time.Second*5) + Require(t, err) + l2balance, err = testClientNodeBOutputFeedReader.Client.BalanceAt(ctx, builder.L2Info.GetAddress("User2"), nil) + Require(t, err) + if l2balance.Cmp(big.NewInt(1e12)) != 0 { + t.Fatal("Unexpected balance:", l2balance) + } + + if logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) { + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") + } } else { _, err = WaitForTx(ctx, testClientB.Client, tx.Hash(), time.Second) if err == nil { diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 749a91e3b1..ab30598b60 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -11,10 +11,19 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/l1pricing" "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/broadcaster/backlog" + "github.com/offchainlabs/nitro/broadcaster/message" + "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/relay" "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/testhelpers" "github.com/offchainlabs/nitro/wsbroadcastserver" ) @@ -38,7 +47,8 @@ func newBroadcastClientConfigTest(port int) *broadcastclient.Config { } func TestSequencerFeed(t *testing.T) { - t.Parallel() + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -73,6 +83,10 @@ func TestSequencerFeed(t *testing.T) { if l2balance.Cmp(big.NewInt(1e12)) != 0 { t.Fatal("Unexpected balance:", l2balance) } + + if logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) { + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") + } } func TestRelayedSequencerFeed(t *testing.T) { @@ -250,3 +264,101 @@ func TestLyingSequencer(t *testing.T) { func TestLyingSequencerLocalDAS(t *testing.T) { testLyingSequencer(t, "files") } + +func testBlockHashComparison(t *testing.T, blockHash *common.Hash, mustMismatch bool) { + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + backlogConfiFetcher := func() *backlog.Config { + return &backlog.DefaultTestConfig + } + bklg := backlog.NewBacklog(backlogConfiFetcher) + + wsBroadcastServer := wsbroadcastserver.NewWSBroadcastServer( + newBroadcasterConfigTest, + bklg, + 412346, + nil, + ) + err := wsBroadcastServer.Initialize() + if err != nil { + t.Fatal("error initializing wsBroadcastServer:", err) + } + err = wsBroadcastServer.Start(ctx) + if err != nil { + t.Fatal("error starting wsBroadcastServer:", err) + } + defer wsBroadcastServer.StopAndWait() + + port := wsBroadcastServer.ListenerAddr().(*net.TCPAddr).Port + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.nodeConfig.Feed.Input = *newBroadcastClientConfigTest(port) + cleanup := builder.Build(t) + defer cleanup() + testClient := builder.L2 + + userAccount := "User2" + builder.L2Info.GenerateAccount(userAccount) + tx := builder.L2Info.PrepareTx("Owner", userAccount, builder.L2Info.TransferGas, big.NewInt(1e12), nil) + l1IncomingMsgHeader := arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: l1pricing.BatchPosterAddress, + BlockNumber: 29, + Timestamp: 1715295980, + RequestId: nil, + L1BaseFee: nil, + } + l1IncomingMsg, err := gethexec.MessageFromTxes( + &l1IncomingMsgHeader, + types.Transactions{tx}, + []error{nil}, + ) + Require(t, err) + + broadcastMessage := message.BroadcastMessage{ + Version: 1, + Messages: []*message.BroadcastFeedMessage{ + { + SequenceNumber: 1, + Message: arbostypes.MessageWithMetadata{ + Message: l1IncomingMsg, + DelayedMessagesRead: 1, + }, + BlockHash: blockHash, + }, + }, + } + wsBroadcastServer.Broadcast(&broadcastMessage) + + // By now, even though block hash mismatch, the transaction should still be processed + _, err = WaitForTx(ctx, testClient.Client, tx.Hash(), time.Second*15) + if err != nil { + t.Fatal("error waiting for tx:", err) + } + l2balance, err := testClient.Client.BalanceAt(ctx, builder.L2Info.GetAddress(userAccount), nil) + if err != nil { + t.Fatal("error getting balance:", err) + } + if l2balance.Cmp(big.NewInt(1e12)) != 0 { + t.Fatal("Unexpected balance:", l2balance) + } + + mismatched := logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) + if mustMismatch && !mismatched { + t.Fatal("Failed to log BlockHashMismatchLogMsg") + } else if !mustMismatch && mismatched { + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") + } +} + +func TestBlockHashFeedMismatch(t *testing.T) { + blockHash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + testBlockHashComparison(t, &blockHash, true) +} + +func TestBlockHashFeedNil(t *testing.T) { + testBlockHashComparison(t, nil, false) +} diff --git a/system_tests/staterecovery_test.go b/system_tests/staterecovery_test.go index 632e748da8..02c2623cfa 100644 --- a/system_tests/staterecovery_test.go +++ b/system_tests/staterecovery_test.go @@ -52,7 +52,7 @@ func TestRectreateMissingStates(t *testing.T) { chainDb, err := stack.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) Require(t, err) defer chainDb.Close() - cacheConfig := gethexec.DefaultCacheConfigFor(stack, &gethexec.DefaultCachingConfig) + cacheConfig := gethexec.DefaultCacheConfigFor(stack, &gethexec.TestCachingConfig) bc, err := gethexec.GetBlockChain(chainDb, cacheConfig, builder.chainConfig, builder.execConfig.TxLookupLimit) Require(t, err) err = staterecovery.RecreateMissingStates(chainDb, bc, cacheConfig, 1) diff --git a/util/rpcclient/rpcclient.go b/util/rpcclient/rpcclient.go index 02b41cf15d..56aebef396 100644 --- a/util/rpcclient/rpcclient.go +++ b/util/rpcclient/rpcclient.go @@ -21,14 +21,15 @@ import ( ) type ClientConfig struct { - URL string `json:"url,omitempty" koanf:"url"` - JWTSecret string `json:"jwtsecret,omitempty" koanf:"jwtsecret"` - Timeout time.Duration `json:"timeout,omitempty" koanf:"timeout" reload:"hot"` - Retries uint `json:"retries,omitempty" koanf:"retries" reload:"hot"` - ConnectionWait time.Duration `json:"connection-wait,omitempty" koanf:"connection-wait"` - ArgLogLimit uint `json:"arg-log-limit,omitempty" koanf:"arg-log-limit" reload:"hot"` - RetryErrors string `json:"retry-errors,omitempty" koanf:"retry-errors" reload:"hot"` - RetryDelay time.Duration `json:"retry-delay,omitempty" koanf:"retry-delay"` + URL string `json:"url,omitempty" koanf:"url"` + JWTSecret string `json:"jwtsecret,omitempty" koanf:"jwtsecret"` + Timeout time.Duration `json:"timeout,omitempty" koanf:"timeout" reload:"hot"` + Retries uint `json:"retries,omitempty" koanf:"retries" reload:"hot"` + ConnectionWait time.Duration `json:"connection-wait,omitempty" koanf:"connection-wait"` + ArgLogLimit uint `json:"arg-log-limit,omitempty" koanf:"arg-log-limit" reload:"hot"` + RetryErrors string `json:"retry-errors,omitempty" koanf:"retry-errors" reload:"hot"` + RetryDelay time.Duration `json:"retry-delay,omitempty" koanf:"retry-delay"` + WebsocketMessageSizeLimit int64 `json:"websocket-message-size-limit,omitempty" koanf:"websocket-message-size-limit"` retryErrors *regexp.Regexp } @@ -46,16 +47,18 @@ func (c *ClientConfig) Validate() error { type ClientConfigFetcher func() *ClientConfig var TestClientConfig = ClientConfig{ - URL: "self", - JWTSecret: "", + URL: "self", + JWTSecret: "", + WebsocketMessageSizeLimit: 256 * 1024 * 1024, } var DefaultClientConfig = ClientConfig{ - URL: "self-auth", - JWTSecret: "", - Retries: 3, - RetryErrors: "websocket: close.*|dial tcp .*|.*i/o timeout|.*connection reset by peer|.*connection refused", - ArgLogLimit: 2048, + URL: "self-auth", + JWTSecret: "", + Retries: 3, + RetryErrors: "websocket: close.*|dial tcp .*|.*i/o timeout|.*connection reset by peer|.*connection refused", + ArgLogLimit: 2048, + WebsocketMessageSizeLimit: 256 * 1024 * 1024, } func RPCClientAddOptions(prefix string, f *flag.FlagSet, defaultConfig *ClientConfig) { @@ -67,6 +70,7 @@ func RPCClientAddOptions(prefix string, f *flag.FlagSet, defaultConfig *ClientCo f.Uint(prefix+".retries", defaultConfig.Retries, "number of retries in case of failure(0 mean one attempt)") f.String(prefix+".retry-errors", defaultConfig.RetryErrors, "Errors matching this regular expression are automatically retried") f.Duration(prefix+".retry-delay", defaultConfig.RetryDelay, "delay between retries") + f.Int64(prefix+".websocket-message-size-limit", defaultConfig.WebsocketMessageSizeLimit, "websocket message size limit used by the RPC client. 0 means no limit") } type RpcClient struct { @@ -256,9 +260,9 @@ func (c *RpcClient) Start(ctx_in context.Context) error { var err error var client *rpc.Client if jwt == nil { - client, err = rpc.DialContext(ctx, url) + client, err = rpc.DialOptions(ctx, url, rpc.WithWebsocketMessageSizeLimit(c.config().WebsocketMessageSizeLimit)) } else { - client, err = rpc.DialOptions(ctx, url, rpc.WithHTTPAuth(node.NewJWTAuth([32]byte(*jwt)))) + client, err = rpc.DialOptions(ctx, url, rpc.WithHTTPAuth(node.NewJWTAuth([32]byte(*jwt))), rpc.WithWebsocketMessageSizeLimit(c.config().WebsocketMessageSizeLimit)) } cancelCtx() if err == nil { diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 93a5b37238..972e11189d 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -25,7 +25,7 @@ type WasmConfig struct { func WasmConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".root-path", DefaultWasmConfig.RootPath, "path to machine folders, each containing wasm files (machine.wavm.br, replay.wasm)") f.Bool(prefix+".enable-wasmroots-check", DefaultWasmConfig.EnableWasmrootsCheck, "enable check for compatibility of on-chain WASM module root with node") - f.StringSlice(prefix+".allowed-wasm-module-roots", DefaultWasmConfig.AllowedWasmModuleRoots, "list of WASM module roots to check if the on-chain WASM module root belongs to on node startup") + f.StringSlice(prefix+".allowed-wasm-module-roots", DefaultWasmConfig.AllowedWasmModuleRoots, "list of WASM module roots or mahcine base paths to match against on-chain WasmModuleRoot") } var DefaultWasmConfig = WasmConfig{