Skip to content

Commit

Permalink
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
Browse files Browse the repository at this point in the history
…is-stream
  • Loading branch information
anodar committed Apr 5, 2024
2 parents 8da1e86 + d0e28b7 commit 5c52884
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
5 changes: 3 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,9 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut
return s.exec.ResultAtPos(count - 1)
}

// exposed for testing
// return value: true if should be called again immediately
func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool {
func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool {
if ctx.Err() != nil {
return false
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
}

func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration {
if s.executeNextMsg(ctx, s.exec) {
if s.ExecuteNextMsg(ctx, s.exec) {
return 0
}
return s.config().ExecuteMessageLoopDelay
Expand Down
81 changes: 81 additions & 0 deletions system_tests/eth_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package arbtest

import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/core/types"
)

func TestEthSyncing(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
builder.L2Info = nil
cleanup := builder.Build(t)
defer cleanup()

testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{})
defer cleanupB()

// stop txstreamer so it won't feed execution messages
testClientB.ConsensusNode.TxStreamer.StopAndWait()

countBefore, err := testClientB.ConsensusNode.TxStreamer.GetMessageCount()
Require(t, err)

builder.L2Info.GenerateAccount("User2")

tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil)

err = builder.L2.Client.SendTransaction(ctx, tx)
Require(t, err)

_, err = builder.L2.EnsureTxSucceeded(tx)
Require(t, err)

// give the inbox reader a bit of time to pick up the delayed message
time.Sleep(time.Millisecond * 100)

// sending l1 messages creates l1 blocks.. make enough to get that delayed inbox message in
for i := 0; i < 30; i++ {
builder.L1.SendWaitTestTransactions(t, []*types.Transaction{
builder.L1Info.PrepareTx("Faucet", "User", 30000, big.NewInt(1e12), nil),
})
}

attempt := 0
for {
if attempt > 30 {
Fatal(t, "2nd node didn't get tx on time")
}
Require(t, ctx.Err())
countAfter, err := testClientB.ConsensusNode.TxStreamer.GetMessageCount()
Require(t, err)
if countAfter > countBefore {
break
}
select {
case <-time.After(time.Millisecond * 100):
case <-ctx.Done():
}
attempt++
}

progress, err := testClientB.Client.SyncProgress(ctx)
Require(t, err)
if progress == nil {
Fatal(t, "eth_syncing returned nil but shouldn't have")
}
for testClientB.ConsensusNode.TxStreamer.ExecuteNextMsg(ctx, testClientB.ExecNode) {
}
progress, err = testClientB.Client.SyncProgress(ctx)
Require(t, err)
if progress != nil {
Fatal(t, "eth_syncing did not return nil but should have")
}
}

0 comments on commit 5c52884

Please sign in to comment.