From ecb4fe56311ef4e55d4166c114eb116d24ff620b Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 15 May 2024 11:43:54 -0700 Subject: [PATCH 01/11] Separate signature verification and DAC signing These are separate concerns, but were mixed together in one class. --- das/factory.go | 7 +- das/rpc_test.go | 4 +- das/sign_after_store_das_writer.go | 103 ++-------------------- das/signature_verifier.go | 133 +++++++++++++++++++++++++++++ system_tests/das_test.go | 4 +- 5 files changed, 144 insertions(+), 107 deletions(-) create mode 100644 das/signature_verifier.go diff --git a/das/factory.go b/das/factory.go index a459d1a464..1fed2ed6f0 100644 --- a/das/factory.go +++ b/das/factory.go @@ -277,15 +277,14 @@ func CreateDAComponentsForDaserver( seqInboxCaller = nil } - privKey, err := config.Key.BLSPrivKey() + daWriter, err = NewSignAfterStoreDASWriter(ctx, *config, storageService) if err != nil { return nil, nil, nil, nil, err } - daWriter, err = NewSignAfterStoreDASWriterWithSeqInboxCaller( - privKey, + daWriter, err = NewSignatureVerifierWithSeqInboxCaller( seqInboxCaller, - storageService, + daWriter, config.ExtraSignatureCheckingPublicKey, ) if err != nil { diff --git a/das/rpc_test.go b/das/rpc_test.go index 044ba597be..c07590d1e3 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -51,9 +51,7 @@ func TestRPC(t *testing.T) { storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) testhelpers.RequireImpl(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) - privKey, err := config.Key.BLSPrivKey() - testhelpers.RequireImpl(t, err) - localDas, err := NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, "") + localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) testhelpers.RequireImpl(t, err) dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService) defer func() { diff --git a/das/sign_after_store_das_writer.go b/das/sign_after_store_das_writer.go index 36c51c022e..ab6ac91cef 100644 --- a/das/sign_after_store_das_writer.go +++ b/das/sign_after_store_das_writer.go @@ -6,7 +6,6 @@ package das import ( "bytes" "context" - "encoding/hex" "errors" "fmt" "os" @@ -15,14 +14,11 @@ import ( flag "github.com/spf13/pflag" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/solgen/go/bridgegen" - "github.com/offchainlabs/nitro/util/contracts" "github.com/offchainlabs/nitro/util/pretty" ) @@ -67,22 +63,12 @@ func KeyConfigAddOptions(prefix string, f *flag.FlagSet) { // // 1) SignAfterStoreDASWriter.Store(...) assembles the returned hash into a // DataAvailabilityCertificate and signs it with its BLS private key. -// -// 2) If Sequencer Inbox contract details are provided when a SignAfterStoreDASWriter is -// constructed, calls to Store(...) will try to verify the passed-in data's signature -// is from the batch poster. If the contract details are not provided, then the -// signature is not checked, which is useful for testing. type SignAfterStoreDASWriter struct { privKey blsSignatures.PrivateKey pubKey *blsSignatures.PublicKey keysetHash [32]byte keysetBytes []byte storageService StorageService - addrVerifier *contracts.AddressVerifier - - // Extra batch poster verifier, for local installations to have their - // own way of testing Stores. - extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool } func NewSignAfterStoreDASWriter(ctx context.Context, config DataAvailabilityConfig, storageService StorageService) (*SignAfterStoreDASWriter, error) { @@ -90,34 +76,7 @@ func NewSignAfterStoreDASWriter(ctx context.Context, config DataAvailabilityConf if err != nil { return nil, err } - if config.ParentChainNodeURL == "none" { - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, config.ExtraSignatureCheckingPublicKey) - } - l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) - if err != nil { - return nil, err - } - seqInboxAddress, err := OptionalAddressFromString(config.SequencerInboxAddress) - if err != nil { - return nil, err - } - if seqInboxAddress == nil { - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, config.ExtraSignatureCheckingPublicKey) - } - seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) - if err != nil { - return nil, err - } - return NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, seqInboxCaller, storageService, config.ExtraSignatureCheckingPublicKey) -} - -func NewSignAfterStoreDASWriterWithSeqInboxCaller( - privKey blsSignatures.PrivateKey, - seqInboxCaller *bridgegen.SequencerInboxCaller, - storageService StorageService, - extraSignatureCheckingPublicKey string, -) (*SignAfterStoreDASWriter, error) { publicKey, err := blsSignatures.PublicKeyFromPrivateKey(privKey) if err != nil { return nil, err @@ -136,45 +95,12 @@ func NewSignAfterStoreDASWriterWithSeqInboxCaller( return nil, err } - var addrVerifier *contracts.AddressVerifier - if seqInboxCaller != nil { - addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) - } - - var extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool - if extraSignatureCheckingPublicKey != "" { - var pubkey []byte - if extraSignatureCheckingPublicKey[:2] == "0x" { - pubkey, err = hex.DecodeString(extraSignatureCheckingPublicKey[2:]) - if err != nil { - return nil, err - } - } else { - pubkeyEncoded, err := os.ReadFile(extraSignatureCheckingPublicKey) - if err != nil { - return nil, err - } - pubkey, err = hex.DecodeString(string(pubkeyEncoded)) - if err != nil { - return nil, err - } - } - extraBpVerifier = func(message []byte, timeout uint64, sig []byte) bool { - if len(sig) >= 64 { - return crypto.VerifySignature(pubkey, dasStoreHash(message, timeout), sig[:64]) - } - return false - } - } - return &SignAfterStoreDASWriter{ - privKey: privKey, - pubKey: &publicKey, - keysetHash: ksHash, - keysetBytes: ksBuf.Bytes(), - storageService: storageService, - addrVerifier: addrVerifier, - extraBpVerifier: extraBpVerifier, + privKey: privKey, + pubKey: &publicKey, + keysetHash: ksHash, + keysetBytes: ksBuf.Bytes(), + storageService: storageService, }, nil } @@ -182,25 +108,6 @@ func (d *SignAfterStoreDASWriter) Store( ctx context.Context, message []byte, timeout uint64, sig []byte, ) (c *daprovider.DataAvailabilityCertificate, err error) { log.Trace("das.SignAfterStoreDASWriter.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", d) - var verified bool - if d.extraBpVerifier != nil { - verified = d.extraBpVerifier(message, timeout, sig) - } - - if !verified && d.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, timeout, sig) - if err != nil { - return nil, err - } - isBatchPosterOrSequencer, err := d.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) - if err != nil { - return nil, err - } - if !isBatchPosterOrSequencer { - return nil, errors.New("store request not properly signed") - } - } - c = &daprovider.DataAvailabilityCertificate{ Timeout: timeout, DataHash: dastree.Hash(message), diff --git a/das/signature_verifier.go b/das/signature_verifier.go new file mode 100644 index 0000000000..0f210b3c6b --- /dev/null +++ b/das/signature_verifier.go @@ -0,0 +1,133 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package das + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "os" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbstate/daprovider" + "github.com/offchainlabs/nitro/solgen/go/bridgegen" + "github.com/offchainlabs/nitro/util/contracts" + "github.com/offchainlabs/nitro/util/pretty" +) + +// SignatureVerifier.Store will try to verify that the passed-in data's signature +// is from the batch poster, or from an injectable verification method. +type SignatureVerifier struct { + inner DataAvailabilityServiceWriter + + addrVerifier *contracts.AddressVerifier + + // Extra batch poster verifier, for local installations to have their + // own way of testing Stores. + extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool +} + +func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig, inner DataAvailabilityServiceWriter) (*SignatureVerifier, error) { + if config.ParentChainNodeURL == "none" { + return NewSignatureVerifierWithSeqInboxCaller(nil, inner, config.ExtraSignatureCheckingPublicKey) + } + l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) + if err != nil { + return nil, err + } + seqInboxAddress, err := OptionalAddressFromString(config.SequencerInboxAddress) + if err != nil { + return nil, err + } + if seqInboxAddress == nil { + return NewSignatureVerifierWithSeqInboxCaller(nil, inner, config.ExtraSignatureCheckingPublicKey) + } + + seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) + if err != nil { + return nil, err + } + return NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, inner, config.ExtraSignatureCheckingPublicKey) + +} + +func NewSignatureVerifierWithSeqInboxCaller( + seqInboxCaller *bridgegen.SequencerInboxCaller, + inner DataAvailabilityServiceWriter, + extraSignatureCheckingPublicKey string, +) (*SignatureVerifier, error) { + var addrVerifier *contracts.AddressVerifier + if seqInboxCaller != nil { + addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) + } + + var extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool + if extraSignatureCheckingPublicKey != "" { + var pubkey []byte + var err error + if extraSignatureCheckingPublicKey[:2] == "0x" { + pubkey, err = hex.DecodeString(extraSignatureCheckingPublicKey[2:]) + if err != nil { + return nil, err + } + } else { + pubkeyEncoded, err := os.ReadFile(extraSignatureCheckingPublicKey) + if err != nil { + return nil, err + } + pubkey, err = hex.DecodeString(string(pubkeyEncoded)) + if err != nil { + return nil, err + } + } + extraBpVerifier = func(message []byte, timeout uint64, sig []byte) bool { + if len(sig) >= 64 { + return crypto.VerifySignature(pubkey, dasStoreHash(message, timeout), sig[:64]) + } + return false + } + } + + return &SignatureVerifier{ + inner: inner, + addrVerifier: addrVerifier, + extraBpVerifier: extraBpVerifier, + }, nil + +} + +func (v *SignatureVerifier) Store( + ctx context.Context, message []byte, timeout uint64, sig []byte, +) (c *daprovider.DataAvailabilityCertificate, err error) { + log.Trace("das.SignatureVerifier.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", v) + var verified bool + if v.extraBpVerifier != nil { + verified = v.extraBpVerifier(message, timeout, sig) + } + + if !verified && v.addrVerifier != nil { + actualSigner, err := DasRecoverSigner(message, timeout, sig) + if err != nil { + return nil, err + } + isBatchPosterOrSequencer, err := v.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) + if err != nil { + return nil, err + } + if !isBatchPosterOrSequencer { + return nil, errors.New("store request not properly signed") + } + } + + return v.inner.Store(ctx, message, timeout, sig) +} + +func (v *SignatureVerifier) String() string { + hasAddrVerifier := v.addrVerifier != nil + hasExtraBpVerifier := v.extraBpVerifier != nil + return fmt.Sprintf("SignatureVerifier{hasAddrVerifier:%v,hasExtraBpVerifier:%v}", hasAddrVerifier, hasExtraBpVerifier) +} diff --git a/system_tests/das_test.go b/system_tests/das_test.go index bb09cc9880..c69bd47828 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -68,9 +68,9 @@ func startLocalDASServer( Require(t, err) seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(seqInboxAddress, l1client) Require(t, err) - privKey, err := config.Key.BLSPrivKey() + innerDaWriter, err := das.NewSignAfterStoreDASWriter(ctx, config, storageService) Require(t, err) - daWriter, err := das.NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, seqInboxCaller, storageService, "") + daWriter, err := das.NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, innerDaWriter, "") Require(t, err) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) From 764ff87e076339970720d5c0d682d71e5c7ebce7 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 15 May 2024 14:30:03 -0700 Subject: [PATCH 02/11] Make das signer variadic --- das/aggregator.go | 2 +- das/dasRpcServer.go | 24 ++++++++++++++++++++++++ das/signature_verifier.go | 12 ++++++------ das/store_signing.go | 22 +++++++++++++--------- das/store_signing_test.go | 2 +- 5 files changed, 45 insertions(+), 17 deletions(-) diff --git a/das/aggregator.go b/das/aggregator.go index d3edd58437..59908271de 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -165,7 +165,7 @@ type storeResponse struct { func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig)) if a.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, timeout, sig) + actualSigner, err := DasRecoverSigner(message, sig, timeout) if err != nil { return nil, err } diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 2f1fc1fd42..3a165621a6 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -28,6 +28,8 @@ var ( rpcStoreFailureGauge = metrics.NewRegisteredGauge("arb/das/rpc/store/failure", nil) rpcStoreStoredBytesGauge = metrics.NewRegisteredGauge("arb/das/rpc/store/bytes", nil) rpcStoreDurationHistogram = metrics.NewRegisteredHistogram("arb/das/rpc/store/duration", nil, metrics.NewBoundedHistogramSample()) + + // TODO chunk store metrics ) type DASRPCServer struct { @@ -118,6 +120,28 @@ func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, time }, nil } +type StartChunkedStoreResult struct { + ChunkedStoreId hexutil.Uint64 `json:"chunkedStoreId,omitempty"` +} + +type SendChunkResult struct { + Ok hexutil.Uint64 `json:"sendChunkResult,omitempty"` +} + +func (serv *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp hexutil.Uint64, nChunks hexutil.Uint64, totalSize hexutil.Uint64, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { + return &StartChunkedStoreResult{}, nil + +} + +func (serv *DASRPCServer) SendChunk(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*SendChunkResult, error) { + return &SendChunkResult{}, nil +} + +func (serv *DASRPCServer) CommitChunkedStore(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + // TODO tracing, metrics, and timers + return &StoreResult{}, nil +} + func (serv *DASRPCServer) HealthCheck(ctx context.Context) error { return serv.daHealthChecker.HealthCheck(ctx) } diff --git a/das/signature_verifier.go b/das/signature_verifier.go index 0f210b3c6b..31c469af86 100644 --- a/das/signature_verifier.go +++ b/das/signature_verifier.go @@ -28,7 +28,7 @@ type SignatureVerifier struct { // Extra batch poster verifier, for local installations to have their // own way of testing Stores. - extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool + extraBpVerifier func(message []byte, sig []byte, extraFields ...uint64) bool } func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig, inner DataAvailabilityServiceWriter) (*SignatureVerifier, error) { @@ -65,7 +65,7 @@ func NewSignatureVerifierWithSeqInboxCaller( addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) } - var extraBpVerifier func(message []byte, timeout uint64, sig []byte) bool + var extraBpVerifier func(message []byte, sig []byte, extraFeilds ...uint64) bool if extraSignatureCheckingPublicKey != "" { var pubkey []byte var err error @@ -84,9 +84,9 @@ func NewSignatureVerifierWithSeqInboxCaller( return nil, err } } - extraBpVerifier = func(message []byte, timeout uint64, sig []byte) bool { + extraBpVerifier = func(message []byte, sig []byte, extraFields ...uint64) bool { if len(sig) >= 64 { - return crypto.VerifySignature(pubkey, dasStoreHash(message, timeout), sig[:64]) + return crypto.VerifySignature(pubkey, dasStoreHash(message, extraFields...), sig[:64]) } return false } @@ -106,11 +106,11 @@ func (v *SignatureVerifier) Store( log.Trace("das.SignatureVerifier.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", v) var verified bool if v.extraBpVerifier != nil { - verified = v.extraBpVerifier(message, timeout, sig) + verified = v.extraBpVerifier(message, sig, timeout) } if !verified && v.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, timeout, sig) + actualSigner, err := DasRecoverSigner(message, sig, timeout) if err != nil { return nil, err } diff --git a/das/store_signing.go b/das/store_signing.go index 8ebc1a9805..88c7c5bfaa 100644 --- a/das/store_signing.go +++ b/das/store_signing.go @@ -20,22 +20,26 @@ import ( var uniquifyingPrefix = []byte("Arbitrum Nitro DAS API Store:") -func applyDasSigner(signer signature.DataSignerFunc, data []byte, timeout uint64) ([]byte, error) { - return signer(dasStoreHash(data, timeout)) +func applyDasSigner(signer signature.DataSignerFunc, data []byte, extraFields ...uint64) ([]byte, error) { + return signer(dasStoreHash(data, extraFields...)) } -func DasRecoverSigner(data []byte, timeout uint64, sig []byte) (common.Address, error) { - pk, err := crypto.SigToPub(dasStoreHash(data, timeout), sig) +func DasRecoverSigner(data []byte, sig []byte, extraFields ...uint64) (common.Address, error) { + pk, err := crypto.SigToPub(dasStoreHash(data, extraFields...), sig) if err != nil { return common.Address{}, err } return crypto.PubkeyToAddress(*pk), nil } -func dasStoreHash(data []byte, timeout uint64) []byte { - var buf8 [8]byte - binary.BigEndian.PutUint64(buf8[:], timeout) - return dastree.HashBytes(uniquifyingPrefix, buf8[:], data) +func dasStoreHash(data []byte, extraFields ...uint64) []byte { + var buf []byte + + for _, field := range extraFields { + buf = binary.BigEndian.AppendUint64(buf, field) + } + + return dastree.HashBytes(uniquifyingPrefix, buf, data) } type StoreSigningDAS struct { @@ -49,7 +53,7 @@ func NewStoreSigningDAS(inner DataAvailabilityServiceWriter, signer signature.Da if err != nil { return nil, err } - addr, err := DasRecoverSigner([]byte{}, 0, sig) + addr, err := DasRecoverSigner([]byte{}, sig, 0) if err != nil { return nil, err } diff --git a/das/store_signing_test.go b/das/store_signing_test.go index 33b94f66e1..a50d1c37f4 100644 --- a/das/store_signing_test.go +++ b/das/store_signing_test.go @@ -25,7 +25,7 @@ func TestStoreSigning(t *testing.T) { sig, err := applyDasSigner(signer, weirdMessage, timeout) Require(t, err) - recoveredAddr, err := DasRecoverSigner(weirdMessage, timeout, sig) + recoveredAddr, err := DasRecoverSigner(weirdMessage, sig, timeout) Require(t, err) if recoveredAddr != addr { From 1b6554f0d96f667ab4d9d8a8fb6705cc1a4dc5a5 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Thu, 16 May 2024 17:00:39 -0700 Subject: [PATCH 03/11] First cut of server side of chunked store api --- cmd/daserver/daserver.go | 4 +- das/dasRpcServer.go | 163 ++++++++++++++++++++++++++++++++---- das/factory.go | 30 +++---- das/rpc_test.go | 2 +- das/signature_verifier.go | 34 +++----- system_tests/common_test.go | 5 +- system_tests/das_test.go | 10 +-- 7 files changed, 185 insertions(+), 63 deletions(-) diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 8036487d26..41e1755e4a 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -235,7 +235,7 @@ func startup() error { return errors.New("sequencer-inbox-address must be set to a valid L1 URL and contract address, or 'none'") } - daReader, daWriter, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress) + daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig.DataAvailability, l1Reader, seqInboxAddress) if err != nil { return err } @@ -250,7 +250,7 @@ func startup() error { if serverConfig.EnableRPC { log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime) - rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, daReader, daWriter, daHealthChecker) + rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, daReader, daWriter, daHealthChecker, signatureVerifier) if err != nil { return err } diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 3a165621a6..2c3c0b5b23 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "net/http" "time" @@ -36,25 +37,30 @@ type DASRPCServer struct { daReader DataAvailabilityServiceReader daWriter DataAvailabilityServiceWriter daHealthChecker DataAvailabilityServiceHealthChecker + + signatureVerifier *SignatureVerifier + + batches batchBuilder } -func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) { +func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum)) if err != nil { return nil, err } - return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, daReader, daWriter, daHealthChecker) + return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, daReader, daWriter, daHealthChecker, signatureVerifier) } -func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) { +func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { if daWriter == nil { return nil, errors.New("No writer backend was configured for DAS RPC server. Has the BLS signing key been set up (--data-availability.key.key-dir or --data-availability.key.priv-key options)?") } rpcServer := rpc.NewServer() err := rpcServer.RegisterName("das", &DASRPCServer{ - daReader: daReader, - daWriter: daWriter, - daHealthChecker: daHealthChecker, + daReader: daReader, + daWriter: daWriter, + daHealthChecker: daHealthChecker, + signatureVerifier: signatureVerifier, }) if err != nil { return nil, err @@ -90,8 +96,8 @@ type StoreResult struct { Version hexutil.Uint64 `json:"version,omitempty"` } -func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { - log.Trace("dasRpc.DASRPCServer.Store", "message", pretty.FirstFewBytes(message), "message length", len(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", serv) +func (s *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + log.Trace("dasRpc.DASRPCServer.Store", "message", pretty.FirstFewBytes(message), "message length", len(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", s) rpcStoreRequestGauge.Inc(1) start := time.Now() success := false @@ -104,7 +110,11 @@ func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, time rpcStoreDurationHistogram.Update(time.Since(start).Nanoseconds()) }() - cert, err := serv.daWriter.Store(ctx, message, uint64(timeout), sig) + if err := s.signatureVerifier.verify(ctx, message, sig, uint64(timeout)); err != nil { + return nil, err + } + + cert, err := s.daWriter.Store(ctx, message, uint64(timeout), nil) if err != nil { return nil, err } @@ -121,25 +131,146 @@ func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, time } type StartChunkedStoreResult struct { - ChunkedStoreId hexutil.Uint64 `json:"chunkedStoreId,omitempty"` + BatchId hexutil.Uint64 `json:"batchId,omitempty"` } type SendChunkResult struct { Ok hexutil.Uint64 `json:"sendChunkResult,omitempty"` } -func (serv *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp hexutil.Uint64, nChunks hexutil.Uint64, totalSize hexutil.Uint64, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { - return &StartChunkedStoreResult{}, nil +type batch struct { + chunks [][]byte + expectedChunks, seenChunks uint64 + timeout uint64 +} + +const ( + maxPendingBatches = 10 +) + +type batchBuilder struct { + batches map[uint64]batch +} + +func (b *batchBuilder) assign(nChunks, timeout uint64) (uint64, error) { + if len(b.batches) >= maxPendingBatches { + return 0, fmt.Errorf("can't start new batch, already %d pending", b.batches) + } + + id := rand.Uint64() + _, ok := b.batches[id] + if ok { + return 0, fmt.Errorf("can't start new batch, try again") + } + + b.batches[id] = batch{ + chunks: make([][]byte, nChunks), + expectedChunks: nChunks, + timeout: timeout, + } + return id, nil +} + +func (b *batchBuilder) add(id, idx uint64, data []byte) error { + batch, ok := b.batches[id] + if !ok { + return fmt.Errorf("unknown batch(%d)", id) + } + + if idx >= uint64(len(batch.chunks)) { + return fmt.Errorf("batch(%d): chunk(%d) out of range", id, idx) + } + + if batch.chunks[idx] != nil { + return fmt.Errorf("batch(%d): chunk(%d) already added", id, idx) + } + + // todo check chunk size + + batch.chunks[idx] = data + batch.seenChunks++ + return nil +} + +func (b *batchBuilder) close(id uint64) ([]byte, uint64, error) { + batch, ok := b.batches[id] + if !ok { + return nil, 0, fmt.Errorf("unknown batch(%d)", id) + } + + if batch.expectedChunks != batch.seenChunks { + return nil, 0, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks, batch.expectedChunks) + } + + // todo check total size + + var flattened []byte + for _, chunk := range batch.chunks { + flattened = append(flattened, chunk...) + } + return flattened, batch.timeout, nil +} + +func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { + if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(timestamp), uint64(nChunks), uint64(totalSize), uint64(timeout)); err != nil { + return nil, err + } + + // Prevent replay of old messages + if time.Since(time.Unix(int64(timestamp), 0)).Abs() > time.Minute { + return nil, errors.New("too much time has elapsed since request was signed") + } + + id, err := s.batches.assign(uint64(nChunks), uint64(timeout)) + if err != nil { + return nil, err + } + + return &StartChunkedStoreResult{ + BatchId: hexutil.Uint64(id), + }, nil } -func (serv *DASRPCServer) SendChunk(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*SendChunkResult, error) { - return &SendChunkResult{}, nil +func (s *DASRPCServer) SendChunk(ctx context.Context, batchId, chunkId hexutil.Uint64, message hexutil.Bytes, sig hexutil.Bytes) (*SendChunkResult, error) { + if err := s.signatureVerifier.verify(ctx, message, sig, uint64(batchId), uint64(chunkId)); err != nil { + return nil, err + } + + if err := s.batches.add(uint64(batchId), uint64(chunkId), message); err != nil { + return nil, err + } + + return &SendChunkResult{ + Ok: hexutil.Uint64(1), // TODO probably not needed + }, nil } -func (serv *DASRPCServer) CommitChunkedStore(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { +func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(batchId)); err != nil { + return nil, err + } + + message, timeout, err := s.batches.close(uint64(batchId)) + if err != nil { + return nil, err + } + + cert, err := s.daWriter.Store(ctx, message, timeout, nil) + if err != nil { + return nil, err + } + return &StoreResult{ + KeysetHash: cert.KeysetHash[:], + DataHash: cert.DataHash[:], + Timeout: hexutil.Uint64(cert.Timeout), + SignersMask: hexutil.Uint64(cert.SignersMask), + Sig: blsSignatures.SignatureToBytes(cert.Sig), + Version: hexutil.Uint64(cert.Version), + }, nil + // TODO tracing, metrics, and timers - return &StoreResult{}, nil + } func (serv *DASRPCServer) HealthCheck(ctx context.Context) error { diff --git a/das/factory.go b/das/factory.go index 1fed2ed6f0..ad4a771708 100644 --- a/das/factory.go +++ b/das/factory.go @@ -191,9 +191,9 @@ func CreateDAComponentsForDaserver( config *DataAvailabilityConfig, l1Reader *headerreader.HeaderReader, seqInboxAddress *common.Address, -) (DataAvailabilityServiceReader, DataAvailabilityServiceWriter, DataAvailabilityServiceHealthChecker, *LifecycleManager, error) { +) (DataAvailabilityServiceReader, DataAvailabilityServiceWriter, *SignatureVerifier, DataAvailabilityServiceHealthChecker, *LifecycleManager, error) { if !config.Enable { - return nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil, nil } // Check config requirements @@ -201,7 +201,7 @@ func CreateDAComponentsForDaserver( !config.LocalFileStorage.Enable && !config.S3Storage.Enable && !config.IpfsStorage.Enable { - return nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage|ipfs-storage) must be enabled.") + return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage|ipfs-storage) must be enabled.") } // Done checking config requirements @@ -209,12 +209,12 @@ func CreateDAComponentsForDaserver( var syncToStorageServices []StorageService storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, &syncFromStorageServices, &syncToStorageServices) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } storageService, err = WrapStorageWithCache(ctx, config, storageService, &syncFromStorageServices, &syncToStorageServices, dasLifecycleManager) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } // The REST aggregator is used as the fallback if requested data is not present @@ -222,7 +222,7 @@ func CreateDAComponentsForDaserver( if config.RestAggregator.Enable { restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } restAgg.Start(ctx) dasLifecycleManager.Register(restAgg) @@ -237,7 +237,7 @@ func CreateDAComponentsForDaserver( if syncConf.Eager { if l1Reader == nil || seqInboxAddress == nil { - return nil, nil, nil, nil, errors.New("l1-node-url and sequencer-inbox-address must be specified along with sync-to-storage.eager") + return nil, nil, nil, nil, nil, errors.New("l1-node-url and sequencer-inbox-address must be specified along with sync-to-storage.eager") } storageService, err = NewSyncingFallbackStorageService( ctx, @@ -249,7 +249,7 @@ func CreateDAComponentsForDaserver( syncConf) dasLifecycleManager.Register(storageService) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } else { storageService = NewFallbackStorageService(storageService, restAgg, restAgg, @@ -262,13 +262,14 @@ func CreateDAComponentsForDaserver( var daWriter DataAvailabilityServiceWriter var daReader DataAvailabilityServiceReader = storageService var daHealthChecker DataAvailabilityServiceHealthChecker = storageService + var signatureVerifier *SignatureVerifier if config.Key.KeyDir != "" || config.Key.PrivKey != "" { var seqInboxCaller *bridgegen.SequencerInboxCaller if seqInboxAddress != nil { seqInbox, err := bridgegen.NewSequencerInbox(*seqInboxAddress, (*l1Reader).Client()) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } seqInboxCaller = &seqInbox.SequencerInboxCaller @@ -279,16 +280,15 @@ func CreateDAComponentsForDaserver( daWriter, err = NewSignAfterStoreDASWriter(ctx, *config, storageService) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } - daWriter, err = NewSignatureVerifierWithSeqInboxCaller( + signatureVerifier, err = NewSignatureVerifierWithSeqInboxCaller( seqInboxCaller, - daWriter, config.ExtraSignatureCheckingPublicKey, ) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } @@ -300,11 +300,11 @@ func CreateDAComponentsForDaserver( if seqInboxAddress != nil { daReader, err = NewChainFetchReader(daReader, (*l1Reader).Client(), *seqInboxAddress) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } } - return daReader, daWriter, daHealthChecker, dasLifecycleManager, nil + return daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, nil } func CreateDAReaderForNode( diff --git a/das/rpc_test.go b/das/rpc_test.go index c07590d1e3..40edfd8332 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -53,7 +53,7 @@ func TestRPC(t *testing.T) { defer lifecycleManager.StopAndWaitUntil(time.Second) localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) testhelpers.RequireImpl(t, err) - dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService) + dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService, &SignatureVerifier{}) defer func() { if err := dasServer.Shutdown(ctx); err != nil { panic(err) diff --git a/das/signature_verifier.go b/das/signature_verifier.go index 31c469af86..df7eeb074e 100644 --- a/das/signature_verifier.go +++ b/das/signature_verifier.go @@ -9,21 +9,15 @@ import ( "errors" "fmt" "os" - "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/util/contracts" - "github.com/offchainlabs/nitro/util/pretty" ) // SignatureVerifier.Store will try to verify that the passed-in data's signature // is from the batch poster, or from an injectable verification method. type SignatureVerifier struct { - inner DataAvailabilityServiceWriter - addrVerifier *contracts.AddressVerifier // Extra batch poster verifier, for local installations to have their @@ -31,9 +25,9 @@ type SignatureVerifier struct { extraBpVerifier func(message []byte, sig []byte, extraFields ...uint64) bool } -func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig, inner DataAvailabilityServiceWriter) (*SignatureVerifier, error) { +func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig) (*SignatureVerifier, error) { if config.ParentChainNodeURL == "none" { - return NewSignatureVerifierWithSeqInboxCaller(nil, inner, config.ExtraSignatureCheckingPublicKey) + return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) } l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) if err != nil { @@ -44,20 +38,19 @@ func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig, in return nil, err } if seqInboxAddress == nil { - return NewSignatureVerifierWithSeqInboxCaller(nil, inner, config.ExtraSignatureCheckingPublicKey) + return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) } seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) if err != nil { return nil, err } - return NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, inner, config.ExtraSignatureCheckingPublicKey) + return NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, config.ExtraSignatureCheckingPublicKey) } func NewSignatureVerifierWithSeqInboxCaller( seqInboxCaller *bridgegen.SequencerInboxCaller, - inner DataAvailabilityServiceWriter, extraSignatureCheckingPublicKey string, ) (*SignatureVerifier, error) { var addrVerifier *contracts.AddressVerifier @@ -93,37 +86,34 @@ func NewSignatureVerifierWithSeqInboxCaller( } return &SignatureVerifier{ - inner: inner, addrVerifier: addrVerifier, extraBpVerifier: extraBpVerifier, }, nil } -func (v *SignatureVerifier) Store( - ctx context.Context, message []byte, timeout uint64, sig []byte, -) (c *daprovider.DataAvailabilityCertificate, err error) { - log.Trace("das.SignatureVerifier.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", v) +func (v *SignatureVerifier) verify( + ctx context.Context, message []byte, sig []byte, extraFields ...uint64) error { var verified bool if v.extraBpVerifier != nil { - verified = v.extraBpVerifier(message, sig, timeout) + verified = v.extraBpVerifier(message, sig, extraFields...) } if !verified && v.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, sig, timeout) + actualSigner, err := DasRecoverSigner(message, sig, extraFields...) if err != nil { - return nil, err + return err } isBatchPosterOrSequencer, err := v.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) if err != nil { - return nil, err + return err } if !isBatchPosterOrSequencer { - return nil, errors.New("store request not properly signed") + return errors.New("store request not properly signed") } } - return v.inner.Store(ctx, message, timeout, sig) + return nil } func (v *SignatureVerifier) String() string { diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 1b2b7ca6d6..b28630b81a 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -1111,15 +1111,16 @@ func setupConfigWithDAS( var daReader das.DataAvailabilityServiceReader var daWriter das.DataAvailabilityServiceWriter var daHealthChecker das.DataAvailabilityServiceHealthChecker + var signatureVerifier *das.SignatureVerifier if dasModeString != "onchain" { - daReader, daWriter, daHealthChecker, lifecycleManager, err = das.CreateDAComponentsForDaserver(ctx, dasConfig, nil, nil) + daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err = das.CreateDAComponentsForDaserver(ctx, dasConfig, nil, nil) Require(t, err) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker) + _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker, signatureVerifier) Require(t, err) _, err = das.NewRestfulDasServerOnListener(restLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daHealthChecker) Require(t, err) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index c69bd47828..f4019ddcb8 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -68,13 +68,13 @@ func startLocalDASServer( Require(t, err) seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(seqInboxAddress, l1client) Require(t, err) - innerDaWriter, err := das.NewSignAfterStoreDASWriter(ctx, config, storageService) + daWriter, err := das.NewSignAfterStoreDASWriter(ctx, config, storageService) Require(t, err) - daWriter, err := das.NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, innerDaWriter, "") + signatureVerifier, err := das.NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, "") Require(t, err) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, storageService, daWriter, storageService) + rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, storageService, daWriter, storageService, signatureVerifier) Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) @@ -278,12 +278,12 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { // L1NodeURL: normally we would have to set this but we are passing in the already constructed client and addresses to the factory } - daReader, daWriter, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &addresses.SequencerInbox) + daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &addresses.SequencerInbox) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) rpcLis, err := net.Listen("tcp", "localhost:0") Require(t, err) - _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker) + _, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker, signatureVerifier) Require(t, err) restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) From 57f583b781d2166bd93c70b8f7c0167141f1020c Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 22 May 2024 17:12:51 -0700 Subject: [PATCH 04/11] Remove StoreSigningDAS from DAWriter chain --- cmd/datool/datool.go | 31 ++++++++-------------- das/dasRpcClient.go | 39 +++++++++++++++++++++++----- das/extra_signature_checker_test.go | 8 +++--- das/factory.go | 9 +------ das/rpc_aggregator.go | 17 ++++++------ das/rpc_test.go | 2 +- das/store_signing.go | 40 ----------------------------- 7 files changed, 60 insertions(+), 86 deletions(-) diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index 3f64a990ca..22d0d4e753 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -121,12 +121,7 @@ func startClientStore(args []string) error { return err } - client, err := das.NewDASRPCClient(config.URL) - if err != nil { - return err - } - - var dasClient das.DataAvailabilityServiceWriter = client + var signer signature.DataSignerFunc if config.SigningKey != "" { var privateKey *ecdsa.PrivateKey if config.SigningKey[:2] == "0x" { @@ -140,12 +135,7 @@ func startClientStore(args []string) error { return err } } - signer := signature.DataSignerFromPrivateKey(privateKey) - - dasClient, err = das.NewStoreSigningDAS(dasClient, signer) - if err != nil { - return err - } + signer = signature.DataSignerFromPrivateKey(privateKey) } else if config.SigningWallet != "" { walletConf := &genericconf.WalletConfig{ Pathname: config.SigningWallet, @@ -154,16 +144,17 @@ func startClientStore(args []string) error { Account: "", OnlyCreateKey: false, } - _, signer, err := util.OpenWallet("datool", walletConf, nil) - if err != nil { - return err - } - dasClient, err = das.NewStoreSigningDAS(dasClient, signer) + _, signer, err = util.OpenWallet("datool", walletConf, nil) if err != nil { return err } } + client, err := das.NewDASRPCClient(config.URL, signer) + if err != nil { + return err + } + ctx := context.Background() var cert *daprovider.DataAvailabilityCertificate @@ -173,9 +164,9 @@ func startClientStore(args []string) error { if err != nil { return err } - cert, err = dasClient.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) } else if len(config.Message) > 0 { - cert, err = dasClient.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) } else { return errors.New("--message or --random-message-size must be specified") } @@ -361,7 +352,7 @@ func dumpKeyset(args []string) error { return err } - services, err := das.ParseServices(config.Keyset) + services, err := das.ParseServices(config.Keyset, nil) if err != nil { return err } diff --git a/das/dasRpcClient.go b/das/dasRpcClient.go index 5fca1e449f..324686f44b 100644 --- a/das/dasRpcClient.go +++ b/das/dasRpcClient.go @@ -16,26 +16,53 @@ import ( "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/signature" ) type DASRPCClient struct { // implements DataAvailabilityService - clnt *rpc.Client - url string + clnt *rpc.Client + url string + signer signature.DataSignerFunc } -func NewDASRPCClient(target string) (*DASRPCClient, error) { +func nilSigner(_ []byte) ([]byte, error) { + return []byte{}, nil +} + +func NewDASRPCClient(target string, signer signature.DataSignerFunc) (*DASRPCClient, error) { clnt, err := rpc.Dial(target) if err != nil { return nil, err } + if signer == nil { + signer = nilSigner + } return &DASRPCClient{ - clnt: clnt, - url: target, + clnt: clnt, + url: target, + signer: signer, }, nil } func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) { - log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(reqSig), "this", *c) + /* + var ret StartChunkedStoreResult + if err := c.clnt.CallContext(ctx, &ret, "das_startChunkedStore", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil { + } + */ + + return c.legacyStore(ctx, message, timeout) + +} + +func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { + log.Trace("das.DASRPCClient.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "this", *c) + + reqSig, err := applyDasSigner(c.signer, message, timeout) + if err != nil { + return nil, err + } + var ret StoreResult if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil { return nil, err diff --git a/das/extra_signature_checker_test.go b/das/extra_signature_checker_test.go index 2fcfac167d..9fdf86cdf5 100644 --- a/das/extra_signature_checker_test.go +++ b/das/extra_signature_checker_test.go @@ -65,10 +65,12 @@ func TestExtraSignatureCheck(t *testing.T) { signer := signature.DataSignerFromPrivateKey(privateKey) var da DataAvailabilityServiceWriter = &StubSignatureCheckDAS{keyDir} - da, err = NewStoreSigningDAS(da, signer) - Require(t, err) - _, err = da.Store(context.Background(), []byte("Hello world"), 1234, []byte{}) + msg := []byte("Hello world") + timeout := uint64(1234) + sig, err := applyDasSigner(signer, msg, timeout) + Require(t, err) + _, err = da.Store(context.Background(), msg, timeout, sig) Require(t, err) } diff --git a/das/factory.go b/das/factory.go index ad4a771708..018129e673 100644 --- a/das/factory.go +++ b/das/factory.go @@ -158,17 +158,10 @@ func CreateBatchPosterDAS( // Done checking config requirements var daWriter DataAvailabilityServiceWriter - daWriter, err := NewRPCAggregator(ctx, *config) + daWriter, err := NewRPCAggregator(ctx, *config, dataSigner) if err != nil { return nil, nil, nil, err } - if dataSigner != nil { - // In some tests the batch poster does not sign Store requests - daWriter, err = NewStoreSigningDAS(daWriter, dataSigner) - if err != nil { - return nil, nil, nil, err - } - } restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { diff --git a/das/rpc_aggregator.go b/das/rpc_aggregator.go index 490116a89a..7345191e0d 100644 --- a/das/rpc_aggregator.go +++ b/das/rpc_aggregator.go @@ -16,6 +16,7 @@ import ( "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/util/metricsutil" + "github.com/offchainlabs/nitro/util/signature" "github.com/ethereum/go-ethereum/common" "github.com/offchainlabs/nitro/arbutil" @@ -27,31 +28,31 @@ type BackendConfig struct { SignerMask uint64 `json:"signermask"` } -func NewRPCAggregator(ctx context.Context, config DataAvailabilityConfig) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregator(ctx context.Context, config DataAvailabilityConfig, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregator(ctx, config, services) } -func NewRPCAggregatorWithL1Info(config DataAvailabilityConfig, l1client arbutil.L1Interface, seqInboxAddress common.Address) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregatorWithL1Info(config DataAvailabilityConfig, l1client arbutil.L1Interface, seqInboxAddress common.Address, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregatorWithL1Info(config, services, l1client, seqInboxAddress) } -func NewRPCAggregatorWithSeqInboxCaller(config DataAvailabilityConfig, seqInboxCaller *bridgegen.SequencerInboxCaller) (*Aggregator, error) { - services, err := ParseServices(config.RPCAggregator) +func NewRPCAggregatorWithSeqInboxCaller(config DataAvailabilityConfig, seqInboxCaller *bridgegen.SequencerInboxCaller, signer signature.DataSignerFunc) (*Aggregator, error) { + services, err := ParseServices(config.RPCAggregator, signer) if err != nil { return nil, err } return NewAggregatorWithSeqInboxCaller(config, services, seqInboxCaller) } -func ParseServices(config AggregatorConfig) ([]ServiceDetails, error) { +func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]ServiceDetails, error) { var cs []BackendConfig err := json.Unmarshal([]byte(config.Backends), &cs) if err != nil { @@ -67,7 +68,7 @@ func ParseServices(config AggregatorConfig) ([]ServiceDetails, error) { } metricName := metricsutil.CanonicalizeMetricName(url.Hostname()) - service, err := NewDASRPCClient(b.URL) + service, err := NewDASRPCClient(b.URL, signer) if err != nil { return nil, err } diff --git a/das/rpc_test.go b/das/rpc_test.go index 40edfd8332..2a9c9d93ca 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -75,7 +75,7 @@ func TestRPC(t *testing.T) { }, RequestTimeout: 5 * time.Second, } - rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil) + rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil, nil) testhelpers.RequireImpl(t, err) msg := testhelpers.RandomizeSlice(make([]byte, 100)) diff --git a/das/store_signing.go b/das/store_signing.go index 88c7c5bfaa..eac25e48b0 100644 --- a/das/store_signing.go +++ b/das/store_signing.go @@ -4,17 +4,12 @@ package das import ( - "context" "encoding/binary" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/util/pretty" "github.com/offchainlabs/nitro/util/signature" ) @@ -41,38 +36,3 @@ func dasStoreHash(data []byte, extraFields ...uint64) []byte { return dastree.HashBytes(uniquifyingPrefix, buf, data) } - -type StoreSigningDAS struct { - DataAvailabilityServiceWriter - signer signature.DataSignerFunc - addr common.Address -} - -func NewStoreSigningDAS(inner DataAvailabilityServiceWriter, signer signature.DataSignerFunc) (DataAvailabilityServiceWriter, error) { - sig, err := applyDasSigner(signer, []byte{}, 0) - if err != nil { - return nil, err - } - addr, err := DasRecoverSigner([]byte{}, sig, 0) - if err != nil { - return nil, err - } - return &StoreSigningDAS{inner, signer, addr}, nil -} - -func (s *StoreSigningDAS) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { - log.Trace("das.StoreSigningDAS.Store(...)", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", s) - mySig, err := applyDasSigner(s.signer, message, timeout) - if err != nil { - return nil, err - } - return s.DataAvailabilityServiceWriter.Store(ctx, message, timeout, mySig) -} - -func (s *StoreSigningDAS) String() string { - return "StoreSigningDAS (" + s.SignerAddress().Hex() + " ," + s.DataAvailabilityServiceWriter.String() + ")" -} - -func (s *StoreSigningDAS) SignerAddress() common.Address { - return s.addr -} From 96885e0d366c9dee398b11b935919a27c7111bfc Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Fri, 24 May 2024 20:27:02 -0700 Subject: [PATCH 05/11] Add RPC for chunked send of DAS batches This adds support for the batch poster in AnyTrust configuration to send batches to the DA committee in chunks of a configurable maximum HTTP POST body size. It adds new RPC methods to the daserver executable's RPC server, das_startChunkedStore, das_sendChunk, das_commitChunkedStore, which the clients created by the batch poster will automatically use if they are detected to be available on the committee server, otherwise it will fall back to the legacy das_store method. This allows an easy roll out of either the client or server first. The payloads of the new RPC methods are all signed by the batch poster. As basic DoS prevention, at most 10 uncommitted stores can be outstanding, uncommitted stores expire after a minute, and a das_startChunkedStore with the same arguments is not replayable after a minute. The batch poster should only be trying to store one batch at a time, so this should be sufficient. The new option --node.data-availability.rpc-aggregator.max-store-chunk-body-size is expressed in terms of the HTTP POST body size that the operator wants the chunk requests to stay under. 512B of padding is also added to whatever the user operator specifies here, since some proxies or endpoints may additionally count headers. This is orthogonal to settings like --node.batch-poster.max-size which control the maximum uncompressed batch size assembled by the batch poster. This should allow the batch poster to create very large batches which are broken up into small chunks to be sent to the committee servers. Once the client has received confirmation to its das_startChunkedStore request, it sends chunks in parallel using das_sendChunk, then once all chunks are sent uses das_commitChunkedStore to cause the data to be stored in the server and to retrieve the signed response to aggregate into the Data Availability Certificate. Server-side metrics are kept largely the same between chunked and non-chunked stores to minimize dashboard/alerting changes. In the context of chunked transfers, the metrics mean as follows: arb_das_rpc_store_requests - Count of initiated chunked transfers arb_das_rpc_store_success - Successful commits of chunked transfers arb_das_rpc_store_failure - Failure at any stage of the chunked transfer arb_das_rpc_store_bytes - Bytes committed arb_das_rpc_store_duration - Total duration of chunked transfer (ns) Additionally two new metrics have been added to count individual das_sendChunk requests: arb_das_rpc_sendchunk_success arb_das_rpc_sendchunk_failure --- cmd/datool/datool.go | 4 +- das/aggregator.go | 15 +++-- das/das.go | 1 + das/dasRpcClient.go | 108 +++++++++++++++++++++++++++---- das/dasRpcServer.go | 133 +++++++++++++++++++++++++++++--------- das/rpc_aggregator.go | 2 +- das/rpc_test.go | 95 ++++++++++++++++++++++----- das/signature_verifier.go | 13 ++-- system_tests/das_test.go | 7 +- 9 files changed, 302 insertions(+), 76 deletions(-) diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index 22d0d4e753..cdea134bcb 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -91,6 +91,7 @@ type ClientStoreConfig struct { SigningKey string `koanf:"signing-key"` SigningWallet string `koanf:"signing-wallet"` SigningWalletPassword string `koanf:"signing-wallet-password"` + MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` } func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { @@ -102,6 +103,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { f.String("signing-wallet", "", "wallet containing ecdsa key to sign the message with") f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password") f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.") + f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request") k, err := confighelpers.BeginCommonParse(f, args) if err != nil { @@ -150,7 +152,7 @@ func startClientStore(args []string) error { } } - client, err := das.NewDASRPCClient(config.URL, signer) + client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize) if err != nil { return err } diff --git a/das/aggregator.go b/das/aggregator.go index 59908271de..25db73a76e 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -27,20 +27,23 @@ import ( ) type AggregatorConfig struct { - Enable bool `koanf:"enable"` - AssumedHonest int `koanf:"assumed-honest"` - Backends string `koanf:"backends"` + Enable bool `koanf:"enable"` + AssumedHonest int `koanf:"assumed-honest"` + Backends string `koanf:"backends"` + MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` } var DefaultAggregatorConfig = AggregatorConfig{ - AssumedHonest: 0, - Backends: "", + AssumedHonest: 0, + Backends: "", + MaxStoreChunkBodySize: 512 * 1024, } func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage/retrieval of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types") + f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types") f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.") f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration") + f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers") } type Aggregator struct { diff --git a/das/das.go b/das/das.go index b0708e3b33..80c45f6666 100644 --- a/das/das.go +++ b/das/das.go @@ -65,6 +65,7 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{ RequestTimeout: 5 * time.Second, Enable: false, RestAggregator: DefaultRestfulClientAggregatorConfig, + RPCAggregator: DefaultAggregatorConfig, ParentChainConnectionAttempts: 15, PanicOnError: false, IpfsStorage: DefaultIpfsStorageServiceConfig, diff --git a/das/dasRpcClient.go b/das/dasRpcClient.go index 324686f44b..8d8db02ff4 100644 --- a/das/dasRpcClient.go +++ b/das/dasRpcClient.go @@ -6,11 +6,13 @@ package das import ( "context" "fmt" + "strings" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbstate/daprovider" @@ -20,16 +22,19 @@ import ( ) type DASRPCClient struct { // implements DataAvailabilityService - clnt *rpc.Client - url string - signer signature.DataSignerFunc + clnt *rpc.Client + url string + signer signature.DataSignerFunc + chunkSize uint64 } func nilSigner(_ []byte) ([]byte, error) { return []byte{}, nil } -func NewDASRPCClient(target string, signer signature.DataSignerFunc) (*DASRPCClient, error) { +const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}" + +func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) { clnt, err := rpc.Dial(target) if err != nil { return nil, err @@ -37,22 +42,99 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc) (*DASRPCCli if signer == nil { signer = nilSigner } + + // Byte arrays are encoded in base64 + chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2 + if chunkSize <= 0 { + return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize) + } + return &DASRPCClient{ - clnt: clnt, - url: target, - signer: signer, + clnt: clnt, + url: target, + signer: signer, + chunkSize: uint64(chunkSize), }, nil } -func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) { - /* - var ret StartChunkedStoreResult - if err := c.clnt.CallContext(ctx, &ret, "das_startChunkedStore", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil { +func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) { + timestamp := uint64(time.Now().Unix()) + nChunks := uint64(len(message)) / c.chunkSize + lastChunkSize := uint64(len(message)) % c.chunkSize + if lastChunkSize > 0 { + nChunks++ + } else { + lastChunkSize = c.chunkSize + } + totalSize := uint64(len(message)) + + startReqSig, err := applyDasSigner(c.signer, []byte{}, timestamp, nChunks, c.chunkSize, totalSize, timeout) + if err != nil { + return nil, err + } + + var startChunkedStoreResult StartChunkedStoreResult + if err := c.clnt.CallContext(ctx, &startChunkedStoreResult, "das_startChunkedStore", hexutil.Uint64(timestamp), hexutil.Uint64(nChunks), hexutil.Uint64(c.chunkSize), hexutil.Uint64(totalSize), hexutil.Uint64(timeout), hexutil.Bytes(startReqSig)); err != nil { + if strings.Contains(err.Error(), "the method das_startChunkedStore does not exist") { + return c.legacyStore(ctx, message, timeout) + } + return nil, err + } + batchId := uint64(startChunkedStoreResult.BatchId) + + g := new(errgroup.Group) + for i := uint64(0); i < nChunks; i++ { + var chunk []byte + if i == nChunks-1 { + chunk = message[i*c.chunkSize : i*c.chunkSize+lastChunkSize] + } else { + chunk = message[i*c.chunkSize : (i+1)*c.chunkSize] } - */ - return c.legacyStore(ctx, message, timeout) + inner := func(_i uint64, _chunk []byte) func() error { + return func() error { return c.sendChunk(ctx, batchId, _i, _chunk) } + } + g.Go(inner(i, chunk)) + } + if err := g.Wait(); err != nil { + return nil, err + } + finalReqSig, err := applyDasSigner(c.signer, []byte{}, uint64(startChunkedStoreResult.BatchId)) + if err != nil { + return nil, err + } + + var storeResult StoreResult + if err := c.clnt.CallContext(ctx, &storeResult, "das_commitChunkedStore", startChunkedStoreResult.BatchId, hexutil.Bytes(finalReqSig)); err != nil { + return nil, err + } + + respSig, err := blsSignatures.SignatureFromBytes(storeResult.Sig) + if err != nil { + return nil, err + } + + return &daprovider.DataAvailabilityCertificate{ + DataHash: common.BytesToHash(storeResult.DataHash), + Timeout: uint64(storeResult.Timeout), + SignersMask: uint64(storeResult.SignersMask), + Sig: respSig, + KeysetHash: common.BytesToHash(storeResult.KeysetHash), + Version: byte(storeResult.Version), + }, nil +} + +func (c *DASRPCClient) sendChunk(ctx context.Context, batchId, i uint64, chunk []byte) error { + chunkReqSig, err := applyDasSigner(c.signer, chunk, batchId, i) + if err != nil { + return err + } + + if err := c.clnt.CallContext(ctx, nil, "das_sendChunk", hexutil.Uint64(batchId), hexutil.Uint64(i), hexutil.Bytes(chunk), hexutil.Bytes(chunkReqSig)); err != nil { + return err + } + return nil } func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 2c3c0b5b23..1cfcecaf9c 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -10,6 +10,7 @@ import ( "math/rand" "net" "net/http" + "sync" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -30,7 +31,8 @@ var ( rpcStoreStoredBytesGauge = metrics.NewRegisteredGauge("arb/das/rpc/store/bytes", nil) rpcStoreDurationHistogram = metrics.NewRegisteredHistogram("arb/das/rpc/store/duration", nil, metrics.NewBoundedHistogramSample()) - // TODO chunk store metrics + rpcSendChunkSuccessGauge = metrics.NewRegisteredGauge("arb/das/rpc/sendchunk/success", nil) + rpcSendChunkFailureGauge = metrics.NewRegisteredGauge("arb/das/rpc/sendchunk/failure", nil) ) type DASRPCServer struct { @@ -40,7 +42,7 @@ type DASRPCServer struct { signatureVerifier *SignatureVerifier - batches batchBuilder + batches *batchBuilder } func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { @@ -56,11 +58,15 @@ func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpc return nil, errors.New("No writer backend was configured for DAS RPC server. Has the BLS signing key been set up (--data-availability.key.key-dir or --data-availability.key.priv-key options)?") } rpcServer := rpc.NewServer() + if legacyDASStoreAPIOnly { + rpcServer.ApplyAPIFilter(map[string]bool{"das_store": true}) + } err := rpcServer.RegisterName("das", &DASRPCServer{ daReader: daReader, daWriter: daWriter, daHealthChecker: daHealthChecker, signatureVerifier: signatureVerifier, + batches: newBatchBuilder(), }) if err != nil { return nil, err @@ -139,22 +145,39 @@ type SendChunkResult struct { } type batch struct { - chunks [][]byte - expectedChunks, seenChunks uint64 - timeout uint64 + chunks [][]byte + expectedChunks, seenChunks uint64 + expectedChunkSize, expectedSize uint64 + timeout uint64 + startTime time.Time } const ( maxPendingBatches = 10 ) +// exposed globals for test control +var ( + batchBuildingExpiry = 1 * time.Minute + legacyDASStoreAPIOnly = false +) + type batchBuilder struct { - batches map[uint64]batch + mutex sync.Mutex + batches map[uint64]*batch } -func (b *batchBuilder) assign(nChunks, timeout uint64) (uint64, error) { +func newBatchBuilder() *batchBuilder { + return &batchBuilder{ + batches: make(map[uint64]*batch), + } +} + +func (b *batchBuilder) assign(nChunks, timeout, chunkSize, totalSize uint64) (uint64, error) { + b.mutex.Lock() + defer b.mutex.Unlock() if len(b.batches) >= maxPendingBatches { - return 0, fmt.Errorf("can't start new batch, already %d pending", b.batches) + return 0, fmt.Errorf("can't start new batch, already %d pending", len(b.batches)) } id := rand.Uint64() @@ -163,16 +186,31 @@ func (b *batchBuilder) assign(nChunks, timeout uint64) (uint64, error) { return 0, fmt.Errorf("can't start new batch, try again") } - b.batches[id] = batch{ - chunks: make([][]byte, nChunks), - expectedChunks: nChunks, - timeout: timeout, + b.batches[id] = &batch{ + chunks: make([][]byte, nChunks), + expectedChunks: nChunks, + expectedChunkSize: chunkSize, + expectedSize: totalSize, + timeout: timeout, + startTime: time.Now(), } + go func(id uint64) { + <-time.After(batchBuildingExpiry) + b.mutex.Lock() + // Batch will only exist if expiry was reached without it being complete. + if _, exists := b.batches[id]; exists { + rpcStoreFailureGauge.Inc(1) + delete(b.batches, id) + } + b.mutex.Unlock() + }(id) return id, nil } func (b *batchBuilder) add(id, idx uint64, data []byte) error { + b.mutex.Lock() batch, ok := b.batches[id] + b.mutex.Unlock() if !ok { return fmt.Errorf("unknown batch(%d)", id) } @@ -185,34 +223,50 @@ func (b *batchBuilder) add(id, idx uint64, data []byte) error { return fmt.Errorf("batch(%d): chunk(%d) already added", id, idx) } - // todo check chunk size + if batch.expectedChunkSize < uint64(len(data)) { + return fmt.Errorf("batch(%d): chunk(%d) greater than expected size %d, was %d", id, idx, batch.expectedChunkSize, len(data)) + } batch.chunks[idx] = data batch.seenChunks++ return nil } -func (b *batchBuilder) close(id uint64) ([]byte, uint64, error) { +func (b *batchBuilder) close(id uint64) ([]byte, uint64, time.Time, error) { + b.mutex.Lock() batch, ok := b.batches[id] + delete(b.batches, id) + b.mutex.Unlock() if !ok { - return nil, 0, fmt.Errorf("unknown batch(%d)", id) + return nil, 0, time.Time{}, fmt.Errorf("unknown batch(%d)", id) } if batch.expectedChunks != batch.seenChunks { - return nil, 0, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks, batch.expectedChunks) + return nil, 0, time.Time{}, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks, batch.expectedChunks) } - // todo check total size - var flattened []byte for _, chunk := range batch.chunks { flattened = append(flattened, chunk...) } - return flattened, batch.timeout, nil + + if batch.expectedSize != uint64(len(flattened)) { + return nil, 0, time.Time{}, fmt.Errorf("batch(%d) was not expected size %d, was %d", id, batch.expectedSize, len(flattened)) + } + + return flattened, batch.timeout, batch.startTime, nil } func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { - if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(timestamp), uint64(nChunks), uint64(totalSize), uint64(timeout)); err != nil { + rpcStoreRequestGauge.Inc(1) + failed := true + defer func() { + if failed { + rpcStoreFailureGauge.Inc(1) + } // success gague will be incremented on successful commit + }() + + if err := s.signatureVerifier.verify(ctx, []byte{}, sig, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout)); err != nil { return nil, err } @@ -221,29 +275,38 @@ func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks return nil, errors.New("too much time has elapsed since request was signed") } - id, err := s.batches.assign(uint64(nChunks), uint64(timeout)) + id, err := s.batches.assign(uint64(nChunks), uint64(timeout), uint64(chunkSize), uint64(totalSize)) if err != nil { return nil, err } + failed = false return &StartChunkedStoreResult{ BatchId: hexutil.Uint64(id), }, nil } -func (s *DASRPCServer) SendChunk(ctx context.Context, batchId, chunkId hexutil.Uint64, message hexutil.Bytes, sig hexutil.Bytes) (*SendChunkResult, error) { +func (s *DASRPCServer) SendChunk(ctx context.Context, batchId, chunkId hexutil.Uint64, message hexutil.Bytes, sig hexutil.Bytes) error { + success := false + defer func() { + if success { + rpcSendChunkSuccessGauge.Inc(1) + } else { + rpcSendChunkFailureGauge.Inc(1) + } + }() + if err := s.signatureVerifier.verify(ctx, message, sig, uint64(batchId), uint64(chunkId)); err != nil { - return nil, err + return err } if err := s.batches.add(uint64(batchId), uint64(chunkId), message); err != nil { - return nil, err + return err } - return &SendChunkResult{ - Ok: hexutil.Uint64(1), // TODO probably not needed - }, nil + success = true + return nil } func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { @@ -251,15 +314,26 @@ func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.U return nil, err } - message, timeout, err := s.batches.close(uint64(batchId)) + message, timeout, startTime, err := s.batches.close(uint64(batchId)) if err != nil { return nil, err } cert, err := s.daWriter.Store(ctx, message, timeout, nil) + success := false + defer func() { + if success { + rpcStoreSuccessGauge.Inc(1) + } else { + rpcStoreFailureGauge.Inc(1) + } + rpcStoreDurationHistogram.Update(time.Since(startTime).Nanoseconds()) + }() if err != nil { return nil, err } + rpcStoreStoredBytesGauge.Inc(int64(len(message))) + success = true return &StoreResult{ KeysetHash: cert.KeysetHash[:], DataHash: cert.DataHash[:], @@ -268,9 +342,6 @@ func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.U Sig: blsSignatures.SignatureToBytes(cert.Sig), Version: hexutil.Uint64(cert.Version), }, nil - - // TODO tracing, metrics, and timers - } func (serv *DASRPCServer) HealthCheck(ctx context.Context) error { diff --git a/das/rpc_aggregator.go b/das/rpc_aggregator.go index 7345191e0d..7e363c6179 100644 --- a/das/rpc_aggregator.go +++ b/das/rpc_aggregator.go @@ -68,7 +68,7 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([] } metricName := metricsutil.CanonicalizeMetricName(url.Hostname()) - service, err := NewDASRPCClient(b.URL, signer) + service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize) if err != nil { return nil, err } diff --git a/das/rpc_test.go b/das/rpc_test.go index 2a9c9d93ca..aab6632c7d 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -7,13 +7,17 @@ import ( "bytes" "context" "encoding/base64" + "encoding/hex" "encoding/json" "net" + "sync" "testing" "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/cmd/genericconf" + "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/util/testhelpers" ) @@ -24,7 +28,11 @@ func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { return string(encodedPubkey) } -func TestRPC(t *testing.T) { +type sleepOnIterationFn func(i int) + +func testRpcImpl(t *testing.T, size, times int, concurrent bool, sleepOnIteration sleepOnIterationFn) { + // enableLogging() + ctx := context.Background() lis, err := net.Listen("tcp", "localhost:0") testhelpers.RequireImpl(t, err) @@ -53,7 +61,14 @@ func TestRPC(t *testing.T) { defer lifecycleManager.StopAndWaitUntil(time.Second) localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) testhelpers.RequireImpl(t, err) - dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService, &SignatureVerifier{}) + + testPrivateKey, err := crypto.GenerateKey() + testhelpers.RequireImpl(t, err) + signatureVerifier, err := NewSignatureVerifierWithSeqInboxCaller(nil, "0x"+hex.EncodeToString(crypto.FromECDSAPub(&testPrivateKey.PublicKey))) + testhelpers.RequireImpl(t, err) + signer := signature.DataSignerFromPrivateKey(testPrivateKey) + + dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService, signatureVerifier) defer func() { if err := dasServer.Shutdown(ctx); err != nil { panic(err) @@ -70,29 +85,77 @@ func TestRPC(t *testing.T) { testhelpers.RequireImpl(t, err) aggConf := DataAvailabilityConfig{ RPCAggregator: AggregatorConfig{ - AssumedHonest: 1, - Backends: string(backendsJsonByte), + AssumedHonest: 1, + Backends: string(backendsJsonByte), + MaxStoreChunkBodySize: (chunkSize * 2) + len(sendChunkJSONBoilerplate), }, RequestTimeout: 5 * time.Second, } - rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil, nil) + rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil, signer) testhelpers.RequireImpl(t, err) - msg := testhelpers.RandomizeSlice(make([]byte, 100)) - cert, err := rpcAgg.Store(ctx, msg, 0, nil) - testhelpers.RequireImpl(t, err) + var wg sync.WaitGroup + runStore := func() { + defer wg.Done() + msg := testhelpers.RandomizeSlice(make([]byte, size)) + cert, err := rpcAgg.Store(ctx, msg, 0, nil) + testhelpers.RequireImpl(t, err) - retrievedMessage, err := storageService.GetByHash(ctx, cert.DataHash) - testhelpers.RequireImpl(t, err) + retrievedMessage, err := storageService.GetByHash(ctx, cert.DataHash) + testhelpers.RequireImpl(t, err) + + if !bytes.Equal(msg, retrievedMessage) { + testhelpers.FailImpl(t, "failed to retrieve correct message") + } - if !bytes.Equal(msg, retrievedMessage) { - testhelpers.FailImpl(t, "failed to retrieve correct message") + retrievedMessage, err = storageService.GetByHash(ctx, cert.DataHash) + testhelpers.RequireImpl(t, err) + + if !bytes.Equal(msg, retrievedMessage) { + testhelpers.FailImpl(t, "failed to getByHash correct message") + } } - retrievedMessage, err = storageService.GetByHash(ctx, cert.DataHash) - testhelpers.RequireImpl(t, err) + for i := 0; i < times; i++ { + wg.Add(1) + if concurrent { + go runStore() + } else { + runStore() + } + sleepOnIteration(i) + } + + wg.Wait() +} + +const chunkSize = 512 * 1024 + +func TestRPCStore(t *testing.T) { + dontSleep := func(_ int) {} + batchBuildingExpiry = time.Second * 5 - if !bytes.Equal(msg, retrievedMessage) { - testhelpers.FailImpl(t, "failed to getByHash correct message") + for _, tc := range []struct { + desc string + totalSize, times int + concurrent bool + sleepOnIteration sleepOnIterationFn + leagcyAPIOnly bool + }{ + {desc: "small store", totalSize: 100, times: 1, concurrent: false, sleepOnIteration: dontSleep}, + {desc: "chunked store - last chunk full", totalSize: chunkSize * 20, times: 10, concurrent: true, sleepOnIteration: dontSleep}, + {desc: "chunked store - last chunk not full", totalSize: chunkSize*31 + 123, times: 10, concurrent: true, sleepOnIteration: dontSleep}, + {desc: "chunked store - overflow cache - sequential", totalSize: chunkSize * 3, times: 15, concurrent: false, sleepOnIteration: dontSleep}, + {desc: "chunked store - wait for cache clear", totalSize: chunkSize * 3, times: 15, concurrent: true, sleepOnIteration: func(i int) { + if i == 9 { + time.Sleep(time.Second * 6) + } + }}, + {desc: "new client falls back to old api for old server", totalSize: (5*1024*1024)/2 - len(sendChunkJSONBoilerplate) - 100 /* geth counts headers too */, times: 5, concurrent: true, sleepOnIteration: dontSleep, leagcyAPIOnly: true}, + } { + t.Run(tc.desc, func(t *testing.T) { + legacyDASStoreAPIOnly = tc.leagcyAPIOnly + testRpcImpl(t, tc.totalSize, tc.times, tc.concurrent, tc.sleepOnIteration) + }) } } diff --git a/das/signature_verifier.go b/das/signature_verifier.go index df7eeb074e..0aa42bceb6 100644 --- a/das/signature_verifier.go +++ b/das/signature_verifier.go @@ -94,6 +94,10 @@ func NewSignatureVerifierWithSeqInboxCaller( func (v *SignatureVerifier) verify( ctx context.Context, message []byte, sig []byte, extraFields ...uint64) error { + if v.extraBpVerifier == nil && v.addrVerifier == nil { + return errors.New("no signature verification method configured") + } + var verified bool if v.extraBpVerifier != nil { verified = v.extraBpVerifier(message, sig, extraFields...) @@ -104,15 +108,14 @@ func (v *SignatureVerifier) verify( if err != nil { return err } - isBatchPosterOrSequencer, err := v.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) + verified, err = v.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) if err != nil { return err } - if !isBatchPosterOrSequencer { - return errors.New("store request not properly signed") - } } - + if !verified { + return errors.New("request not properly signed") + } return nil } diff --git a/system_tests/das_test.go b/system_tests/das_test.go index f4019ddcb8..521a65c1e9 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -99,9 +99,10 @@ func aggConfigForBackend(t *testing.T, backendConfig das.BackendConfig) das.Aggr backendsJsonByte, err := json.Marshal([]das.BackendConfig{backendConfig}) Require(t, err) return das.AggregatorConfig{ - Enable: true, - AssumedHonest: 1, - Backends: string(backendsJsonByte), + Enable: true, + AssumedHonest: 1, + Backends: string(backendsJsonByte), + MaxStoreChunkBodySize: 512 * 1024, } } From 51b703fc571af56562f6449472e4c64dd4387312 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 28 May 2024 16:27:43 -0700 Subject: [PATCH 06/11] Fix race condition on seenChunks --- das/dasRpcServer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 4cb950b3c2..c1d66b3b1f 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -150,7 +151,8 @@ type SendChunkResult struct { type batch struct { chunks [][]byte - expectedChunks, seenChunks uint64 + expectedChunks uint64 + seenChunks atomic.Int64 expectedChunkSize, expectedSize uint64 timeout uint64 startTime time.Time @@ -232,7 +234,7 @@ func (b *batchBuilder) add(id, idx uint64, data []byte) error { } batch.chunks[idx] = data - batch.seenChunks++ + batch.seenChunks.Add(1) return nil } @@ -245,8 +247,8 @@ func (b *batchBuilder) close(id uint64) ([]byte, uint64, time.Time, error) { return nil, 0, time.Time{}, fmt.Errorf("unknown batch(%d)", id) } - if batch.expectedChunks != batch.seenChunks { - return nil, 0, time.Time{}, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks, batch.expectedChunks) + if batch.expectedChunks != uint64(batch.seenChunks.Load()) { + return nil, 0, time.Time{}, fmt.Errorf("incomplete batch(%d): got %d/%d chunks", id, batch.seenChunks.Load(), batch.expectedChunks) } var flattened []byte From 45f9ad7ee32b8e38879f43a883e6f33d1578297a Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 29 May 2024 11:55:30 -0700 Subject: [PATCH 07/11] Fix timeout caused by -race test mode --- das/rpc_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/das/rpc_test.go b/das/rpc_test.go index ce314fa778..9995167a01 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -91,7 +91,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool, sleepOnIteratio Backends: string(backendsJsonByte), MaxStoreChunkBodySize: (chunkSize * 2) + len(sendChunkJSONBoilerplate), }, - RequestTimeout: 5 * time.Second, + RequestTimeout: time.Minute, } rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil, signer) testhelpers.RequireImpl(t, err) @@ -135,7 +135,6 @@ const chunkSize = 512 * 1024 func TestRPCStore(t *testing.T) { dontSleep := func(_ int) {} - batchBuildingExpiry = time.Second * 5 for _, tc := range []struct { desc string From a23b629cdfd0311bfe8626ba12bb8acd14852299 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 29 May 2024 11:59:19 -0700 Subject: [PATCH 08/11] Remove cache expiry test and hooks, didn't work --- das/dasRpcServer.go | 6 +++--- das/rpc_test.go | 23 +++++++---------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index c1d66b3b1f..1e5c95089f 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -159,12 +159,12 @@ type batch struct { } const ( - maxPendingBatches = 10 + maxPendingBatches = 10 + batchBuildingExpiry = 1 * time.Minute ) -// exposed globals for test control +// exposed global for test control var ( - batchBuildingExpiry = 1 * time.Minute legacyDASStoreAPIOnly = false ) diff --git a/das/rpc_test.go b/das/rpc_test.go index 9995167a01..047e72f110 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -30,7 +30,7 @@ func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { type sleepOnIterationFn func(i int) -func testRpcImpl(t *testing.T, size, times int, concurrent bool, sleepOnIteration sleepOnIterationFn) { +func testRpcImpl(t *testing.T, size, times int, concurrent bool) { // enableLogging() ctx := context.Background() @@ -125,7 +125,6 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool, sleepOnIteratio } else { runStore() } - sleepOnIteration(i) } wg.Wait() @@ -134,29 +133,21 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool, sleepOnIteratio const chunkSize = 512 * 1024 func TestRPCStore(t *testing.T) { - dontSleep := func(_ int) {} - for _, tc := range []struct { desc string totalSize, times int concurrent bool - sleepOnIteration sleepOnIterationFn leagcyAPIOnly bool }{ - {desc: "small store", totalSize: 100, times: 1, concurrent: false, sleepOnIteration: dontSleep}, - {desc: "chunked store - last chunk full", totalSize: chunkSize * 20, times: 10, concurrent: true, sleepOnIteration: dontSleep}, - {desc: "chunked store - last chunk not full", totalSize: chunkSize*31 + 123, times: 10, concurrent: true, sleepOnIteration: dontSleep}, - {desc: "chunked store - overflow cache - sequential", totalSize: chunkSize * 3, times: 15, concurrent: false, sleepOnIteration: dontSleep}, - {desc: "chunked store - wait for cache clear", totalSize: chunkSize * 3, times: 15, concurrent: true, sleepOnIteration: func(i int) { - if i == 9 { - time.Sleep(time.Second * 6) - } - }}, - {desc: "new client falls back to old api for old server", totalSize: (5*1024*1024)/2 - len(sendChunkJSONBoilerplate) - 100 /* geth counts headers too */, times: 5, concurrent: true, sleepOnIteration: dontSleep, leagcyAPIOnly: true}, + {desc: "small store", totalSize: 100, times: 1, concurrent: false}, + {desc: "chunked store - last chunk full", totalSize: chunkSize * 20, times: 10, concurrent: true}, + {desc: "chunked store - last chunk not full", totalSize: chunkSize*31 + 123, times: 10, concurrent: true}, + {desc: "chunked store - overflow cache - sequential", totalSize: chunkSize * 3, times: 15, concurrent: false}, + {desc: "new client falls back to old api for old server", totalSize: (5*1024*1024)/2 - len(sendChunkJSONBoilerplate) - 100 /* geth counts headers too */, times: 5, concurrent: true, leagcyAPIOnly: true}, } { t.Run(tc.desc, func(t *testing.T) { legacyDASStoreAPIOnly = tc.leagcyAPIOnly - testRpcImpl(t, tc.totalSize, tc.times, tc.concurrent, tc.sleepOnIteration) + testRpcImpl(t, tc.totalSize, tc.times, tc.concurrent) }) } } From aeedf8344131bedee00421ac864d2331123dbc40 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 29 May 2024 13:08:24 -0700 Subject: [PATCH 09/11] Lint fix --- das/rpc_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/das/rpc_test.go b/das/rpc_test.go index 047e72f110..2839edee69 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -28,8 +28,6 @@ func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { return string(encodedPubkey) } -type sleepOnIterationFn func(i int) - func testRpcImpl(t *testing.T, size, times int, concurrent bool) { // enableLogging() From 6aeaf9efb489962f94aadf3cc5163e42026a2694 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Thu, 30 May 2024 16:53:44 -0700 Subject: [PATCH 10/11] Remove IPFS das factory logic and stubs --- das/das.go | 3 - das/factory.go | 58 ++----- das/ipfs_storage_service.bkup_go | 256 ------------------------------- das/ipfs_storage_service_stub.go | 68 -------- das/ipfs_storage_service_test.go | 60 -------- 5 files changed, 9 insertions(+), 436 deletions(-) delete mode 100644 das/ipfs_storage_service.bkup_go delete mode 100644 das/ipfs_storage_service_stub.go delete mode 100644 das/ipfs_storage_service_test.go diff --git a/das/das.go b/das/das.go index 80c45f6666..4f889ff602 100644 --- a/das/das.go +++ b/das/das.go @@ -44,7 +44,6 @@ type DataAvailabilityConfig struct { LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` S3Storage S3StorageServiceConfig `koanf:"s3-storage"` - IpfsStorage IpfsStorageServiceConfig `koanf:"ipfs-storage"` RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"` Key KeyConfig `koanf:"key"` @@ -68,7 +67,6 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{ RPCAggregator: DefaultAggregatorConfig, ParentChainConnectionAttempts: 15, PanicOnError: false, - IpfsStorage: DefaultIpfsStorageServiceConfig, } func OptionalAddressFromString(s string) (*common.Address, error) { @@ -129,7 +127,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { } // Both the Nitro node and daserver can use these options. - IpfsStorageServiceConfigAddOptions(prefix+".ipfs-storage", f) RestfulClientAggregatorConfigAddOptions(prefix+".rest-aggregator", f) f.String(prefix+".parent-chain-node-url", DefaultDataAvailabilityConfig.ParentChainNodeURL, "URL for parent chain node, only used in standalone daserver; when running as part of a node that node's L1 configuration is used") diff --git a/das/factory.go b/das/factory.go index 018129e673..cedf814cce 100644 --- a/das/factory.go +++ b/das/factory.go @@ -78,15 +78,6 @@ func CreatePersistentStorageService( storageServices = append(storageServices, s) } - if config.IpfsStorage.Enable { - s, err := NewIpfsStorageService(ctx, config.IpfsStorage) - if err != nil { - return nil, nil, err - } - lifecycleManager.Register(s) - storageServices = append(storageServices, s) - } - if len(storageServices) > 1 { s, err := NewRedundantStorageService(ctx, storageServices) if err != nil { @@ -151,10 +142,6 @@ func CreateBatchPosterDAS( if !config.RPCAggregator.Enable || !config.RestAggregator.Enable { return nil, nil, nil, errors.New("--node.data-availability.rpc-aggregator.enable and rest-aggregator.enable must be set when running a Batch Poster in AnyTrust mode") } - - if config.IpfsStorage.Enable { - return nil, nil, nil, errors.New("--node.data-availability.ipfs-storage.enable may not be set when running a Nitro AnyTrust node in Batch Poster mode") - } // Done checking config requirements var daWriter DataAvailabilityServiceWriter @@ -192,9 +179,8 @@ func CreateDAComponentsForDaserver( // Check config requirements if !config.LocalDBStorage.Enable && !config.LocalFileStorage.Enable && - !config.S3Storage.Enable && - !config.IpfsStorage.Enable { - return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage|ipfs-storage) must be enabled.") + !config.S3Storage.Enable { + return nil, nil, nil, nil, nil, errors.New("At least one of --data-availability.(local-db-storage|local-file-storage|s3-storage) must be enabled.") } // Done checking config requirements @@ -315,48 +301,22 @@ func CreateDAReaderForNode( return nil, nil, errors.New("node.data-availability.rpc-aggregator is only for Batch Poster mode") } - if !config.RestAggregator.Enable && !config.IpfsStorage.Enable { - return nil, nil, fmt.Errorf("--node.data-availability.enable was set but neither of --node.data-availability.(rest-aggregator|ipfs-storage) were enabled. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") - } - - if config.RestAggregator.SyncToStorage.Eager { - return nil, nil, errors.New("--node.data-availability.rest-aggregator.sync-to-storage.eager can't be used with a Nitro node, only lazy syncing can be used.") + if !config.RestAggregator.Enable { + return nil, nil, fmt.Errorf("--node.data-availability.enable was set but not --node.data-availability.rest-aggregator. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") } // Done checking config requirements - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, nil, nil) - if err != nil { - return nil, nil, err - } - + var lifecycleManager LifecycleManager var daReader DataAvailabilityServiceReader if config.RestAggregator.Enable { var restAgg *SimpleDASReaderAggregator - restAgg, err = NewRestfulClientAggregator(ctx, &config.RestAggregator) + restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { return nil, nil, err } restAgg.Start(ctx) - dasLifecycleManager.Register(restAgg) - - if storageService != nil { - syncConf := &config.RestAggregator.SyncToStorage - var retentionPeriodSeconds uint64 - if uint64(syncConf.RetentionPeriod) == math.MaxUint64 { - retentionPeriodSeconds = math.MaxUint64 - } else { - retentionPeriodSeconds = uint64(syncConf.RetentionPeriod.Seconds()) - } - - // This falls back to REST and updates the local IPFS repo if the data is found. - storageService = NewFallbackStorageService(storageService, restAgg, restAgg, - retentionPeriodSeconds, syncConf.IgnoreWriteErrors, true) - dasLifecycleManager.Register(storageService) - - daReader = storageService - } else { - daReader = restAgg - } + lifecycleManager.Register(restAgg) + daReader = restAgg } if seqInboxAddress != nil { @@ -370,5 +330,5 @@ func CreateDAReaderForNode( } } - return daReader, dasLifecycleManager, nil + return daReader, &lifecycleManager, nil } diff --git a/das/ipfs_storage_service.bkup_go b/das/ipfs_storage_service.bkup_go deleted file mode 100644 index 43b06fd4b6..0000000000 --- a/das/ipfs_storage_service.bkup_go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend. -// It takes advantage of IPFS' content addressing scheme to be able to directly retrieve -// the batches from IPFS using their root hash from the L1 sequencer inbox contract. - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "errors" - "io" - "math/rand" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ipfs/go-cid" - coreiface "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/ipfs/interface-go-ipfs-core/path" - "github.com/multiformats/go-multihash" - "github.com/offchainlabs/nitro/arbstate/daprovider" - "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/cmd/ipfshelper" - "github.com/offchainlabs/nitro/das/dastree" - "github.com/offchainlabs/nitro/util/pretty" - flag "github.com/spf13/pflag" -) - -type IpfsStorageServiceConfig struct { - Enable bool `koanf:"enable"` - RepoDir string `koanf:"repo-dir"` - ReadTimeout time.Duration `koanf:"read-timeout"` - Profiles string `koanf:"profiles"` - Peers []string `koanf:"peers"` - - // Pinning options - PinAfterGet bool `koanf:"pin-after-get"` - PinPercentage float64 `koanf:"pin-percentage"` -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, - RepoDir: "", - ReadTimeout: time.Minute, - Profiles: "", - Peers: []string{}, - - PinAfterGet: true, - PinPercentage: 100.0, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from IPFS") - f.String(prefix+".repo-dir", DefaultIpfsStorageServiceConfig.RepoDir, "directory to use to store the local IPFS repo") - f.Duration(prefix+".read-timeout", DefaultIpfsStorageServiceConfig.ReadTimeout, "timeout for IPFS reads, since by default it will wait forever. Treat timeout as not found") - f.String(prefix+".profiles", DefaultIpfsStorageServiceConfig.Profiles, "comma separated list of IPFS profiles to use, see https://docs.ipfs.tech/how-to/default-profile") - f.StringSlice(prefix+".peers", DefaultIpfsStorageServiceConfig.Peers, "list of IPFS peers to connect to, eg /ip4/1.2.3.4/tcp/12345/p2p/abc...xyz") - f.Bool(prefix+".pin-after-get", DefaultIpfsStorageServiceConfig.PinAfterGet, "pin sequencer batch data in IPFS") - f.Float64(prefix+".pin-percentage", DefaultIpfsStorageServiceConfig.PinPercentage, "percent of sequencer batch data to pin, as a floating point number in the range 0.0 to 100.0") -} - -type IpfsStorageService struct { - config IpfsStorageServiceConfig - ipfsHelper *ipfshelper.IpfsHelper - ipfsApi coreiface.CoreAPI -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - ipfsHelper, err := ipfshelper.CreateIpfsHelper(ctx, config.RepoDir, false, config.Peers, config.Profiles) - if err != nil { - return nil, err - } - addrs, err := ipfsHelper.GetPeerHostAddresses() - if err != nil { - return nil, err - } - log.Info("IPFS node started up", "hostAddresses", addrs) - - return &IpfsStorageService{ - config: config, - ipfsHelper: ipfsHelper, - ipfsApi: ipfsHelper.GetAPI(), - }, nil -} - -func hashToCid(hash common.Hash) (cid.Cid, error) { - multiEncodedHashBytes, err := multihash.Encode(hash[:], multihash.KECCAK_256) - if err != nil { - return cid.Cid{}, err - } - - _, multiHash, err := multihash.MHFromBytes(multiEncodedHashBytes) - if err != nil { - return cid.Cid{}, err - } - - return cid.NewCidV1(cid.Raw, multiHash), nil -} - -// GetByHash retrieves and reconstructs one batch's data, using IPFS to retrieve the preimages -// for each chunk of data and the dastree nodes. -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - log.Trace("das.IpfsStorageService.GetByHash", "hash", pretty.PrettyHash(hash)) - - doPin := false // If true, pin every block related to this batch - if s.config.PinAfterGet { - if s.config.PinPercentage == 100.0 { - doPin = true - } else if (rand.Float64() * 100.0) <= s.config.PinPercentage { - doPin = true - } - - } - - oracle := func(h common.Hash) ([]byte, error) { - thisCid, err := hashToCid(h) - if err != nil { - return nil, err - } - - ipfsPath := path.IpfsPath(thisCid) - log.Trace("Retrieving IPFS path", "path", ipfsPath.String()) - - parentCtx := ctx - if doPin { - // If we want to pin this batch, then detach from the parent context so - // we are not canceled before s.config.ReadTimeout. - parentCtx = context.Background() - } - - timeoutCtx, cancel := context.WithTimeout(parentCtx, s.config.ReadTimeout) - defer cancel() - rdr, err := s.ipfsApi.Block().Get(timeoutCtx, ipfsPath) - if err != nil { - if timeoutCtx.Err() != nil { - return nil, ErrNotFound - } - return nil, err - } - - data, err := io.ReadAll(rdr) - if err != nil { - return nil, err - } - - if doPin { - go func() { - pinCtx, pinCancel := context.WithTimeout(context.Background(), s.config.ReadTimeout) - defer pinCancel() - err := s.ipfsApi.Pin().Add(pinCtx, ipfsPath) - // Recursive pinning not needed, each dastree preimage fits in a single - // IPFS block. - if err != nil { - // Pinning is best-effort. - log.Warn("Failed to pin in IPFS", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } else { - log.Trace("Pin in IPFS successful", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String()) - } - }() - } - - return data, nil - } - - return dastree.Content(hash, oracle) -} - -// Put stores all the preimages required to reconstruct the dastree for single batch, -// ie the hashed data chunks and dastree nodes. -// This takes advantage of IPFS supporting keccak256 on raw data blocks for calculating -// its CIDs, and the fact that the dastree structure uses keccak256 for addressing its -// nodes, to directly store the dastree structure in IPFS. -// IPFS default block size is 256KB and dastree max block size is 64KB so each dastree -// node and data chunk easily fits within an IPFS block. -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.IpfsStorageService.Put", data, timeout, s) - - var chunks [][]byte - - record := func(_ common.Hash, value []byte, ty arbutil.PreimageType) { - chunks = append(chunks, value) - } - - _ = dastree.RecordHash(record, data) - - numChunks := len(chunks) - resultChan := make(chan error, numChunks) - for _, chunk := range chunks { - _chunk := chunk - go func() { - blockStat, err := s.ipfsApi.Block().Put( - ctx, - bytes.NewReader(_chunk), - options.Block.CidCodec("raw"), // Store the data in raw form since the hash in the CID must be the hash - // of the preimage for our lookup scheme to work. - options.Block.Hash(multihash.KECCAK_256, -1), // Use keccak256 to calculate the hash to put in the block's - // CID, since it is the same algo used by dastree. - options.Block.Pin(true)) // Keep the data in the local IPFS repo, don't GC it. - if err == nil { - log.Trace("Wrote IPFS path", "path", blockStat.Path().String()) - } - resultChan <- err - }() - } - - successfullyWrittenChunks := 0 - for err := range resultChan { - if err != nil { - return err - } - successfullyWrittenChunks++ - if successfullyWrittenChunks == numChunks { - return nil - } - } - panic("unreachable") -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, nil -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return nil -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return s.ipfsHelper.Close() -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - testData := []byte("Test-Data") - err := s.Put(ctx, testData, 0) - if err != nil { - return err - } - res, err := s.GetByHash(ctx, dastree.Hash(testData)) - if err != nil { - return err - } - if !bytes.Equal(res, testData) { - return errors.New("invalid GetByHash result") - } - return nil -} diff --git a/das/ipfs_storage_service_stub.go b/das/ipfs_storage_service_stub.go deleted file mode 100644 index 5814f2c7e4..0000000000 --- a/das/ipfs_storage_service_stub.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -// IPFS DAS backend stub -// a stub. we don't currently support ipfs - -//go:build !ipfs -// +build !ipfs - -package das - -import ( - "context" - "errors" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/arbstate/daprovider" - flag "github.com/spf13/pflag" -) - -var ErrIpfsNotSupported = errors.New("ipfs not supported") - -type IpfsStorageServiceConfig struct { - Enable bool -} - -var DefaultIpfsStorageServiceConfig = IpfsStorageServiceConfig{ - Enable: false, -} - -func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultIpfsStorageServiceConfig.Enable, "legacy option - not supported") -} - -type IpfsStorageService struct { -} - -func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - return nil, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Sync(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) Close(ctx context.Context) error { - return ErrIpfsNotSupported -} - -func (s *IpfsStorageService) String() string { - return "IpfsStorageService-not supported" -} - -func (s *IpfsStorageService) HealthCheck(ctx context.Context) error { - return ErrIpfsNotSupported -} diff --git a/das/ipfs_storage_service_test.go b/das/ipfs_storage_service_test.go deleted file mode 100644 index 6e1a86b234..0000000000 --- a/das/ipfs_storage_service_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -//go:build ipfs -// +build ipfs - -package das - -import ( - "bytes" - "context" - "math" - "math/rand" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/das/dastree" -) - -func runAddAndGetTest(t *testing.T, ctx context.Context, svc *IpfsStorageService, size int) { - - data := make([]byte, size) - _, err := rand.Read(data) - Require(t, err) - - err = svc.Put(ctx, data, 0) - Require(t, err) - - hash := dastree.Hash(data).Bytes() - returnedData, err := svc.GetByHash(ctx, common.BytesToHash(hash)) - Require(t, err) - if !bytes.Equal(data, returnedData) { - Fail(t, "Returned data didn't match!") - } - -} - -func TestIpfsStorageServiceAddAndGet(t *testing.T) { - enableLogging() - ctx := context.Background() - svc, err := NewIpfsStorageService(ctx, - IpfsStorageServiceConfig{ - Enable: true, - RepoDir: t.TempDir(), - ReadTimeout: time.Minute, - Profiles: "test", - }) - defer svc.Close(ctx) - Require(t, err) - - pow2Size := 1 << 16 // 64kB - for i := 1; i < 8; i++ { - runAddAndGetTest(t, ctx, svc, int(math.Pow10(i))) - runAddAndGetTest(t, ctx, svc, pow2Size) - runAddAndGetTest(t, ctx, svc, pow2Size-1) - runAddAndGetTest(t, ctx, svc, pow2Size+1) - pow2Size = pow2Size << 1 - } -} From fa361c00d47e78b2342f2f72333390a933b044d6 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Thu, 30 May 2024 18:01:25 -0700 Subject: [PATCH 11/11] Remove IterableStorageService from DAS This feature was unused and unmaintained, better to remove it for now since it complicates further das development. --- das/das.go | 8 +- das/das_test.go | 12 +-- das/db_storage_service.go | 25 ++--- das/factory.go | 47 +-------- das/iterable_storage_service.go | 147 --------------------------- das/local_file_storage_service.go | 34 +------ das/memory_backed_storage_service.go | 10 -- das/redis_storage_service.go | 23 +---- das/regular_sync_storage_test.go | 79 -------------- das/regularly_sync_storage.go | 95 ----------------- das/rpc_test.go | 4 +- das/s3_storage_service.go | 30 ++---- system_tests/das_test.go | 4 +- 13 files changed, 29 insertions(+), 489 deletions(-) delete mode 100644 das/iterable_storage_service.go delete mode 100644 das/regular_sync_storage_test.go delete mode 100644 das/regularly_sync_storage.go diff --git a/das/das.go b/das/das.go index 4f889ff602..fea1e6c6a2 100644 --- a/das/das.go +++ b/das/das.go @@ -41,10 +41,9 @@ type DataAvailabilityConfig struct { LocalCache CacheConfig `koanf:"local-cache"` RedisCache RedisConfig `koanf:"redis-cache"` - LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` - LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` - S3Storage S3StorageServiceConfig `koanf:"s3-storage"` - RegularSyncStorage RegularSyncStorageConfig `koanf:"regular-sync-storage"` + LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"` + LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` + S3Storage S3StorageServiceConfig `koanf:"s3-storage"` Key KeyConfig `koanf:"key"` @@ -113,7 +112,6 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f) LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f) S3ConfigAddOptions(prefix+".s3-storage", f) - RegularSyncStorageConfigAddOptions(prefix+".regular-sync-storage", f) // Key config for storage KeyConfigAddOptions(prefix+".key", f) diff --git a/das/das_test.go b/das/das_test.go index 4377dc4dce..950b63d9d9 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -47,9 +47,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServicesFirst []*IterableStorageService - var syncToStorageServicesFirst []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config, &syncFromStorageServicesFirst, &syncToStorageServicesFirst) + storageService, lifecycleManager, err := CreatePersistentStorageService(firstCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(firstCtx, config, storageService) @@ -77,9 +75,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { secondCtx, secondCancel := context.WithCancel(context.Background()) defer secondCancel() - var syncFromStorageServicesSecond []*IterableStorageService - var syncToStorageServicesSecond []StorageService - storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config, &syncFromStorageServicesSecond, &syncToStorageServicesSecond) + storageService2, lifecycleManager, err := CreatePersistentStorageService(secondCtx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) var daReader2 DataAvailabilityServiceReader = storageService2 @@ -140,9 +136,7 @@ func testDASMissingMessage(t *testing.T, storageType string) { ParentChainNodeURL: "none", } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) daWriter, err := NewSignAfterStoreDASWriter(ctx, config, storageService) diff --git a/das/db_storage_service.go b/das/db_storage_service.go index 5596ff378e..0fbe1c2723 100644 --- a/das/db_storage_service.go +++ b/das/db_storage_service.go @@ -20,11 +20,9 @@ import ( ) type LocalDBStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` // BadgerDB options NumMemtables int `koanf:"num-memtables"` @@ -38,11 +36,9 @@ type LocalDBStorageConfig struct { var badgerDefaultOptions = badger.DefaultOptions("") var DefaultLocalDBStorageConfig = LocalDBStorageConfig{ - Enable: false, - DataDir: "", - DiscardAfterTimeout: false, - SyncFromStorageService: false, - SyncToStorageService: false, + Enable: false, + DataDir: "", + DiscardAfterTimeout: false, NumMemtables: badgerDefaultOptions.NumMemtables, NumLevelZeroTables: badgerDefaultOptions.NumLevelZeroTables, @@ -56,8 +52,6 @@ func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem") f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database") f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalDBStorageConfig.SyncFromStorageService, "enable db storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalDBStorageConfig.SyncToStorageService, "enable db storage to be used as a sink for regular sync storage") f.Int(prefix+".num-memtables", DefaultLocalDBStorageConfig.NumMemtables, "BadgerDB option: sets the maximum number of tables to keep in memory before stalling") f.Int(prefix+".num-level-zero-tables", DefaultLocalDBStorageConfig.NumLevelZeroTables, "BadgerDB option: sets the maximum number of Level 0 tables before compaction starts") @@ -158,13 +152,6 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6 }) } -func (dbs *DBStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return dbs.db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry(key.Bytes(), value) - return txn.SetEntry(e) - }) -} - func (dbs *DBStorageService) Sync(ctx context.Context) error { return dbs.db.Sync() } diff --git a/das/factory.go b/das/factory.go index cedf814cce..d9eacd0ada 100644 --- a/das/factory.go +++ b/das/factory.go @@ -22,8 +22,6 @@ import ( func CreatePersistentStorageService( ctx context.Context, config *DataAvailabilityConfig, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, ) (StorageService, *LifecycleManager, error) { storageServices := make([]StorageService, 0, 10) var lifecycleManager LifecycleManager @@ -32,14 +30,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalDBStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalDBStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -49,14 +39,6 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - if config.LocalFileStorage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.LocalFileStorage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } lifecycleManager.Register(s) storageServices = append(storageServices, s) } @@ -67,14 +49,6 @@ func CreatePersistentStorageService( return nil, nil, err } lifecycleManager.Register(s) - if config.S3Storage.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(s)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - s = iterableStorageService - } - if config.S3Storage.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, s) - } storageServices = append(storageServices, s) } @@ -96,8 +70,6 @@ func WrapStorageWithCache( ctx context.Context, config *DataAvailabilityConfig, storageService StorageService, - syncFromStorageServices *[]*IterableStorageService, - syncToStorageServices *[]StorageService, lifecycleManager *LifecycleManager) (StorageService, error) { if storageService == nil { return nil, nil @@ -111,14 +83,6 @@ func WrapStorageWithCache( if err != nil { return nil, err } - if config.RedisCache.SyncFromStorageService { - iterableStorageService := NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(storageService)) - *syncFromStorageServices = append(*syncFromStorageServices, iterableStorageService) - storageService = iterableStorageService - } - if config.RedisCache.SyncToStorageService { - *syncToStorageServices = append(*syncToStorageServices, storageService) - } } if config.LocalCache.Enable { storageService = NewCacheStorageService(config.LocalCache, storageService) @@ -184,14 +148,12 @@ func CreateDAComponentsForDaserver( } // Done checking config requirements - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config, &syncFromStorageServices, &syncToStorageServices) + storageService, dasLifecycleManager, err := CreatePersistentStorageService(ctx, config) if err != nil { return nil, nil, nil, nil, nil, err } - storageService, err = WrapStorageWithCache(ctx, config, storageService, &syncFromStorageServices, &syncToStorageServices, dasLifecycleManager) + storageService, err = WrapStorageWithCache(ctx, config, storageService, dasLifecycleManager) if err != nil { return nil, nil, nil, nil, nil, err } @@ -271,11 +233,6 @@ func CreateDAComponentsForDaserver( } } - if config.RegularSyncStorage.Enable && len(syncFromStorageServices) != 0 && len(syncToStorageServices) != 0 { - regularlySyncStorage := NewRegularlySyncStorage(syncFromStorageServices, syncToStorageServices, config.RegularSyncStorage) - regularlySyncStorage.Start(ctx) - } - if seqInboxAddress != nil { daReader, err = NewChainFetchReader(daReader, (*l1Reader).Client(), *seqInboxAddress) if err != nil { diff --git a/das/iterable_storage_service.go b/das/iterable_storage_service.go deleted file mode 100644 index a0829f00e4..0000000000 --- a/das/iterable_storage_service.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -const iteratorStorageKeyPrefix = "iterator_key_prefix_" -const iteratorBegin = "iterator_begin" -const iteratorEnd = "iterator_end" -const expirationTimeKeyPrefix = "expiration_time_key_prefix_" - -// IterationCompatibleStorageService is a StorageService which is -// compatible to be used as a backend for IterableStorageService. -type IterationCompatibleStorageService interface { - putKeyValue(ctx context.Context, key common.Hash, value []byte) error - StorageService -} - -// IterationCompatibleStorageServiceAdaptor is an adaptor used to covert iteration incompatible StorageService -// to IterationCompatibleStorageService (basically adds an empty putKeyValue to the StorageService) -type IterationCompatibleStorageServiceAdaptor struct { - StorageService -} - -func (i *IterationCompatibleStorageServiceAdaptor) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - return nil -} - -func ConvertStorageServiceToIterationCompatibleStorageService(storageService StorageService) IterationCompatibleStorageService { - service, ok := storageService.(IterationCompatibleStorageService) - if ok { - return service - } - return &IterationCompatibleStorageServiceAdaptor{storageService} -} - -// An IterableStorageService is used as a wrapper on top of a storage service, -// to add the capability of iterating over the stored date in a sequential manner. -type IterableStorageService struct { - // Local copy of iterator end. End can also be accessed by getByHash for iteratorEnd. - end atomic.Value // atomic access to common.Hash - IterationCompatibleStorageService - - mutex sync.Mutex -} - -func NewIterableStorageService(storageService IterationCompatibleStorageService) *IterableStorageService { - i := &IterableStorageService{IterationCompatibleStorageService: storageService} - i.end.Store(common.Hash{}) - return i -} - -func (i *IterableStorageService) Put(ctx context.Context, data []byte, expiration uint64) error { - dataHash := dastree.Hash(data) - - // Do not insert data if data is already present. - // (This is being done to avoid redundant hash being added to the - // linked list ,since it can lead to loops in the linked list.) - if _, err := i.IterationCompatibleStorageService.GetByHash(ctx, dataHash); err == nil { - return nil - } - - if err := i.IterationCompatibleStorageService.Put(ctx, data, expiration); err != nil { - return err - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(dastree.Hash(data)))), []byte(strconv.FormatUint(expiration, 10))); err != nil { - return err - } - - i.mutex.Lock() - defer i.mutex.Unlock() - - endHash := i.End(ctx) - if (endHash == common.Hash{}) { - // First element being inserted in the chain. - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorBegin)), dataHash.Bytes()); err != nil { - return err - } - } else { - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorStorageKeyPrefix+EncodeStorageServiceKey(endHash))), dataHash.Bytes()); err != nil { - return err - } - } - - if err := i.putKeyValue(ctx, dastree.Hash([]byte(iteratorEnd)), dataHash.Bytes()); err != nil { - return err - } - i.end.Store(dataHash) - - return nil -} - -func (i *IterableStorageService) GetExpirationTime(ctx context.Context, hash common.Hash) (uint64, error) { - value, err := i.IterationCompatibleStorageService.GetByHash(ctx, dastree.Hash([]byte(expirationTimeKeyPrefix+EncodeStorageServiceKey(hash)))) - if err != nil { - return 0, err - } - - expirationTime, err := strconv.ParseUint(string(value), 10, 64) - if err != nil { - return 0, err - } - return expirationTime, nil -} - -func (i *IterableStorageService) DefaultBegin() common.Hash { - return dastree.Hash([]byte(iteratorBegin)) -} - -func (i *IterableStorageService) End(ctx context.Context) common.Hash { - endHash, ok := i.end.Load().(common.Hash) - if !ok { - return common.Hash{} - } - if (endHash != common.Hash{}) { - return endHash - } - value, err := i.GetByHash(ctx, dastree.Hash([]byte(iteratorEnd))) - if err != nil { - return common.Hash{} - } - endHash = common.BytesToHash(value) - i.end.Store(endHash) - return endHash -} - -func (i *IterableStorageService) Next(ctx context.Context, hash common.Hash) common.Hash { - if hash != i.DefaultBegin() { - hash = dastree.Hash([]byte(iteratorStorageKeyPrefix + EncodeStorageServiceKey(hash))) - } - value, err := i.GetByHash(ctx, hash) - if err != nil { - return common.Hash{} - } - return common.BytesToHash(value) -} diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 4ebb1d56d9..8be03bcb30 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -22,10 +22,8 @@ import ( ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ @@ -35,8 +33,6 @@ var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") - f.Bool(prefix+".sync-from-storage-service", DefaultLocalFileStorageConfig.SyncFromStorageService, "enable local storage to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultLocalFileStorageConfig.SyncToStorageService, "enable local storage to be used as a sink for regular sync storage") } type LocalFileStorageService struct { @@ -96,32 +92,6 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout } -func (s *LocalFileStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - fileName := EncodeStorageServiceKey(key) - finalPath := s.dataDir + "/" + fileName - - // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) - if err != nil { - return err - } - err = f.Chmod(0o600) - if err != nil { - return err - } - _, err = f.Write(value) - if err != nil { - return err - } - err = f.Close() - if err != nil { - return err - } - - return os.Rename(f.Name(), finalPath) - -} - func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } diff --git a/das/memory_backed_storage_service.go b/das/memory_backed_storage_service.go index 91f7d9a2f5..c013b501b9 100644 --- a/das/memory_backed_storage_service.go +++ b/das/memory_backed_storage_service.go @@ -53,16 +53,6 @@ func (m *MemoryBackedStorageService) Put(ctx context.Context, data []byte, expir return nil } -func (m *MemoryBackedStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - if m.closed { - return ErrClosed - } - m.contents[key] = append([]byte{}, value...) - return nil -} - func (m *MemoryBackedStorageService) Sync(ctx context.Context) error { m.rwmutex.RLock() defer m.rwmutex.RUnlock() diff --git a/das/redis_storage_service.go b/das/redis_storage_service.go index dbd85921ed..210d5cb2d4 100644 --- a/das/redis_storage_service.go +++ b/das/redis_storage_service.go @@ -24,12 +24,10 @@ import ( ) type RedisConfig struct { - Enable bool `koanf:"enable"` - Url string `koanf:"url"` - Expiration time.Duration `koanf:"expiration"` - KeyConfig string `koanf:"key-config"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + Url string `koanf:"url"` + Expiration time.Duration `koanf:"expiration"` + KeyConfig string `koanf:"key-config"` } var DefaultRedisConfig = RedisConfig{ @@ -43,8 +41,6 @@ func RedisConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".url", DefaultRedisConfig.Url, "Redis url") f.Duration(prefix+".expiration", DefaultRedisConfig.Expiration, "Redis expiration") f.String(prefix+".key-config", DefaultRedisConfig.KeyConfig, "Redis key config") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable Redis to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable Redis to be used as a sink for regular sync storage") } type RedisStorageService struct { @@ -139,17 +135,6 @@ func (rs *RedisStorageService) Put(ctx context.Context, value []byte, timeout ui return err } -func (rs *RedisStorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - // Expiration is set to zero here, since we want to keep the index inserted for iterable storage forever. - err := rs.client.Set( - ctx, string(key.Bytes()), rs.signMessage(value), 0, - ).Err() - if err != nil { - log.Error("das.RedisStorageService.putKeyValue", "err", err) - } - return err -} - func (rs *RedisStorageService) Sync(ctx context.Context) error { return rs.baseStorageService.Sync(ctx) } diff --git a/das/regular_sync_storage_test.go b/das/regular_sync_storage_test.go deleted file mode 100644 index 5fed7a90b3..0000000000 --- a/das/regular_sync_storage_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2021-2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "bytes" - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - - "github.com/offchainlabs/nitro/das/dastree" -) - -func TestRegularSyncStorage(t *testing.T) { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - syncFromStorageService := []*IterableStorageService{ - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - NewIterableStorageService(ConvertStorageServiceToIterationCompatibleStorageService(NewMemoryBackedStorageService(ctx))), - } - syncToStorageService := []StorageService{ - NewMemoryBackedStorageService(ctx), - NewMemoryBackedStorageService(ctx), - } - - regularSyncStorage := NewRegularlySyncStorage( - syncFromStorageService, - syncToStorageService, RegularSyncStorageConfig{ - Enable: true, - SyncInterval: 100 * time.Millisecond, - }) - - val := [][]byte{ - []byte("The first value"), - []byte("The second value"), - []byte("The third value"), - []byte("The forth value"), - } - valKey := []common.Hash{ - dastree.Hash(val[0]), - dastree.Hash(val[1]), - dastree.Hash(val[2]), - dastree.Hash(val[3]), - } - - reqCtx := context.Background() - timeout := uint64(time.Now().Add(time.Hour).Unix()) - for i := 0; i < 2; i++ { - for j := 0; j < 2; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - regularSyncStorage.Start(ctx) - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 2; j < 4; j++ { - err := syncFromStorageService[i].Put(reqCtx, val[j], timeout) - Require(t, err) - } - } - - time.Sleep(300 * time.Millisecond) - - for i := 0; i < 2; i++ { - for j := 0; j < 4; j++ { - v, err := syncToStorageService[i].GetByHash(reqCtx, valKey[j]) - Require(t, err) - if !bytes.Equal(v, val[j]) { - t.Fatal(v, val[j]) - } - } - } -} diff --git a/das/regularly_sync_storage.go b/das/regularly_sync_storage.go deleted file mode 100644 index c6b8ed5ea1..0000000000 --- a/das/regularly_sync_storage.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package das - -import ( - "context" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/offchainlabs/nitro/util/stopwaiter" - - flag "github.com/spf13/pflag" -) - -type RegularSyncStorageConfig struct { - Enable bool `koanf:"enable"` - SyncInterval time.Duration `koanf:"sync-interval"` -} - -var DefaultRegularSyncStorageConfig = RegularSyncStorageConfig{ - Enable: false, - SyncInterval: 5 * time.Minute, -} - -func RegularSyncStorageConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultRegularSyncStorageConfig.Enable, "enable regular storage syncing") - f.Duration(prefix+".sync-interval", DefaultRegularSyncStorageConfig.SyncInterval, "interval for running regular storage sync") -} - -// A RegularlySyncStorage is used to sync data from syncFromStorageServices to -// all the syncToStorageServices at regular intervals. -// (Only newly added data since the last sync is copied over.) -type RegularlySyncStorage struct { - stopwaiter.StopWaiter - syncFromStorageServices []*IterableStorageService - syncToStorageServices []StorageService - lastSyncedHashOfEachSyncFromStorageService map[*IterableStorageService]common.Hash - syncInterval time.Duration -} - -func NewRegularlySyncStorage(syncFromStorageServices []*IterableStorageService, syncToStorageServices []StorageService, conf RegularSyncStorageConfig) *RegularlySyncStorage { - lastSyncedHashOfEachSyncFromStorageService := make(map[*IterableStorageService]common.Hash) - for _, syncFrom := range syncFromStorageServices { - lastSyncedHashOfEachSyncFromStorageService[syncFrom] = syncFrom.DefaultBegin() - } - return &RegularlySyncStorage{ - syncFromStorageServices: syncFromStorageServices, - syncToStorageServices: syncToStorageServices, - lastSyncedHashOfEachSyncFromStorageService: lastSyncedHashOfEachSyncFromStorageService, - syncInterval: conf.SyncInterval, - } -} - -func (r *RegularlySyncStorage) Start(ctx context.Context) { - // Start thread for regular sync - r.StopWaiter.Start(ctx, r) - r.CallIteratively(r.syncAllStorages) -} - -func (r *RegularlySyncStorage) syncAllStorages(ctx context.Context) time.Duration { - for syncFrom, lastSyncedHash := range r.lastSyncedHashOfEachSyncFromStorageService { - end := syncFrom.End(ctx) - if (end == common.Hash{}) { - continue - } - - syncHash := lastSyncedHash - for syncHash != end { - syncHash = syncFrom.Next(ctx, syncHash) - data, err := syncFrom.GetByHash(ctx, syncHash) - if err != nil { - continue - } - expirationTime, err := syncFrom.GetExpirationTime(ctx, syncHash) - if err != nil { - continue - } - for _, syncTo := range r.syncToStorageServices { - _, err = syncTo.GetByHash(ctx, syncHash) - if err == nil { - continue - } - - if err = syncTo.Put(ctx, data, expirationTime); err != nil { - log.Error("Error while running regular storage sync", "err", err) - } - } - } - r.lastSyncedHashOfEachSyncFromStorageService[syncFrom] = end - } - return r.syncInterval -} diff --git a/das/rpc_test.go b/das/rpc_test.go index 2839edee69..5f97ef8828 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -52,9 +52,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool) { RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*IterableStorageService - var syncToStorageServices []StorageService - storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := CreatePersistentStorageService(ctx, &config) testhelpers.RequireImpl(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) localDas, err := NewSignAfterStoreDASWriter(ctx, config, storageService) diff --git a/das/s3_storage_service.go b/das/s3_storage_service.go index b5150fb8ed..a1de200c52 100644 --- a/das/s3_storage_service.go +++ b/das/s3_storage_service.go @@ -34,15 +34,13 @@ type S3Downloader interface { } type S3StorageServiceConfig struct { - Enable bool `koanf:"enable"` - AccessKey string `koanf:"access-key"` - Bucket string `koanf:"bucket"` - ObjectPrefix string `koanf:"object-prefix"` - Region string `koanf:"region"` - SecretKey string `koanf:"secret-key"` - DiscardAfterTimeout bool `koanf:"discard-after-timeout"` - SyncFromStorageService bool `koanf:"sync-from-storage-service"` - SyncToStorageService bool `koanf:"sync-to-storage-service"` + Enable bool `koanf:"enable"` + AccessKey string `koanf:"access-key"` + Bucket string `koanf:"bucket"` + ObjectPrefix string `koanf:"object-prefix"` + Region string `koanf:"region"` + SecretKey string `koanf:"secret-key"` + DiscardAfterTimeout bool `koanf:"discard-after-timeout"` } var DefaultS3StorageServiceConfig = S3StorageServiceConfig{} @@ -55,8 +53,6 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".region", DefaultS3StorageServiceConfig.Region, "S3 region") f.String(prefix+".secret-key", DefaultS3StorageServiceConfig.SecretKey, "S3 secret key") f.Bool(prefix+".discard-after-timeout", DefaultS3StorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout") - f.Bool(prefix+".sync-from-storage-service", DefaultRedisConfig.SyncFromStorageService, "enable s3 to be used as a source for regular sync storage") - f.Bool(prefix+".sync-to-storage-service", DefaultRedisConfig.SyncToStorageService, "enable s3 to be used as a sink for regular sync storage") } type S3StorageService struct { @@ -125,18 +121,6 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint return err } -func (s3s *S3StorageService) putKeyValue(ctx context.Context, key common.Hash, value []byte) error { - putObjectInput := s3.PutObjectInput{ - Bucket: aws.String(s3s.bucket), - Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)), - Body: bytes.NewReader(value)} - _, err := s3s.uploader.Upload(ctx, &putObjectInput) - if err != nil { - log.Error("das.S3StorageService.Store", "err", err) - } - return err -} - func (s3s *S3StorageService) Sync(ctx context.Context) error { return nil } diff --git a/system_tests/das_test.go b/system_tests/das_test.go index dbd78a0865..593eaa1bbe 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -61,9 +61,7 @@ func startLocalDASServer( RequestTimeout: 5 * time.Second, } - var syncFromStorageServices []*das.IterableStorageService - var syncToStorageServices []das.StorageService - storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config, &syncFromStorageServices, &syncToStorageServices) + storageService, lifecycleManager, err := das.CreatePersistentStorageService(ctx, &config) defer lifecycleManager.StopAndWaitUntil(time.Second) Require(t, err)