Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Config Change] Multi exec servers #2267

Merged
merged 11 commits into from
May 2, 2024
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,13 @@ RUN export DEBIAN_FRONTEND=noninteractive && \

USER user

FROM nitro-node-dev as nitro-node-split
USER root

RUN apt-get install -y xxd
COPY scripts/split-val-entry.sh /usr/local/bin
ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ]
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
USER user

FROM nitro-node as nitro-node-default
# Just to ensure nitro-node-dist is default
9 changes: 4 additions & 5 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -40,11 +39,11 @@ func (a *BlockValidatorDebugAPI) ValidateMessageNumber(
if moduleRootOptional != nil {
moduleRoot = *moduleRootOptional
} else {
moduleRoots := a.val.GetModuleRootsToValidate()
if len(moduleRoots) == 0 {
return result, errors.New("no current WasmModuleRoot configured, must provide parameter")
var err error
moduleRoot, err = a.val.GetLatestWasmModuleRoot(ctx)
if err != nil {
return result, fmt.Errorf("no latest WasmModuleRoot configured, must provide parameter: %w", err)
}
moduleRoot = moduleRoots[0]
}
start_time := time.Now()
valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot)
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.SyncMonitor = TestSyncMonitorConfig
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.BlockValidator.ExecutionServerConfigs = []rpcclient.ClientConfig{{URL: ""}}

return &config
}
Expand All @@ -217,7 +217,7 @@ func ConfigDefaultL2Test() *Config {
config.Staker = staker.TestL1ValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.BlockValidator.ExecutionServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig

return &config
Expand Down Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ExecutionServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func mainImpl() int {
}

var sameProcessValidationNodeEnabled bool
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self" || nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self-auth") {
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ExecutionServerConfigs[0].URL == "self" || nodeConfig.Node.BlockValidator.ExecutionServerConfigs[0].URL == "self-auth") {
sameProcessValidationNodeEnabled = true
valnode.EnsureValidationExposedViaAuthRPC(&stackConf)
}
Expand Down
2 changes: 1 addition & 1 deletion nitro-testnode
8 changes: 8 additions & 0 deletions scripts/split-val-entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/sh

xxd -l 32 -ps -c 40 /dev/urandom > /tmp/nitro-val.jwt
echo launching validation
/usr/local/bin/nitro-val --file-logging.file nitro-val.log --auth.addr 127.0.0.10 --auth.origins 127.0.0.1 --auth.jwtsecret /tmp/nitro-val.jwt --auth.port 2000 &
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
sleep 2
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
echo launching nitro-node
/usr/local/bin/nitro --node.block-validator.execution-server.jwtsecret /tmp/nitro-val.jwt --node.block-validator.execution-server.url http://127.0.0.10:2000 "$@"
151 changes: 92 additions & 59 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -74,6 +75,13 @@ type BlockValidator struct {
sendRecordChan chan struct{}
progressValidationsChan chan struct{}

chosenValidator map[common.Hash]validator.ValidationSpawner

// wasmModuleRoot
moduleMutex sync.Mutex
currentWasmModuleRoot common.Hash
pendingWasmModuleRoot common.Hash

// for testing only
testingProgressMadeChan chan struct{}

Expand All @@ -84,10 +92,9 @@ type BlockValidator struct {

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ExecutionServer rpcclient.ClientConfig `koanf:"execution-server" reload:"hot"`
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
ExecutionServerConfigs []rpcclient.ClientConfig `koanf:"execution-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
Expand All @@ -96,7 +103,7 @@ type BlockValidatorConfig struct {
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
ExecutionServerConfigsList string `koanf:"execution-server-configs-list"`

memoryFreeLimit int
}
Expand All @@ -112,27 +119,21 @@ func (c *BlockValidatorConfig) Validate() error {
c.memoryFreeLimit = limit
}
streamsEnabled := c.RedisValidationClientConfig.Enabled()
if c.ValidationServerConfigs == nil {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
if c.ValidationServerConfigsList != "default" {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled {
if c.ExecutionServerConfigs == nil {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
c.ExecutionServerConfigs = []rpcclient.ClientConfig{c.ExecutionServer}
if c.ExecutionServerConfigsList != "default" {
var executionServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ExecutionServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
c.ExecutionServerConfigs = executionServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 && !streamsEnabled {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
if err := serverConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err)
for i := range c.ExecutionServerConfigs {
if err := c.ExecutionServerConfigs[i].Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator execution-server-configs. url: %s, err: %w", c.ExecutionServerConfigs[i].URL, err)
}
}
if err := c.ExecutionServerConfig.Validate(); err != nil {
return fmt.Errorf("validating execution server config: %w", err)
}
return nil
}

Expand All @@ -144,10 +145,9 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig

func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig)
rpcclient.RPCClientAddOptions(prefix+".execution-server", f, &DefaultBlockValidatorConfig.ExecutionServer)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.String(prefix+".execution-server-configs-list", DefaultBlockValidatorConfig.ExecutionServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
Expand All @@ -164,9 +164,8 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ExecutionServerConfig: rpcclient.DefaultClientConfig,
ExecutionServerConfigsList: "default",
ExecutionServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
Expand All @@ -180,10 +179,9 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
ExecutionServer: rpcclient.TestClientConfig,
ExecutionServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ExecutionServerConfig: rpcclient.TestClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand Down Expand Up @@ -332,6 +330,17 @@ func nonBlockingTrigger(channel chan struct{}) {
}
}

func (v *BlockValidator) GetModuleRootsToValidate() []common.Hash {
v.moduleMutex.Lock()
defer v.moduleMutex.Unlock()

validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot}
if (v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != common.Hash{}) {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot)
}
return validatingModuleRoots
}

// called from NewBlockValidator, doesn't need to catch locks
func ReadLastValidatedInfo(db ethdb.Database) (*GlobalStateValidatedInfo, error) {
exists, err := db.Has(lastGlobalStateValidatedInfoKey)
Expand Down Expand Up @@ -460,8 +469,13 @@ func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoo
if err != nil {
return err
}
_, err = v.execSpawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
_, err = spawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
}
}
return errors.New("did not find exec spawner for wasmModuleRoot")
}

func (v *BlockValidator) SetCurrentWasmModuleRoot(hash common.Hash) error {
Expand Down Expand Up @@ -704,14 +718,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
rooms := make([]int, len(v.validationSpawners))
currentSpawnerIndex := 0
for i, spawner := range v.validationSpawners {
here := spawner.Room() / len(wasmRoots)
if here > 0 {
rooms[i] = here
}
}
pos := v.validated() - 1 // to reverse the first +1 in the loop
validationsLoop:
for {
Expand Down Expand Up @@ -780,15 +786,15 @@ validationsLoop:
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
for currentSpawnerIndex < len(rooms) {
if rooms[currentSpawnerIndex] > 0 {
break
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot))
continue
}
if v.chosenValidator[moduleRoot].Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
return nil, nil
}
currentSpawnerIndex++
}
if currentSpawnerIndex == len(rooms) {
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
if v.isMemoryLimitExceeded() {
log.Warn("advanceValidations: aborting due to running low on memory")
Expand All @@ -808,8 +814,8 @@ validationsLoop:
defer validatorPendingValidationsGauge.Dec(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex)
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validationCtx, cancel := context.WithCancel(ctx)
Expand All @@ -832,10 +838,6 @@ validationsLoop:
}
nonBlockingTrigger(v.progressValidationsChan)
})
rooms[currentSpawnerIndex]--
if rooms[currentSpawnerIndex] == 0 {
currentSpawnerIndex++
}
}
}
}
Expand Down Expand Up @@ -1045,10 +1047,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
currentModuleRoot := config.CurrentModuleRoot
switch currentModuleRoot {
case "latest":
if v.execSpawner == nil {
return fmt.Errorf(`execution spawner is nil while current module root is "latest"`)
}
latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx)
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
Expand All @@ -1063,13 +1062,47 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
return errors.New("current-module-root config value illegal")
}
}
pendingModuleRoot := config.PendingUpgradeModuleRoot
if pendingModuleRoot != "" {
if pendingModuleRoot == "latest" {
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
v.pendingWasmModuleRoot = latest
} else {
valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot)
v.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot)
if (!valid || v.pendingWasmModuleRoot == common.Hash{}) {
return errors.New("pending-upgrade-module-root config value illegal")
}
}
}
log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot)
moduleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot {
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot)
}
if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing block validator with module roots: %w", err)
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
if err != nil {
return err
}
}
v.chosenValidator = make(map[common.Hash]validator.ValidationSpawner)
for _, root := range moduleRoots {
if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) {
v.chosenValidator[root] = v.redisValidator
}
if v.chosenValidator[root] == nil {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, root) {
v.chosenValidator[root] = spawner
break
}
}
}
}
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,18 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
}
}
input.BatchInfo = prunedBatches
execRun, err := m.validator.execSpawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
var execRun validator.ExecutionRun
for _, spawner := range m.validator.execSpawners {
if validator.SpawnerSupportsModule(spawner, m.wasmModuleRoot) {
execRun, err = spawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
}
break
}
}
if execRun == nil {
return fmt.Errorf("did not find valid execution backend")
}
backend, err := NewExecutionChallengeBackend(execRun)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions staker/challenge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func runChallengeTest(

for i := 0; i < 100; i++ {
if testTimeout {
backend.Commit()
err = backend.AdjustTime(time.Second * 40)
}
Require(t, err)
Expand Down
Loading
Loading