diff --git a/pubsub/common.go b/pubsub/common.go index 9f05304e46..d7f041af15 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "strings" "github.com/ethereum/go-ethereum/log" "github.com/go-redis/redis/v8" @@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool { got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result() if err != nil { - log.Error("Reading redis streams", "error", err) + if !strings.Contains(err.Error(), "no such key") { + log.Error("redis error", "err", err, "searching stream", streamName) + } return false } return got != nil diff --git a/pubsub/producer.go b/pubsub/producer.go index 074670ca0f..2b1cdb5e3f 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -13,6 +13,9 @@ import ( "encoding/json" "errors" "fmt" + "math" + "strconv" + "strings" "sync" "time" @@ -59,6 +62,7 @@ type ProducerConfig struct { KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"` // Interval duration for checking the result set by consumers. CheckResultInterval time.Duration `koanf:"check-result-interval"` + CheckPendingItems int64 `koanf:"check-pending-items"` } var DefaultProducerConfig = ProducerConfig{ @@ -66,13 +70,15 @@ var DefaultProducerConfig = ProducerConfig{ CheckPendingInterval: time.Second, KeepAliveTimeout: 5 * time.Minute, CheckResultInterval: 5 * time.Second, + CheckPendingItems: 256, } var TestProducerConfig = ProducerConfig{ - EnableReproduce: true, + EnableReproduce: false, CheckPendingInterval: 10 * time.Millisecond, KeepAliveTimeout: 100 * time.Millisecond, CheckResultInterval: 5 * time.Millisecond, + CheckPendingItems: 256, } func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -80,6 +86,7 @@ func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive") f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive") f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed") + f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending") } func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) { @@ -99,13 +106,13 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream }, nil } -func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) { +func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) { p.promisesLock.Lock() defer p.promisesLock.Unlock() - for _, msg := range msgs { - if promise, found := p.promises[msg.ID]; found { + for _, msg := range msgIds { + if promise, found := p.promises[msg]; found { promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request")) - delete(p.promises, msg.ID) + delete(p.promises, msg) } } } @@ -113,56 +120,132 @@ func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) // checkAndReproduce reproduce pending messages that were sent to consumers // that are currently inactive. func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration { - msgs, err := p.checkPending(ctx) + staleIds, err := p.checkPending(ctx) if err != nil { log.Error("Checking pending messages", "error", err) return p.cfg.CheckPendingInterval } - if len(msgs) == 0 { + if len(staleIds) == 0 { return p.cfg.CheckPendingInterval } - if !p.cfg.EnableReproduce { - p.errorPromisesFor(msgs) - return p.cfg.CheckPendingInterval + if p.cfg.EnableReproduce { + err = p.reproduceIds(ctx, staleIds) + if err != nil { + log.Warn("filed reproducing messages", "err", err) + } + } else { + p.errorPromisesFor(staleIds) } - acked := make(map[string]Request) - for _, msg := range msgs { + return p.cfg.CheckPendingInterval +} + +func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error { + log.Info("Attempting to claim", "messages", staleIds) + claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{ + Stream: p.redisStream, + Group: p.redisGroup, + Consumer: p.id, + MinIdle: p.cfg.KeepAliveTimeout, + Messages: staleIds, + }).Result() + if err != nil { + return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err) + } + for _, msg := range claimedMsgs { + data, ok := (msg.Values[messageKey]).(string) + if !ok { + log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey]) + continue + } + var req Request + if err := json.Unmarshal([]byte(data), &req); err != nil { + log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey]) + continue + } if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil { - log.Error("ACKing message", "error", err) + log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err) continue } - acked[msg.ID] = msg.Value - } - for k, v := range acked { // Only re-insert messages that were removed the the pending list first. - _, err := p.reproduce(ctx, v, k) - if err != nil { - log.Error("Re-inserting pending messages with inactive consumers", "error", err) + if _, err := p.reproduce(ctx, req, msg.ID); err != nil { + log.Error("redis producer reproduce: error", "err", err) } } - return p.cfg.CheckPendingInterval + return nil +} + +func setMinIdInt(min *[2]uint64, id string) error { + idParts := strings.Split(id, "-") + if len(idParts) != 2 { + return fmt.Errorf("invalid i.d: %v", id) + } + idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid i.d: %v err: %w", id, err) + } + if idTimeStamp > min[0] { + return nil + } + idSerial, err := strconv.ParseUint(idParts[1], 10, 64) + if err != nil { + return fmt.Errorf("invalid i.d serial: %v err: %w", id, err) + } + if idTimeStamp < min[0] { + min[0] = idTimeStamp + min[1] = idSerial + return nil + } + // idTimeStamp == min[0] + if idSerial < min[1] { + min[1] = idSerial + } + return nil } // checkResponses checks iteratively whether response for the promise is ready. func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration { + minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64} p.promisesLock.Lock() defer p.promisesLock.Unlock() + responded := 0 + errored := 0 for id, promise := range p.promises { + if ctx.Err() != nil { + return 0 + } res, err := p.client.Get(ctx, id).Result() if err != nil { - if errors.Is(err, redis.Nil) { - continue + errSetId := setMinIdInt(&minIdInt, id) + if errSetId != nil { + log.Error("error setting minId", "err", err) + return p.cfg.CheckResultInterval + } + if !errors.Is(err, redis.Nil) { + log.Error("Error reading value in redis", "key", id, "error", err) } - log.Error("Error reading value in redis", "key", id, "error", err) + continue } var resp Response if err := json.Unmarshal([]byte(res), &resp); err != nil { + promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err)) log.Error("Error unmarshaling", "value", res, "error", err) - continue + errored++ + } else { + promise.Produce(resp) + responded++ } - promise.Produce(resp) delete(p.promises, id) } + var trimmed int64 + var trimErr error + minId := "+" + if minIdInt[0] < math.MaxUint64 { + minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1]) + trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result() + } else { + trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result() + } + log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr) return p.cfg.CheckResultInterval } @@ -184,6 +267,10 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque if err != nil { return nil, fmt.Errorf("marshaling value: %w", err) } + // catching the promiseLock before we sendXadd makes sure promise ids will + // be always ascending + p.promisesLock.Lock() + defer p.promisesLock.Unlock() id, err := p.client.XAdd(ctx, &redis.XAddArgs{ Stream: p.redisStream, Values: map[string]any{messageKey: val}, @@ -191,13 +278,12 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque if err != nil { return nil, fmt.Errorf("adding values to redis: %w", err) } - p.promisesLock.Lock() - defer p.promisesLock.Unlock() promise := p.promises[oldKey] if oldKey != "" && promise == nil { // This will happen if the old consumer became inactive but then ack_d // the message afterwards. - return nil, fmt.Errorf("error reproducing the message, could not find existing one") + // don't error + log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey) } if oldKey == "" || promise == nil { pr := containers.NewPromise[Response](nil) @@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool { return found } -func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) { +// returns ids of pending messages that's worker doesn't appear alive +func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) { pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: p.redisStream, Group: p.redisGroup, Start: "-", End: "+", - Count: 100, + Count: p.cfg.CheckPendingItems, }).Result() if err != nil && !errors.Is(err, redis.Nil) { @@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess if len(pendingMessages) == 0 { return nil, nil } + if len(pendingMessages) >= int(p.cfg.CheckPendingItems) { + log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems) + } // IDs of the pending messages with inactive consumers. var ids []string active := make(map[string]bool) @@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess } ids = append(ids, msg.ID) } - if len(ids) == 0 { - log.Trace("There are no pending messages with inactive consumers") - return nil, nil - } - log.Info("Attempting to claim", "messages", ids) - claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: p.redisStream, - Group: p.redisGroup, - Consumer: p.id, - MinIdle: p.cfg.KeepAliveTimeout, - Messages: ids, - }).Result() - if err != nil { - return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err) - } - var res []*Message[Request] - for _, msg := range claimedMsgs { - data, ok := (msg.Values[messageKey]).(string) - if !ok { - return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey]) - } - var req Request - if err := json.Unmarshal([]byte(data), &req); err != nil { - return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err) - } - res = append(res, &Message[Request]{ - ID: msg.ID, - Value: req, - }) - } - return res, nil + return ids, nil } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 72504602e3..a3140e32ac 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -49,10 +49,12 @@ type configOpt interface { apply(consCfg *ConsumerConfig, prodCfg *ProducerConfig) } -type disableReproduce struct{} +type withReproduce struct { + reproduce bool +} -func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) { - prodCfg.EnableReproduce = false +func (e *withReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) { + prodCfg.EnableReproduce = e.reproduce } func producerCfg() *ProducerConfig { @@ -61,6 +63,7 @@ func producerCfg() *ProducerConfig { CheckPendingInterval: TestProducerConfig.CheckPendingInterval, KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout, CheckResultInterval: TestProducerConfig.CheckResultInterval, + CheckPendingItems: TestProducerConfig.CheckPendingItems, } } @@ -71,7 +74,7 @@ func consumerCfg() *ConsumerConfig { } } -func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { +func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (redis.UniversalClient, string, *Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { t.Helper() redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t)) if err != nil { @@ -107,7 +110,7 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) log.Debug("Error deleting heartbeat keys", "error", err) } }) - return producer, consumers + return redisClient, streamName, producer, consumers } func messagesMaps(n int) []map[string]string { @@ -118,10 +121,14 @@ func messagesMaps(n int) []map[string]string { return ret } +func msgForIndex(idx int) string { + return fmt.Sprintf("msg: %d", idx) +} + func wantMessages(n int) []string { var ret []string for i := 0; i < n; i++ { - ret = append(ret, fmt.Sprintf("msg: %d", i)) + ret = append(ret, msgForIndex(i)) } sort.Strings(ret) return ret @@ -148,26 +155,25 @@ func produceMessages(ctx context.Context, msgs []string, producer *Producer[test return promises, nil } -func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, error) { +func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, []int) { var ( responses []string - errs []error + errs []int ) - for _, p := range promises { + for idx, p := range promises { res, err := p.Await(ctx) if err != nil { - errs = append(errs, err) + errs = append(errs, idx) continue } responses = append(responses, res.Response) } - return responses, errors.Join(errs...) + return responses, errs } // consume messages from every consumer except stopped ones. -func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse]) ([]map[string]string, [][]string) { +func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse], gotMessages []map[string]string) [][]string { t.Helper() - gotMessages := messagesMaps(consumersCount) wantResponses := make([][]string, consumersCount) for idx := 0; idx < consumersCount; idx++ { if consumers[idx].Stopped() { @@ -199,7 +205,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques } }) } - return gotMessages, wantResponses + return wantResponses } func TestRedisProduce(t *testing.T) { @@ -208,43 +214,56 @@ func TestRedisProduce(t *testing.T) { for _, tc := range []struct { name string killConsumers bool + autoRecover bool }{ { name: "all consumers are active", killConsumers: false, + autoRecover: false, }, { name: "some consumers killed, others should take over their work", killConsumers: true, + autoRecover: true, + }, + { + name: "some consumers killed, should return failure", + killConsumers: true, + autoRecover: false, }, } { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - producer, consumers := newProducerConsumers(ctx, t) + redisClient, streamName, producer, consumers := newProducerConsumers(ctx, t, &withReproduce{tc.autoRecover}) producer.Start(ctx) wantMsgs := wantMessages(messagesCount) promises, err := produceMessages(ctx, wantMsgs, producer) if err != nil { t.Fatalf("Error producing messages: %v", err) } + gotMessages := messagesMaps(len(consumers)) if tc.killConsumers { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { consumers[i].Start(ctx) - if _, err := consumers[i].Consume(ctx); err != nil { + req, err := consumers[i].Consume(ctx) + if err != nil { t.Errorf("Error consuming message: %v", err) } + if !tc.autoRecover { + gotMessages[i][req.ID] = req.Value.Request + } consumers[i].StopAndWait() } } time.Sleep(time.Second) - gotMessages, wantResponses := consume(ctx, t, consumers) - gotResponses, err := awaitResponses(ctx, promises) - if err != nil { - t.Fatalf("Error awaiting responses: %v", err) + wantResponses := consume(ctx, t, consumers, gotMessages) + gotResponses, errIndexes := awaitResponses(ctx, promises) + if len(errIndexes) != 0 && tc.autoRecover { + t.Fatalf("Error awaiting responses: %v", errIndexes) } producer.StopAndWait() for _, c := range consumers { @@ -254,7 +273,6 @@ func TestRedisProduce(t *testing.T) { if err != nil { t.Fatalf("mergeMaps() unexpected error: %v", err) } - if diff := cmp.Diff(wantMsgs, got); diff != "" { t.Errorf("Unexpected diff (-want +got):\n%s\n", diff) } @@ -266,57 +284,17 @@ func TestRedisProduce(t *testing.T) { if cnt := producer.promisesLen(); cnt != 0 { t.Errorf("Producer still has %d unfullfilled promises", cnt) } + msgs, err := redisClient.XRange(ctx, streamName, "-", "+").Result() + if err != nil { + t.Errorf("XRange failed: %v", err) + } + if len(msgs) != 0 { + t.Errorf("redis still has %v messages", len(msgs)) + } }) } } -func TestRedisReproduceDisabled(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - producer, consumers := newProducerConsumers(ctx, t, &disableReproduce{}) - producer.Start(ctx) - wantMsgs := wantMessages(messagesCount) - promises, err := produceMessages(ctx, wantMsgs, producer) - if err != nil { - t.Fatalf("Error producing messages: %v", err) - } - - // Consumer messages in every third consumer but don't ack them to check - // that other consumers will claim ownership on those messages. - for i := 0; i < len(consumers); i += 3 { - consumers[i].Start(ctx) - if _, err := consumers[i].Consume(ctx); err != nil { - t.Errorf("Error consuming message: %v", err) - } - consumers[i].StopAndWait() - } - - gotMessages, _ := consume(ctx, t, consumers) - gotResponses, err := awaitResponses(ctx, promises) - if err == nil { - t.Fatalf("All promises were fullfilled with reproduce disabled and some consumers killed") - } - producer.StopAndWait() - for _, c := range consumers { - c.StopWaiter.StopAndWait() - } - got, err := mergeValues(gotMessages) - if err != nil { - t.Fatalf("mergeMaps() unexpected error: %v", err) - } - wantMsgCnt := messagesCount - ((consumersCount + 2) / 3) - if len(got) != wantMsgCnt { - t.Fatalf("Got: %d messages, want %d", len(got), wantMsgCnt) - } - if len(gotResponses) != wantMsgCnt { - t.Errorf("Got %d responses want: %d\n", len(gotResponses), wantMsgCnt) - } - if cnt := producer.promisesLen(); cnt != 0 { - t.Errorf("Producer still has %d unfullfilled promises", cnt) - } -} - // mergeValues merges maps from the slice and returns their values. // Returns and error if there exists duplicate key. func mergeValues(messages []map[string]string) ([]string, error) { diff --git a/staker/block_validator.go b/staker/block_validator.go index d7b5f4f6a2..bfb7c24ac6 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -35,6 +35,8 @@ var ( validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) + validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) + validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) ) @@ -283,8 +285,9 @@ func NewBlockValidator( return ret, nil } -func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) { +func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex, metr metrics.Gauge) { addr.Store(uint64(val)) + metr.Update(int64(val)) } func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex { @@ -588,7 +591,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e v.validations.Store(pos, status) v.nextCreateStartGS = endGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead - atomicStorePos(&v.createdA, pos+1) + atomicStorePos(&v.createdA, pos+1, validatorMsgCountCreatedGauge) log.Trace("create validation entry: created", "pos", pos) return true, nil } @@ -667,7 +670,7 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro return false, err } pos += 1 - atomicStorePos(&v.recordSentA, pos) + atomicStorePos(&v.recordSentA, pos, validatorMsgCountRecordSentGauge) log.Trace("next record request: sent", "pos", pos) } @@ -778,11 +781,10 @@ validationsLoop: log.Error("failed writing new validated to database", "pos", pos, "err", err) } go v.recorder.MarkValid(pos, v.lastValidGS.BlockHash) - atomicStorePos(&v.validatedA, pos+1) + atomicStorePos(&v.validatedA, pos+1, validatorMsgCountValidatedGauge) v.validations.Delete(pos) nonBlockingTrigger(v.createNodesChan) nonBlockingTrigger(v.sendRecordChan) - validatorMsgCountValidatedGauge.Update(int64(pos + 1)) if v.testingProgressMadeChan != nil { nonBlockingTrigger(v.testingProgressMadeChan) } @@ -1222,9 +1224,9 @@ func (v *BlockValidator) checkValidatedGSCaughtUp() (bool, error) { v.nextCreateBatchReread = true v.nextCreateStartGS = v.lastValidGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead - atomicStorePos(&v.createdA, count) - atomicStorePos(&v.recordSentA, count) - atomicStorePos(&v.validatedA, count) + atomicStorePos(&v.createdA, count, validatorMsgCountCreatedGauge) + atomicStorePos(&v.recordSentA, count, validatorMsgCountRecordSentGauge) + atomicStorePos(&v.validatedA, count, validatorMsgCountValidatedGauge) validatorMsgCountValidatedGauge.Update(int64(count)) v.chainCaughtUp = true return true, nil diff --git a/system_tests/common_test.go b/system_tests/common_test.go index ccee6aa8e4..b0748f8639 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -790,12 +790,12 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co conf.Wasm.RootPath = wasmRootDir // Enable redis streams when URL is specified if redisURL != "" { - conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig + conf.Arbitrator.RedisValidationServerConfig = rediscons.TestValidationServerConfig redisClient, err := redisutil.RedisClientFromURL(redisURL) if err != nil { t.Fatalf("Error creating redis coordinator: %v", err) } - redisStream := server_api.RedisStreamForRoot(currentRootModule(t)) + redisStream := server_api.RedisStreamForRoot(rediscons.TestValidationServerConfig.StreamPrefix, currentRootModule(t)) createRedisGroup(ctx, t, redisStream, redisClient) conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL t.Cleanup(func() { destroyRedisGroup(ctx, t, redisStream, redisClient) }) diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 7594421f9a..0adedc6784 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -20,6 +20,7 @@ import ( type ValidationClientConfig struct { Name string `koanf:"name"` + StreamPrefix string `koanf:"stream-prefix"` Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` @@ -42,6 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Name: "test redis validation client", Room: 2, RedisURL: "", + StreamPrefix: "test-", ProducerConfig: pubsub.TestProducerConfig, CreateStreams: false, } @@ -50,6 +52,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url") + f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } @@ -57,14 +60,12 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { // ValidationClient implements validation client through redis streams. type ValidationClient struct { stopwaiter.StopWaiter - name string - room atomic.Int32 + config *ValidationClientConfig + room atomic.Int32 // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] - producerConfig pubsub.ProducerConfig - redisClient redis.UniversalClient - moduleRoots []common.Hash - createStreams bool + producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] + redisClient redis.UniversalClient + moduleRoots []common.Hash } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -76,11 +77,9 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) return nil, err } validationClient := &ValidationClient{ - name: cfg.Name, - producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), - producerConfig: cfg.ProducerConfig, - redisClient: redisClient, - createStreams: cfg.CreateStreams, + config: cfg, + producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), + redisClient: redisClient, } validationClient.room.Store(cfg.Room) return validationClient, nil @@ -88,8 +87,8 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { - if c.createStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil { return fmt.Errorf("creating redis stream: %w", err) } } @@ -98,7 +97,7 @@ func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common. continue } p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( - c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) + c.redisClient, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue @@ -146,10 +145,7 @@ func (c *ValidationClient) Stop() { } func (c *ValidationClient) Name() string { - if c.Started() { - return c.name - } - return "(not started)" + return c.config.Name } func (c *ValidationClient) Room() int { diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 3dd817d5ae..dd646e1aa1 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -45,8 +45,8 @@ func MachineStepResultFromJson(resultJson *MachineStepResultJson) (*validator.Ma }, nil } -func RedisStreamForRoot(moduleRoot common.Hash) string { - return fmt.Sprintf("stream:%s", moduleRoot.Hex()) +func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string { + return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex()) } type Request struct { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 84f597c095..fb7db1e870 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -37,7 +37,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } @@ -130,10 +130,12 @@ type ValidationServerConfig struct { ModuleRoots []string `koanf:"module-roots"` // Timeout on polling for existence of each redis stream. StreamTimeout time.Duration `koanf:"stream-timeout"` + StreamPrefix string `koanf:"stream-prefix"` } var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, StreamTimeout: 10 * time.Minute, @@ -141,6 +143,7 @@ var DefaultValidationServerConfig = ValidationServerConfig{ var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "test-", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, StreamTimeout: time.Minute, @@ -150,6 +153,7 @@ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") + f.String(prefix+".stream-prefix", DefaultValidationServerConfig.StreamPrefix, "prefix for stream name") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") }