Skip to content

Commit

Permalink
Implement system tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 19, 2024
1 parent 51d4666 commit 8496679
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 46 deletions.
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
3 changes: 1 addition & 2 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
Expand All @@ -141,7 +140,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

log.Debug("Redis stream consuming", "consumer_id", c.id, "message_id", res[0].Messages[0].ID)
return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
Expand Down
1 change: 1 addition & 0 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
}

func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request) (*containers.Promise[Response], error) {
log.Debug("Redis stream producing", "value", value)
p.once.Do(func() {
p.StopWaiter.CallIteratively(p.checkAndReproduce)
p.StopWaiter.CallIteratively(p.checkResponses)
Expand Down
8 changes: 6 additions & 2 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,19 @@ func (c *BlockValidatorConfig) Validate() error {
}
c.memoryFreeLimit = limit
}
streamsEnabled := c.RedisValidationClientConfig.Enabled()
if c.ValidationServerConfigs == nil {
if c.ValidationServerConfigsList == "default" {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
} else {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil {
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 {
if len(c.ValidationServerConfigs) == 0 && !streamsEnabled {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
Expand Down Expand Up @@ -1032,6 +1033,9 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex)
// Initialize must be called after SetCurrentWasmModuleRoot sets the current one
func (v *BlockValidator) Initialize(ctx context.Context) error {
config := v.config()
if config.RedisValidationClientConfig.Enabled() && v.execSpawner == nil {
return nil
}
currentModuleRoot := config.CurrentModuleRoot
switch currentModuleRoot {
case "latest":
Expand Down
34 changes: 20 additions & 14 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,20 @@ func NewStatelessBlockValidator(
config func() *BlockValidatorConfig,
stack *node.Node,
) (*StatelessBlockValidator, error) {

validationSpawners := make([]validator.ValidationSpawner, len(config().ValidationServerConfigs))
for i, serverConfig := range config().ValidationServerConfigs {
valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig }
validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack)
}
var validationSpawners []validator.ValidationSpawner
redisValClient, err := server_api.NewRedisValidationClient(&config().RedisValidationClientConfig)
if err != nil {
log.Error("Creating redis validation client", "error", err)
} else {
validationSpawners = append(validationSpawners, redisValClient)
}
for _, serverConfig := range config().ValidationServerConfigs {
valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig }
validationSpawners = append(validationSpawners, server_api.NewValidationClient(valConfFetcher, stack))
}

valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] }
execClient := server_api.NewExecutionClient(valConfFetcher, stack)
validator := &StatelessBlockValidator{
config: config(),
execSpawner: execClient,
recorder: recorder,
validationSpawners: validationSpawners,
inboxReader: inboxReader,
Expand All @@ -221,6 +217,12 @@ func NewStatelessBlockValidator(
daService: das,
blobReader: blobReader,
}
if len(config().ValidationServerConfigs) != 0 {
valConfFetcher := func() *rpcclient.ClientConfig {
return &config().ValidationServerConfigs[0]
}
validator.execSpawner = server_api.NewExecutionClient(valConfFetcher, stack)
}
return validator, nil
}

Expand Down Expand Up @@ -425,15 +427,17 @@ func (v *StatelessBlockValidator) OverrideRecorder(t *testing.T, recorder execut
}

func (v *StatelessBlockValidator) Start(ctx_in context.Context) error {
err := v.execSpawner.Start(ctx_in)
if err != nil {
return err
}
for _, spawner := range v.validationSpawners {
if err := spawner.Start(ctx_in); err != nil {
return err
}
}
if v.execSpawner == nil {
return nil
}
if err := v.execSpawner.Start(ctx_in); err != nil {
return err
}
if v.config.PendingUpgradeModuleRoot != "" {
if v.config.PendingUpgradeModuleRoot == "latest" {
latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx_in)
Expand All @@ -453,7 +457,9 @@ func (v *StatelessBlockValidator) Start(ctx_in context.Context) error {
}

func (v *StatelessBlockValidator) Stop() {
v.execSpawner.Stop()
if v.execSpawner != nil {
v.execSpawner.Stop()
}
for _, spawner := range v.validationSpawners {
spawner.Stop()
}
Expand Down
31 changes: 25 additions & 6 deletions system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/offchainlabs/nitro/solgen/go/mocksgen"
"github.com/offchainlabs/nitro/solgen/go/precompilesgen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/validator/server_api"
)

type workloadType uint
Expand All @@ -37,7 +39,9 @@ const (
upgradeArbOs
)

func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops int, workload workloadType, arbitrator bool) {
var moduleRoot = "0xe5059c8450e490232bf1ffe02b7cf056349dccea517c8ac7c6d28a0e91ae68cd"

func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops int, workload workloadType, arbitrator bool, useRedisStreams bool) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -67,7 +71,18 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
validatorConfig.BlockValidator.Enable = true
validatorConfig.DataAvailability = l1NodeConfigA.DataAvailability
validatorConfig.DataAvailability.RPCAggregator.Enable = false
AddDefaultValNode(t, ctx, validatorConfig, !arbitrator)
redisURL := ""
if useRedisStreams {
redisURL = redisutil.CreateTestRedis(ctx, t)
validatorConfig.BlockValidator.RedisValidationClientConfig = server_api.DefaultRedisValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig.ModuleRoots = []string{moduleRoot}
stream := server_api.RedisStreamForRoot(common.HexToHash(moduleRoot))
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisStream = stream
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL
}

AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL)

testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: validatorConfig})
defer cleanupB()
builder.L2Info.GenerateAccount("User2")
Expand Down Expand Up @@ -239,17 +254,21 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
}

func TestBlockValidatorSimpleOnchainUpgradeArbOs(t *testing.T) {
testBlockValidatorSimple(t, "onchain", 1, upgradeArbOs, true)
testBlockValidatorSimple(t, "onchain", 1, upgradeArbOs, true, false)
}

func TestBlockValidatorSimpleOnchain(t *testing.T) {
testBlockValidatorSimple(t, "onchain", 1, ethSend, true)
testBlockValidatorSimple(t, "onchain", 1, ethSend, true, false)
}

func TestBlockValidatorSimpleOnchainWithRedisStreams(t *testing.T) {
testBlockValidatorSimple(t, "onchain", 1, ethSend, true, true)
}

func TestBlockValidatorSimpleLocalDAS(t *testing.T) {
testBlockValidatorSimple(t, "files", 1, ethSend, true)
testBlockValidatorSimple(t, "files", 1, ethSend, true, false)
}

func TestBlockValidatorSimpleJITOnchain(t *testing.T) {
testBlockValidatorSimple(t, "files", 8, smallContract, false)
testBlockValidatorSimple(t, "files", 8, smallContract, false, false)
}
47 changes: 42 additions & 5 deletions system_tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbos/util"
"github.com/offchainlabs/nitro/arbstate"
Expand All @@ -27,8 +28,10 @@ import (
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_api/validation"
"github.com/offchainlabs/nitro/validator/server_common"
"github.com/offchainlabs/nitro/validator/valnode"

Expand Down Expand Up @@ -504,6 +507,24 @@ func createStackConfigForTest(dataDir string) *node.Config {
return &stackConf
}

func createGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
// Stream name and group name are the same.
if _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result(); err != nil {
log.Debug("Error creating stream group: %v", err)
}
}

func destroyGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if client == nil {
return
}
if _, err := client.XGroupDestroy(ctx, streamName, streamName).Result(); err != nil {
log.Debug("Error destroying a stream group", "error", err)
}
}

func createTestValidationNode(t *testing.T, ctx context.Context, config *valnode.Config) (*valnode.ValidationNode, *node.Node) {
stackConf := node.DefaultConfig
stackConf.HTTPPort = 0
Expand Down Expand Up @@ -556,19 +577,35 @@ func StaticFetcherFrom[T any](t *testing.T, config *T) func() *T {
}

func configByValidationNode(t *testing.T, clientConfig *arbnode.Config, valStack *node.Node) {
if len(clientConfig.BlockValidator.ValidationServerConfigs) == 0 {
return
}
clientConfig.BlockValidator.ValidationServerConfigs[0].URL = valStack.WSEndpoint()
clientConfig.BlockValidator.ValidationServerConfigs[0].JWTSecret = ""
}

func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Config, useJit bool) {
func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Config, useJit bool, redisURL string) {
if !nodeConfig.ValidatorRequired() {
return
}
if nodeConfig.BlockValidator.ValidationServerConfigs[0].URL != "" {
if len(nodeConfig.BlockValidator.ValidationServerConfigs) > 0 && nodeConfig.BlockValidator.ValidationServerConfigs[0].URL != "" {
return
}
conf := valnode.TestValidationConfig
conf.UseJit = useJit
// Enable redis streams when URL is specified
if redisURL != "" {
conf.Arbitrator.RedisValidationServerConfig = validation.DefaultRedisValidationServerConfig
redisStream := server_api.RedisStreamForRoot(common.HexToHash(moduleRoot))

Check failure on line 599 in system_tests/common_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: moduleRoot

Check failure on line 599 in system_tests/common_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: moduleRoot
redisClient, err := redisutil.RedisClientFromURL(redisURL)
if err != nil {
t.Fatalf("Error creating redis coordinator: %v", err)
}
createGroup(ctx, t, redisStream, redisClient)
conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL
conf.Arbitrator.RedisValidationServerConfig.ModuleRoots = []string{moduleRoot}

Check failure on line 606 in system_tests/common_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: moduleRoot

Check failure on line 606 in system_tests/common_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

undefined: moduleRoot
t.Cleanup(func() { destroyGroup(ctx, t, redisStream, redisClient) })
}
_, valStack := createTestValidationNode(t, ctx, &conf)
configByValidationNode(t, nodeConfig, valStack)
}
Expand Down Expand Up @@ -798,7 +835,7 @@ func createTestNodeWithL1(
execConfig.Sequencer.Enable = false
}

AddDefaultValNode(t, ctx, nodeConfig, true)
AddDefaultValNode(t, ctx, nodeConfig, true, "")

Require(t, execConfig.Validate())
execConfigFetcher := func() *gethexec.Config { return execConfig }
Expand Down Expand Up @@ -833,7 +870,7 @@ func createTestNode(

feedErrChan := make(chan error, 10)

AddDefaultValNode(t, ctx, nodeConfig, true)
AddDefaultValNode(t, ctx, nodeConfig, true, "")

l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, l2Info, "", chainConfig, &execConfig.Caching)

Expand Down Expand Up @@ -939,7 +976,7 @@ func Create2ndNodeWithConfig(
l2blockchain, err := gethexec.WriteOrTestBlockChain(l2chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0)
Require(t, err)

AddDefaultValNode(t, ctx, nodeConfig, true)
AddDefaultValNode(t, ctx, nodeConfig, true, "")

Require(t, execConfig.Validate())
Require(t, nodeConfig.Validate())
Expand Down
9 changes: 7 additions & 2 deletions validator/server_api/redisconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type RedisValidationServer struct {
consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]
}

func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*RedisValidationServer, error) {
func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig, spawner validator.ValidationSpawner) (*RedisValidationServer, error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
Expand All @@ -35,14 +35,15 @@ func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*Red
consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState])
for _, hash := range cfg.ModuleRoots {
mr := common.HexToHash(hash)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, redisStreamForRoot(mr), &cfg.ConsumerConfig)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, RedisStreamForRoot(mr), &cfg.ConsumerConfig)
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
consumers[mr] = c
}
return &RedisValidationServer{
consumers: consumers,
spawner: spawner,
}, nil
}

Expand All @@ -57,6 +58,10 @@ func (s *RedisValidationServer) Start(ctx_in context.Context) {
log.Error("Consuming request", "error", err)
return 0
}
if req == nil {
// There's nothing in the queue.
return time.Second
}
valRun := s.spawner.Launch(req.Value, moduleRoot)
res, err := valRun.Await(ctx)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions validator/server_api/redisproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ type RedisValidationClientConfig struct {
RedisURL string `koanf:"redis-url"`
RedisStream string `koanf:"redis-stream"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
// Supported wasm module roots.
// Supported wasm module roots, when the list is empty this is disabled.
ModuleRoots []string `koanf:"module-roots"`
}

func (c RedisValidationClientConfig) Enabled() bool {
return len(c.ModuleRoots) > 0
}

var DefaultRedisValidationClientConfig = RedisValidationClientConfig{
Name: "redis validation client",
Room: 2,
Expand Down Expand Up @@ -58,7 +62,7 @@ type RedisValidationClient struct {
producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]
}

func redisStreamForRoot(moduleRoot common.Hash) string {
func RedisStreamForRoot(moduleRoot common.Hash) string {
return fmt.Sprintf("stream:%s", moduleRoot.Hex())
}

Expand All @@ -75,10 +79,13 @@ func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidatio
if err != nil {
return nil, err
}
if len(cfg.ModuleRoots) == 0 {
return nil, fmt.Errorf("moduleRoots must be specified to enable redis streams")
}
for _, hash := range cfg.ModuleRoots {
mr := common.HexToHash(hash)
p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState](
redisClient, redisStreamForRoot(mr), &cfg.ProducerConfig)
redisClient, RedisStreamForRoot(mr), &cfg.ProducerConfig)
if err != nil {
return nil, fmt.Errorf("creating producer for validation: %w", err)
}
Expand Down
Loading

0 comments on commit 8496679

Please sign in to comment.