Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Das batch store chunking #2341

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ecb4fe5
Separate signature verification and DAC signing
Tristan-Wilson May 15, 2024
764ff87
Make das signer variadic
Tristan-Wilson May 15, 2024
1b6554f
First cut of server side of chunked store api
Tristan-Wilson May 17, 2024
57f583b
Remove StoreSigningDAS from DAWriter chain
Tristan-Wilson May 23, 2024
96885e0
Add RPC for chunked send of DAS batches
Tristan-Wilson May 25, 2024
a332ca5
Merge remote-tracking branch 'origin/master' into das-batch-store-chu…
Tristan-Wilson May 28, 2024
51b703f
Fix race condition on seenChunks
Tristan-Wilson May 28, 2024
45f9ad7
Fix timeout caused by -race test mode
Tristan-Wilson May 29, 2024
a23b629
Remove cache expiry test and hooks, didn't work
Tristan-Wilson May 29, 2024
aeedf83
Lint fix
Tristan-Wilson May 29, 2024
4fbdc72
Merge branch 'master' into das-batch-store-chunking
amsanghi May 30, 2024
3acf22a
Merge branch 'master' into das-batch-store-chunking
Tristan-Wilson May 30, 2024
6aeaf9e
Remove IPFS das factory logic and stubs
Tristan-Wilson May 30, 2024
fa361c0
Remove IterableStorageService from DAS
Tristan-Wilson May 31, 2024
2aaf914
Merge branch 'master' into das-batch-store-chunking
amsanghi May 31, 2024
6356632
Merge pull request #2361 from OffchainLabs/remove-iterable-storage-se…
joshuacolvin0 Jun 3, 2024
4146eff
Merge branch 'master' into das-batch-store-chunking
Tristan-Wilson Jun 4, 2024
b1edb9b
Merge pull request #2359 from OffchainLabs/remove-ipfs-das
joshuacolvin0 Jun 4, 2024
2cd8b74
Merge branch 'master' into das-batch-store-chunking
Tristan-Wilson Jun 4, 2024
3d8e9da
Merge branch 'master' into das-batch-store-chunking
Tristan-Wilson Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func startup() error {
return errors.New("sequencer-inbox-address must be set to a valid L1 URL and contract address, or 'none'")
}

daReader, daWriter, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress)
daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress)
if err != nil {
return err
}
Expand All @@ -253,7 +253,7 @@ func startup() error {
if serverConfig.EnableRPC {
log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime)

rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker)
rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker, signatureVerifier)
if err != nil {
return err
}
Expand Down
33 changes: 13 additions & 20 deletions cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type ClientStoreConfig struct {
SigningKey string `koanf:"signing-key"`
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -102,6 +103,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet", "", "wallet containing ecdsa key to sign the message with")
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand All @@ -121,12 +123,7 @@ func startClientStore(args []string) error {
return err
}

client, err := das.NewDASRPCClient(config.URL)
if err != nil {
return err
}

var dasClient das.DataAvailabilityServiceWriter = client
var signer signature.DataSignerFunc
if config.SigningKey != "" {
var privateKey *ecdsa.PrivateKey
if config.SigningKey[:2] == "0x" {
Expand All @@ -140,12 +137,7 @@ func startClientStore(args []string) error {
return err
}
}
signer := signature.DataSignerFromPrivateKey(privateKey)

dasClient, err = das.NewStoreSigningDAS(dasClient, signer)
if err != nil {
return err
}
signer = signature.DataSignerFromPrivateKey(privateKey)
} else if config.SigningWallet != "" {
walletConf := &genericconf.WalletConfig{
Pathname: config.SigningWallet,
Expand All @@ -154,16 +146,17 @@ func startClientStore(args []string) error {
Account: "",
OnlyCreateKey: false,
}
_, signer, err := util.OpenWallet("datool", walletConf, nil)
if err != nil {
return err
}
dasClient, err = das.NewStoreSigningDAS(dasClient, signer)
_, signer, err = util.OpenWallet("datool", walletConf, nil)
if err != nil {
return err
}
}

client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
if err != nil {
return err
}

ctx := context.Background()
var cert *daprovider.DataAvailabilityCertificate

Expand All @@ -173,9 +166,9 @@ func startClientStore(args []string) error {
if err != nil {
return err
}
cert, err = dasClient.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
} else if len(config.Message) > 0 {
cert, err = dasClient.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
} else {
return errors.New("--message or --random-message-size must be specified")
}
Expand Down Expand Up @@ -361,7 +354,7 @@ func dumpKeyset(args []string) error {
return err
}

services, err := das.ParseServices(config.Keyset)
services, err := das.ParseServices(config.Keyset, nil)
if err != nil {
return err
}
Expand Down
17 changes: 10 additions & 7 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ import (
)

type AggregatorConfig struct {
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: "",
AssumedHonest: 0,
Backends: "",
MaxStoreChunkBodySize: 512 * 1024,
}

func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage/retrieval of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
}

type Aggregator struct {
Expand Down Expand Up @@ -165,7 +168,7 @@ type storeResponse struct {
func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig))
if a.addrVerifier != nil {
actualSigner, err := DasRecoverSigner(message, timeout, sig)
actualSigner, err := DasRecoverSigner(message, sig, timeout)
if err != nil {
return nil, err
}
Expand Down
12 changes: 4 additions & 8 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ type DataAvailabilityConfig struct {
LocalCache CacheConfig `koanf:"local-cache"`
RedisCache RedisConfig `koanf:"redis-cache"`

LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
IpfsStorage IpfsStorageServiceConfig `koanf:"ipfs-storage"`
RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"`
LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`

Key KeyConfig `koanf:"key"`

Expand All @@ -65,9 +63,9 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{
RequestTimeout: 5 * time.Second,
Enable: false,
RestAggregator: DefaultRestfulClientAggregatorConfig,
RPCAggregator: DefaultAggregatorConfig,
ParentChainConnectionAttempts: 15,
PanicOnError: false,
IpfsStorage: DefaultIpfsStorageServiceConfig,
}

func OptionalAddressFromString(s string) (*common.Address, error) {
Expand Down Expand Up @@ -114,7 +112,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
RegularSyncStorageConfigAddOptions(prefix+".regular-sync-storage", f)

// Key config for storage
KeyConfigAddOptions(prefix+".key", f)
Expand All @@ -128,7 +125,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
}

// Both the Nitro node and daserver can use these options.
IpfsStorageServiceConfigAddOptions(prefix+".ipfs-storage", f)
RestfulClientAggregatorConfigAddOptions(prefix+".rest-aggregator", f)

f.String(prefix+".parent-chain-node-url", DefaultDataAvailabilityConfig.ParentChainNodeURL, "URL for parent chain node, only used in standalone daserver; when running as part of a node that node's L1 configuration is used")
Expand Down
123 changes: 116 additions & 7 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,145 @@ package das
import (
"context"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/signature"
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
}

func NewDASRPCClient(target string) (*DASRPCClient, error) {
func nilSigner(_ []byte) ([]byte, error) {
return []byte{}, nil
}

const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
}
if signer == nil {
signer = nilSigner
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) {
timestamp := uint64(time.Now().Unix())
nChunks := uint64(len(message)) / c.chunkSize
lastChunkSize := uint64(len(message)) % c.chunkSize
if lastChunkSize > 0 {
nChunks++
} else {
lastChunkSize = c.chunkSize
}
totalSize := uint64(len(message))

startReqSig, err := applyDasSigner(c.signer, []byte{}, timestamp, nChunks, c.chunkSize, totalSize, timeout)
if err != nil {
return nil, err
}

var startChunkedStoreResult StartChunkedStoreResult
if err := c.clnt.CallContext(ctx, &startChunkedStoreResult, "das_startChunkedStore", hexutil.Uint64(timestamp), hexutil.Uint64(nChunks), hexutil.Uint64(c.chunkSize), hexutil.Uint64(totalSize), hexutil.Uint64(timeout), hexutil.Bytes(startReqSig)); err != nil {
if strings.Contains(err.Error(), "the method das_startChunkedStore does not exist") {
return c.legacyStore(ctx, message, timeout)
}
return nil, err
}
batchId := uint64(startChunkedStoreResult.BatchId)

g := new(errgroup.Group)
for i := uint64(0); i < nChunks; i++ {
var chunk []byte
if i == nChunks-1 {
chunk = message[i*c.chunkSize : i*c.chunkSize+lastChunkSize]
} else {
chunk = message[i*c.chunkSize : (i+1)*c.chunkSize]
}

inner := func(_i uint64, _chunk []byte) func() error {
return func() error { return c.sendChunk(ctx, batchId, _i, _chunk) }
}
g.Go(inner(i, chunk))
}
if err := g.Wait(); err != nil {
return nil, err
}

finalReqSig, err := applyDasSigner(c.signer, []byte{}, uint64(startChunkedStoreResult.BatchId))
if err != nil {
return nil, err
}

var storeResult StoreResult
if err := c.clnt.CallContext(ctx, &storeResult, "das_commitChunkedStore", startChunkedStoreResult.BatchId, hexutil.Bytes(finalReqSig)); err != nil {
return nil, err
}

respSig, err := blsSignatures.SignatureFromBytes(storeResult.Sig)
if err != nil {
return nil, err
}

return &daprovider.DataAvailabilityCertificate{
DataHash: common.BytesToHash(storeResult.DataHash),
Timeout: uint64(storeResult.Timeout),
SignersMask: uint64(storeResult.SignersMask),
Sig: respSig,
KeysetHash: common.BytesToHash(storeResult.KeysetHash),
Version: byte(storeResult.Version),
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(reqSig), "this", *c)
func (c *DASRPCClient) sendChunk(ctx context.Context, batchId, i uint64, chunk []byte) error {
chunkReqSig, err := applyDasSigner(c.signer, chunk, batchId, i)
if err != nil {
return err
}

if err := c.clnt.CallContext(ctx, nil, "das_sendChunk", hexutil.Uint64(batchId), hexutil.Uint64(i), hexutil.Bytes(chunk), hexutil.Bytes(chunkReqSig)); err != nil {
return err
}
return nil
}

func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "this", *c)

reqSig, err := applyDasSigner(c.signer, message, timeout)
if err != nil {
return nil, err
}

var ret StoreResult
if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil {
return nil, err
Expand Down
Loading
Loading