Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Config Change] Multi exec servers #2267

Merged
merged 11 commits into from
May 2, 2024
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,15 @@ RUN export DEBIAN_FRONTEND=noninteractive && \

USER user

FROM nitro-node-dev as nitro-node-split
USER root

RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y xxd netcat-traditional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove the apt cache after this

COPY scripts/split-val-entry.sh /usr/local/bin
ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ]
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
USER user

FROM nitro-node as nitro-node-default
# Just to ensure nitro-node-dist is default
9 changes: 4 additions & 5 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -40,11 +39,11 @@ func (a *BlockValidatorDebugAPI) ValidateMessageNumber(
if moduleRootOptional != nil {
moduleRoot = *moduleRootOptional
} else {
moduleRoots := a.val.GetModuleRootsToValidate()
if len(moduleRoots) == 0 {
return result, errors.New("no current WasmModuleRoot configured, must provide parameter")
var err error
moduleRoot, err = a.val.GetLatestWasmModuleRoot(ctx)
if err != nil {
return result, fmt.Errorf("no latest WasmModuleRoot configured, must provide parameter: %w", err)
}
moduleRoot = moduleRoots[0]
}
start_time := time.Now()
valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot)
Expand Down
2 changes: 1 addition & 1 deletion nitro-testnode
18 changes: 18 additions & 0 deletions scripts/split-val-entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

xxd -l 32 -ps -c 40 /dev/urandom > /tmp/nitro-val.jwt

echo launching validation servers
# To add validation server:
# > launch them here with a different port and --validation.wasm.root-path
# add their port to wait loop
# edit validation-server-configs-list to include the other nodes
/usr/local/bin/nitro-val --file-logging.enable=false --auth.addr 127.0.0.10 --auth.origins 127.0.0.1 --auth.jwtsecret /tmp/nitro-val.jwt --auth.port 52000 &
for port in 52000; do
while ! nc -w1 -z 127.0.0.10 $port; do
echo waiting for validation port $port
sleep 1
done
done
echo launching nitro-node
/usr/local/bin/nitro --node.block-validator.pending-upgrade-module-root="0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4" --node.block-validator.validation-server-configs-list='[{"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52000"}]' "$@"
136 changes: 84 additions & 52 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -74,6 +75,13 @@ type BlockValidator struct {
sendRecordChan chan struct{}
progressValidationsChan chan struct{}

chosenValidator map[common.Hash]validator.ValidationSpawner

// wasmModuleRoot
moduleMutex sync.Mutex
currentWasmModuleRoot common.Hash
pendingWasmModuleRoot common.Hash

// for testing only
testingProgressMadeChan chan struct{}

Expand All @@ -84,10 +92,9 @@ type BlockValidator struct {

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
Expand All @@ -96,7 +103,7 @@ type BlockValidatorConfig struct {
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`

memoryFreeLimit int
}
Expand All @@ -112,27 +119,21 @@ func (c *BlockValidatorConfig) Validate() error {
c.memoryFreeLimit = limit
}
streamsEnabled := c.RedisValidationClientConfig.Enabled()
if c.ValidationServerConfigs == nil {
if len(c.ValidationServerConfigs) == 0 {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
if c.ValidationServerConfigsList != "default" {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled {
var executionServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
c.ValidationServerConfigs = executionServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 && !streamsEnabled {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
if err := serverConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err)
for i := range c.ValidationServerConfigs {
if err := c.ValidationServerConfigs[i].Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", c.ValidationServerConfigs[i].URL, err)
}
}
if err := c.ExecutionServerConfig.Validate(); err != nil {
return fmt.Errorf("validating execution server config: %w", err)
}
return nil
}

Expand All @@ -145,9 +146,8 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig
func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
Expand All @@ -166,7 +166,6 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ExecutionServerConfig: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
Expand All @@ -183,7 +182,6 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ExecutionServerConfig: rpcclient.TestClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand Down Expand Up @@ -332,6 +330,17 @@ func nonBlockingTrigger(channel chan struct{}) {
}
}

func (v *BlockValidator) GetModuleRootsToValidate() []common.Hash {
v.moduleMutex.Lock()
defer v.moduleMutex.Unlock()

validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot)
}
return validatingModuleRoots
}

// called from NewBlockValidator, doesn't need to catch locks
func ReadLastValidatedInfo(db ethdb.Database) (*GlobalStateValidatedInfo, error) {
exists, err := db.Has(lastGlobalStateValidatedInfoKey)
Expand Down Expand Up @@ -460,8 +469,13 @@ func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoo
if err != nil {
return err
}
_, err = v.execSpawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
_, err = spawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
}
}
return errors.New("did not find exec spawner for wasmModuleRoot")
}

func (v *BlockValidator) SetCurrentWasmModuleRoot(hash common.Hash) error {
Expand Down Expand Up @@ -704,14 +718,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
rooms := make([]int, len(v.validationSpawners))
currentSpawnerIndex := 0
for i, spawner := range v.validationSpawners {
here := spawner.Room() / len(wasmRoots)
if here > 0 {
rooms[i] = here
}
}
pos := v.validated() - 1 // to reverse the first +1 in the loop
validationsLoop:
for {
Expand Down Expand Up @@ -780,15 +786,15 @@ validationsLoop:
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
for currentSpawnerIndex < len(rooms) {
if rooms[currentSpawnerIndex] > 0 {
break
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot))
continue
}
if v.chosenValidator[moduleRoot].Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
return nil, nil
}
currentSpawnerIndex++
}
if currentSpawnerIndex == len(rooms) {
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
if v.isMemoryLimitExceeded() {
log.Warn("advanceValidations: aborting due to running low on memory")
Expand All @@ -808,8 +814,8 @@ validationsLoop:
defer validatorPendingValidationsGauge.Dec(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex)
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validationCtx, cancel := context.WithCancel(ctx)
Expand All @@ -832,10 +838,6 @@ validationsLoop:
}
nonBlockingTrigger(v.progressValidationsChan)
})
rooms[currentSpawnerIndex]--
if rooms[currentSpawnerIndex] == 0 {
currentSpawnerIndex++
}
}
}
}
Expand Down Expand Up @@ -1045,10 +1047,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
currentModuleRoot := config.CurrentModuleRoot
switch currentModuleRoot {
case "latest":
if v.execSpawner == nil {
return fmt.Errorf(`execution spawner is nil while current module root is "latest"`)
}
latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx)
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
Expand All @@ -1063,13 +1062,46 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
return errors.New("current-module-root config value illegal")
}
}
pendingModuleRoot := config.PendingUpgradeModuleRoot
if pendingModuleRoot != "" {
if pendingModuleRoot == "latest" {
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
v.pendingWasmModuleRoot = latest
} else {
valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot)
v.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot)
if (!valid || v.pendingWasmModuleRoot == common.Hash{}) {
return errors.New("pending-upgrade-module-root config value illegal")
}
}
}
log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot)
moduleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot {
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot)
}
if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing block validator with module roots: %w", err)
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
if err != nil {
return err
}
}
v.chosenValidator = make(map[common.Hash]validator.ValidationSpawner)
for _, root := range moduleRoots {
if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) {
v.chosenValidator[root] = v.redisValidator
} else {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, root) {
v.chosenValidator[root] = spawner
break
}
}
}
}
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,18 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
}
}
input.BatchInfo = prunedBatches
execRun, err := m.validator.execSpawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
var execRun validator.ExecutionRun
for _, spawner := range m.validator.execSpawners {
if validator.SpawnerSupportsModule(spawner, m.wasmModuleRoot) {
execRun, err = spawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
}
break
}
}
if execRun == nil {
return fmt.Errorf("did not find valid execution backend")
}
backend, err := NewExecutionChallengeBackend(execRun)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions staker/challenge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func runChallengeTest(

for i := 0; i < 100; i++ {
if testTimeout {
backend.Commit()
err = backend.AdjustTime(time.Second * 40)
}
Require(t, err)
Expand Down
Loading
Loading