From 849667989ff601832a558b2493092a7fab76db06 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Fri, 19 Apr 2024 22:39:09 +0200 Subject: [PATCH] Implement system tests --- arbnode/node.go | 2 +- pubsub/consumer.go | 3 +- pubsub/producer.go | 1 + staker/block_validator.go | 8 +++- staker/stateless_block_validator.go | 34 ++++++++------ system_tests/block_validator_test.go | 31 +++++++++--- system_tests/common_test.go | 47 +++++++++++++++++-- validator/server_api/redisconsumer.go | 9 +++- validator/server_api/redisproducer.go | 13 +++-- validator/server_api/validation/validation.go | 3 -- validator/server_api/validation_client.go | 13 +++-- validator/valnode/valnode.go | 2 +- 12 files changed, 120 insertions(+), 46 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index 7a7a99ba88..43a05155fe 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -540,7 +540,7 @@ func createNodeImpl( txStreamer.SetInboxReaders(inboxReader, delayedBridge) var statelessBlockValidator *staker.StatelessBlockValidator - if config.BlockValidator.ValidationServerConfigs[0].URL != "" { + if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" { statelessBlockValidator, err = staker.NewStatelessBlockValidator( inboxReader, inboxTracker, diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7f8ca3a988..5385b33979 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -129,7 +129,6 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req if len(res) != 1 || len(res[0].Messages) != 1 { return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res) } - log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID)) var ( value = res[0].Messages[0].Values[messageKey] data, ok = (value).(string) @@ -141,7 +140,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req if err := json.Unmarshal([]byte(data), &req); err != nil { return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err) } - + log.Debug("Redis stream consuming", "consumer_id", c.id, "message_id", res[0].Messages[0].ID) return &Message[Request]{ ID: res[0].Messages[0].ID, Value: req, diff --git a/pubsub/producer.go b/pubsub/producer.go index 7f7f05389b..debea81365 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -217,6 +217,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque } func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request) (*containers.Promise[Response], error) { + log.Debug("Redis stream producing", "value", value) p.once.Do(func() { p.StopWaiter.CallIteratively(p.checkAndReproduce) p.StopWaiter.CallIteratively(p.checkResponses) diff --git a/staker/block_validator.go b/staker/block_validator.go index a65adbeff0..1ec160c558 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -111,18 +111,19 @@ func (c *BlockValidatorConfig) Validate() error { } c.memoryFreeLimit = limit } + streamsEnabled := c.RedisValidationClientConfig.Enabled() if c.ValidationServerConfigs == nil { if c.ValidationServerConfigsList == "default" { c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer} } else { var validationServersConfigs []rpcclient.ClientConfig - if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil { + if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled { return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err) } c.ValidationServerConfigs = validationServersConfigs } } - if len(c.ValidationServerConfigs) == 0 { + if len(c.ValidationServerConfigs) == 0 && !streamsEnabled { return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config") } for _, serverConfig := range c.ValidationServerConfigs { @@ -1032,6 +1033,9 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) // Initialize must be called after SetCurrentWasmModuleRoot sets the current one func (v *BlockValidator) Initialize(ctx context.Context) error { config := v.config() + if config.RedisValidationClientConfig.Enabled() && v.execSpawner == nil { + return nil + } currentModuleRoot := config.CurrentModuleRoot switch currentModuleRoot { case "latest": diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index cfccc793ad..25d64fae35 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -194,24 +194,20 @@ func NewStatelessBlockValidator( config func() *BlockValidatorConfig, stack *node.Node, ) (*StatelessBlockValidator, error) { - - validationSpawners := make([]validator.ValidationSpawner, len(config().ValidationServerConfigs)) - for i, serverConfig := range config().ValidationServerConfigs { - valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig } - validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack) - } + var validationSpawners []validator.ValidationSpawner redisValClient, err := server_api.NewRedisValidationClient(&config().RedisValidationClientConfig) if err != nil { log.Error("Creating redis validation client", "error", err) } else { validationSpawners = append(validationSpawners, redisValClient) } + for _, serverConfig := range config().ValidationServerConfigs { + valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig } + validationSpawners = append(validationSpawners, server_api.NewValidationClient(valConfFetcher, stack)) + } - valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] } - execClient := server_api.NewExecutionClient(valConfFetcher, stack) validator := &StatelessBlockValidator{ config: config(), - execSpawner: execClient, recorder: recorder, validationSpawners: validationSpawners, inboxReader: inboxReader, @@ -221,6 +217,12 @@ func NewStatelessBlockValidator( daService: das, blobReader: blobReader, } + if len(config().ValidationServerConfigs) != 0 { + valConfFetcher := func() *rpcclient.ClientConfig { + return &config().ValidationServerConfigs[0] + } + validator.execSpawner = server_api.NewExecutionClient(valConfFetcher, stack) + } return validator, nil } @@ -425,15 +427,17 @@ func (v *StatelessBlockValidator) OverrideRecorder(t *testing.T, recorder execut } func (v *StatelessBlockValidator) Start(ctx_in context.Context) error { - err := v.execSpawner.Start(ctx_in) - if err != nil { - return err - } for _, spawner := range v.validationSpawners { if err := spawner.Start(ctx_in); err != nil { return err } } + if v.execSpawner == nil { + return nil + } + if err := v.execSpawner.Start(ctx_in); err != nil { + return err + } if v.config.PendingUpgradeModuleRoot != "" { if v.config.PendingUpgradeModuleRoot == "latest" { latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx_in) @@ -453,7 +457,9 @@ func (v *StatelessBlockValidator) Start(ctx_in context.Context) error { } func (v *StatelessBlockValidator) Stop() { - v.execSpawner.Stop() + if v.execSpawner != nil { + v.execSpawner.Stop() + } for _, spawner := range v.validationSpawners { spawner.Stop() } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index 1fcf2bab34..fa2fd238d5 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -26,6 +26,8 @@ import ( "github.com/offchainlabs/nitro/solgen/go/mocksgen" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/validator/server_api" ) type workloadType uint @@ -37,7 +39,9 @@ const ( upgradeArbOs ) -func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops int, workload workloadType, arbitrator bool) { +var moduleRoot = "0xe5059c8450e490232bf1ffe02b7cf056349dccea517c8ac7c6d28a0e91ae68cd" + +func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops int, workload workloadType, arbitrator bool, useRedisStreams bool) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -67,7 +71,18 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops validatorConfig.BlockValidator.Enable = true validatorConfig.DataAvailability = l1NodeConfigA.DataAvailability validatorConfig.DataAvailability.RPCAggregator.Enable = false - AddDefaultValNode(t, ctx, validatorConfig, !arbitrator) + redisURL := "" + if useRedisStreams { + redisURL = redisutil.CreateTestRedis(ctx, t) + validatorConfig.BlockValidator.RedisValidationClientConfig = server_api.DefaultRedisValidationClientConfig + validatorConfig.BlockValidator.RedisValidationClientConfig.ModuleRoots = []string{moduleRoot} + stream := server_api.RedisStreamForRoot(common.HexToHash(moduleRoot)) + validatorConfig.BlockValidator.RedisValidationClientConfig.RedisStream = stream + validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL + } + + AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL) + testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: validatorConfig}) defer cleanupB() builder.L2Info.GenerateAccount("User2") @@ -239,17 +254,21 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops } func TestBlockValidatorSimpleOnchainUpgradeArbOs(t *testing.T) { - testBlockValidatorSimple(t, "onchain", 1, upgradeArbOs, true) + testBlockValidatorSimple(t, "onchain", 1, upgradeArbOs, true, false) } func TestBlockValidatorSimpleOnchain(t *testing.T) { - testBlockValidatorSimple(t, "onchain", 1, ethSend, true) + testBlockValidatorSimple(t, "onchain", 1, ethSend, true, false) +} + +func TestBlockValidatorSimpleOnchainWithRedisStreams(t *testing.T) { + testBlockValidatorSimple(t, "onchain", 1, ethSend, true, true) } func TestBlockValidatorSimpleLocalDAS(t *testing.T) { - testBlockValidatorSimple(t, "files", 1, ethSend, true) + testBlockValidatorSimple(t, "files", 1, ethSend, true, false) } func TestBlockValidatorSimpleJITOnchain(t *testing.T) { - testBlockValidatorSimple(t, "files", 8, smallContract, false) + testBlockValidatorSimple(t, "files", 8, smallContract, false, false) } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index cd65cd2edc..6008f57ed2 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/arbstate" @@ -27,8 +28,10 @@ import ( "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" + "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/validator/server_api" + "github.com/offchainlabs/nitro/validator/server_api/validation" "github.com/offchainlabs/nitro/validator/server_common" "github.com/offchainlabs/nitro/validator/valnode" @@ -504,6 +507,24 @@ func createStackConfigForTest(dataDir string) *node.Config { return &stackConf } +func createGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) { + t.Helper() + // Stream name and group name are the same. + if _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result(); err != nil { + log.Debug("Error creating stream group: %v", err) + } +} + +func destroyGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) { + t.Helper() + if client == nil { + return + } + if _, err := client.XGroupDestroy(ctx, streamName, streamName).Result(); err != nil { + log.Debug("Error destroying a stream group", "error", err) + } +} + func createTestValidationNode(t *testing.T, ctx context.Context, config *valnode.Config) (*valnode.ValidationNode, *node.Node) { stackConf := node.DefaultConfig stackConf.HTTPPort = 0 @@ -556,19 +577,35 @@ func StaticFetcherFrom[T any](t *testing.T, config *T) func() *T { } func configByValidationNode(t *testing.T, clientConfig *arbnode.Config, valStack *node.Node) { + if len(clientConfig.BlockValidator.ValidationServerConfigs) == 0 { + return + } clientConfig.BlockValidator.ValidationServerConfigs[0].URL = valStack.WSEndpoint() clientConfig.BlockValidator.ValidationServerConfigs[0].JWTSecret = "" } -func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Config, useJit bool) { +func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Config, useJit bool, redisURL string) { if !nodeConfig.ValidatorRequired() { return } - if nodeConfig.BlockValidator.ValidationServerConfigs[0].URL != "" { + if len(nodeConfig.BlockValidator.ValidationServerConfigs) > 0 && nodeConfig.BlockValidator.ValidationServerConfigs[0].URL != "" { return } conf := valnode.TestValidationConfig conf.UseJit = useJit + // Enable redis streams when URL is specified + if redisURL != "" { + conf.Arbitrator.RedisValidationServerConfig = validation.DefaultRedisValidationServerConfig + redisStream := server_api.RedisStreamForRoot(common.HexToHash(moduleRoot)) + redisClient, err := redisutil.RedisClientFromURL(redisURL) + if err != nil { + t.Fatalf("Error creating redis coordinator: %v", err) + } + createGroup(ctx, t, redisStream, redisClient) + conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL + conf.Arbitrator.RedisValidationServerConfig.ModuleRoots = []string{moduleRoot} + t.Cleanup(func() { destroyGroup(ctx, t, redisStream, redisClient) }) + } _, valStack := createTestValidationNode(t, ctx, &conf) configByValidationNode(t, nodeConfig, valStack) } @@ -798,7 +835,7 @@ func createTestNodeWithL1( execConfig.Sequencer.Enable = false } - AddDefaultValNode(t, ctx, nodeConfig, true) + AddDefaultValNode(t, ctx, nodeConfig, true, "") Require(t, execConfig.Validate()) execConfigFetcher := func() *gethexec.Config { return execConfig } @@ -833,7 +870,7 @@ func createTestNode( feedErrChan := make(chan error, 10) - AddDefaultValNode(t, ctx, nodeConfig, true) + AddDefaultValNode(t, ctx, nodeConfig, true, "") l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, l2Info, "", chainConfig, &execConfig.Caching) @@ -939,7 +976,7 @@ func Create2ndNodeWithConfig( l2blockchain, err := gethexec.WriteOrTestBlockChain(l2chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0) Require(t, err) - AddDefaultValNode(t, ctx, nodeConfig, true) + AddDefaultValNode(t, ctx, nodeConfig, true, "") Require(t, execConfig.Validate()) Require(t, nodeConfig.Validate()) diff --git a/validator/server_api/redisconsumer.go b/validator/server_api/redisconsumer.go index bc40d19d73..45ae842287 100644 --- a/validator/server_api/redisconsumer.go +++ b/validator/server_api/redisconsumer.go @@ -24,7 +24,7 @@ type RedisValidationServer struct { consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] } -func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*RedisValidationServer, error) { +func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig, spawner validator.ValidationSpawner) (*RedisValidationServer, error) { if cfg.RedisURL == "" { return nil, fmt.Errorf("redis url cannot be empty") } @@ -35,7 +35,7 @@ func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*Red consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, redisStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, RedisStreamForRoot(mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } @@ -43,6 +43,7 @@ func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*Red } return &RedisValidationServer{ consumers: consumers, + spawner: spawner, }, nil } @@ -57,6 +58,10 @@ func (s *RedisValidationServer) Start(ctx_in context.Context) { log.Error("Consuming request", "error", err) return 0 } + if req == nil { + // There's nothing in the queue. + return time.Second + } valRun := s.spawner.Launch(req.Value, moduleRoot) res, err := valRun.Await(ctx) if err != nil { diff --git a/validator/server_api/redisproducer.go b/validator/server_api/redisproducer.go index 5540cd169f..99c9bcce9a 100644 --- a/validator/server_api/redisproducer.go +++ b/validator/server_api/redisproducer.go @@ -21,10 +21,14 @@ type RedisValidationClientConfig struct { RedisURL string `koanf:"redis-url"` RedisStream string `koanf:"redis-stream"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` - // Supported wasm module roots. + // Supported wasm module roots, when the list is empty this is disabled. ModuleRoots []string `koanf:"module-roots"` } +func (c RedisValidationClientConfig) Enabled() bool { + return len(c.ModuleRoots) > 0 +} + var DefaultRedisValidationClientConfig = RedisValidationClientConfig{ Name: "redis validation client", Room: 2, @@ -58,7 +62,7 @@ type RedisValidationClient struct { producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] } -func redisStreamForRoot(moduleRoot common.Hash) string { +func RedisStreamForRoot(moduleRoot common.Hash) string { return fmt.Sprintf("stream:%s", moduleRoot.Hex()) } @@ -75,10 +79,13 @@ func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidatio if err != nil { return nil, err } + if len(cfg.ModuleRoots) == 0 { + return nil, fmt.Errorf("moduleRoots must be specified to enable redis streams") + } for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( - redisClient, redisStreamForRoot(mr), &cfg.ProducerConfig) + redisClient, RedisStreamForRoot(mr), &cfg.ProducerConfig) if err != nil { return nil, fmt.Errorf("creating producer for validation: %w", err) } diff --git a/validator/server_api/validation/validation.go b/validator/server_api/validation/validation.go index 9cab29bde7..08d92085d6 100644 --- a/validator/server_api/validation/validation.go +++ b/validator/server_api/validation/validation.go @@ -33,7 +33,6 @@ type BatchInfoJson struct { type RedisValidationServerConfig struct { RedisURL string `koanf:"redis-url"` - RedisStream string `koanf:"redis-stream"` ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` // Supported wasm module roots. ModuleRoots []string `koanf:"module-roots"` @@ -41,14 +40,12 @@ type RedisValidationServerConfig struct { var DefaultRedisValidationServerConfig = RedisValidationServerConfig{ RedisURL: "", - RedisStream: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, } var TestRedisValidationServerConfig = RedisValidationServerConfig{ RedisURL: "", - RedisStream: "", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, } diff --git a/validator/server_api/validation_client.go b/validator/server_api/validation_client.go index d6143ca917..0148eac0dd 100644 --- a/validator/server_api/validation_client.go +++ b/validator/server_api/validation_client.go @@ -48,21 +48,20 @@ func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot c func (c *ValidationClient) Start(ctx_in context.Context) error { c.StopWaiter.Start(ctx_in, c) ctx := c.GetContext() - err := c.client.Start(ctx) - if err != nil { - return err + if c.client != nil { + if err := c.client.Start(ctx); err != nil { + return err + } } var name string - err = c.client.CallContext(ctx, &name, Namespace+"_name") - if err != nil { + if err := c.client.CallContext(ctx, &name, Namespace+"_name"); err != nil { return err } if len(name) == 0 { return errors.New("couldn't read name from server") } var room int - err = c.client.CallContext(c.GetContext(), &room, Namespace+"_room") - if err != nil { + if err := c.client.CallContext(c.GetContext(), &room, Namespace+"_room"); err != nil { return err } if room < 2 { diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 5b4986f9da..e42acd8ae4 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -119,7 +119,7 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod } else { serverAPI = server_api.NewExecutionServerAPI(arbSpawner, arbSpawner, arbConfigFetcher) } - redisConsumer, err := server_api.NewRedisValidationServer(&arbConfigFetcher().RedisValidationServerConfig) + redisConsumer, err := server_api.NewRedisValidationServer(&arbConfigFetcher().RedisValidationServerConfig, arbSpawner) if err != nil { log.Error("Creating new redis validation server", "error", err) }