Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Apr 27, 2024
2 parents 400492d + 6958bea commit a107c5c
Show file tree
Hide file tree
Showing 21 changed files with 667 additions and 301 deletions.
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
48 changes: 30 additions & 18 deletions arbos/arbosState/arbosstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,26 @@ import (
// persisted beyond the end of the test.)

type ArbosState struct {
arbosVersion uint64 // version of the ArbOS storage format and semantics
upgradeVersion storage.StorageBackedUint64 // version we're planning to upgrade to, or 0 if not planning to upgrade
upgradeTimestamp storage.StorageBackedUint64 // when to do the planned upgrade
networkFeeAccount storage.StorageBackedAddress
l1PricingState *l1pricing.L1PricingState
l2PricingState *l2pricing.L2PricingState
retryableState *retryables.RetryableState
addressTable *addressTable.AddressTable
chainOwners *addressSet.AddressSet
sendMerkle *merkleAccumulator.MerkleAccumulator
blockhashes *blockhash.Blockhashes
chainId storage.StorageBackedBigInt
chainConfig storage.StorageBackedBytes
genesisBlockNum storage.StorageBackedUint64
infraFeeAccount storage.StorageBackedAddress
brotliCompressionLevel storage.StorageBackedUint64 // brotli compression level used for pricing
backingStorage *storage.Storage
Burner burn.Burner
arbosVersion uint64 // version of the ArbOS storage format and semantics
maxArbosVersionSupported uint64 // maximum ArbOS version supported by this code
maxDebugArbosVersionSupported uint64 // maximum ArbOS version supported by this code in debug mode
upgradeVersion storage.StorageBackedUint64 // version we're planning to upgrade to, or 0 if not planning to upgrade
upgradeTimestamp storage.StorageBackedUint64 // when to do the planned upgrade
networkFeeAccount storage.StorageBackedAddress
l1PricingState *l1pricing.L1PricingState
l2PricingState *l2pricing.L2PricingState
retryableState *retryables.RetryableState
addressTable *addressTable.AddressTable
chainOwners *addressSet.AddressSet
sendMerkle *merkleAccumulator.MerkleAccumulator
blockhashes *blockhash.Blockhashes
chainId storage.StorageBackedBigInt
chainConfig storage.StorageBackedBytes
genesisBlockNum storage.StorageBackedUint64
infraFeeAccount storage.StorageBackedAddress
brotliCompressionLevel storage.StorageBackedUint64 // brotli compression level used for pricing
backingStorage *storage.Storage
Burner burn.Burner
}

var ErrUninitializedArbOS = errors.New("ArbOS uninitialized")
Expand All @@ -70,6 +72,8 @@ func OpenArbosState(stateDB vm.StateDB, burner burn.Burner) (*ArbosState, error)
}
return &ArbosState{
arbosVersion,
20,
20,
backingStorage.OpenStorageBackedUint64(uint64(upgradeVersionOffset)),
backingStorage.OpenStorageBackedUint64(uint64(upgradeTimestampOffset)),
backingStorage.OpenStorageBackedAddress(uint64(networkFeeAccountOffset)),
Expand Down Expand Up @@ -400,6 +404,14 @@ func (state *ArbosState) RetryableState() *retryables.RetryableState {
return state.retryableState
}

func (state *ArbosState) MaxArbosVersionSupported() uint64 {
return state.maxArbosVersionSupported
}

func (state *ArbosState) MaxDebugArbosVersionSupported() uint64 {
return state.maxDebugArbosVersionSupported
}

func (state *ArbosState) L1PricingState() *l1pricing.L1PricingState {
return state.l1PricingState
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo
return fmt.Errorf("invalid chain config, not compatible with previous: %w", err)
}
}
// Make sure we don't allow accidentally downgrading ArbOS
if chainConfig.DebugMode() {
if currentArbosState.ArbOSVersion() > currentArbosState.MaxDebugArbosVersionSupported() {
return fmt.Errorf("attempted to launch node in debug mode with ArbOS version %v on ArbOS state with version %v", currentArbosState.MaxDebugArbosVersionSupported(), currentArbosState.ArbOSVersion())
}
} else {
if currentArbosState.ArbOSVersion() > currentArbosState.MaxArbosVersionSupported() {
return fmt.Errorf("attempted to launch node with ArbOS version %v on ArbOS state with version %v", currentArbosState.MaxArbosVersionSupported(), currentArbosState.ArbOSVersion())
}

}

return nil
}
Expand Down
4 changes: 1 addition & 3 deletions gethhook/geth-hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func init() {
precompileErrors := make(map[[4]byte]abi.Error)
for addr, precompile := range precompiles.Precompiles() {
for _, errABI := range precompile.Precompile().GetErrorABIs() {
var id [4]byte
copy(id[:], errABI.ID[:4])
precompileErrors[id] = errABI
precompileErrors[[4]byte(errABI.ID.Bytes())] = errABI
}
var wrapped vm.AdvancedPrecompile = ArbosPrecompileWrapper{precompile}
vm.PrecompiledContractsArbitrum[addr] = wrapped
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
57 changes: 18 additions & 39 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -21,68 +20,49 @@ type ConsumerConfig struct {
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultConsumerConfig = &ConsumerConfig{
var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
var TestConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
func NewConsumer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if streamName == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
}
return consumer, nil
}
Expand Down Expand Up @@ -125,11 +105,11 @@ func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Group: c.redisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Streams: []string{c.redisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
Expand All @@ -142,7 +122,6 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
Expand All @@ -154,7 +133,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

log.Debug("Redis stream consuming", "consumer_id", c.id, "message_id", res[0].Messages[0].ID)
return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
Expand All @@ -170,7 +149,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
Expand Down
68 changes: 27 additions & 41 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -32,9 +31,11 @@ const (

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ProducerConfig

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand All @@ -49,10 +50,7 @@ type ProducerConfig struct {
// When enabled, messages that are sent to consumers that later die before
// processing them, will be re-inserted into the stream to be proceesed by
// another consumer
EnableReproduce bool `koanf:"enable-reproduce"`
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
EnableReproduce bool `koanf:"enable-reproduce"`
// Interval duration in which producer checks for pending messages delivered
// to the consumers that are currently inactive.
CheckPendingInterval time.Duration `koanf:"check-pending-interval"`
Expand All @@ -61,56 +59,43 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultProducerConfig = &ProducerConfig{
var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
}

var DefaultTestProducerConfig = &ProducerConfig{
var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.String(prefix+".redis-url", DefaultProducerConfig.RedisURL, "redis url for redis stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-stream", DefaultProducerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultProducerConfig.RedisGroup, "redis stream consumer group name")
}

func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
if client == nil {
return nil, fmt.Errorf("redis client cannot be nil")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream cannot be emtpy")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group cannot be empty")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
if streamName == "" {
return nil, fmt.Errorf("stream name cannot be empty")
}
return &Producer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
}, nil
}

Expand Down Expand Up @@ -142,7 +127,7 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
}
acked := make(map[string]Request)
for _, msg := range msgs {
if _, err := p.client.XAck(ctx, p.cfg.RedisStream, p.cfg.RedisGroup, msg.ID).Result(); err != nil {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
continue
}
Expand Down Expand Up @@ -200,7 +185,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
return nil, fmt.Errorf("marshaling value: %w", err)
}
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.cfg.RedisStream,
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
Expand All @@ -224,6 +209,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
}

func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request) (*containers.Promise[Response], error) {
log.Debug("Redis stream producing", "value", value)
p.once.Do(func() {
p.StopWaiter.CallIteratively(p.checkAndReproduce)
p.StopWaiter.CallIteratively(p.checkResponses)
Expand All @@ -248,8 +234,8 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Expand Down Expand Up @@ -285,8 +271,8 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
Expand Down
Loading

0 comments on commit a107c5c

Please sign in to comment.