diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 1a3fd435b8..3c164066d3 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -238,7 +238,7 @@ func startup() error { return errors.New("sequencer-inbox-address must be set to a valid L1 URL and contract address, or 'none'") } - daReader, daWriter, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress) + daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress) if err != nil { return err } @@ -253,7 +253,7 @@ func startup() error { if serverConfig.EnableRPC { log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime) - rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker) + rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker, signatureVerifier) if err != nil { return err } diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index 3f64a990ca..cdea134bcb 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -91,6 +91,7 @@ type ClientStoreConfig struct { SigningKey string `koanf:"signing-key"` SigningWallet string `koanf:"signing-wallet"` SigningWalletPassword string `koanf:"signing-wallet-password"` + MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` } func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { @@ -102,6 +103,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { f.String("signing-wallet", "", "wallet containing ecdsa key to sign the message with") f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password") f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.") + f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request") k, err := confighelpers.BeginCommonParse(f, args) if err != nil { @@ -121,12 +123,7 @@ func startClientStore(args []string) error { return err } - client, err := das.NewDASRPCClient(config.URL) - if err != nil { - return err - } - - var dasClient das.DataAvailabilityServiceWriter = client + var signer signature.DataSignerFunc if config.SigningKey != "" { var privateKey *ecdsa.PrivateKey if config.SigningKey[:2] == "0x" { @@ -140,12 +137,7 @@ func startClientStore(args []string) error { return err } } - signer := signature.DataSignerFromPrivateKey(privateKey) - - dasClient, err = das.NewStoreSigningDAS(dasClient, signer) - if err != nil { - return err - } + signer = signature.DataSignerFromPrivateKey(privateKey) } else if config.SigningWallet != "" { walletConf := &genericconf.WalletConfig{ Pathname: config.SigningWallet, @@ -154,16 +146,17 @@ func startClientStore(args []string) error { Account: "", OnlyCreateKey: false, } - _, signer, err := util.OpenWallet("datool", walletConf, nil) - if err != nil { - return err - } - dasClient, err = das.NewStoreSigningDAS(dasClient, signer) + _, signer, err = util.OpenWallet("datool", walletConf, nil) if err != nil { return err } } + client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize) + if err != nil { + return err + } + ctx := context.Background() var cert *daprovider.DataAvailabilityCertificate @@ -173,9 +166,9 @@ func startClientStore(args []string) error { if err != nil { return err } - cert, err = dasClient.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) } else if len(config.Message) > 0 { - cert, err = dasClient.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) } else { return errors.New("--message or --random-message-size must be specified") } @@ -361,7 +354,7 @@ func dumpKeyset(args []string) error { return err } - services, err := das.ParseServices(config.Keyset) + services, err := das.ParseServices(config.Keyset, nil) if err != nil { return err } diff --git a/das/aggregator.go b/das/aggregator.go index d3edd58437..25db73a76e 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -27,20 +27,23 @@ import ( ) type AggregatorConfig struct { - Enable bool `koanf:"enable"` - AssumedHonest int `koanf:"assumed-honest"` - Backends string `koanf:"backends"` + Enable bool `koanf:"enable"` + AssumedHonest int `koanf:"assumed-honest"` + Backends string `koanf:"backends"` + MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` } var DefaultAggregatorConfig = AggregatorConfig{ - AssumedHonest: 0, - Backends: "", + AssumedHonest: 0, + Backends: "", + MaxStoreChunkBodySize: 512 * 1024, } func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage/retrieval of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types") + f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types") f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.") f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration") + f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers") } type Aggregator struct { @@ -165,7 +168,7 @@ type storeResponse struct { func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig)) if a.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, timeout, sig) + actualSigner, err := DasRecoverSigner(message, sig, timeout) if err != nil { return nil, err } diff --git a/das/das.go b/das/das.go index b0708e3b33..fea1e6c6a2 100644 --- a/das/das.go +++ b/das/das.go @@ -41,11 +41,9 @@ type DataAvailabilityConfig struct { LocalCache CacheConfig `koanf:"local-cache"` RedisCache RedisConfig `koanf:"redis-cache"` - LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` - LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` - S3Storage S3StorageServiceConfig `koanf:"s3-storage"` - IpfsStorage IpfsStorageServiceConfig `koanf:"ipfs-storage"` - RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"` + LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` + LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` + S3Storage S3StorageServiceConfig `koanf:"s3-storage"` Key KeyConfig `koanf:"key"` @@ -65,9 +63,9 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{ RequestTimeout: 5 * time.Second, Enable: false, RestAggregator: DefaultRestfulClientAggregatorConfig, + RPCAggregator: DefaultAggregatorConfig, ParentChainConnectionAttempts: 15, PanicOnError: false, - IpfsStorage: DefaultIpfsStorageServiceConfig, } func OptionalAddressFromString(s string) (*common.Address, error) { @@ -114,7 +112,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f) LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f) S3ConfigAddOptions(prefix+".s3-storage", f) - RegularSyncStorageConfigAddOptions(prefix+".regular-sync-storage", f) // Key config for storage KeyConfigAddOptions(prefix+".key", f) @@ -128,7 +125,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { } // Both the Nitro node and daserver can use these options. - IpfsStorageServiceConfigAddOptions(prefix+".ipfs-storage", f) RestfulClientAggregatorConfigAddOptions(prefix+".rest-aggregator", f) f.String(prefix+".parent-chain-node-url", DefaultDataAvailabilityConfig.ParentChainNodeURL, "URL for parent chain node, only used in standalone daserver; when running as part of a node that node's L1 configuration is used") diff --git a/das/dasRpcClient.go b/das/dasRpcClient.go index 5fca1e449f..8d8db02ff4 100644 --- a/das/dasRpcClient.go +++ b/das/dasRpcClient.go @@ -6,36 +6,145 @@ package das import ( "context" "fmt" + "strings" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/signature" ) type DASRPCClient struct { // implements DataAvailabilityService - clnt *rpc.Client - url string + clnt *rpc.Client + url string + signer signature.DataSignerFunc + chunkSize uint64 } -func NewDASRPCClient(target string) (*DASRPCClient, error) { +func nilSigner(_ []byte) ([]byte, error) { + return []byte{}, nil +} + +const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}" + +func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) { clnt, err := rpc.Dial(target) if err != nil { return nil, err } + if signer == nil { + signer = nilSigner + } + + // Byte arrays are encoded in base64 + chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2 + if chunkSize <= 0 { + return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize) + } + return &DASRPCClient{ - clnt: clnt, - url: target, + clnt: clnt, + url: target, + signer: signer, + chunkSize: uint64(chunkSize), + }, nil +} + +func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) { + timestamp := uint64(time.Now().Unix()) + nChunks := uint64(len(message)) / c.chunkSize + lastChunkSize := uint64(len(message)) % c.chunkSize + if lastChunkSize > 0 { + nChunks++ + } else { + lastChunkSize = c.chunkSize + } + totalSize := uint64(len(message)) + + startReqSig, err := applyDasSigner(c.signer, []byte{}, timestamp, nChunks, c.chunkSize, totalSize, timeout) + if err != nil { + return nil, err + } + + var startChunkedStoreResult StartChunkedStoreResult + if err := c.clnt.CallContext(ctx, &startChunkedStoreResult, "das_startChunkedStore", hexutil.Uint64(timestamp), hexutil.Uint64(nChunks), hexutil.Uint64(c.chunkSize), hexutil.Uint64(totalSize), hexutil.Uint64(timeout), hexutil.Bytes(startReqSig)); err != nil { + if strings.Contains(err.Error(), "the method das_startChunkedStore does not exist") { + return c.legacyStore(ctx, message, timeout) + } + return nil, err + } + batchId := uint64(startChunkedStoreResult.BatchId) + + g := new(errgroup.Group) + for i := uint64(0); i < nChunks; i++ { + var chunk []byte + if i == nChunks-1 { + chunk = message[i*c.chunkSize : i*c.chunkSize+lastChunkSize] + } else { + chunk = message[i*c.chunkSize : (i+1)*c.chunkSize] + } + + inner := func(_i uint64, _chunk []byte) func() error { + return func() error { return c.sendChunk(ctx, batchId, _i, _chunk) } + } + g.Go(inner(i, chunk)) + } + if err := g.Wait(); err != nil { + return nil, err + } + + finalReqSig, err := applyDasSigner(c.signer, []byte{}, uint64(startChunkedStoreResult.BatchId)) + if err != nil { + return nil, err + } + + var storeResult StoreResult + if err := c.clnt.CallContext(ctx, &storeResult, "das_commitChunkedStore", startChunkedStoreResult.BatchId, hexutil.Bytes(finalReqSig)); err != nil { + return nil, err + } + + respSig, err := blsSignatures.SignatureFromBytes(storeResult.Sig) + if err != nil { + return nil, err + } + + return &daprovider.DataAvailabilityCertificate{ + DataHash: common.BytesToHash(storeResult.DataHash), + Timeout: uint64(storeResult.Timeout), + SignersMask: uint64(storeResult.SignersMask), + Sig: respSig, + KeysetHash: common.BytesToHash(storeResult.KeysetHash), + Version: byte(storeResult.Version), }, nil } -func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) { - log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(reqSig), "this", *c) +func (c *DASRPCClient) sendChunk(ctx context.Context, batchId, i uint64, chunk []byte) error { + chunkReqSig, err := applyDasSigner(c.signer, chunk, batchId, i) + if err != nil { + return err + } + + if err := c.clnt.CallContext(ctx, nil, "das_sendChunk", hexutil.Uint64(batchId), hexutil.Uint64(i), hexutil.Bytes(chunk), hexutil.Bytes(chunkReqSig)); err != nil { + return err + } + return nil +} + +func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { + log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "this", *c) + + reqSig, err := applyDasSigner(c.signer, message, timeout) + if err != nil { + return nil, err + } + var ret StoreResult if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil { return nil, err diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 03f755b90e..1e5c95089f 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -7,8 +7,11 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "net/http" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -28,34 +31,47 @@ var ( rpcStoreFailureGauge = metrics.NewRegisteredGauge("arb/das/rpc/store/failure", nil) rpcStoreStoredBytesGauge = metrics.NewRegisteredGauge("arb/das/rpc/store/bytes", nil) rpcStoreDurationHistogram = metrics.NewRegisteredHistogram("arb/das/rpc/store/duration", nil, metrics.NewBoundedHistogramSample()) + + rpcSendChunkSuccessGauge = metrics.NewRegisteredGauge("arb/das/rpc/sendchunk/success", nil) + rpcSendChunkFailureGauge = metrics.NewRegisteredGauge("arb/das/rpc/sendchunk/failure", nil) ) type DASRPCServer struct { daReader DataAvailabilityServiceReader daWriter DataAvailabilityServiceWriter daHealthChecker DataAvailabilityServiceHealthChecker + + signatureVerifier *SignatureVerifier + + batches *batchBuilder } -func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) { +func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum)) if err != nil { return nil, err } - return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, rpcServerBodyLimit, daReader, daWriter, daHealthChecker) + return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, rpcServerBodyLimit, daReader, daWriter, daHealthChecker, signatureVerifier) } -func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) { +func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { if daWriter == nil { return nil, errors.New("No writer backend was configured for DAS RPC server. Has the BLS signing key been set up (--data-availability.key.key-dir or --data-availability.key.priv-key options)?") } rpcServer := rpc.NewServer() + if legacyDASStoreAPIOnly { + rpcServer.ApplyAPIFilter(map[string]bool{"das_store": true}) + } if rpcServerBodyLimit > 0 { rpcServer.SetHTTPBodyLimit(rpcServerBodyLimit) } + err := rpcServer.RegisterName("das", &DASRPCServer{ - daReader: daReader, - daWriter: daWriter, - daHealthChecker: daHealthChecker, + daReader: daReader, + daWriter: daWriter, + daHealthChecker: daHealthChecker, + signatureVerifier: signatureVerifier, + batches: newBatchBuilder(), }) if err != nil { return nil, err @@ -91,8 +107,8 @@ type StoreResult struct { Version hexutil.Uint64 `json:"version,omitempty"` } -func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { - log.Trace("dasRpc.DASRPCServer.Store", "message", pretty.FirstFewBytes(message), "message length", len(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", serv) +func (s *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + log.Trace("dasRpc.DASRPCServer.Store", "message", pretty.FirstFewBytes(message), "message length", len(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", s) rpcStoreRequestGauge.Inc(1) start := time.Now() success := false @@ -105,7 +121,220 @@ func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, time rpcStoreDurationHistogram.Update(time.Since(start).Nanoseconds()) }() - cert, err := serv.daWriter.Store(ctx, message, uint64(timeout), sig) + if err := s.signatureVerifier.verify(ctx, message, sig, uint64(timeout)); err != nil { + return nil, err + } + + cert, err := s.daWriter.Store(ctx, message, uint64(timeout), nil) + if err != nil { + return nil, err + } + rpcStoreStoredBytesGauge.Inc(int64(len(message))) + success = true + return &StoreResult{ + KeysetHash: cert.KeysetHash[:], + DataHash: cert.DataHash[:], + Timeout: hexutil.Uint64(cert.Timeout), + SignersMask: hexutil.Uint64(cert.SignersMask), + Sig: blsSignatures.SignatureToBytes(cert.Sig), + Version: hexutil.Uint64(cert.Version), + }, nil +} + +type StartChunkedStoreResult struct { + BatchId hexutil.Uint64 `json:"batchId,omitempty"` +} + +type SendChunkResult struct { + Ok hexutil.Uint64 `json:"sendChunkResult,omitempty"` +} + +type batch struct { + chunks [][]byte + expectedChunks uint64 + seenChunks atomic.Int64 + expectedChunkSize, expectedSize uint64 + timeout uint64 + startTime time.Time +} + +const ( + maxPendingBatches = 10 + batchBuildingExpiry = 1 * time.Minute +) + +// exposed global for test control +var ( + legacyDASStoreAPIOnly = false +) + +type batchBuilder struct { + mutex sync.Mutex + batches map[uint64]*batch +} + +func newBatchBuilder() *batchBuilder { + return &batchBuilder{ + batches: make(map[uint64]*batch), + } +} + +func (b *batchBuilder) assign(nChunks, timeout, chunkSize, totalSize uint64) (uint64, error) { + b.mutex.Lock() + defer b.mutex.Unlock() + if len(b.batches) >= maxPendingBatches { + return 0, fmt.Errorf("can't start new batch, already %d pending", len(b.batches)) + } + + id := rand.Uint64() + _, ok := b.batches[id] + if ok { + return 0, fmt.Errorf("can't start new batch, try again") + } + + b.batches[id] = &batch{ + chunks: make([][]byte, nChunks), + expectedChunks: nChunks, + expectedChunkSize: chunkSize, + expectedSize: totalSize, + timeout: timeout, + startTime: time.Now(), + } + go func(id uint64) { + <-time.After(batchBuildingExpiry) + b.mutex.Lock() + // Batch will only exist if expiry was reached without it being complete. + if _, exists := b.batches[id]; exists { + rpcStoreFailureGauge.Inc(1) + delete(b.batches, id) + } + b.mutex.Unlock() + }(id) + return id, nil +} + +func (b *batchBuilder) add(id, idx uint64, data []byte) error { + b.mutex.Lock() + batch, ok := b.batches[id] + b.mutex.Unlock() + if !ok { + return fmt.Errorf("unknown batch(%d)", id) + } + + if idx >= uint64(len(batch.chunks)) { + return fmt.Errorf("batch(%d): chunk(%d) out of range", id, idx) + } + + if batch.chunks[idx] != nil { + return fmt.Errorf("batch(%d): chunk(%d) already added", id, idx) + } + + if batch.expectedChunkSize < uint64(len(data)) { + return fmt.Errorf("batch(%d): chunk(%d) greater than expected size %d, was %d", id, idx, batch.expectedChunkSize, len(data)) + } + + batch.chunks[idx] = data + batch.seenChunks.Add(1) + return nil +} + +func (b *batchBuilder) close(id uint64) ([]byte, uint64, time.Time, error) { + b.mutex.Lock() + batch, ok := b.batches[id] + delete(b.batches, id) + b.mutex.Unlock() + if !ok { + return nil, 0, time.Time{}, fmt.Errorf("unknown batch(%d)", id) + } + + if batch.expectedChunks != uint64(batch.seenChunks.Load()) { + return nil, 0, time.Time{}, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks.Load(), batch.expectedChunks) + } + + var flattened []byte + for _, chunk := range batch.chunks { + flattened = append(flattened, chunk...) + } + + if batch.expectedSize != uint64(len(flattened)) { + return nil, 0, time.Time{}, fmt.Errorf("batch(%d) was not expected size %d, was %d", id, batch.expectedSize, len(flattened)) + } + + return flattened, batch.timeout, batch.startTime, nil +} + +func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { + rpcStoreRequestGauge.Inc(1) + failed := true + defer func() { + if failed { + rpcStoreFailureGauge.Inc(1) + } // success gague will be incremented on successful commit + }() + + if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout)); err != nil { + return nil, err + } + + // Prevent replay of old messages + if time.Since(time.Unix(int64(timestamp), 0)).Abs() > time.Minute { + return nil, errors.New("too much time has elapsed since request was signed") + } + + id, err := s.batches.assign(uint64(nChunks), uint64(timeout), uint64(chunkSize), uint64(totalSize)) + if err != nil { + return nil, err + } + + failed = false + return &StartChunkedStoreResult{ + BatchId: hexutil.Uint64(id), + }, nil + +} + +func (s *DASRPCServer) SendChunk(ctx context.Context, batchId, chunkId hexutil.Uint64, message hexutil.Bytes, sig hexutil.Bytes) error { + success := false + defer func() { + if success { + rpcSendChunkSuccessGauge.Inc(1) + } else { + rpcSendChunkFailureGauge.Inc(1) + } + }() + + if err := s.signatureVerifier.verify(ctx, message, sig, uint64(batchId), uint64(chunkId)); err != nil { + return err + } + + if err := s.batches.add(uint64(batchId), uint64(chunkId), message); err != nil { + return err + } + + success = true + return nil +} + +func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(batchId)); err != nil { + return nil, err + } + + message, timeout, startTime, err := s.batches.close(uint64(batchId)) + if err != nil { + return nil, err + } + + cert, err := s.daWriter.Store(ctx, message, timeout, nil) + success := false + defer func() { + if success { + rpcStoreSuccessGauge.Inc(1) + } else { + rpcStoreFailureGauge.Inc(1) + } + rpcStoreDurationHistogram.Update(time.Since(startTime).Nanoseconds()) + }() if err != nil { return nil, err } diff --git a/das/das_test.go b/das/das_test.go index 4377dc4dce..950b63d9d9 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -47,9 +47,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServicesFirst []*IterableStorageService - var syncToStorageServicesFirst []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config, &syncFromStorageServicesFirst, &syncToStorageServicesFirst) + storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(firstCtx, config, storageService) @@ -77,9 +75,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { secondCtx, secondCancel := context.WithCancel(context.Background()) defer secondCancel() - var syncFromStorageServicesSecond []*IterableStorageService - var syncToStorageServicesSecond []StorageService - storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config, &syncFromStorageServicesSecond, &syncToStorageServicesSecond) + storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) var daReader2 DataAvailabilityServiceReader = storageService2 @@ -140,9 +136,7 @@ func testDASMissingMessage(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(ctx, config, storageService) diff --git a/das/db_storage_service.go b/das/db_storage_service.go index 5596ff378e..0fbe1c2723 100644 --- a/das/db_storage_service.go +++ b/das/db_storage_service.go @@ -20,11 +20,9 @@ import ( ) type LocalDBStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` // BadgerDB options NumMemtables int `koanf:"num-memtables"` @@ -38,11 +36,9 @@ type LocalDBStorageConfig struct { var badgerDefaultOptions = badger.DefaultOptions("") var DefaultLocalDBStorageConfig = LocalDBStorageConfig{ - Enable: false, - DataDir: "", - DiscardAfterTimeout: false, - SyncFromStorageService: false, - SyncToStorageService: false, + Enable: false, + DataDir: "", + DiscardAfterTimeout: false, NumMemtables: badgerDefaultOptions.NumMemtables, NumLevelZeroTables: badgerDefaultOptions.NumLevelZeroTables, @@ -56,8 +52,6 @@ func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem") f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database") f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalDBStorageConfig.SyncFromStorageService, "enable db storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalDBStorageConfig.SyncToStorageService, "enable db storage to be used as a sink for regular sync storage") f.Int(prefix+".num-memtables", DefaultLocalDBStorageConfig.NumMemtables, "BadgerDB option: sets the maximum number of tables to keep in memory before stalling") f.Int(prefix+".num-level-zero-tables", DefaultLocalDBStorageConfig.NumLevelZeroTables, "BadgerDB option: sets the maximum number of Level 0 tables before compaction starts") @@ -158,13 +152,6 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6 }) } -func (dbs *DBStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return dbs.db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry(key.Bytes(), value) - return txn.SetEntry(e) - }) -} - func (dbs *DBStorageService) Sync(ctx context.Context) error { return dbs.db.Sync() } diff --git a/das/extra_signature_checker_test.go b/das/extra_signature_checker_test.go index 2fcfac167d..9fdf86cdf5 100644 --- a/das/extra_signature_checker_test.go +++ b/das/extra_signature_checker_test.go @@ -65,10 +65,12 @@ func TestExtraSignatureCheck(t *testing.T) { signer := signature.DataSignerFromPrivateKey(privateKey) var da DataAvailabilityServiceWriter = &StubSignatureCheckDAS{keyDir} - da, err = NewStoreSigningDAS(da, signer) - Require(t, err) - _, err = da.Store(context.Background(), []byte("Hello world"), 1234, []byte{}) + msg := []byte("Hello world") + timeout := uint64(1234) + sig, err := applyDasSigner(signer, msg, timeout) + Require(t, err) + _, err = da.Store(context.Background(), msg, timeout, sig) Require(t, err) } diff --git a/das/factory.go b/das/factory.go index a459d1a464..d9eacd0ada 100644 --- a/das/factory.go +++ b/das/factory.go @@ -22,8 +22,6 @@ import ( func CreatePersistentStorageService( ctx context.Context, config *DataAvailabilityConfig, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, ) (StorageService, *LifecycleManager, error) { storageServices := make([]StorageService, 0, 10) var lifecycleManager LifecycleManager @@ -32,14 +30,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalDBStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalDBStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -49,14 +39,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalFileStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalFileStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -67,23 +49,6 @@ func CreatePersistentStorageService( return nil, nil, err } lifecycleManager.Register(s) - if config.S3Storage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.S3Storage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } - storageServices = append(storageServices, s) - } - - if config.IpfsStorage.Enable { - s, err := NewIpfsStorageService(ctx, config.IpfsStorage) - if err != nil { - return nil, nil, err - } - lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -105,8 +70,6 @@ func WrapStorageWithCache( ctx context.Context, config *DataAvailabilityConfig, storageService StorageService, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, lifecycleManager *LifecycleManager) (StorageService, error) { if storageService == nil { return nil, nil @@ -120,14 +83,6 @@ func WrapStorageWithCache( if err != nil { return nil, err } - if config.RedisCache.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(storageService)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - storageService = iterableStorageService - } - if config.RedisCache.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, storageService) - } } if config.LocalCache.Enable { storageService = NewCacheStorageService(config.LocalCache, storageService) @@ -151,24 +106,13 @@ func CreateBatchPosterDAS( if !config.RPCAggregator.Enable || !config.RestAggregator.Enable { return nil, nil, nil, errors.New("--node.data-availability.rpc-aggregator.enable and rest-aggregator.enable must be set when running a Batch Poster in AnyTrust mode") } - - if config.IpfsStorage.Enable { - return nil, nil, nil, errors.New("--node.data-availability.ipfs-storage.enable may not be set when running a Nitro AnyTrust node in Batch Poster mode") - } // Done checking config requirements var daWriter DataAvailabilityServiceWriter - daWriter, err := NewRPCAggregator(ctx, *config) + daWriter, err := NewRPCAggregator(ctx, *config, dataSigner) if err != nil { return nil, nil, nil, err } - if dataSigner != nil { - // In some tests the batch poster does not sign Store requests - daWriter, err = NewStoreSigningDAS(daWriter, dataSigner) - if err != nil { - return nil, nil, nil, err - } - } restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { @@ -191,30 +135,27 @@ func CreateDAComponentsForDaserver( config *DataAvailabilityConfig, l1Reader *headerreader.HeaderReader, seqInboxAddress *common.Address, -) (DataAvailabilityServiceReader, DataAvailabilityServiceWriter, DataAvailabilityServiceHealthChecker, *LifecycleManager, error) { +) (DataAvailabilityServiceReader, DataAvailabilityServiceWriter, *SignatureVerifier, DataAvailabilityServiceHealthChecker, *LifecycleManager, error) { if !config.Enable { - return nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil, nil } // Check config requirements if !config.LocalDBStorage.Enable && !config.LocalFileStorage.Enable && - !config.S3Storage.Enable && - !config.IpfsStorage.Enable { - return nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage|ipfs-storage) must be enabled.") + !config.S3Storage.Enable { + return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage) must be enabled.") } // Done checking config requirements - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, &syncFromStorageServices, &syncToStorageServices) + storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } - storageService, err = WrapStorageWithCache(ctx, config, storageService, &syncFromStorageServices, &syncToStorageServices, dasLifecycleManager) + storageService, err = WrapStorageWithCache(ctx, config, storageService, dasLifecycleManager) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } // The REST aggregator is used as the fallback if requested data is not present @@ -222,7 +163,7 @@ func CreateDAComponentsForDaserver( if config.RestAggregator.Enable { restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } restAgg.Start(ctx) dasLifecycleManager.Register(restAgg) @@ -237,7 +178,7 @@ func CreateDAComponentsForDaserver( if syncConf.Eager { if l1Reader == nil || seqInboxAddress == nil { - return nil, nil, nil, nil, errors.New("l1-node-url and sequencer-inbox-address must be specified along with sync-to-storage.eager") + return nil, nil, nil, nil, nil, errors.New("l1-node-url and sequencer-inbox-address must be specified along with sync-to-storage.eager") } storageService, err = NewSyncingFallbackStorageService( ctx, @@ -249,7 +190,7 @@ func CreateDAComponentsForDaserver( syncConf) dasLifecycleManager.Register(storageService) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } else { storageService = NewFallbackStorageService(storageService, restAgg, restAgg, @@ -262,13 +203,14 @@ func CreateDAComponentsForDaserver( var daWriter DataAvailabilityServiceWriter var daReader DataAvailabilityServiceReader = storageService var daHealthChecker DataAvailabilityServiceHealthChecker = storageService + var signatureVerifier *SignatureVerifier if config.Key.KeyDir != "" || config.Key.PrivKey != "" { var seqInboxCaller *bridgegen.SequencerInboxCaller if seqInboxAddress != nil { seqInbox, err := bridgegen.NewSequencerInbox(*seqInboxAddress, (*l1Reader).Client()) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } seqInboxCaller = &seqInbox.SequencerInboxCaller @@ -277,35 +219,28 @@ func CreateDAComponentsForDaserver( seqInboxCaller = nil } - privKey, err := config.Key.BLSPrivKey() + daWriter, err = NewSignAfterStoreDASWriter(ctx, *config, storageService) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } - daWriter, err = NewSignAfterStoreDASWriterWithSeqInboxCaller( - privKey, + signatureVerifier, err = NewSignatureVerifierWithSeqInboxCaller( seqInboxCaller, - storageService, config.ExtraSignatureCheckingPublicKey, ) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } - if config.RegularSyncStorage.Enable && len(syncFromStorageServices) != 0 && len(syncToStorageServices) != 0 { - regularlySyncStorage := NewRegularlySyncStorage(syncFromStorageServices, syncToStorageServices, config.RegularSyncStorage) - regularlySyncStorage.Start(ctx) - } - if seqInboxAddress != nil { daReader, err = NewChainFetchReader(daReader, (*l1Reader).Client(), *seqInboxAddress) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } - return daReader, daWriter, daHealthChecker, dasLifecycleManager, nil + return daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, nil } func CreateDAReaderForNode( @@ -323,48 +258,22 @@ func CreateDAReaderForNode( return nil, nil, errors.New("node.data-availability.rpc-aggregator is only for Batch Poster mode") } - if !config.RestAggregator.Enable && !config.IpfsStorage.Enable { - return nil, nil, fmt.Errorf("--node.data-availability.enable was set but neither of --node.data-availability.(rest-aggregator|ipfs-storage) were enabled. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") - } - - if config.RestAggregator.SyncToStorage.Eager { - return nil, nil, errors.New("--node.data-availability.rest-aggregator.sync-to-storage.eager can't be used with a Nitro node, only lazy syncing can be used.") + if !config.RestAggregator.Enable { + return nil, nil, fmt.Errorf("--node.data-availability.enable was set but not --node.data-availability.rest-aggregator. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") } // Done checking config requirements - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, nil, nil) - if err != nil { - return nil, nil, err - } - + var lifecycleManager LifecycleManager var daReader DataAvailabilityServiceReader if config.RestAggregator.Enable { var restAgg *SimpleDASReaderAggregator - restAgg, err = NewRestfulClientAggregator(ctx, &config.RestAggregator) + restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { return nil, nil, err } restAgg.Start(ctx) - dasLifecycleManager.Register(restAgg) - - if storageService != nil { - syncConf := &config.RestAggregator.SyncToStorage - var retentionPeriodSeconds uint64 - if uint64(syncConf.RetentionPeriod) == math.MaxUint64 { - retentionPeriodSeconds = math.MaxUint64 - } else { - retentionPeriodSeconds = uint64(syncConf.RetentionPeriod.Seconds()) - } - - // This falls back to REST and updates the local IPFS repo if the data is found. - storageService = NewFallbackStorageService(storageService, restAgg, restAgg, - retentionPeriodSeconds, syncConf.IgnoreWriteErrors, true) - dasLifecycleManager.Register(storageService) - - daReader = storageService - } else { - daReader = restAgg - } + lifecycleManager.Register(restAgg) + daReader = restAgg } if seqInboxAddress != nil { @@ -378,5 +287,5 @@ func CreateDAReaderForNode( } } - return daReader, dasLifecycleManager, nil + return daReader, &lifecycleManager, nil } diff --git a/das/ipfs_storage_service.bkup_go b/das/ipfs_storage_service.bkup_go deleted file mode 100644 index 43b06fd4b6..0000000000 --- a/das/ipfs_storage_service.bkup_go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend. -// It takes advantage of IPFS' content addressing scheme to be able to directly retrieve -// the batches from IPFS using their root hash from the L1 sequencer inbox contract. - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "errors" - "io" - "math/rand" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ipfs/go-cid" - coreiface "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/ipfs/interface-go-ipfs-core/path" - "github.com/multiformats/go-multihash" - "github.com/offchainlabs/nitro/arbstate/daprovider" - "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/cmd/ipfshelper" - "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/util/pretty" - flag "github.com/spf13/pflag" -) - -type IpfsStorageServiceConfig struct { - Enable bool `koanf:"enable"` - RepoDir string `koanf:"repo-dir"` - ReadTimeout time.Duration `koanf:"read-timeout"` - Profiles string `koanf:"profiles"` - Peers []string `koanf:"peers"` - - // Pinning options - PinAfterGet bool `koanf:"pin-after-get"` - PinPercentage float64 `koanf:"pin-percentage"` -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, - RepoDir: "", - ReadTimeout: time.Minute, - Profiles: "", - Peers: []string{}, - - PinAfterGet: true, - PinPercentage: 100.0, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from IPFS") - f.String(prefix+".repo-dir", DefaultIpfsStorageServiceConfig.RepoDir, "directory to use to store the local IPFS repo") - f.Duration(prefix+".read-timeout", DefaultIpfsStorageServiceConfig.ReadTimeout, "timeout for IPFS reads, since by default it will wait forever. Treat timeout as not found") - f.String(prefix+".profiles", DefaultIpfsStorageServiceConfig.Profiles, "comma separated list of IPFS profiles to use, see https://docs.ipfs.tech/how-to/default-profile") - f.StringSlice(prefix+".peers", DefaultIpfsStorageServiceConfig.Peers, "list of IPFS peers to connect to, eg /ip4/1.2.3.4/tcp/12345/p2p/abc...xyz") - f.Bool(prefix+".pin-after-get", DefaultIpfsStorageServiceConfig.PinAfterGet, "pin sequencer batch data in IPFS") - f.Float64(prefix+".pin-percentage", DefaultIpfsStorageServiceConfig.PinPercentage, "percent of sequencer batch data to pin, as a floating point number in the range 0.0 to 100.0") -} - -type IpfsStorageService struct { - config IpfsStorageServiceConfig - ipfsHelper *ipfshelper.IpfsHelper - ipfsApi coreiface.CoreAPI -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - ipfsHelper, err := ipfshelper.CreateIpfsHelper(ctx, config.RepoDir, false, config.Peers, config.Profiles) - if err != nil { - return nil, err - } - addrs, err := ipfsHelper.GetPeerHostAddresses() - if err != nil { - return nil, err - } - log.Info("IPFS node started up", "hostAddresses", addrs) - - return &IpfsStorageService{ - config: config, - ipfsHelper: ipfsHelper, - ipfsApi: ipfsHelper.GetAPI(), - }, nil -} - -func hashToCid(hash common.Hash) (cid.Cid, error) { - multiEncodedHashBytes, err := multihash.Encode(hash[:], multihash.KECCAK_256) - if err != nil { - return cid.Cid{}, err - } - - _, multiHash, err := multihash.MHFromBytes(multiEncodedHashBytes) - if err != nil { - return cid.Cid{}, err - } - - return cid.NewCidV1(cid.Raw, multiHash), nil -} - -// GetByHash retrieves and reconstructs one batch's data, using IPFS to retrieve the preimages -// for each chunk of data and the dastree nodes. -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - log.Trace("das.IpfsStorageService.GetByHash", "hash", pretty.PrettyHash(hash)) - - doPin := false // If true, pin every block related to this batch - if s.config.PinAfterGet { - if s.config.PinPercentage == 100.0 { - doPin = true - } else if (rand.Float64() * 100.0) <= s.config.PinPercentage { - doPin = true - } - - } - - oracle := func(h common.Hash) ([]byte, error) { - thisCid, err := hashToCid(h) - if err != nil { - return nil, err - } - - ipfsPath := path.IpfsPath(thisCid) - log.Trace("Retrieving IPFS path", "path", ipfsPath.String()) - - parentCtx := ctx - if doPin { - // If we want to pin this batch, then detach from the parent context so - // we are not canceled before s.config.ReadTimeout. - parentCtx = context.Background() - } - - timeoutCtx, cancel := context.WithTimeout(parentCtx, s.config.ReadTimeout) - defer cancel() - rdr, err := s.ipfsApi.Block().Get(timeoutCtx, ipfsPath) - if err != nil { - if timeoutCtx.Err() != nil { - return nil, ErrNotFound - } - return nil, err - } - - data, err := io.ReadAll(rdr) - if err != nil { - return nil, err - } - - if doPin { - go func() { - pinCtx, pinCancel := context.WithTimeout(context.Background(), s.config.ReadTimeout) - defer pinCancel() - err := s.ipfsApi.Pin().Add(pinCtx, ipfsPath) - // Recursive pinning not needed, each dastree preimage fits in a single - // IPFS block. - if err != nil { - // Pinning is best-effort. - log.Warn("Failed to pin in IPFS", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } else { - log.Trace("Pin in IPFS successful", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } - }() - } - - return data, nil - } - - return dastree.Content(hash, oracle) -} - -// Put stores all the preimages required to reconstruct the dastree for single batch, -// ie the hashed data chunks and dastree nodes. -// This takes advantage of IPFS supporting keccak256 on raw data blocks for calculating -// its CIDs, and the fact that the dastree structure uses keccak256 for addressing its -// nodes, to directly store the dastree structure in IPFS. -// IPFS default block size is 256KB and dastree max block size is 64KB so each dastree -// node and data chunk easily fits within an IPFS block. -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.IpfsStorageService.Put", data, timeout, s) - - var chunks [][]byte - - record := func(_ common.Hash, value []byte, ty arbutil.PreimageType) { - chunks = append(chunks, value) - } - - _ = dastree.RecordHash(record, data) - - numChunks := len(chunks) - resultChan := make(chan error, numChunks) - for _, chunk := range chunks { - _chunk := chunk - go func() { - blockStat, err := s.ipfsApi.Block().Put( - ctx, - bytes.NewReader(_chunk), - options.Block.CidCodec("raw"), // Store the data in raw form since the hash in the CID must be the hash - // of the preimage for our lookup scheme to work. - options.Block.Hash(multihash.KECCAK_256, -1), // Use keccak256 to calculate the hash to put in the block's - // CID, since it is the same algo used by dastree. - options.Block.Pin(true)) // Keep the data in the local IPFS repo, don't GC it. - if err == nil { - log.Trace("Wrote IPFS path", "path", blockStat.Path().String()) - } - resultChan <- err - }() - } - - successfullyWrittenChunks := 0 - for err := range resultChan { - if err != nil { - return err - } - successfullyWrittenChunks++ - if successfullyWrittenChunks == numChunks { - return nil - } - } - panic("unreachable") -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, nil -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return nil -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return s.ipfsHelper.Close() -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - testData := []byte("Test-Data") - err := s.Put(ctx, testData, 0) - if err != nil { - return err - } - res, err := s.GetByHash(ctx, dastree.Hash(testData)) - if err != nil { - return err - } - if !bytes.Equal(res, testData) { - return errors.New("invalid GetByHash result") - } - return nil -} diff --git a/das/ipfs_storage_service_stub.go b/das/ipfs_storage_service_stub.go deleted file mode 100644 index 5814f2c7e4..0000000000 --- a/das/ipfs_storage_service_stub.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend stub -// a stub. we don't currently support ipfs - -//go:build !ipfs -// +build !ipfs - -package das - -import ( - "context" - "errors" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/arbstate/daprovider" - flag "github.com/spf13/pflag" -) - -var ErrIpfsNotSupported = errors.New("ipfs not supported") - -type IpfsStorageServiceConfig struct { - Enable bool -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "legacy option - not supported") -} - -type IpfsStorageService struct { -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService-not supported" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - return ErrIpfsNotSupported -} diff --git a/das/ipfs_storage_service_test.go b/das/ipfs_storage_service_test.go deleted file mode 100644 index 6e1a86b234..0000000000 --- a/das/ipfs_storage_service_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "math" - "math/rand" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/das/dastree" -) - -func runAddAndGetTest(t *testing.T, ctx context.Context, svc *IpfsStorageService, size int) { - - data := make([]byte, size) - _, err := rand.Read(data) - Require(t, err) - - err = svc.Put(ctx, data, 0) - Require(t, err) - - hash := dastree.Hash(data).Bytes() - returnedData, err := svc.GetByHash(ctx, common.BytesToHash(hash)) - Require(t, err) - if !bytes.Equal(data, returnedData) { - Fail(t, "Returned data didn't match!") - } - -} - -func TestIpfsStorageServiceAddAndGet(t *testing.T) { - enableLogging() - ctx := context.Background() - svc, err := NewIpfsStorageService(ctx, - IpfsStorageServiceConfig{ - Enable: true, - RepoDir: t.TempDir(), - ReadTimeout: time.Minute, - Profiles: "test", - }) - defer svc.Close(ctx) - Require(t, err) - - pow2Size := 1 << 16 // 64kB - for i := 1; i < 8; i++ { - runAddAndGetTest(t, ctx, svc, int(math.Pow10(i))) - runAddAndGetTest(t, ctx, svc, pow2Size) - runAddAndGetTest(t, ctx, svc, pow2Size-1) - runAddAndGetTest(t, ctx, svc, pow2Size+1) - pow2Size = pow2Size << 1 - } -} diff --git a/das/iterable_storage_service.go b/das/iterable_storage_service.go deleted file mode 100644 index a0829f00e4..0000000000 --- a/das/iterable_storage_service.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -const iteratorStorageKeyPrefix = "iterator_key_prefix_" -const iteratorBegin = "iterator_begin" -const iteratorEnd = "iterator_end" -const expirationTimeKeyPrefix = "expiration_time_key_prefix_" - -// IterationCompatibleStorageService is a StorageService which is -// compatible to be used as a backend for IterableStorageService. -type IterationCompatibleStorageService interface { - putKeyValue(ctx context.Context, key common.Hash, value []byte) error - StorageService -} - -// IterationCompatibleStorageServiceAdaptor is an adaptor used to covert iteration incompatible StorageService -// to IterationCompatibleStorageService (basically adds an empty putKeyValue to the StorageService) -type IterationCompatibleStorageServiceAdaptor struct { - StorageService -} - -func (i *IterationCompatibleStorageServiceAdaptor) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return nil -} - -func ConvertStorageServiceToIterationCompatibleStorageService(storageService StorageService) IterationCompatibleStorageService { - service, ok := storageService.(IterationCompatibleStorageService) - if ok { - return service - } - return &IterationCompatibleStorageServiceAdaptor{storageService} -} - -// An IterableStorageService is used as a wrapper on top of a storage service, -// to add the capability of iterating over the stored date in a sequential manner. -type IterableStorageService struct { - // Local copy of iterator end. End can also be accessed by getByHash for iteratorEnd. - end atomic.Value // atomic access to common.Hash - IterationCompatibleStorageService - - mutex sync.Mutex -} - -func NewIterableStorageService(storageService IterationCompatibleStorageService) *IterableStorageService { - i := &IterableStorageService{IterationCompatibleStorageService: storageService} - i.end.Store(common.Hash{}) - return i -} - -func (i *IterableStorageService) Put(ctx context.Context, data []byte, expiration uint64) error { - dataHash := dastree.Hash(data) - - // Do not insert data if data is already present. - // (This is being done to avoid redundant hash being added to the - // linked list ,since it can lead to loops in the linked list.) - if _, err := i.IterationCompatibleStorageService.GetByHash(ctx, dataHash); err == nil { - return nil - } - - if err := i.IterationCompatibleStorageService.Put(ctx, data, expiration); err != nil { - return err - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(dastree.Hash(data)))), []byte(strconv.FormatUint(expiration, 10))); err != nil { - return err - } - - i.mutex.Lock() - defer i.mutex.Unlock() - - endHash := i.End(ctx) - if (endHash == common.Hash{}) { - // First element being inserted in the chain. - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorBegin)), dataHash.Bytes()); err != nil { - return err - } - } else { - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorStorageKeyPrefix+EncodeStorageServiceKey(endHash))), dataHash.Bytes()); err != nil { - return err - } - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorEnd)), dataHash.Bytes()); err != nil { - return err - } - i.end.Store(dataHash) - - return nil -} - -func (i *IterableStorageService) GetExpirationTime(ctx context.Context, hash common.Hash) (uint64, error) { - value, err := i.IterationCompatibleStorageService.GetByHash(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(hash)))) - if err != nil { - return 0, err - } - - expirationTime, err := strconv.ParseUint(string(value), 10, 64) - if err != nil { - return 0, err - } - return expirationTime, nil -} - -func (i *IterableStorageService) DefaultBegin() common.Hash { - return dastree.Hash([]byte(iteratorBegin)) -} - -func (i *IterableStorageService) End(ctx context.Context) common.Hash { - endHash, ok := i.end.Load().(common.Hash) - if !ok { - return common.Hash{} - } - if (endHash != common.Hash{}) { - return endHash - } - value, err := i.GetByHash(ctx, dastree.Hash([]byte(iteratorEnd))) - if err != nil { - return common.Hash{} - } - endHash = common.BytesToHash(value) - i.end.Store(endHash) - return endHash -} - -func (i *IterableStorageService) Next(ctx context.Context, hash common.Hash) common.Hash { - if hash != i.DefaultBegin() { - hash = dastree.Hash([]byte(iteratorStorageKeyPrefix + EncodeStorageServiceKey(hash))) - } - value, err := i.GetByHash(ctx, hash) - if err != nil { - return common.Hash{} - } - return common.BytesToHash(value) -} diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 4ebb1d56d9..8be03bcb30 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -22,10 +22,8 @@ import ( ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ @@ -35,8 +33,6 @@ var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalFileStorageConfig.SyncFromStorageService, "enable local storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalFileStorageConfig.SyncToStorageService, "enable local storage to be used as a sink for regular sync storage") } type LocalFileStorageService struct { @@ -96,32 +92,6 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout } -func (s *LocalFileStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - fileName := EncodeStorageServiceKey(key) - finalPath := s.dataDir + "/" + fileName - - // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) - if err != nil { - return err - } - err = f.Chmod(0o600) - if err != nil { - return err - } - _, err = f.Write(value) - if err != nil { - return err - } - err = f.Close() - if err != nil { - return err - } - - return os.Rename(f.Name(), finalPath) - -} - func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } diff --git a/das/memory_backed_storage_service.go b/das/memory_backed_storage_service.go index 91f7d9a2f5..c013b501b9 100644 --- a/das/memory_backed_storage_service.go +++ b/das/memory_backed_storage_service.go @@ -53,16 +53,6 @@ func (m *MemoryBackedStorageService) Put(ctx context.Context, data []byte, expir return nil } -func (m *MemoryBackedStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - if m.closed { - return ErrClosed - } - m.contents[key] = append([]byte{}, value...) - return nil -} - func (m *MemoryBackedStorageService) Sync(ctx context.Context) error { m.rwmutex.RLock() defer m.rwmutex.RUnlock() diff --git a/das/redis_storage_service.go b/das/redis_storage_service.go index dbd85921ed..210d5cb2d4 100644 --- a/das/redis_storage_service.go +++ b/das/redis_storage_service.go @@ -24,12 +24,10 @@ import ( ) type RedisConfig struct { - Enable bool `koanf:"enable"` - Url string `koanf:"url"` - Expiration time.Duration `koanf:"expiration"` - KeyConfig string `koanf:"key-config"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + Url string `koanf:"url"` + Expiration time.Duration `koanf:"expiration"` + KeyConfig string `koanf:"key-config"` } var DefaultRedisConfig = RedisConfig{ @@ -43,8 +41,6 @@ func RedisConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".url", DefaultRedisConfig.Url, "Redis url") f.Duration(prefix+".expiration", DefaultRedisConfig.Expiration, "Redis expiration") f.String(prefix+".key-config", DefaultRedisConfig.KeyConfig, "Redis key config") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable Redis to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable Redis to be used as a sink for regular sync storage") } type RedisStorageService struct { @@ -139,17 +135,6 @@ func (rs *RedisStorageService) Put(ctx context.Context, value []byte, timeout ui return err } -func (rs *RedisStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - // Expiration is set to zero here, since we want to keep the index inserted for iterable storage forever. - err := rs.client.Set( - ctx, string(key.Bytes()), rs.signMessage(value), 0, - ).Err() - if err != nil { - log.Error("das.RedisStorageService.putKeyValue", "err", err) - } - return err -} - func (rs *RedisStorageService) Sync(ctx context.Context) error { return rs.baseStorageService.Sync(ctx) } diff --git a/das/regular_sync_storage_test.go b/das/regular_sync_storage_test.go deleted file mode 100644 index 5fed7a90b3..0000000000 --- a/das/regular_sync_storage_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2021-2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "bytes" - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -func TestRegularSyncStorage(t *testing.T) { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - syncFromStorageService := []*IterableStorageService{ - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - } - syncToStorageService := []StorageService{ - NewMemoryBackedStorageService(ctx), - NewMemoryBackedStorageService(ctx), - } - - regularSyncStorage := NewRegularlySyncStorage( - syncFromStorageService, - syncToStorageService, RegularSyncStorageConfig{ - Enable: true, - SyncInterval: 100 * time.Millisecond, - }) - - val := [][]byte{ - []byte("The first value"), - []byte("The second value"), - []byte("The third value"), - []byte("The forth value"), - } - valKey := []common.Hash{ - dastree.Hash(val[0]), - dastree.Hash(val[1]), - dastree.Hash(val[2]), - dastree.Hash(val[3]), - } - - reqCtx := context.Background() - timeout := uint64(time.Now().Add(time.Hour).Unix()) - for i := 0; i < 2; i++ { - for j := 0; j < 2; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - regularSyncStorage.Start(ctx) - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 2; j < 4; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 0; j < 4; j++ { - v, err := syncToStorageService[i].GetByHash(reqCtx, valKey[j]) - Require(t, err) - if !bytes.Equal(v, val[j]) { - t.Fatal(v, val[j]) - } - } - } -} diff --git a/das/regularly_sync_storage.go b/das/regularly_sync_storage.go deleted file mode 100644 index c6b8ed5ea1..0000000000 --- a/das/regularly_sync_storage.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/offchainlabs/nitro/util/stopwaiter" - - flag "github.com/spf13/pflag" -) - -type RegularSyncStorageConfig struct { - Enable bool `koanf:"enable"` - SyncInterval time.Duration `koanf:"sync-interval"` -} - -var DefaultRegularSyncStorageConfig = RegularSyncStorageConfig{ - Enable: false, - SyncInterval: 5 * time.Minute, -} - -func RegularSyncStorageConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultRegularSyncStorageConfig.Enable, "enable regular storage syncing") - f.Duration(prefix+".sync-interval", DefaultRegularSyncStorageConfig.SyncInterval, "interval for running regular storage sync") -} - -// A RegularlySyncStorage is used to sync data from syncFromStorageServices to -// all the syncToStorageServices at regular intervals. -// (Only newly added data since the last sync is copied over.) -type RegularlySyncStorage struct { - stopwaiter.StopWaiter - syncFromStorageServices []*IterableStorageService - syncToStorageServices []StorageService - lastSyncedHashOfEachSyncFromStorageService map[*IterableStorageService]common.Hash - syncInterval time.Duration -} - -func NewRegularlySyncStorage(syncFromStorageServices []*IterableStorageService, syncToStorageServices []StorageService, conf RegularSyncStorageConfig) *RegularlySyncStorage { - lastSyncedHashOfEachSyncFromStorageService := make(map[*IterableStorageService]common.Hash) - for _, syncFrom := range syncFromStorageServices { - lastSyncedHashOfEachSyncFromStorageService[syncFrom] = syncFrom.DefaultBegin() - } - return &RegularlySyncStorage{ - syncFromStorageServices: syncFromStorageServices, - syncToStorageServices: syncToStorageServices, - lastSyncedHashOfEachSyncFromStorageService: lastSyncedHashOfEachSyncFromStorageService, - syncInterval: conf.SyncInterval, - } -} - -func (r *RegularlySyncStorage) Start(ctx context.Context) { - // Start thread for regular sync - r.StopWaiter.Start(ctx, r) - r.CallIteratively(r.syncAllStorages) -} - -func (r *RegularlySyncStorage) syncAllStorages(ctx context.Context) time.Duration { - for syncFrom, lastSyncedHash := range r.lastSyncedHashOfEachSyncFromStorageService { - end := syncFrom.End(ctx) - if (end == common.Hash{}) { - continue - } - - syncHash := lastSyncedHash - for syncHash != end { - syncHash = syncFrom.Next(ctx, syncHash) - data, err := syncFrom.GetByHash(ctx, syncHash) - if err != nil { - continue - } - expirationTime, err := syncFrom.GetExpirationTime(ctx, syncHash) - if err != nil { - continue - } - for _, syncTo := range r.syncToStorageServices { - _, err = syncTo.GetByHash(ctx, syncHash) - if err == nil { - continue - } - - if err = syncTo.Put(ctx, data, expirationTime); err != nil { - log.Error("Error while running regular storage sync", "err", err) - } - } - } - r.lastSyncedHashOfEachSyncFromStorageService[syncFrom] = end - } - return r.syncInterval -} diff --git a/das/rpc_aggregator.go b/das/rpc_aggregator.go index 490116a89a..7e363c6179 100644 --- a/das/rpc_aggregator.go +++ b/das/rpc_aggregator.go @@ -16,6 +16,7 @@ import ( "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/util/metricsutil" + "github.com/offchainlabs/nitro/util/signature" "github.com/ethereum/go-ethereum/common" "github.com/offchainlabs/nitro/arbutil" @@ -27,31 +28,31 @@ type BackendConfig struct { SignerMask uint64 `json:"signermask"` } -func NewRPCAggregator(ctx context.Context, config DataAvailabilityConfig) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregator(ctx context.Context, config DataAvailabilityConfig, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregator(ctx, config, services) } -func NewRPCAggregatorWithL1Info(config DataAvailabilityConfig, l1client arbutil.L1Interface, seqInboxAddress common.Address) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregatorWithL1Info(config DataAvailabilityConfig, l1client arbutil.L1Interface, seqInboxAddress common.Address, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregatorWithL1Info(config, services, l1client, seqInboxAddress) } -func NewRPCAggregatorWithSeqInboxCaller(config DataAvailabilityConfig, seqInboxCaller *bridgegen.SequencerInboxCaller) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregatorWithSeqInboxCaller(config DataAvailabilityConfig, seqInboxCaller *bridgegen.SequencerInboxCaller, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregatorWithSeqInboxCaller(config, services, seqInboxCaller) } -func ParseServices(config AggregatorConfig) ([]ServiceDetails, error) { +func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]ServiceDetails, error) { var cs []BackendConfig err := json.Unmarshal([]byte(config.Backends), &cs) if err != nil { @@ -67,7 +68,7 @@ func ParseServices(config AggregatorConfig) ([]ServiceDetails, error) { } metricName := metricsutil.CanonicalizeMetricName(url.Hostname()) - service, err := NewDASRPCClient(b.URL) + service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize) if err != nil { return nil, err } diff --git a/das/rpc_test.go b/das/rpc_test.go index 658592cc0b..5f97ef8828 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -7,13 +7,17 @@ import ( "bytes" "context" "encoding/base64" + "encoding/hex" "encoding/json" "net" + "sync" "testing" "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/cmd/genericconf" + "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/util/testhelpers" ) @@ -24,7 +28,9 @@ func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { return string(encodedPubkey) } -func TestRPC(t *testing.T) { +func testRpcImpl(t *testing.T, size, times int, concurrent bool) { + // enableLogging() + ctx := context.Background() lis, err := net.Listen("tcp", "localhost:0") testhelpers.RequireImpl(t, err) @@ -46,16 +52,21 @@ func TestRPC(t *testing.T) { RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) testhelpers.RequireImpl(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) - privKey, err := config.Key.BLSPrivKey() + localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) testhelpers.RequireImpl(t, err) - localDas, err := NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, "") + + testPrivateKey, err := crypto.GenerateKey() testhelpers.RequireImpl(t, err) - dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, localDas, storageService) + + signatureVerifier, err := NewSignatureVerifierWithSeqInboxCaller(nil, "0x"+hex.EncodeToString(crypto.FromECDSAPub(&testPrivateKey.PublicKey))) + testhelpers.RequireImpl(t, err) + signer := signature.DataSignerFromPrivateKey(testPrivateKey) + + dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, localDas, storageService, signatureVerifier) + defer func() { if err := dasServer.Shutdown(ctx); err != nil { panic(err) @@ -72,29 +83,67 @@ func TestRPC(t *testing.T) { testhelpers.RequireImpl(t, err) aggConf := DataAvailabilityConfig{ RPCAggregator: AggregatorConfig{ - AssumedHonest: 1, - Backends: string(backendsJsonByte), + AssumedHonest: 1, + Backends: string(backendsJsonByte), + MaxStoreChunkBodySize: (chunkSize * 2) + len(sendChunkJSONBoilerplate), }, - RequestTimeout: 5 * time.Second, + RequestTimeout: time.Minute, } - rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil) + rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil, signer) testhelpers.RequireImpl(t, err) - msg := testhelpers.RandomizeSlice(make([]byte, 100)) - cert, err := rpcAgg.Store(ctx, msg, 0, nil) - testhelpers.RequireImpl(t, err) + var wg sync.WaitGroup + runStore := func() { + defer wg.Done() + msg := testhelpers.RandomizeSlice(make([]byte, size)) + cert, err := rpcAgg.Store(ctx, msg, 0, nil) + testhelpers.RequireImpl(t, err) - retrievedMessage, err := storageService.GetByHash(ctx, cert.DataHash) - testhelpers.RequireImpl(t, err) + retrievedMessage, err := storageService.GetByHash(ctx, cert.DataHash) + testhelpers.RequireImpl(t, err) + + if !bytes.Equal(msg, retrievedMessage) { + testhelpers.FailImpl(t, "failed to retrieve correct message") + } + + retrievedMessage, err = storageService.GetByHash(ctx, cert.DataHash) + testhelpers.RequireImpl(t, err) - if !bytes.Equal(msg, retrievedMessage) { - testhelpers.FailImpl(t, "failed to retrieve correct message") + if !bytes.Equal(msg, retrievedMessage) { + testhelpers.FailImpl(t, "failed to getByHash correct message") + } } - retrievedMessage, err = storageService.GetByHash(ctx, cert.DataHash) - testhelpers.RequireImpl(t, err) + for i := 0; i < times; i++ { + wg.Add(1) + if concurrent { + go runStore() + } else { + runStore() + } + } + + wg.Wait() +} + +const chunkSize = 512 * 1024 - if !bytes.Equal(msg, retrievedMessage) { - testhelpers.FailImpl(t, "failed to getByHash correct message") +func TestRPCStore(t *testing.T) { + for _, tc := range []struct { + desc string + totalSize, times int + concurrent bool + leagcyAPIOnly bool + }{ + {desc: "small store", totalSize: 100, times: 1, concurrent: false}, + {desc: "chunked store - last chunk full", totalSize: chunkSize * 20, times: 10, concurrent: true}, + {desc: "chunked store - last chunk not full", totalSize: chunkSize*31 + 123, times: 10, concurrent: true}, + {desc: "chunked store - overflow cache - sequential", totalSize: chunkSize * 3, times: 15, concurrent: false}, + {desc: "new client falls back to old api for old server", totalSize: (5*1024*1024)/2 - len(sendChunkJSONBoilerplate) - 100 /* geth counts headers too */, times: 5, concurrent: true, leagcyAPIOnly: true}, + } { + t.Run(tc.desc, func(t *testing.T) { + legacyDASStoreAPIOnly = tc.leagcyAPIOnly + testRpcImpl(t, tc.totalSize, tc.times, tc.concurrent) + }) } } diff --git a/das/s3_storage_service.go b/das/s3_storage_service.go index b5150fb8ed..a1de200c52 100644 --- a/das/s3_storage_service.go +++ b/das/s3_storage_service.go @@ -34,15 +34,13 @@ type S3Downloader interface { } type S3StorageServiceConfig struct { - Enable bool `koanf:"enable"` - AccessKey string `koanf:"access-key"` - Bucket string `koanf:"bucket"` - ObjectPrefix string `koanf:"object-prefix"` - Region string `koanf:"region"` - SecretKey string `koanf:"secret-key"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + AccessKey string `koanf:"access-key"` + Bucket string `koanf:"bucket"` + ObjectPrefix string `koanf:"object-prefix"` + Region string `koanf:"region"` + SecretKey string `koanf:"secret-key"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` } var DefaultS3StorageServiceConfig = S3StorageServiceConfig{} @@ -55,8 +53,6 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".region", DefaultS3StorageServiceConfig.Region, "S3 region") f.String(prefix+".secret-key", DefaultS3StorageServiceConfig.SecretKey, "S3 secret key") f.Bool(prefix+".discard-after-timeout", DefaultS3StorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable s3 to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable s3 to be used as a sink for regular sync storage") } type S3StorageService struct { @@ -125,18 +121,6 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint return err } -func (s3s *S3StorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - putObjectInput := s3.PutObjectInput{ - Bucket: aws.String(s3s.bucket), - Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)), - Body: bytes.NewReader(value)} - _, err := s3s.uploader.Upload(ctx, &putObjectInput) - if err != nil { - log.Error("das.S3StorageService.Store", "err", err) - } - return err -} - func (s3s *S3StorageService) Sync(ctx context.Context) error { return nil } diff --git a/das/sign_after_store_das_writer.go b/das/sign_after_store_das_writer.go index 36c51c022e..ab6ac91cef 100644 --- a/das/sign_after_store_das_writer.go +++ b/das/sign_after_store_das_writer.go @@ -6,7 +6,6 @@ package das import ( "bytes" "context" - "encoding/hex" "errors" "fmt" "os" @@ -15,14 +14,11 @@ import ( flag "github.com/spf13/pflag" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/solgen/go/bridgegen" - "github.com/offchainlabs/nitro/util/contracts" "github.com/offchainlabs/nitro/util/pretty" ) @@ -67,22 +63,12 @@ func KeyConfigAddOptions(prefix string, f *flag.FlagSet) { // // 1) SignAfterStoreDASWriter.Store(...) assembles the returned hash into a // DataAvailabilityCertificate and signs it with its BLS private key. -// -// 2) If Sequencer Inbox contract details are provided when a SignAfterStoreDASWriter is -// constructed, calls to Store(...) will try to verify the passed-in data's signature -// is from the batch poster. If the contract details are not provided, then the -// signature is not checked, which is useful for testing. type SignAfterStoreDASWriter struct { privKey blsSignatures.PrivateKey pubKey *blsSignatures.PublicKey keysetHash [32]byte keysetBytes []byte storageService StorageService - addrVerifier *contracts.AddressVerifier - - // Extra batch poster verifier, for local installations to have their - // own way of testing Stores. - extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool } func NewSignAfterStoreDASWriter(ctx context.Context, config DataAvailabilityConfig, storageService StorageService) (*SignAfterStoreDASWriter, error) { @@ -90,34 +76,7 @@ func NewSignAfterStoreDASWriter(ctx context.Context, config DataAvailabilityConf if err != nil { return nil, err } - if config.ParentChainNodeURL == "none" { - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, config.ExtraSignatureCheckingPublicKey) - } - l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) - if err != nil { - return nil, err - } - seqInboxAddress, err := OptionalAddressFromString(config.SequencerInboxAddress) - if err != nil { - return nil, err - } - if seqInboxAddress == nil { - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, config.ExtraSignatureCheckingPublicKey) - } - seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) - if err != nil { - return nil, err - } - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, seqInboxCaller, storageService, config.ExtraSignatureCheckingPublicKey) -} - -func NewSignAfterStoreDASWriterWithSeqInboxCaller( - privKey blsSignatures.PrivateKey, - seqInboxCaller *bridgegen.SequencerInboxCaller, - storageService StorageService, - extraSignatureCheckingPublicKey string, -) (*SignAfterStoreDASWriter, error) { publicKey, err := blsSignatures.PublicKeyFromPrivateKey(privKey) if err != nil { return nil, err @@ -136,45 +95,12 @@ func NewSignAfterStoreDASWriterWithSeqInboxCaller( return nil, err } - var addrVerifier *contracts.AddressVerifier - if seqInboxCaller != nil { - addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) - } - - var extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool - if extraSignatureCheckingPublicKey != "" { - var pubkey []byte - if extraSignatureCheckingPublicKey[:2] == "0x" { - pubkey, err = hex.DecodeString(extraSignatureCheckingPublicKey[2:]) - if err != nil { - return nil, err - } - } else { - pubkeyEncoded, err := os.ReadFile(extraSignatureCheckingPublicKey) - if err != nil { - return nil, err - } - pubkey, err = hex.DecodeString(string(pubkeyEncoded)) - if err != nil { - return nil, err - } - } - extraBpVerifier = func(message []byte, timeout uint64, sig []byte) bool { - if len(sig) >= 64 { - return crypto.VerifySignature(pubkey, dasStoreHash(message, timeout), sig[:64]) - } - return false - } - } - return &SignAfterStoreDASWriter{ - privKey: privKey, - pubKey: &publicKey, - keysetHash: ksHash, - keysetBytes: ksBuf.Bytes(), - storageService: storageService, - addrVerifier: addrVerifier, - extraBpVerifier: extraBpVerifier, + privKey: privKey, + pubKey: &publicKey, + keysetHash: ksHash, + keysetBytes: ksBuf.Bytes(), + storageService: storageService, }, nil } @@ -182,25 +108,6 @@ func (d *SignAfterStoreDASWriter) Store( ctx context.Context, message []byte, timeout uint64, sig []byte, ) (c *daprovider.DataAvailabilityCertificate, err error) { log.Trace("das.SignAfterStoreDASWriter.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", d) - var verified bool - if d.extraBpVerifier != nil { - verified = d.extraBpVerifier(message, timeout, sig) - } - - if !verified && d.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, timeout, sig) - if err != nil { - return nil, err - } - isBatchPosterOrSequencer, err := d.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) - if err != nil { - return nil, err - } - if !isBatchPosterOrSequencer { - return nil, errors.New("store request not properly signed") - } - } - c = &daprovider.DataAvailabilityCertificate{ Timeout: timeout, DataHash: dastree.Hash(message), diff --git a/das/signature_verifier.go b/das/signature_verifier.go new file mode 100644 index 0000000000..0aa42bceb6 --- /dev/null +++ b/das/signature_verifier.go @@ -0,0 +1,126 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package das + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/offchainlabs/nitro/solgen/go/bridgegen" + "github.com/offchainlabs/nitro/util/contracts" +) + +// SignatureVerifier.Store will try to verify that the passed-in data's signature +// is from the batch poster, or from an injectable verification method. +type SignatureVerifier struct { + addrVerifier *contracts.AddressVerifier + + // Extra batch poster verifier, for local installations to have their + // own way of testing Stores. + extraBpVerifier func(message []byte, sig []byte, extraFields ...uint64) bool +} + +func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig) (*SignatureVerifier, error) { + if config.ParentChainNodeURL == "none" { + return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) + } + l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) + if err != nil { + return nil, err + } + seqInboxAddress, err := OptionalAddressFromString(config.SequencerInboxAddress) + if err != nil { + return nil, err + } + if seqInboxAddress == nil { + return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) + } + + seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) + if err != nil { + return nil, err + } + return NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, config.ExtraSignatureCheckingPublicKey) + +} + +func NewSignatureVerifierWithSeqInboxCaller( + seqInboxCaller *bridgegen.SequencerInboxCaller, + extraSignatureCheckingPublicKey string, +) (*SignatureVerifier, error) { + var addrVerifier *contracts.AddressVerifier + if seqInboxCaller != nil { + addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) + } + + var extraBpVerifier func(message []byte, sig []byte, extraFeilds ...uint64) bool + if extraSignatureCheckingPublicKey != "" { + var pubkey []byte + var err error + if extraSignatureCheckingPublicKey[:2] == "0x" { + pubkey, err = hex.DecodeString(extraSignatureCheckingPublicKey[2:]) + if err != nil { + return nil, err + } + } else { + pubkeyEncoded, err := os.ReadFile(extraSignatureCheckingPublicKey) + if err != nil { + return nil, err + } + pubkey, err = hex.DecodeString(string(pubkeyEncoded)) + if err != nil { + return nil, err + } + } + extraBpVerifier = func(message []byte, sig []byte, extraFields ...uint64) bool { + if len(sig) >= 64 { + return crypto.VerifySignature(pubkey, dasStoreHash(message, extraFields...), sig[:64]) + } + return false + } + } + + return &SignatureVerifier{ + addrVerifier: addrVerifier, + extraBpVerifier: extraBpVerifier, + }, nil + +} + +func (v *SignatureVerifier) verify( + ctx context.Context, message []byte, sig []byte, extraFields ...uint64) error { + if v.extraBpVerifier == nil && v.addrVerifier == nil { + return errors.New("no signature verification method configured") + } + + var verified bool + if v.extraBpVerifier != nil { + verified = v.extraBpVerifier(message, sig, extraFields...) + } + + if !verified && v.addrVerifier != nil { + actualSigner, err := DasRecoverSigner(message, sig, extraFields...) + if err != nil { + return err + } + verified, err = v.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) + if err != nil { + return err + } + } + if !verified { + return errors.New("request not properly signed") + } + return nil +} + +func (v *SignatureVerifier) String() string { + hasAddrVerifier := v.addrVerifier != nil + hasExtraBpVerifier := v.extraBpVerifier != nil + return fmt.Sprintf("SignatureVerifier{hasAddrVerifier:%v,hasExtraBpVerifier:%v}", hasAddrVerifier, hasExtraBpVerifier) +} diff --git a/das/store_signing.go b/das/store_signing.go index 8ebc1a9805..eac25e48b0 100644 --- a/das/store_signing.go +++ b/das/store_signing.go @@ -4,71 +4,35 @@ package das import ( - "context" "encoding/binary" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/util/pretty" "github.com/offchainlabs/nitro/util/signature" ) var uniquifyingPrefix = []byte("Arbitrum Nitro DAS API Store:") -func applyDasSigner(signer signature.DataSignerFunc, data []byte, timeout uint64) ([]byte, error) { - return signer(dasStoreHash(data, timeout)) +func applyDasSigner(signer signature.DataSignerFunc, data []byte, extraFields ...uint64) ([]byte, error) { + return signer(dasStoreHash(data, extraFields...)) } -func DasRecoverSigner(data []byte, timeout uint64, sig []byte) (common.Address, error) { - pk, err := crypto.SigToPub(dasStoreHash(data, timeout), sig) +func DasRecoverSigner(data []byte, sig []byte, extraFields ...uint64) (common.Address, error) { + pk, err := crypto.SigToPub(dasStoreHash(data, extraFields...), sig) if err != nil { return common.Address{}, err } return crypto.PubkeyToAddress(*pk), nil } -func dasStoreHash(data []byte, timeout uint64) []byte { - var buf8 [8]byte - binary.BigEndian.PutUint64(buf8[:], timeout) - return dastree.HashBytes(uniquifyingPrefix, buf8[:], data) -} - -type StoreSigningDAS struct { - DataAvailabilityServiceWriter - signer signature.DataSignerFunc - addr common.Address -} +func dasStoreHash(data []byte, extraFields ...uint64) []byte { + var buf []byte -func NewStoreSigningDAS(inner DataAvailabilityServiceWriter, signer signature.DataSignerFunc) (DataAvailabilityServiceWriter, error) { - sig, err := applyDasSigner(signer, []byte{}, 0) - if err != nil { - return nil, err + for _, field := range extraFields { + buf = binary.BigEndian.AppendUint64(buf, field) } - addr, err := DasRecoverSigner([]byte{}, 0, sig) - if err != nil { - return nil, err - } - return &StoreSigningDAS{inner, signer, addr}, nil -} - -func (s *StoreSigningDAS) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { - log.Trace("das.StoreSigningDAS.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", s) - mySig, err := applyDasSigner(s.signer, message, timeout) - if err != nil { - return nil, err - } - return s.DataAvailabilityServiceWriter.Store(ctx, message, timeout, mySig) -} - -func (s *StoreSigningDAS) String() string { - return "StoreSigningDAS (" + s.SignerAddress().Hex() + " ," + s.DataAvailabilityServiceWriter.String() + ")" -} -func (s *StoreSigningDAS) SignerAddress() common.Address { - return s.addr + return dastree.HashBytes(uniquifyingPrefix, buf, data) } diff --git a/das/store_signing_test.go b/das/store_signing_test.go index 33b94f66e1..a50d1c37f4 100644 --- a/das/store_signing_test.go +++ b/das/store_signing_test.go @@ -25,7 +25,7 @@ func TestStoreSigning(t *testing.T) { sig, err := applyDasSigner(signer, weirdMessage, timeout) Require(t, err) - recoveredAddr, err := DasRecoverSigner(weirdMessage, timeout, sig) + recoveredAddr, err := DasRecoverSigner(weirdMessage, sig, timeout) Require(t, err) if recoveredAddr != addr { diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 1e4daf0b86..84890c9984 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -1120,15 +1120,16 @@ func setupConfigWithDAS( var daReader das.DataAvailabilityServiceReader var daWriter das.DataAvailabilityServiceWriter var daHealthChecker das.DataAvailabilityServiceHealthChecker + var signatureVerifier *das.SignatureVerifier if dasModeString != "onchain" { - daReader, daWriter, daHealthChecker, lifecycleManager, err = das.CreateDAComponentsForDaserver(ctx, dasConfig, nil, nil) + daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err = das.CreateDAComponentsForDaserver(ctx, dasConfig, nil, nil) Require(t, err) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker) + _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker, signatureVerifier) Require(t, err) _, err = das.NewRestfulDasServerOnListener(restLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daHealthChecker) Require(t, err) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index d15c5e6ba6..593eaa1bbe 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -61,21 +61,19 @@ func startLocalDASServer( RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*das.IterableStorageService - var syncToStorageServices []das.StorageService - storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config) defer lifecycleManager.StopAndWaitUntil(time.Second) Require(t, err) seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(seqInboxAddress, l1client) Require(t, err) - privKey, err := config.Key.BLSPrivKey() + daWriter, err := das.NewSignAfterStoreDASWriter(ctx, config, storageService) Require(t, err) - daWriter, err := das.NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, seqInboxCaller, storageService, "") + signatureVerifier, err := das.NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, "") Require(t, err) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, daWriter, storageService) + rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, daWriter, storageService, signatureVerifier) Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) @@ -100,9 +98,10 @@ func aggConfigForBackend(t *testing.T, backendConfig das.BackendConfig) das.Aggr backendsJsonByte, err := json.Marshal([]das.BackendConfig{backendConfig}) Require(t, err) return das.AggregatorConfig{ - Enable: true, - AssumedHonest: 1, - Backends: string(backendsJsonByte), + Enable: true, + AssumedHonest: 1, + Backends: string(backendsJsonByte), + MaxStoreChunkBodySize: 512 * 1024, } } @@ -279,12 +278,12 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { // L1NodeURL: normally we would have to set this but we are passing in the already constructed client and addresses to the factory } - daReader, daWriter, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &addresses.SequencerInbox) + daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &addresses.SequencerInbox) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker) + _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker, signatureVerifier) Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err)