Skip to content

Commit

Permalink
Factor out redisproducer and redisconumer
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 23, 2024
1 parent 123023e commit 9dfe3d1
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 92 deletions.
43 changes: 23 additions & 20 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import (
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/client/redis"
"github.com/spf13/pflag"

validatorclient "github.com/offchainlabs/nitro/validator/client"
)

var (
Expand Down Expand Up @@ -84,20 +83,20 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
RedisValidationClientConfig validatorclient.RedisValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`

memoryFreeLimit int
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig)
validatorclient.RedisValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
Expand All @@ -168,7 +167,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ExecutionServerConfig: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: validatorclient.DefaultRedisValidationClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand All @@ -183,7 +182,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: validatorclient.TestRedisValidationClientConfig,
RedisValidationClientConfig: redis.TestValidationClientConfig,
ExecutionServerConfig: rpcclient.TestClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
Expand Down Expand Up @@ -1065,7 +1064,11 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
}
}
log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot)
if err := v.StatelessBlockValidator.Initialize([]common.Hash{v.currentWasmModuleRoot, v.pendingWasmModuleRoot}); err != nil {
moduleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot {
moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot)
}
if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing block validator with module roots: %w", err)
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/client/redis"

validatorclient "github.com/offchainlabs/nitro/validator/client"
)
Expand Down Expand Up @@ -196,7 +197,7 @@ func NewStatelessBlockValidator(
) (*StatelessBlockValidator, error) {
var validationSpawners []validator.ValidationSpawner
if config().RedisValidationClientConfig.Enabled() {
redisValClient, err := validatorclient.NewRedisValidationClient(&config().RedisValidationClientConfig)
redisValClient, err := redis.NewValidationClient(&config().RedisValidationClientConfig)
if err != nil {
return nil, fmt.Errorf("creating new redis validation client: %w", err)
}
Expand Down Expand Up @@ -229,7 +230,7 @@ func (v *StatelessBlockValidator) Initialize(moduleRoots []common.Hash) error {
return nil
}
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v, ok := v.validationSpawners[0].(*validatorclient.RedisValidationClient); ok {
if v, ok := v.validationSpawners[0].(*redis.ValidationClient); ok {
if err := v.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing redis validation client module roots: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"github.com/offchainlabs/nitro/solgen/go/precompilesgen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/redisutil"

validatorclient "github.com/offchainlabs/nitro/validator/client"
"github.com/offchainlabs/nitro/validator/client/redis"
)

type workloadType uint
Expand Down Expand Up @@ -73,7 +72,7 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
redisURL := ""
if useRedisStreams {
redisURL = redisutil.CreateTestRedis(ctx, t)
validatorConfig.BlockValidator.RedisValidationClientConfig = validatorclient.DefaultRedisValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.DefaultValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL
validatorConfig.BlockValidator.ValidationServerConfigs = nil
}
Expand Down
3 changes: 2 additions & 1 deletion system_tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_common"
"github.com/offchainlabs/nitro/validator/valnode"
rediscons "github.com/offchainlabs/nitro/validator/valnode/redis"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -601,7 +602,7 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co
conf.UseJit = useJit
// Enable redis streams when URL is specified
if redisURL != "" {
conf.Arbitrator.RedisValidationServerConfig = server_api.DefaultRedisValidationServerConfig
conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig
redisClient, err := redisutil.RedisClientFromURL(redisURL)
if err != nil {
t.Fatalf("Error creating redis coordinator: %v", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package redis

import (
"context"
Expand All @@ -18,39 +18,39 @@ import (
"github.com/spf13/pflag"
)

type RedisValidationClientConfig struct {
type ValidationClientConfig struct {
Name string `koanf:"name"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
}

func (c RedisValidationClientConfig) Enabled() bool {
func (c ValidationClientConfig) Enabled() bool {
return c.RedisURL != ""
}

var DefaultRedisValidationClientConfig = RedisValidationClientConfig{
var DefaultValidationClientConfig = ValidationClientConfig{
Name: "redis validation client",
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.DefaultProducerConfig,
}

var TestRedisValidationClientConfig = RedisValidationClientConfig{
var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.TestProducerConfig,
}

func RedisValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultRedisValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultRedisValidationClientConfig.Room, "validation client room")
func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
}

// RedisValidationClient implements validation client through redis streams.
type RedisValidationClient struct {
// ValidationClient implements validation client through redis streams.
type ValidationClient struct {
stopwaiter.StopWaiter
name string
room int32
Expand All @@ -60,15 +60,15 @@ type RedisValidationClient struct {
redisClient redis.UniversalClient
}

func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidationClient, error) {
func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
return &RedisValidationClient{
return &ValidationClient{
name: cfg.Name,
room: cfg.Room,
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
Expand All @@ -77,7 +77,7 @@ func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidatio
}, nil
}

func (c *RedisValidationClient) Initialize(moduleRoots []common.Hash) error {
func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already existsw for module root", "hash", mr)
Expand All @@ -94,7 +94,7 @@ func (c *RedisValidationClient) Initialize(moduleRoots []common.Hash) error {
return nil
}

func (c *RedisValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
atomic.AddInt32(&c.room, -1)
defer atomic.AddInt32(&c.room, 1)
producer, found := c.producers[moduleRoot]
Expand All @@ -110,28 +110,28 @@ func (c *RedisValidationClient) Launch(entry *validator.ValidationInput, moduleR
return server_common.NewValRun(promise, moduleRoot)
}

func (c *RedisValidationClient) Start(ctx_in context.Context) error {
func (c *ValidationClient) Start(ctx_in context.Context) error {
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *RedisValidationClient) Stop() {
func (c *ValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}

func (c *RedisValidationClient) Name() string {
func (c *ValidationClient) Name() string {
if c.Started() {
return c.name
}
return "(not started)"
}

func (c *RedisValidationClient) Room() int {
func (c *ValidationClient) Room() int {
return int(c.room)
}
26 changes: 0 additions & 26 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/jsonapi"
"github.com/offchainlabs/nitro/validator"
"github.com/spf13/pflag"
)

const Namespace string = "validation"
Expand Down Expand Up @@ -65,27 +63,3 @@ type BatchInfoJson struct {
Number uint64
DataB64 string
}

type RedisValidationServerConfig struct {
RedisURL string `koanf:"redis-url"`
ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"`
// Supported wasm module roots.
ModuleRoots []string `koanf:"module-roots"`
}

var DefaultRedisValidationServerConfig = RedisValidationServerConfig{
RedisURL: "",
ConsumerConfig: pubsub.DefaultConsumerConfig,
ModuleRoots: []string{},
}

var TestRedisValidationServerConfig = RedisValidationServerConfig{
RedisURL: "",
ConsumerConfig: pubsub.TestConsumerConfig,
ModuleRoots: []string{},
}

func RedisValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f)
f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes")
}
20 changes: 10 additions & 10 deletions validator/server_arb/validator_spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"sync/atomic"
"time"

flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_common"
"github.com/offchainlabs/nitro/validator/valnode/redis"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -28,11 +28,11 @@ import (
var arbitratorValidationSteps = metrics.NewRegisteredHistogram("arbitrator/validation/steps", nil, metrics.NewBoundedHistogramSample())

type ArbitratorSpawnerConfig struct {
Workers int `koanf:"workers" reload:"hot"`
OutputPath string `koanf:"output-path" reload:"hot"`
Execution MachineCacheConfig `koanf:"execution" reload:"hot"` // hot reloading for new executions only
ExecutionRunTimeout time.Duration `koanf:"execution-run-timeout" reload:"hot"`
RedisValidationServerConfig server_api.RedisValidationServerConfig `koanf:"redis-validation-server-config"`
Workers int `koanf:"workers" reload:"hot"`
OutputPath string `koanf:"output-path" reload:"hot"`
Execution MachineCacheConfig `koanf:"execution" reload:"hot"` // hot reloading for new executions only
ExecutionRunTimeout time.Duration `koanf:"execution-run-timeout" reload:"hot"`
RedisValidationServerConfig redis.ValidationServerConfig `koanf:"redis-validation-server-config"`
}

type ArbitratorSpawnerConfigFecher func() *ArbitratorSpawnerConfig
Expand All @@ -42,15 +42,15 @@ var DefaultArbitratorSpawnerConfig = ArbitratorSpawnerConfig{
OutputPath: "./target/output",
Execution: DefaultMachineCacheConfig,
ExecutionRunTimeout: time.Minute * 15,
RedisValidationServerConfig: server_api.DefaultRedisValidationServerConfig,
RedisValidationServerConfig: redis.DefaultValidationServerConfig,
}

func ArbitratorSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) {
func ArbitratorSpawnerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int(prefix+".workers", DefaultArbitratorSpawnerConfig.Workers, "number of concurrent validation threads")
f.Duration(prefix+".execution-run-timeout", DefaultArbitratorSpawnerConfig.ExecutionRunTimeout, "timeout before discarding execution run")
f.String(prefix+".output-path", DefaultArbitratorSpawnerConfig.OutputPath, "path to write machines to")
MachineCacheConfigConfigAddOptions(prefix+".execution", f)
server_api.RedisValidationServerConfigAddOptions(prefix+".redis-validation-server-config", f)
redis.ValidationServerConfigAddOptions(prefix+".redis-validation-server-config", f)
}

func DefaultArbitratorSpawnerConfigFetcher() *ArbitratorSpawnerConfig {
Expand Down
Loading

0 comments on commit 9dfe3d1

Please sign in to comment.