Skip to content

Commit

Permalink
Merge branch 'master' into check-batch-correctness
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Jun 6, 2024
2 parents 77b831c + 2363b04 commit 8b3505a
Show file tree
Hide file tree
Showing 28 changed files with 671 additions and 1,178 deletions.
4 changes: 2 additions & 2 deletions cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func startup() error {
return errors.New("sequencer-inbox-address must be set to a valid L1 URL and contract address, or 'none'")
}

daReader, daWriter, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress)
daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress)
if err != nil {
return err
}
Expand All @@ -253,7 +253,7 @@ func startup() error {
if serverConfig.EnableRPC {
log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime)

rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker)
rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker, signatureVerifier)
if err != nil {
return err
}
Expand Down
33 changes: 13 additions & 20 deletions cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type ClientStoreConfig struct {
SigningKey string `koanf:"signing-key"`
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -102,6 +103,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet", "", "wallet containing ecdsa key to sign the message with")
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand All @@ -121,12 +123,7 @@ func startClientStore(args []string) error {
return err
}

client, err := das.NewDASRPCClient(config.URL)
if err != nil {
return err
}

var dasClient das.DataAvailabilityServiceWriter = client
var signer signature.DataSignerFunc
if config.SigningKey != "" {
var privateKey *ecdsa.PrivateKey
if config.SigningKey[:2] == "0x" {
Expand All @@ -140,12 +137,7 @@ func startClientStore(args []string) error {
return err
}
}
signer := signature.DataSignerFromPrivateKey(privateKey)

dasClient, err = das.NewStoreSigningDAS(dasClient, signer)
if err != nil {
return err
}
signer = signature.DataSignerFromPrivateKey(privateKey)
} else if config.SigningWallet != "" {
walletConf := &genericconf.WalletConfig{
Pathname: config.SigningWallet,
Expand All @@ -154,16 +146,17 @@ func startClientStore(args []string) error {
Account: "",
OnlyCreateKey: false,
}
_, signer, err := util.OpenWallet("datool", walletConf, nil)
if err != nil {
return err
}
dasClient, err = das.NewStoreSigningDAS(dasClient, signer)
_, signer, err = util.OpenWallet("datool", walletConf, nil)
if err != nil {
return err
}
}

client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
if err != nil {
return err
}

ctx := context.Background()
var cert *daprovider.DataAvailabilityCertificate

Expand All @@ -173,9 +166,9 @@ func startClientStore(args []string) error {
if err != nil {
return err
}
cert, err = dasClient.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
} else if len(config.Message) > 0 {
cert, err = dasClient.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
} else {
return errors.New("--message or --random-message-size must be specified")
}
Expand Down Expand Up @@ -361,7 +354,7 @@ func dumpKeyset(args []string) error {
return err
}

services, err := das.ParseServices(config.Keyset)
services, err := das.ParseServices(config.Keyset, nil)
if err != nil {
return err
}
Expand Down
17 changes: 10 additions & 7 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ import (
)

type AggregatorConfig struct {
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: "",
AssumedHonest: 0,
Backends: "",
MaxStoreChunkBodySize: 512 * 1024,
}

func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage/retrieval of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
}

type Aggregator struct {
Expand Down Expand Up @@ -165,7 +168,7 @@ type storeResponse struct {
func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig))
if a.addrVerifier != nil {
actualSigner, err := DasRecoverSigner(message, timeout, sig)
actualSigner, err := DasRecoverSigner(message, sig, timeout)
if err != nil {
return nil, err
}
Expand Down
12 changes: 4 additions & 8 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ type DataAvailabilityConfig struct {
LocalCache CacheConfig `koanf:"local-cache"`
RedisCache RedisConfig `koanf:"redis-cache"`

LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
IpfsStorage IpfsStorageServiceConfig `koanf:"ipfs-storage"`
RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"`
LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`

Key KeyConfig `koanf:"key"`

Expand All @@ -65,9 +63,9 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{
RequestTimeout: 5 * time.Second,
Enable: false,
RestAggregator: DefaultRestfulClientAggregatorConfig,
RPCAggregator: DefaultAggregatorConfig,
ParentChainConnectionAttempts: 15,
PanicOnError: false,
IpfsStorage: DefaultIpfsStorageServiceConfig,
}

func OptionalAddressFromString(s string) (*common.Address, error) {
Expand Down Expand Up @@ -114,7 +112,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
RegularSyncStorageConfigAddOptions(prefix+".regular-sync-storage", f)

// Key config for storage
KeyConfigAddOptions(prefix+".key", f)
Expand All @@ -128,7 +125,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
}

// Both the Nitro node and daserver can use these options.
IpfsStorageServiceConfigAddOptions(prefix+".ipfs-storage", f)
RestfulClientAggregatorConfigAddOptions(prefix+".rest-aggregator", f)

f.String(prefix+".parent-chain-node-url", DefaultDataAvailabilityConfig.ParentChainNodeURL, "URL for parent chain node, only used in standalone daserver; when running as part of a node that node's L1 configuration is used")
Expand Down
123 changes: 116 additions & 7 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,145 @@ package das
import (
"context"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/signature"
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
}

func NewDASRPCClient(target string) (*DASRPCClient, error) {
func nilSigner(_ []byte) ([]byte, error) {
return []byte{}, nil
}

const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
}
if signer == nil {
signer = nilSigner
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) {
timestamp := uint64(time.Now().Unix())
nChunks := uint64(len(message)) / c.chunkSize
lastChunkSize := uint64(len(message)) % c.chunkSize
if lastChunkSize > 0 {
nChunks++
} else {
lastChunkSize = c.chunkSize
}
totalSize := uint64(len(message))

startReqSig, err := applyDasSigner(c.signer, []byte{}, timestamp, nChunks, c.chunkSize, totalSize, timeout)
if err != nil {
return nil, err
}

var startChunkedStoreResult StartChunkedStoreResult
if err := c.clnt.CallContext(ctx, &startChunkedStoreResult, "das_startChunkedStore", hexutil.Uint64(timestamp), hexutil.Uint64(nChunks), hexutil.Uint64(c.chunkSize), hexutil.Uint64(totalSize), hexutil.Uint64(timeout), hexutil.Bytes(startReqSig)); err != nil {
if strings.Contains(err.Error(), "the method das_startChunkedStore does not exist") {
return c.legacyStore(ctx, message, timeout)
}
return nil, err
}
batchId := uint64(startChunkedStoreResult.BatchId)

g := new(errgroup.Group)
for i := uint64(0); i < nChunks; i++ {
var chunk []byte
if i == nChunks-1 {
chunk = message[i*c.chunkSize : i*c.chunkSize+lastChunkSize]
} else {
chunk = message[i*c.chunkSize : (i+1)*c.chunkSize]
}

inner := func(_i uint64, _chunk []byte) func() error {
return func() error { return c.sendChunk(ctx, batchId, _i, _chunk) }
}
g.Go(inner(i, chunk))
}
if err := g.Wait(); err != nil {
return nil, err
}

finalReqSig, err := applyDasSigner(c.signer, []byte{}, uint64(startChunkedStoreResult.BatchId))
if err != nil {
return nil, err
}

var storeResult StoreResult
if err := c.clnt.CallContext(ctx, &storeResult, "das_commitChunkedStore", startChunkedStoreResult.BatchId, hexutil.Bytes(finalReqSig)); err != nil {
return nil, err
}

respSig, err := blsSignatures.SignatureFromBytes(storeResult.Sig)
if err != nil {
return nil, err
}

return &daprovider.DataAvailabilityCertificate{
DataHash: common.BytesToHash(storeResult.DataHash),
Timeout: uint64(storeResult.Timeout),
SignersMask: uint64(storeResult.SignersMask),
Sig: respSig,
KeysetHash: common.BytesToHash(storeResult.KeysetHash),
Version: byte(storeResult.Version),
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(reqSig), "this", *c)
func (c *DASRPCClient) sendChunk(ctx context.Context, batchId, i uint64, chunk []byte) error {
chunkReqSig, err := applyDasSigner(c.signer, chunk, batchId, i)
if err != nil {
return err
}

if err := c.clnt.CallContext(ctx, nil, "das_sendChunk", hexutil.Uint64(batchId), hexutil.Uint64(i), hexutil.Bytes(chunk), hexutil.Bytes(chunkReqSig)); err != nil {
return err
}
return nil
}

func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "this", *c)

reqSig, err := applyDasSigner(c.signer, message, timeout)
if err != nil {
return nil, err
}

var ret StoreResult
if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8b3505a

Please sign in to comment.