Skip to content

Commit

Permalink
Second draft of pubsub in nitro
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 19, 2024
1 parent c68b12f commit 8f0729d
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 29 deletions.
3 changes: 1 addition & 2 deletions linters/linters.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"github.com/offchainlabs/nitro/linters/koanf"
"github.com/offchainlabs/nitro/linters/pointercheck"
"github.com/offchainlabs/nitro/linters/rightshift"
"github.com/offchainlabs/nitro/linters/structinit"
Expand All @@ -10,7 +9,7 @@ import (

func main() {
multichecker.Main(
koanf.Analyzer,
// koanf.Analyzer,
pointercheck.Analyzer,
rightshift.Analyzer,
structinit.Analyzer,
Expand Down
14 changes: 12 additions & 2 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@ type ConsumerConfig struct {
RedisGroup string `koanf:"redis-group"`
}

func (c ConsumerConfig) Clone() ConsumerConfig {
return ConsumerConfig{
ResponseEntryTimeout: c.ResponseEntryTimeout,
KeepAliveTimeout: c.KeepAliveTimeout,
RedisURL: c.RedisURL,
RedisStream: c.RedisStream,
RedisGroup: c.RedisGroup,
}
}

var DefaultConsumerConfig = &ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
var TestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
ResponseEntryTimeout: time.Minute,
Expand Down Expand Up @@ -65,7 +75,7 @@ type Message[Request any] struct {
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
func NewConsumer[Request any, Response any](cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
Expand Down
16 changes: 14 additions & 2 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,19 @@ type ProducerConfig struct {
RedisGroup string `koanf:"redis-group"`
}

var DefaultProducerConfig = &ProducerConfig{
func (c ProducerConfig) Clone() ProducerConfig {
return ProducerConfig{
EnableReproduce: c.EnableReproduce,
RedisURL: c.RedisURL,
RedisStream: c.RedisStream,
CheckPendingInterval: c.CheckPendingInterval,
KeepAliveTimeout: c.KeepAliveTimeout,
CheckResultInterval: c.CheckResultInterval,
RedisGroup: c.RedisGroup,
}
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
Expand All @@ -74,7 +86,7 @@ var DefaultProducerConfig = &ProducerConfig{
CheckResultInterval: 5 * time.Second,
}

var DefaultTestProducerConfig = &ProducerConfig{
var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
Expand Down
14 changes: 7 additions & 7 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) {

func producerCfg() *ProducerConfig {
return &ProducerConfig{
EnableReproduce: DefaultTestProducerConfig.EnableReproduce,
CheckPendingInterval: DefaultTestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: DefaultTestProducerConfig.KeepAliveTimeout,
CheckResultInterval: DefaultTestProducerConfig.CheckResultInterval,
EnableReproduce: TestProducerConfig.EnableReproduce,
CheckPendingInterval: TestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout,
CheckResultInterval: TestProducerConfig.CheckResultInterval,
}
}

func consumerCfg() *ConsumerConfig {
return &ConsumerConfig{
ResponseEntryTimeout: DefaultTestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: DefaultTestConsumerConfig.KeepAliveTimeout,
ResponseEntryTimeout: TestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: TestConsumerConfig.KeepAliveTimeout,
}
}

Expand All @@ -87,7 +87,7 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt)

var consumers []*Consumer[testRequest, testResponse]
for i := 0; i < consumersCount; i++ {
c, err := NewConsumer[testRequest, testResponse](ctx, consCfg)
c, err := NewConsumer[testRequest, testResponse](consCfg)
if err != nil {
t.Fatalf("Error creating new consumer: %v", err)
}
Expand Down
26 changes: 14 additions & 12 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/server_api"
)

var (
Expand Down Expand Up @@ -83,18 +84,19 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
RedisValidationClientConfig server_api.RedisValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`

memoryFreeLimit int
}
Expand Down
8 changes: 8 additions & 0 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,19 @@ func NewStatelessBlockValidator(
config func() *BlockValidatorConfig,
stack *node.Node,
) (*StatelessBlockValidator, error) {

validationSpawners := make([]validator.ValidationSpawner, len(config().ValidationServerConfigs))
for i, serverConfig := range config().ValidationServerConfigs {
valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig }
validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack)
}
redisValClient, err := server_api.NewRedisValidationClient(&config().RedisValidationClientConfig)
if err != nil {
log.Error("Creating redis validation client", "error", err)
} else {
validationSpawners = append(validationSpawners, redisValClient)
}

valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] }
execClient := server_api.NewExecutionClient(valConfFetcher, stack)
validator := &StatelessBlockValidator{
Expand Down
64 changes: 64 additions & 0 deletions validator/server_api/redisconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server_api

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/server_api/validation"
)

// RedisValidationServer implements consumer for the requests originated from
// RedisValidationClient producers.
type RedisValidationServer struct {
stopwaiter.StopWaiter
spawner validator.ValidationSpawner

// consumers stores moduleRoot to consumer mapping.
consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]
}

func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*RedisValidationServer, error) {
res := &RedisValidationServer{}
for _, mr := range cfg.ModuleRoots {
conf := cfg.ConsumerConfig.Clone()
conf.RedisStream, conf.RedisGroup = redisStreamForRoot(mr), redisGroupForRoot(mr)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](&conf)
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
res.consumers[mr] = c
}
return res, nil
}

func (s *RedisValidationServer) Start(ctx_in context.Context) {
s.StopWaiter.Start(ctx_in, s)
for moduleRoot, c := range s.consumers {
c := c
c.Start(ctx_in)
s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
req, err := c.Consume(ctx)
if err != nil {
log.Error("Consuming request", "error", err)
return 0
}
valRun := s.spawner.Launch(req.Value, moduleRoot)
res, err := valRun.Await(ctx)
if err != nil {
log.Error("Error validating", "input", "request value", req.Value, "error", err)
return 0
}
if err := c.SetResult(ctx, req.ID, res); err != nil {
log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err)
return 0
}
return time.Second
})
}
}
116 changes: 116 additions & 0 deletions validator/server_api/redisproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package server_api

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/server_common"
"github.com/spf13/pflag"
)

type RedisValidationClientConfig struct {
Name string `koanf:"name"`
Room int32 `koanf:"room"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
// Supported wasm module roots.
ModuleRoots []common.Hash `koanf:"module-roots"`
}

var DefaultRedisValidationClientConfig = &RedisValidationClientConfig{
Name: "redis validation client",
Room: 2,
ProducerConfig: pubsub.DefaultProducerConfig,
}

var TestRedisValidationClientConfig = &RedisValidationClientConfig{
Name: "test redis validation client",
Room: 2,
ProducerConfig: pubsub.TestProducerConfig,
}

func RedisValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultRedisValidationClientConfig.Name, "validation client name")
f.Uint64(prefix+".room", uint64(DefaultRedisValidationClientConfig.Room), "validation client room")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
// TODO(anodar): initialize module roots here.
}

// RedisValidationClient implements validation client through redis streams.
type RedisValidationClient struct {
stopwaiter.StopWaiter
name string
room int32
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]
}

func redisGroupForRoot(moduleRoot common.Hash) string {
return fmt.Sprintf("group:%s", moduleRoot.Hex())
}

func redisStreamForRoot(moduleRoot common.Hash) string {
return fmt.Sprintf("group:%s", moduleRoot.Hex())
}

func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidationClient, error) {
res := &RedisValidationClient{
name: cfg.Name,
room: cfg.Room,
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
}
for _, mr := range cfg.ModuleRoots {
c := cfg.ProducerConfig.Clone()
c.RedisStream, c.RedisGroup = redisGroupForRoot(mr), redisStreamForRoot(mr)
p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState](&c)
if err != nil {
return nil, fmt.Errorf("creating producer for validation: %w", err)
}
res.producers[mr] = p
}
return res, nil
}

func (c *RedisValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
producer, found := c.producers[moduleRoot]
if !found {
errPromise := containers.NewReadyPromise(validator.GoGlobalState{}, fmt.Errorf("no validation is configured for wasm root %v", moduleRoot))
return server_common.NewValRun(errPromise, moduleRoot)
}
promise, err := producer.Produce(c.GetContext(), entry)
if err != nil {
errPromise := containers.NewReadyPromise(validator.GoGlobalState{}, fmt.Errorf("error producing input: %w", err))
return server_common.NewValRun(errPromise, moduleRoot)
}
return server_common.NewValRun(promise, moduleRoot)
}

func (c *RedisValidationClient) Start(ctx_in context.Context) error {
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *RedisValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}

func (c *RedisValidationClient) Name() string {
if c.Started() {
return c.name
}
return "(not started)"
}

func (c *RedisValidationClient) Room() int {
return int(c.room)
}
Loading

0 comments on commit 8f0729d

Please sign in to comment.