Skip to content

Commit

Permalink
Merge pull request #2267 from OffchainLabs/multi-exec-servers
Browse files Browse the repository at this point in the history
[Config Change] Multi exec servers
  • Loading branch information
PlasmaPower authored May 2, 2024
2 parents 58f53f9 + 260d14a commit b4cc111
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 170 deletions.
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,15 @@ RUN export DEBIAN_FRONTEND=noninteractive && \

USER user

FROM nitro-node-dev as nitro-node-split
USER root

RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y xxd netcat-traditional
COPY scripts/split-val-entry.sh /usr/local/bin
ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ]
USER user

FROM nitro-node as nitro-node-default
# Just to ensure nitro-node-dist is default
9 changes: 4 additions & 5 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -40,11 +39,11 @@ func (a *BlockValidatorDebugAPI) ValidateMessageNumber(
if moduleRootOptional != nil {
moduleRoot = *moduleRootOptional
} else {
moduleRoots := a.val.GetModuleRootsToValidate()
if len(moduleRoots) == 0 {
return result, errors.New("no current WasmModuleRoot configured, must provide parameter")
var err error
moduleRoot, err = a.val.GetLatestWasmModuleRoot(ctx)
if err != nil {
return result, fmt.Errorf("no latest WasmModuleRoot configured, must provide parameter: %w", err)
}
moduleRoot = moduleRoots[0]
}
start_time := time.Now()
valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot)
Expand Down
2 changes: 1 addition & 1 deletion nitro-testnode
18 changes: 18 additions & 0 deletions scripts/split-val-entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

xxd -l 32 -ps -c 40 /dev/urandom > /tmp/nitro-val.jwt

echo launching validation servers
# To add validation server:
# > launch them here with a different port and --validation.wasm.root-path
# add their port to wait loop
# edit validation-server-configs-list to include the other nodes
/usr/local/bin/nitro-val --file-logging.enable=false --auth.addr 127.0.0.10 --auth.origins 127.0.0.1 --auth.jwtsecret /tmp/nitro-val.jwt --auth.port 52000 &
for port in 52000; do
while ! nc -w1 -z 127.0.0.10 $port; do
echo waiting for validation port $port
sleep 1
done
done
echo launching nitro-node
/usr/local/bin/nitro --node.block-validator.pending-upgrade-module-root="0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4" --node.block-validator.validation-server-configs-list='[{"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52000"}]' "$@"
136 changes: 84 additions & 52 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -74,6 +75,13 @@ type BlockValidator struct {
sendRecordChan chan struct{}
progressValidationsChan chan struct{}

chosenValidator map[common.Hash]validator.ValidationSpawner

// wasmModuleRoot
moduleMutex sync.Mutex
currentWasmModuleRoot common.Hash
pendingWasmModuleRoot common.Hash

// for testing only
testingProgressMadeChan chan struct{}

Expand All @@ -84,10 +92,9 @@ type BlockValidator struct {

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
Expand All @@ -96,7 +103,7 @@ type BlockValidatorConfig struct {
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`

memoryFreeLimit int
}
Expand All @@ -112,27 +119,21 @@ func (c *BlockValidatorConfig) Validate() error {
c.memoryFreeLimit = limit
}
streamsEnabled := c.RedisValidationClientConfig.Enabled()
if c.ValidationServerConfigs == nil {
if len(c.ValidationServerConfigs) == 0 {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
if c.ValidationServerConfigsList != "default" {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled {
var executionServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
c.ValidationServerConfigs = executionServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 && !streamsEnabled {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
if err := serverConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err)
for i := range c.ValidationServerConfigs {
if err := c.ValidationServerConfigs[i].Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", c.ValidationServerConfigs[i].URL, err)
}
}
if err := c.ExecutionServerConfig.Validate(); err != nil {
return fmt.Errorf("validating execution server config: %w", err)
}
return nil
}

Expand All @@ -145,9 +146,8 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig
func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
Expand All @@ -166,7 +166,6 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ExecutionServerConfig: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
Expand All @@ -183,7 +182,6 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ExecutionServerConfig: rpcclient.TestClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand Down Expand Up @@ -332,6 +330,17 @@ func nonBlockingTrigger(channel chan struct{}) {
}
}

func (v *BlockValidator) GetModuleRootsToValidate() []common.Hash {
v.moduleMutex.Lock()
defer v.moduleMutex.Unlock()

validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot)
}
return validatingModuleRoots
}

// called from NewBlockValidator, doesn't need to catch locks
func ReadLastValidatedInfo(db ethdb.Database) (*GlobalStateValidatedInfo, error) {
exists, err := db.Has(lastGlobalStateValidatedInfoKey)
Expand Down Expand Up @@ -460,8 +469,13 @@ func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoo
if err != nil {
return err
}
_, err = v.execSpawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
_, err = spawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
}
}
return errors.New("did not find exec spawner for wasmModuleRoot")
}

func (v *BlockValidator) SetCurrentWasmModuleRoot(hash common.Hash) error {
Expand Down Expand Up @@ -704,14 +718,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
rooms := make([]int, len(v.validationSpawners))
currentSpawnerIndex := 0
for i, spawner := range v.validationSpawners {
here := spawner.Room() / len(wasmRoots)
if here > 0 {
rooms[i] = here
}
}
pos := v.validated() - 1 // to reverse the first +1 in the loop
validationsLoop:
for {
Expand Down Expand Up @@ -780,15 +786,15 @@ validationsLoop:
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
for currentSpawnerIndex < len(rooms) {
if rooms[currentSpawnerIndex] > 0 {
break
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot))
continue
}
if v.chosenValidator[moduleRoot].Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
return nil, nil
}
currentSpawnerIndex++
}
if currentSpawnerIndex == len(rooms) {
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
if v.isMemoryLimitExceeded() {
log.Warn("advanceValidations: aborting due to running low on memory")
Expand All @@ -808,8 +814,8 @@ validationsLoop:
defer validatorPendingValidationsGauge.Dec(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex)
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validationCtx, cancel := context.WithCancel(ctx)
Expand All @@ -832,10 +838,6 @@ validationsLoop:
}
nonBlockingTrigger(v.progressValidationsChan)
})
rooms[currentSpawnerIndex]--
if rooms[currentSpawnerIndex] == 0 {
currentSpawnerIndex++
}
}
}
}
Expand Down Expand Up @@ -1045,10 +1047,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
currentModuleRoot := config.CurrentModuleRoot
switch currentModuleRoot {
case "latest":
if v.execSpawner == nil {
return fmt.Errorf(`execution spawner is nil while current module root is "latest"`)
}
latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx)
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
Expand All @@ -1063,13 +1062,46 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
return errors.New("current-module-root config value illegal")
}
}
pendingModuleRoot := config.PendingUpgradeModuleRoot
if pendingModuleRoot != "" {
if pendingModuleRoot == "latest" {
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
v.pendingWasmModuleRoot = latest
} else {
valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot)
v.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot)
if (!valid || v.pendingWasmModuleRoot == common.Hash{}) {
return errors.New("pending-upgrade-module-root config value illegal")
}
}
}
log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot)
moduleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot {
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot)
}
if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing block validator with module roots: %w", err)
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
if err != nil {
return err
}
}
v.chosenValidator = make(map[common.Hash]validator.ValidationSpawner)
for _, root := range moduleRoots {
if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) {
v.chosenValidator[root] = v.redisValidator
} else {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, root) {
v.chosenValidator[root] = spawner
break
}
}
}
}
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,18 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
}
}
input.BatchInfo = prunedBatches
execRun, err := m.validator.execSpawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
var execRun validator.ExecutionRun
for _, spawner := range m.validator.execSpawners {
if validator.SpawnerSupportsModule(spawner, m.wasmModuleRoot) {
execRun, err = spawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
}
break
}
}
if execRun == nil {
return fmt.Errorf("did not find valid execution backend")
}
backend, err := NewExecutionChallengeBackend(execRun)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions staker/challenge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func runChallengeTest(

for i := 0; i < 100; i++ {
if testTimeout {
backend.Commit()
err = backend.AdjustTime(time.Second * 40)
}
Require(t, err)
Expand Down
Loading

0 comments on commit b4cc111

Please sign in to comment.