diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 30ad88d91a..2aacf32f00 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -64,12 +64,12 @@ jobs: run: | cd nitro-testnode ./test-node.bash --init --dev & - + - name: Wait for rpc to come up shell: bash run: | ${{ github.workspace }}/.github/workflows/waitForNitro.sh - + - name: Print WAVM module root id: module-root run: | @@ -77,7 +77,7 @@ jobs: # We work around this by piping a tarball through stdout docker run --rm --entrypoint tar localhost:5000/nitro-node-dev:latest -cf - target/machines/latest | tar xf - module_root="$(cat "target/machines/latest/module-root.txt")" - echo "name=module-root=$module_root" >> $GITHUB_STATE + echo "module-root=$module_root" >> "$GITHUB_OUTPUT" echo -e "\x1b[1;34mWAVM module root:\x1b[0m $module_root" - name: Upload WAVM machine as artifact diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index 6f291bbb22..6561c429e2 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -1,20 +1,42 @@ name: Merge Checks on: - pull_request: + pull_request_target: branches: [ master ] types: [synchronize, opened, reopened, labeled, unlabeled] +permissions: + statuses: write + jobs: - design-approved-check: - if: ${{ !contains(github.event.*.labels.*.name, 'design-approved') }} - name: Design Approved Check + check-design-approved: + name: Check if Design Approved runs-on: ubuntu-latest steps: - - name: Check for design-approved label + - name: Check if design approved and update status run: | - echo "Pull request is missing the 'design-approved' label" - echo "This workflow fails so that the pull request cannot be merged" - exit 1 - - + set -x pipefail + status_state="pending" + if ${{ contains(github.event.*.labels.*.name, 'design-approved') }}; then + status_state="success" + else + resp="$(curl -sSL --fail-with-body \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/commits/${{ github.event.pull_request.head.sha }}/statuses")" + if ! jq -e '.[] | select(.context == "Design Approved Check")' > /dev/null <<< "$resp"; then + # Design not approved yet and no status exists + # Keep it without a status to keep the green checkmark appearing + # Otherwise, the commit and PR's CI will appear to be indefinitely pending + # Merging will still be blocked until the required status appears + exit 0 + fi + fi + curl -sSL --fail-with-body \ + -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/statuses/${{ github.event.pull_request.head.sha }}" \ + -d '{"context":"Design Approved Check","state":"'"$status_state"'"}' diff --git a/Dockerfile b/Dockerfile index d7b4a5415b..0b0ff8b6c5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM debian:bookworm-slim as brotli-wasm-builder +FROM debian:bookworm-slim AS brotli-wasm-builder WORKDIR /workspace RUN apt-get update && \ apt-get install -y cmake make git lbzip2 python3 xz-utils && \ @@ -10,10 +10,10 @@ COPY scripts/build-brotli.sh scripts/ COPY brotli brotli RUN cd emsdk && . ./emsdk_env.sh && cd .. && ./scripts/build-brotli.sh -w -t /workspace/install/ -FROM scratch as brotli-wasm-export +FROM scratch AS brotli-wasm-export COPY --from=brotli-wasm-builder /workspace/install/ / -FROM debian:bookworm-slim as brotli-library-builder +FROM debian:bookworm-slim AS brotli-library-builder WORKDIR /workspace COPY scripts/build-brotli.sh scripts/ COPY brotli brotli @@ -21,10 +21,10 @@ RUN apt-get update && \ apt-get install -y cmake make gcc git && \ ./scripts/build-brotli.sh -l -t /workspace/install/ -FROM scratch as brotli-library-export +FROM scratch AS brotli-library-export COPY --from=brotli-library-builder /workspace/install/ / -FROM node:18-bookworm-slim as contracts-builder +FROM node:18-bookworm-slim AS contracts-builder RUN apt-get update && \ apt-get install -y git python3 make g++ curl RUN curl -L https://foundry.paradigm.xyz | bash && . ~/.bashrc && ~/.foundry/bin/foundryup @@ -35,11 +35,11 @@ COPY contracts contracts/ COPY Makefile . RUN . ~/.bashrc && NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-solidity -FROM debian:bookworm-20231218 as wasm-base +FROM debian:bookworm-20231218 AS wasm-base WORKDIR /workspace RUN apt-get update && apt-get install -y curl build-essential=12.9 -FROM wasm-base as wasm-libs-builder +FROM wasm-base AS wasm-libs-builder # clang / lld used by soft-float wasm RUN apt-get update && \ apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1 wabt @@ -59,10 +59,10 @@ COPY --from=brotli-wasm-export / target/ RUN apt-get update && apt-get install -y cmake RUN . ~/.cargo/env && NITRO_BUILD_IGNORE_TIMESTAMPS=1 RUSTFLAGS='-C symbol-mangling-version=v0' make build-wasm-libs -FROM scratch as wasm-libs-export +FROM scratch AS wasm-libs-export COPY --from=wasm-libs-builder /workspace/ / -FROM wasm-base as wasm-bin-builder +FROM wasm-base AS wasm-bin-builder # pinned go version RUN curl -L https://golang.org/dl/go1.21.10.linux-`dpkg --print-architecture`.tar.gz | tar -C /usr/local -xzf - COPY ./Makefile ./go.mod ./go.sum ./ @@ -91,7 +91,7 @@ COPY --from=contracts-builder workspace/contracts/node_modules/@offchainlabs/upg COPY --from=contracts-builder workspace/.make/ .make/ RUN PATH="$PATH:/usr/local/go/bin" NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-wasm-bin -FROM rust:1.75-slim-bookworm as prover-header-builder +FROM rust:1.75-slim-bookworm AS prover-header-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -113,10 +113,10 @@ COPY brotli brotli RUN apt-get update && apt-get install -y cmake RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-header -FROM scratch as prover-header-export +FROM scratch AS prover-header-export COPY --from=prover-header-builder /workspace/target/ / -FROM rust:1.75-slim-bookworm as prover-builder +FROM rust:1.75-slim-bookworm AS prover-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -156,10 +156,10 @@ RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-lib RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-bin RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-jit -FROM scratch as prover-export +FROM scratch AS prover-export COPY --from=prover-builder /workspace/target/ / -FROM debian:bookworm-slim as module-root-calc +FROM debian:bookworm-slim AS module-root-calc WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -181,7 +181,7 @@ COPY ./solgen ./solgen COPY ./contracts ./contracts RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-replay-env -FROM debian:bookworm-slim as machine-versions +FROM debian:bookworm-slim AS machine-versions RUN apt-get update && apt-get install -y unzip wget curl WORKDIR /workspace/machines # Download WAVM machines @@ -206,7 +206,7 @@ COPY ./scripts/download-machine.sh . #RUN ./download-machine.sh consensus-v20 0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4 RUN ./download-machine.sh consensus-v30 0xb0de9cb89e4d944ae6023a3b62276e54804c242fd8c4c2d8e6cc4450f5fa8b1b && true -FROM golang:1.21.10-bookworm as node-builder +FROM golang:1.21.10-bookworm AS node-builder WORKDIR /workspace ARG version="" ARG datetime="" @@ -233,17 +233,17 @@ RUN mkdir -p target/bin COPY .nitro-tag.txt /nitro-tag.txt RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build -FROM node-builder as fuzz-builder +FROM node-builder AS fuzz-builder RUN mkdir fuzzers/ RUN ./scripts/fuzz.bash --build --binary-path /workspace/fuzzers/ -FROM debian:bookworm-slim as nitro-fuzzer +FROM debian:bookworm-slim AS nitro-fuzzer COPY --from=fuzz-builder /workspace/fuzzers/*.fuzz /usr/local/bin/ COPY ./scripts/fuzz.bash /usr/local/bin RUN mkdir /fuzzcache ENTRYPOINT [ "/usr/local/bin/fuzz.bash", "FuzzStateTransition", "--binary-path", "/usr/local/bin/", "--fuzzcache-path", "/fuzzcache" ] -FROM debian:bookworm-slim as nitro-node-slim +FROM debian:bookworm-slim AS nitro-node-slim WORKDIR /home/user COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ @@ -274,9 +274,9 @@ USER user WORKDIR /home/user/ ENTRYPOINT [ "/usr/local/bin/nitro" ] -FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 as nitro-legacy +FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 AS nitro-legacy -FROM nitro-node-slim as nitro-node +FROM nitro-node-slim AS nitro-node USER root COPY --from=prover-export /bin/jit /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/daserver /usr/local/bin/ @@ -296,7 +296,7 @@ ENTRYPOINT [ "/usr/local/bin/nitro" , "--validation.wasm.allowed-wasm-module-roo USER user -FROM nitro-node as nitro-node-validator +FROM nitro-node AS nitro-node-validator USER root COPY --from=nitro-legacy /usr/local/bin/nitro-val /home/user/nitro-legacy/bin/nitro-val COPY --from=nitro-legacy /usr/local/bin/jit /home/user/nitro-legacy/bin/jit @@ -308,7 +308,7 @@ COPY scripts/split-val-entry.sh /usr/local/bin ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] USER user -FROM nitro-node-validator as nitro-node-dev +FROM nitro-node-validator AS nitro-node-dev USER root # Copy in latest WASM module root RUN rm -f /home/user/target/machines/latest @@ -332,5 +332,5 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ USER user -FROM nitro-node as nitro-node-default +FROM nitro-node AS nitro-node-default # Just to ensure nitro-node-dist is default diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 1829ae29f5..2617a9a629 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -168,6 +168,7 @@ type BatchPosterConfig struct { UseAccessLists bool `koanf:"use-access-lists" reload:"hot"` GasEstimateBaseFeeMultipleBips arbmath.Bips `koanf:"gas-estimate-base-fee-multiple-bips"` Dangerous BatchPosterDangerousConfig `koanf:"dangerous"` + ReorgResistanceMargin time.Duration `koanf:"reorg-resistance-margin" reload:"hot"` gasRefunder common.Address l1BlockBound l1BlockBound @@ -219,6 +220,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".l1-block-bound-bypass", DefaultBatchPosterConfig.L1BlockBoundBypass, "post batches even if not within the layer 1 future bounds if we're within this margin of the max delay") f.Bool(prefix+".use-access-lists", DefaultBatchPosterConfig.UseAccessLists, "post batches with access lists to reduce gas usage (disabled for L3s)") f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas") + f.Duration(prefix+".reorg-resistance-margin", DefaultBatchPosterConfig.ReorgResistanceMargin, "do not post batch if its within this duration from layer 1 minimum bounds. Requires l1-block-bound option not be set to \"ignore\"") redislock.AddConfigOptions(prefix+".redis-lock", f) dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname) @@ -248,6 +250,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{ UseAccessLists: true, RedisLock: redislock.DefaultCfg, GasEstimateBaseFeeMultipleBips: arbmath.OneInBips * 3 / 2, + ReorgResistanceMargin: 10 * time.Minute, } var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{ @@ -1136,6 +1139,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) var l1BoundMaxTimestamp uint64 = math.MaxUint64 var l1BoundMinBlockNumber uint64 var l1BoundMinTimestamp uint64 + var l1BoundMinBlockNumberWithBypass uint64 + var l1BoundMinTimestampWithBypass uint64 hasL1Bound := config.l1BlockBound != l1BlockBoundIgnore if hasL1Bound { var l1Bound *types.Header @@ -1180,17 +1185,19 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) l1BoundMaxBlockNumber = arbmath.SaturatingUAdd(l1BoundBlockNumber, arbmath.BigToUintSaturating(maxTimeVariationFutureBlocks)) l1BoundMaxTimestamp = arbmath.SaturatingUAdd(l1Bound.Time, arbmath.BigToUintSaturating(maxTimeVariationFutureSeconds)) + latestHeader, err := b.l1Reader.LastHeader(ctx) + if err != nil { + return false, err + } + latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader) + l1BoundMinBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks)) + l1BoundMinTimestamp = arbmath.SaturatingUSub(latestHeader.Time, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds)) + if config.L1BlockBoundBypass > 0 { - latestHeader, err := b.l1Reader.LastHeader(ctx) - if err != nil { - return false, err - } - latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader) blockNumberWithPadding := arbmath.SaturatingUAdd(latestBlockNumber, uint64(config.L1BlockBoundBypass/ethPosBlockTime)) timestampWithPadding := arbmath.SaturatingUAdd(latestHeader.Time, uint64(config.L1BlockBoundBypass/time.Second)) - - l1BoundMinBlockNumber = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks)) - l1BoundMinTimestamp = arbmath.SaturatingUSub(timestampWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds)) + l1BoundMinBlockNumberWithBypass = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks)) + l1BoundMinTimestampWithBypass = arbmath.SaturatingUSub(timestampWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds)) } } @@ -1200,13 +1207,14 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) log.Error("error getting message from streamer", "error", err) break } - if msg.Message.Header.BlockNumber < l1BoundMinBlockNumber || msg.Message.Header.Timestamp < l1BoundMinTimestamp { + if msg.Message.Header.BlockNumber < l1BoundMinBlockNumberWithBypass || msg.Message.Header.Timestamp < l1BoundMinTimestampWithBypass { log.Error( "disabling L1 bound as batch posting message is close to the maximum delay", "blockNumber", msg.Message.Header.BlockNumber, - "l1BoundMinBlockNumber", l1BoundMinBlockNumber, + "l1BoundMinBlockNumberWithBypass", l1BoundMinBlockNumberWithBypass, "timestamp", msg.Message.Header.Timestamp, - "l1BoundMinTimestamp", l1BoundMinTimestamp, + "l1BoundMinTimestampWithBypass", l1BoundMinTimestampWithBypass, + "l1BlockBoundBypass", config.L1BlockBoundBypass, ) l1BoundMaxBlockNumber = math.MaxUint64 l1BoundMaxTimestamp = math.MaxUint64 @@ -1242,6 +1250,24 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) b.building.msgCount++ } + if hasL1Bound && config.ReorgResistanceMargin > 0 { + firstMsgBlockNumber := firstMsg.Message.Header.BlockNumber + firstMsgTimeStamp := firstMsg.Message.Header.Timestamp + batchNearL1BoundMinBlockNumber := firstMsgBlockNumber <= arbmath.SaturatingUAdd(l1BoundMinBlockNumber, uint64(config.ReorgResistanceMargin/ethPosBlockTime)) + batchNearL1BoundMinTimestamp := firstMsgTimeStamp <= arbmath.SaturatingUAdd(l1BoundMinTimestamp, uint64(config.ReorgResistanceMargin/time.Second)) + if batchNearL1BoundMinTimestamp || batchNearL1BoundMinBlockNumber { + log.Error( + "Disabling batch posting due to batch being within reorg resistance margin from layer 1 minimum block or timestamp bounds", + "reorgResistanceMargin", config.ReorgResistanceMargin, + "firstMsgTimeStamp", firstMsgTimeStamp, + "l1BoundMinTimestamp", l1BoundMinTimestamp, + "firstMsgBlockNumber", firstMsgBlockNumber, + "l1BoundMinBlockNumber", l1BoundMinBlockNumber, + ) + return false, errors.New("batch is within reorg resistance margin from layer 1 minimum block or timestamp bounds") + } + } + if !forcePostBatch || !b.building.haveUsefulMessage { // the batch isn't full yet and we've posted a batch recently // don't post anything for now diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 1167dba133..2225341560 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/offchainlabs/nitro/arbutil" m "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/util/contracts" @@ -292,6 +291,10 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, err } if err != nil { + connectionRejectedError := &ws.ConnectionRejectedError{} + if errors.As(err, &connectionRejectedError) && connectionRejectedError.StatusCode() == 429 { + log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) + } return nil, fmt.Errorf("broadcast client unable to connect: %w", err) } if config.RequireChainId && !foundChainId { diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 9b8acd5f71..0b80b74594 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -8,9 +8,7 @@ import ( flag "github.com/spf13/pflag" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" ) type HTTPConfig struct { @@ -189,65 +187,6 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) { f.StringSlice(prefix+".api", AuthRPCConfigDefault.API, "APIs offered over the AUTH-RPC interface") } -type P2PConfig struct { - ListenAddr string `koanf:"listen-addr"` - NoDial bool `koanf:"no-dial"` - NoDiscovery bool `koanf:"no-discovery"` - MaxPeers int `koanf:"max-peers"` - DiscoveryV5 bool `koanf:"discovery-v5"` - DiscoveryV4 bool `koanf:"discovery-v4"` - Bootnodes []string `koanf:"bootnodes"` - BootnodesV5 []string `koanf:"bootnodes-v5"` -} - -func (p P2PConfig) Apply(stackConf *node.Config) { - stackConf.P2P.ListenAddr = p.ListenAddr - stackConf.P2P.NoDial = p.NoDial - stackConf.P2P.NoDiscovery = p.NoDiscovery - stackConf.P2P.MaxPeers = p.MaxPeers - stackConf.P2P.DiscoveryV5 = p.DiscoveryV5 - stackConf.P2P.DiscoveryV4 = p.DiscoveryV4 - stackConf.P2P.BootstrapNodes = parseBootnodes(p.Bootnodes) - stackConf.P2P.BootstrapNodesV5 = parseBootnodes(p.BootnodesV5) -} - -func parseBootnodes(urls []string) []*enode.Node { - nodes := make([]*enode.Node, 0, len(urls)) - for _, url := range urls { - if url != "" { - node, err := enode.Parse(enode.ValidSchemes, url) - if err != nil { - log.Crit("Bootstrap URL invalid", "enode", url, "err", err) - return nil - } - nodes = append(nodes, node) - } - } - return nodes -} - -var P2PConfigDefault = P2PConfig{ - ListenAddr: "", - NoDial: true, - NoDiscovery: true, - MaxPeers: 50, - DiscoveryV5: false, - DiscoveryV4: false, - Bootnodes: []string{}, - BootnodesV5: []string{}, -} - -func P2PConfigAddOptions(prefix string, f *flag.FlagSet) { - f.String(prefix+".listen-addr", P2PConfigDefault.ListenAddr, "P2P listen address") - f.Bool(prefix+".no-dial", P2PConfigDefault.NoDial, "P2P no dial") - f.Bool(prefix+".no-discovery", P2PConfigDefault.NoDiscovery, "P2P no discovery") - f.Int(prefix+".max-peers", P2PConfigDefault.MaxPeers, "P2P max peers") - f.Bool(prefix+".discovery-v5", P2PConfigDefault.DiscoveryV5, "P2P discovery v5") - f.Bool(prefix+".discovery-v4", P2PConfigDefault.DiscoveryV4, "P2P discovery v4") - f.StringSlice(prefix+".bootnodes", P2PConfigDefault.Bootnodes, "P2P bootnodes") - f.StringSlice(prefix+".bootnodes-v5", P2PConfigDefault.BootnodesV5, "P2P bootnodes v5") -} - type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` diff --git a/cmd/nitro-val/config.go b/cmd/nitro-val/config.go index b52a1c6b5e..2adbe5e9aa 100644 --- a/cmd/nitro-val/config.go +++ b/cmd/nitro-val/config.go @@ -27,7 +27,6 @@ type ValidationNodeConfig struct { HTTP genericconf.HTTPConfig `koanf:"http"` WS genericconf.WSConfig `koanf:"ws"` IPC genericconf.IPCConfig `koanf:"ipc"` - P2P genericconf.P2PConfig `koanf:"p2p"` Auth genericconf.AuthRPCConfig `koanf:"auth"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` @@ -67,7 +66,6 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{ HTTP: HTTPConfigDefault, WS: WSConfigDefault, IPC: IPCConfigDefault, - P2P: genericconf.P2PConfigDefault, Auth: genericconf.AuthRPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, @@ -87,7 +85,6 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof") diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 1e894336ea..6f5f546430 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -70,7 +70,6 @@ func mainImpl() int { nodeConfig.WS.Apply(&stackConf) nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 97678a7d23..ea48ec8784 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -408,7 +408,7 @@ func isLeveldbNotExistError(err error) bool { func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { - if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil { + if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, config.Persistent.Ancient, "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil { if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil { readOnlyDb.Close() if !arbmath.BigEquals(chainConfig.ChainID, chainId) { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 1c4ad80186..04bdeb3228 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -183,7 +183,6 @@ func mainImpl() int { if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision @@ -680,8 +679,6 @@ func mainImpl() int { exitCode = 1 } if nodeConfig.Init.ThenQuit { - close(sigint) - return exitCode } } @@ -695,9 +692,6 @@ func mainImpl() int { log.Info("shutting down because of sigint") } - // cause future ctrl+c's to panic - close(sigint) - return exitCode } @@ -717,7 +711,6 @@ type NodeConfig struct { IPC genericconf.IPCConfig `koanf:"ipc"` Auth genericconf.AuthRPCConfig `koanf:"auth"` GraphQL genericconf.GraphQLConfig `koanf:"graphql"` - P2P genericconf.P2PConfig `koanf:"p2p"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` PProf bool `koanf:"pprof"` @@ -743,7 +736,6 @@ var NodeConfigDefault = NodeConfig{ IPC: genericconf.IPCConfigDefault, Auth: genericconf.AuthRPCConfigDefault, GraphQL: genericconf.GraphQLConfigDefault, - P2P: genericconf.P2PConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, Init: conf.InitConfigDefault, @@ -768,7 +760,6 @@ func NodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) genericconf.GraphQLConfigAddOptions("graphql", f) f.Bool("metrics", NodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) diff --git a/das/das.go b/das/das.go index 5528323a9c..6bd02fbc75 100644 --- a/das/das.go +++ b/das/das.go @@ -45,6 +45,8 @@ type DataAvailabilityConfig struct { LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"` S3Storage S3StorageServiceConfig `koanf:"s3-storage"` + MigrateLocalDBToFileStorage bool `koanf:"migrate-local-db-to-file-storage"` + Key KeyConfig `koanf:"key"` RPCAggregator AggregatorConfig `koanf:"rpc-aggregator"` @@ -112,6 +114,7 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) { LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f) LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f) S3ConfigAddOptions(prefix+".s3-storage", f) + f.Bool(prefix+".migrate-local-db-to-file-storage", DefaultDataAvailabilityConfig.MigrateLocalDBToFileStorage, "daserver will migrate all data on startup from local-db-storage to local-file-storage, then mark local-db-storage as unusable") // Key config for storage KeyConfigAddOptions(prefix+".key", f) diff --git a/das/das_test.go b/das/das_test.go index c52616fe20..179734c8b1 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -40,8 +40,9 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", @@ -129,8 +130,9 @@ func testDASMissingMessage(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", diff --git a/das/db_storage_service.go b/das/db_storage_service.go index 0fbe1c2723..e3b6183c37 100644 --- a/das/db_storage_service.go +++ b/das/db_storage_service.go @@ -7,6 +7,9 @@ import ( "bytes" "context" "errors" + "fmt" + "os" + "path/filepath" "time" badger "github.com/dgraph-io/badger/v4" @@ -35,6 +38,8 @@ type LocalDBStorageConfig struct { var badgerDefaultOptions = badger.DefaultOptions("") +const migratedMarker = "MIGRATED" + var DefaultLocalDBStorageConfig = LocalDBStorageConfig{ Enable: false, DataDir: "", @@ -49,7 +54,7 @@ var DefaultLocalDBStorageConfig = LocalDBStorageConfig{ } func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem") + f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "!!!DEPRECATED, USE local-file-storage!!! enable storage/retrieval of sequencer batch data from a database on the local filesystem") f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database") f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout") @@ -69,7 +74,17 @@ type DBStorageService struct { stopWaiter stopwaiter.StopWaiterSafe } -func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (StorageService, error) { +// The DBStorageService is deprecated. This function will migrate data to the target +// LocalFileStorageService if it is provided and migration hasn't already happened. +func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig, target *LocalFileStorageService) (*DBStorageService, error) { + if alreadyMigrated(config.DataDir) { + log.Warn("local-db-storage already migrated, please remove it from the daserver configuration and restart. data-dir can be cleaned up manually now") + return nil, nil + } + if target == nil { + log.Error("local-db-storage is DEPRECATED, please use use the local-file-storage and migrate-local-db-to-file-storage options. This error will be made fatal in future, continuing for now...") + } + options := badger.DefaultOptions(config.DataDir). WithNumMemtables(config.NumMemtables). WithNumLevelZeroTables(config.NumLevelZeroTables). @@ -87,9 +102,21 @@ func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (Sto discardAfterTimeout: config.DiscardAfterTimeout, dirPath: config.DataDir, } + + if target != nil { + if err = ret.migrateTo(ctx, target); err != nil { + return nil, fmt.Errorf("error migrating local-db-storage to %s: %w", target, err) + } + if err = ret.setMigrated(); err != nil { + return nil, fmt.Errorf("error finalizing migration of local-db-storage to %s: %w", target, err) + } + return nil, nil + } + if err := ret.stopWaiter.Start(ctx, ret); err != nil { return nil, err } + err = ret.stopWaiter.LaunchThreadSafe(func(myCtx context.Context) { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() @@ -152,6 +179,48 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6 }) } +func (dbs *DBStorageService) migrateTo(ctx context.Context, s StorageService) error { + originExpirationPolicy, err := dbs.ExpirationPolicy(ctx) + if err != nil { + return err + } + targetExpirationPolicy, err := s.ExpirationPolicy(ctx) + if err != nil { + return err + } + + if originExpirationPolicy == daprovider.KeepForever && targetExpirationPolicy == daprovider.DiscardAfterDataTimeout { + return errors.New("can't migrate from DBStorageService to target, incompatible expiration policies - can't migrate from non-expiring to expiring since non-expiring DB lacks expiry time metadata") + } + + return dbs.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + log.Info("Migrating from DBStorageService", "target", s) + migrationStart := time.Now() + count := 0 + for it.Rewind(); it.Valid(); it.Next() { + if count%1000 == 0 { + log.Info("Migration in progress", "migrated", count) + } + item := it.Item() + k := item.Key() + expiry := item.ExpiresAt() + err := item.Value(func(v []byte) error { + log.Trace("migrated", "key", pretty.FirstFewBytes(k), "value", pretty.FirstFewBytes(v), "expiry", expiry) + return s.Put(ctx, v, expiry) + }) + if err != nil { + return err + } + count++ + } + log.Info("Migration from DBStorageService complete", "target", s, "migrated", count, "duration", time.Since(migrationStart)) + return nil + }) +} + func (dbs *DBStorageService) Sync(ctx context.Context) error { return dbs.db.Sync() } @@ -160,6 +229,29 @@ func (dbs *DBStorageService) Close(ctx context.Context) error { return dbs.stopWaiter.StopAndWait() } +func alreadyMigrated(dirPath string) bool { + migratedMarkerFile := filepath.Join(dirPath, migratedMarker) + _, err := os.Stat(migratedMarkerFile) + if os.IsNotExist(err) { + return false + } + if err != nil { + log.Error("error checking if local-db-storage is already migrated", "err", err) + return false + } + return true +} + +func (dbs *DBStorageService) setMigrated() error { + migratedMarkerFile := filepath.Join(dbs.dirPath, migratedMarker) + file, err := os.OpenFile(migratedMarkerFile, os.O_CREATE|os.O_WRONLY, 0o600) + if err != nil { + return err + } + file.Close() + return nil +} + func (dbs *DBStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { if dbs.discardAfterTimeout { return daprovider.DiscardAfterDataTimeout, nil diff --git a/das/factory.go b/das/factory.go index fd6f60abb2..5742a39479 100644 --- a/das/factory.go +++ b/das/factory.go @@ -25,22 +25,36 @@ func CreatePersistentStorageService( ) (StorageService, *LifecycleManager, error) { storageServices := make([]StorageService, 0, 10) var lifecycleManager LifecycleManager - if config.LocalDBStorage.Enable { - s, err := NewDBStorageService(ctx, &config.LocalDBStorage) + var err error + + var fs *LocalFileStorageService + if config.LocalFileStorage.Enable { + fs, err = NewLocalFileStorageService(config.LocalFileStorage) if err != nil { return nil, nil, err } - lifecycleManager.Register(s) - storageServices = append(storageServices, s) + err = fs.start(ctx) + if err != nil { + return nil, nil, err + } + lifecycleManager.Register(fs) + storageServices = append(storageServices, fs) } - if config.LocalFileStorage.Enable { - s, err := NewLocalFileStorageService(config.LocalFileStorage.DataDir) + if config.LocalDBStorage.Enable { + var s *DBStorageService + if config.MigrateLocalDBToFileStorage { + s, err = NewDBStorageService(ctx, &config.LocalDBStorage, fs) + } else { + s, err = NewDBStorageService(ctx, &config.LocalDBStorage, nil) + } if err != nil { return nil, nil, err } - lifecycleManager.Register(s) - storageServices = append(storageServices, s) + if s != nil { + lifecycleManager.Register(s) + storageServices = append(storageServices, s) + } } if config.S3Storage.Enable { @@ -63,6 +77,10 @@ func CreatePersistentStorageService( if len(storageServices) == 1 { return storageServices[0], &lifecycleManager, nil } + if len(storageServices) == 0 { + return nil, nil, errors.New("No data-availability storage backend has been configured") + } + return nil, &lifecycleManager, nil } diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 8be03bcb30..621cf3efdb 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -6,10 +6,17 @@ package das import ( "bytes" "context" - "encoding/base32" "errors" "fmt" + "io" "os" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "syscall" "time" "github.com/ethereum/go-ethereum/common" @@ -17,64 +24,146 @@ import ( "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/stopwaiter" flag "github.com/spf13/pflag" "golang.org/x/sys/unix" ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + EnableExpiry bool `koanf:"enable-expiry"` + MaxRetention time.Duration `koanf:"max-retention"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ - DataDir: "", + DataDir: "", + MaxRetention: defaultStorageRetention, } func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") + f.Bool(prefix+".enable-expiry", DefaultLocalFileStorageConfig.EnableExpiry, "enable expiry of batches") + f.Duration(prefix+".max-retention", DefaultLocalFileStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected") } type LocalFileStorageService struct { - dataDir string + config LocalFileStorageConfig + + legacyLayout flatLayout + layout trieLayout + + // for testing only + enableLegacyLayout bool + + stopWaiter stopwaiter.StopWaiterSafe +} + +func NewLocalFileStorageService(config LocalFileStorageConfig) (*LocalFileStorageService, error) { + if unix.Access(config.DataDir, unix.W_OK|unix.R_OK) != nil { + return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", config.DataDir) + } + s := &LocalFileStorageService{ + config: config, + legacyLayout: flatLayout{root: config.DataDir, retention: config.MaxRetention}, + layout: trieLayout{root: config.DataDir, expiryEnabled: config.EnableExpiry}, + } + return s, nil } -func NewLocalFileStorageService(dataDir string) (StorageService, error) { - if unix.Access(dataDir, unix.W_OK|unix.R_OK) != nil { - return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", dataDir) +// Separate start function +// Tests want to be able to avoid triggering the auto migration +func (s *LocalFileStorageService) start(ctx context.Context) error { + migrated, err := s.layout.migrated() + if err != nil { + return err + } + + if !migrated && !s.enableLegacyLayout { + if err = migrate(&s.legacyLayout, &s.layout); err != nil { + return err + } + } + + if err := s.stopWaiter.Start(ctx, s); err != nil { + return err } - return &LocalFileStorageService{dataDir: dataDir}, nil + if s.config.EnableExpiry && !s.enableLegacyLayout { + err = s.stopWaiter.CallIterativelySafe(func(ctx context.Context) time.Duration { + err = s.layout.prune(time.Now()) + if err != nil { + log.Error("error pruning expired batches", "error", err) + } + return time.Minute * 5 + }) + if err != nil { + return err + } + } + return nil +} + +func (s *LocalFileStorageService) Close(ctx context.Context) error { + return s.stopWaiter.StopAndWait() } func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) { log.Trace("das.LocalFileStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s) - pathname := s.dataDir + "/" + EncodeStorageServiceKey(key) - data, err := os.ReadFile(pathname) + var batchPath string + if s.enableLegacyLayout { + batchPath = s.legacyLayout.batchPath(key) + } else { + batchPath = s.layout.batchPath(key) + } + + data, err := os.ReadFile(batchPath) if err != nil { - // Just for backward compatability. - pathname = s.dataDir + "/" + base32.StdEncoding.EncodeToString(key.Bytes()) - data, err = os.ReadFile(pathname) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil, ErrNotFound - } - return nil, err + if errors.Is(err, os.ErrNotExist) { + return nil, ErrNotFound } - return data, nil + return nil, err } return data, nil } -func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.LocalFileStorageService.Store", data, timeout, s) - fileName := EncodeStorageServiceKey(dastree.Hash(data)) - finalPath := s.dataDir + "/" + fileName +func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry uint64) error { + logPut("das.LocalFileStorageService.Store", data, expiry, s) + expiryTime := time.Unix(int64(expiry), 0) + currentTimePlusRetention := time.Now().Add(s.config.MaxRetention) + if expiryTime.After(currentTimePlusRetention) { + return fmt.Errorf("requested expiry time (%v) exceeds current time plus maximum allowed retention period(%v)", expiryTime, currentTimePlusRetention) + } + + key := dastree.Hash(data) + var batchPath string + if !s.enableLegacyLayout { + s.layout.writeMutex.Lock() + defer s.layout.writeMutex.Unlock() + batchPath = s.layout.batchPath(key) + } else { + batchPath = s.legacyLayout.batchPath(key) + } + + err := os.MkdirAll(path.Dir(batchPath), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(batchPath), err) + } // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) + f, err := os.CreateTemp(path.Dir(batchPath), path.Base(batchPath)) if err != nil { return err } + renamed := false + defer func() { + _ = f.Close() + if !renamed { + if err := os.Remove(f.Name()); err != nil { + log.Error("Couldn't clean up temporary file", "file", f.Name()) + } + } + }() err = f.Chmod(0o600) if err != nil { return err @@ -83,29 +172,55 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout if err != nil { return err } - err = f.Close() - if err != nil { - return err + + // For testing only. When migrating we treat the expiry time of existing flat layout + // files to be the modification time + the max allowed retention. So when creating + // new flat layout files, set their modification time accordingly. + if s.enableLegacyLayout { + tv := syscall.Timeval{ + Sec: int64(expiry - uint64(s.legacyLayout.retention.Seconds())), + Usec: 0, + } + times := []syscall.Timeval{tv, tv} + if err = syscall.Utimes(f.Name(), times); err != nil { + return err + } } - return os.Rename(f.Name(), finalPath) + _, err = os.Stat(batchPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.Rename(f.Name(), batchPath); err != nil { + return err + } + renamed = true + } else { + return err + } + } -} + if !s.enableLegacyLayout { + if err := createHardLink(batchPath, s.layout.expiryPath(key, expiry)); err != nil { + return fmt.Errorf("couldn't create by-expiry-path index entry: %w", err) + } + } -func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } -func (s *LocalFileStorageService) Close(ctx context.Context) error { +func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } func (s *LocalFileStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { + if s.config.EnableExpiry { + return daprovider.DiscardAfterDataTimeout, nil + } return daprovider.KeepForever, nil } func (s *LocalFileStorageService) String() string { - return "LocalFileStorageService(" + s.dataDir + ")" + return "LocalFileStorageService(" + s.config.DataDir + ")" } func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { @@ -123,3 +238,572 @@ func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { } return nil } + +func listDir(dir string) ([]string, error) { + d, err := os.Open(dir) + if err != nil { + return nil, err + } + defer d.Close() + + // Read all the directory entries + files, err := d.Readdirnames(-1) + if err != nil { + return nil, err + } + + return files, nil +} + +var hex64Regex = regexp.MustCompile(fmt.Sprintf("^[a-fA-F0-9]{%d}$", common.HashLength*2)) + +func isStorageServiceKey(key string) bool { + return hex64Regex.MatchString(key) +} + +// Copies a file by its contents to a new file, making any directories needed +// in the new file's path. +func copyFile(new, orig string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(new), err) + } + + origFile, err := os.Open(orig) + if err != nil { + return fmt.Errorf("failed to open source file: %w", err) + } + defer origFile.Close() + + newFile, err := os.Create(new) + if err != nil { + return fmt.Errorf("failed to create destination file: %w", err) + } + defer newFile.Close() + + _, err = io.Copy(newFile, origFile) + if err != nil { + return fmt.Errorf("failed to copy contents: %w", err) + } + + return nil +} + +// Creates a hard link at new, to orig, making any directories needed in the new link's path. +func createHardLink(orig, new string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return err + } + + info, err := os.Stat(new) + if err != nil { + if os.IsNotExist(err) { + err = os.Link(orig, new) + if err != nil { + return err + } + return nil + } else { + return err + } + } + + // Hard link already exists + stat, ok := info.Sys().(*syscall.Stat_t) + if ok && stat.Nlink > 1 { + return nil + } + + return fmt.Errorf("file exists but is not a hard link: %s", new) +} + +// migrate converts a file store from flatLayout to trieLayout. +// It is not thread safe and must be run before Put requests are served. +// The expiry index is only created if expiry is enabled. +func migrate(fl *flatLayout, tl *trieLayout) error { + flIt, err := fl.iterateBatches() + if err != nil { + return err + } + + batch, err := flIt.next() + if errors.Is(err, io.EOF) { + log.Info("No batches in legacy layout detected, skipping migration.") + return nil + } + if err != nil { + return err + } + + if startErr := tl.startMigration(); startErr != nil { + return startErr + } + + migrationStart := time.Now() + var migrated, skipped, removed int + err = func() error { + for ; !errors.Is(err, io.EOF); batch, err = flIt.next() { + if err != nil { + return err + } + + if tl.expiryEnabled && batch.expiry.Before(migrationStart) { + skipped++ + log.Debug("skipping expired batch during migration", "expiry", batch.expiry, "start", migrationStart) + continue // don't migrate expired batches + } + + origPath := fl.batchPath(batch.key) + newPath := tl.batchPath(batch.key) + if err = copyFile(newPath, origPath); err != nil { + return err + } + + expiryPath := tl.expiryPath(batch.key, uint64(batch.expiry.Unix())) + if err = createHardLink(newPath, expiryPath); err != nil { + return err + } + migrated++ + } + + return tl.commitMigration() + }() + if err != nil { + return fmt.Errorf("error migrating local file store layout, retaining old layout: %w", err) + } + + flIt, err = fl.iterateBatches() + if err != nil { + return err + } + for batch, err := flIt.next(); !errors.Is(err, io.EOF); batch, err = flIt.next() { + if err != nil { + log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) + break + } + toRemove := fl.batchPath(batch.key) + err = os.Remove(toRemove) + if err != nil { + log.Warn("local file store migration completed, but error cleaning up file from old layout, file is now orphaned", "file", toRemove, "error", err) + } + removed++ + } + + log.Info("Local file store legacy layout migration complete", "migratedFiles", migrated, "skippedExpiredFiles", skipped, "removedFiles", removed, "duration", time.Since(migrationStart)) + + return nil +} + +func (tl *trieLayout) prune(pruneTil time.Time) error { + tl.writeMutex.Lock() + defer tl.writeMutex.Unlock() + it, err := tl.iterateBatchesByTimestamp(pruneTil) + if err != nil { + return err + } + pruned := 0 + pruningStart := time.Now() + for pathByTimestamp, err := it.next(); !errors.Is(err, io.EOF); pathByTimestamp, err = it.next() { + if err != nil { + return err + } + key, err := DecodeStorageServiceKey(path.Base(pathByTimestamp)) + if err != nil { + return err + } + err = recursivelyDeleteUntil(pathByTimestamp, byExpiryTimestamp) + if err != nil { + log.Error("Couldn't prune expired batch expiry index entry, continuing trying to prune others", "path", pathByTimestamp, "err", err) + } + + pathByHash := tl.batchPath(key) + info, err := os.Stat(pathByHash) + if err != nil { + if os.IsNotExist(err) { + log.Warn("Couldn't find batch to expire, it may have been previously deleted but its by-expiry-timestamp index entry still existed, deleting its index entry and continuing", "path", pathByHash, "indexPath", pathByTimestamp, "err", err) + } else { + log.Error("Couldn't prune expired batch, continuing trying to prune others", "path", pathByHash, "err", err) + } + continue + } + stat, ok := info.Sys().(*syscall.Stat_t) + if !ok { + log.Error("Couldn't convert file stats to Stat_t struct, possible OS or filesystem incompatibility, skipping pruning this batch", "file", pathByHash) + continue + } + if stat.Nlink == 1 { + err = recursivelyDeleteUntil(pathByHash, byDataHash) + if err != nil { + return err + } + } + + pruned++ + } + if pruned > 0 { + log.Info("Local file store pruned expired batches", "count", pruned, "pruneTil", pruneTil, "duration", time.Since(pruningStart)) + } + return nil +} + +func recursivelyDeleteUntil(filePath, until string) error { + err := os.Remove(filePath) + if err != nil { + return err + } + + for filePath = path.Dir(filePath); path.Base(filePath) != until; filePath = path.Dir(filePath) { + err = os.Remove(filePath) + if err != nil { + if !strings.Contains(err.Error(), "directory not empty") { + log.Warn("error cleaning up empty directory when pruning expired batches", "path", filePath, "err", err) + } + break + } + } + return nil +} + +type batchIdentifier struct { + key common.Hash + expiry time.Time +} + +type flatLayout struct { + root string + + retention time.Duration +} + +type flatLayoutIterator struct { + files []string + + layout *flatLayout +} + +func (l *flatLayout) batchPath(key common.Hash) string { + return filepath.Join(l.root, EncodeStorageServiceKey(key)) +} + +type layerFilter func(*[][]string, int) bool + +func noopFilter(*[][]string, int) bool { return true } + +func (l *flatLayout) iterateBatches() (*flatLayoutIterator, error) { + files, err := listDir(l.root) + if err != nil { + return nil, err + } + return &flatLayoutIterator{ + files: files, + layout: l, + }, nil +} + +func (i *flatLayoutIterator) next() (batchIdentifier, error) { + for len(i.files) > 0 { + var f string + f, i.files = i.files[0], i.files[1:] + if !isStorageServiceKey(f) { + continue + } + key, err := DecodeStorageServiceKey(f) + if err != nil { + return batchIdentifier{}, err + } + + fullPath := i.layout.batchPath(key) + stat, err := os.Stat(fullPath) + if err != nil { + return batchIdentifier{}, err + } + + return batchIdentifier{ + key: key, + expiry: stat.ModTime().Add(i.layout.retention), + }, nil + } + return batchIdentifier{}, io.EOF +} + +const ( + byDataHash = "by-data-hash" + byExpiryTimestamp = "by-expiry-timestamp" + migratingSuffix = "-migrating" + expiryDivisor = 10_000 +) + +var expirySecondPartWidth = len(strconv.Itoa(expiryDivisor)) - 1 + +type trieLayout struct { + root string + expiryEnabled bool + + // Is the trieLayout currently being migrated to? + // Controls whether paths include the migratingSuffix. + migrating bool + + // Anything changing the layout (pruning, adding files) must go through + // this mutex. + // Pruning the entire history at statup of Arb Nova as of 2024-06-12 takes + // 5s on my laptop, so the overhead of pruning after startup should be neglibile. + writeMutex sync.Mutex +} + +type trieLayoutIterator struct { + levels [][]string + filters []layerFilter + topDir string + layout *trieLayout +} + +func (l *trieLayout) batchPath(key common.Hash) string { + encodedKey := EncodeStorageServiceKey(key) + firstDir := encodedKey[:2] + secondDir := encodedKey[2:4] + + topDir := byDataHash + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) string { + encodedKey := EncodeStorageServiceKey(key) + firstDir := fmt.Sprintf("%d", expiry/expiryDivisor) + secondDir := fmt.Sprintf("%0*d", expirySecondPartWidth, expiry%expiryDivisor) + + topDir := byExpiryTimestamp + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) iterateBatches() (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + // TODO handle stray files that aren't dirs + + firstLevel, err = listDir(filepath.Join(l.root, byDataHash)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + + return &trieLayoutIterator{ + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{noopFilter, noopFilter, storageKeyFilter}, + topDir: byDataHash, + layout: l, + }, nil +} + +func (l *trieLayout) iterateBatchesByTimestamp(maxTimestamp time.Time) (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + firstLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + beforeUpper := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) <= maxTimestamp.Unix()/expiryDivisor + } + beforeLower := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx-1][0] + (*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) < maxTimestamp.Unix() + } + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + + return &trieLayoutIterator{ + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{beforeUpper, beforeLower, storageKeyFilter}, + topDir: byExpiryTimestamp, + layout: l, + }, nil +} + +func (l *trieLayout) migrated() (bool, error) { + info, err := os.Stat(filepath.Join(l.root, byDataHash)) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, err + } + return info.IsDir(), nil +} + +func (l *trieLayout) startMigration() error { + migrated, err := l.migrated() + if err != nil { + return err + } + if migrated { + return errors.New("local file storage already migrated to trieLayout") + } + + l.migrating = true + + if err := os.MkdirAll(filepath.Join(l.root, byDataHash+migratingSuffix), 0o700); err != nil { + return err + } + + if err := os.MkdirAll(filepath.Join(l.root, byExpiryTimestamp+migratingSuffix), 0o700); err != nil { + return err + } + return nil + +} + +func (l *trieLayout) commitMigration() error { + if !l.migrating { + return errors.New("already finished migration") + } + + removeSuffix := func(prefix string) error { + oldDir := filepath.Join(l.root, prefix+migratingSuffix) + newDir := filepath.Join(l.root, prefix) + + if err := os.Rename(oldDir, newDir); err != nil { + return err // rename error already includes src and dst, no need to wrap + } + return nil + } + + if err := removeSuffix(byDataHash); err != nil { + return err + } + + if err := removeSuffix(byExpiryTimestamp); err != nil { + return err + } + + syscall.Sync() + + // Done migrating + l.migrating = false + + return nil +} + +func (it *trieLayoutIterator) next() (string, error) { + isLeaf := func(idx int) bool { + return idx == len(it.levels)-1 + } + + makePathAtLevel := func(idx int) string { + pathComponents := make([]string, idx+3) + pathComponents[0] = it.layout.root + pathComponents[1] = it.topDir + for i := 0; i <= idx; i++ { + pathComponents[i+2] = it.levels[i][0] + } + return filepath.Join(pathComponents...) + } + + var populateNextLevel func(idx int) error + populateNextLevel = func(idx int) error { + if isLeaf(idx) || len(it.levels[idx]) == 0 { + return nil + } + nextLevelEntries, err := listDir(makePathAtLevel(idx)) + if err != nil { + return err + } + it.levels[idx+1] = nextLevelEntries + if len(nextLevelEntries) > 0 { + return populateNextLevel(idx + 1) + } + return nil + } + + advanceWithinLevel := func(idx int) error { + if len(it.levels[idx]) > 1 { + it.levels[idx] = it.levels[idx][1:] + } else { + it.levels[idx] = nil + } + + return populateNextLevel(idx) + } + + for idx := 0; idx >= 0; { + if len(it.levels[idx]) == 0 { + idx-- + continue + } + + if !it.filters[idx](&it.levels, idx) { + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + continue + } + + if isLeaf(idx) { + path := makePathAtLevel(idx) + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + return path, nil + } + + if len(it.levels[idx+1]) > 0 { + idx++ + continue + } + + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + } + return "", io.EOF +} diff --git a/das/local_file_storage_service_test.go b/das/local_file_storage_service_test.go new file mode 100644 index 0000000000..cc27e293e3 --- /dev/null +++ b/das/local_file_storage_service_test.go @@ -0,0 +1,214 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package das + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + "time" + + "github.com/offchainlabs/nitro/das/dastree" +) + +func getByHashAndCheck(t *testing.T, s *LocalFileStorageService, xs ...string) { + t.Helper() + ctx := context.Background() + + for _, x := range xs { + actual, err := s.GetByHash(ctx, dastree.Hash([]byte(x))) + Require(t, err) + if !bytes.Equal([]byte(x), actual) { + Fail(t, "unexpected result") + } + } +} + +func countEntries(t *testing.T, layout *trieLayout, expected int) { + t.Helper() + + count := 0 + trIt, err := layout.iterateBatches() + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected number of batches", "expected", expected, "was", count) + } +} + +func countTimestampEntries(t *testing.T, layout *trieLayout, cutoff time.Time, expected int) { + t.Helper() + var count int + trIt, err := layout.iterateBatchesByTimestamp(cutoff) + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected count of entries when iterating by timestamp", "expected", expected, "was", count) + } +} + +func pruneCountRemaining(t *testing.T, layout *trieLayout, pruneTil time.Time, expected int) { + t.Helper() + err := layout.prune(pruneTil) + Require(t, err) + + countEntries(t, layout, expected) +} + +func TestMigrationNoExpiry(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: false, + MaxRetention: time.Hour * 24 * 30, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := uint64(time.Now().Unix()) + + err = s.Put(ctx, []byte("a"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("b"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("c"), now+2) + Require(t, err) + err = s.Put(ctx, []byte("d"), now+10) + Require(t, err) + + getByHashAndCheck(t, s, "a", "b", "c", "d") + + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + s.enableLegacyLayout = false + + countEntries(t, &s.layout, 4) + getByHashAndCheck(t, s, "a", "b", "c", "d") + + // Can still iterate by timestamp even if expiry disabled + countTimestampEntries(t, &s.layout, time.Unix(int64(now+11), 0), 4) + +} + +func TestMigrationExpiry(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 10, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := time.Now() + + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("b"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("c"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) + Require(t, err) + + getByHashAndCheck(t, s, "a", "b", "c", "d", "e") + + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + s.enableLegacyLayout = false + + countEntries(t, &s.layout, 3) + getByHashAndCheck(t, s, "c", "d", "e") + + afterNow := now.Add(time.Second) + countTimestampEntries(t, &s.layout, afterNow, 0) // They should have all been filtered out since they're after now + countTimestampEntries(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 3) + + pruneCountRemaining(t, &s.layout, afterNow, 3) + getByHashAndCheck(t, s, "c", "d", "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 1) + getByHashAndCheck(t, s, "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 0) +} + +func TestExpiryDuplicates(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 10, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + + now := time.Now() + + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) + // Put the same entry and expiry again, should have no effect + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) + + afterNow := now.Add(time.Second) + // "a" is duplicated + countEntries(t, &s.layout, 4) + // There should be a timestamp entry for each time "a" was added + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 6) + + // We've expired the first "a", but there are still 2 other timestamp entries for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-2*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 5) + + // We've expired the second "a", but there is still 1 other timestamp entry for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-1*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 4) + + // We've expired the third "a", and also "d" + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 2) + + // We've expired the "e" + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 1) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 1) + + // We've expired the "f" + pruneCountRemaining(t, &s.layout, afterNow.Add(3*time.Second*expiryDivisor), 0) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 0) +} diff --git a/das/storage_service.go b/das/storage_service.go index 806e80dba5..b7526077e9 100644 --- a/das/storage_service.go +++ b/das/storage_service.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -25,6 +26,8 @@ type StorageService interface { HealthCheck(ctx context.Context) error } +const defaultStorageRetention = time.Hour * 24 * 21 // 6 days longer than the batch poster default + func EncodeStorageServiceKey(key common.Hash) string { return key.Hex()[2:] } diff --git a/das/syncing_fallback_storage.go b/das/syncing_fallback_storage.go index 3f4f2765b5..1cf6a832f3 100644 --- a/das/syncing_fallback_storage.go +++ b/das/syncing_fallback_storage.go @@ -7,7 +7,6 @@ import ( "context" "encoding/binary" "fmt" - "math" "math/big" "os" "sync" @@ -63,26 +62,29 @@ type SyncToStorageConfig struct { IgnoreWriteErrors bool `koanf:"ignore-write-errors"` ParentChainBlocksPerRead uint64 `koanf:"parent-chain-blocks-per-read"` StateDir string `koanf:"state-dir"` + SyncExpiredData bool `koanf:"sync-expired-data"` } var DefaultSyncToStorageConfig = SyncToStorageConfig{ Eager: false, EagerLowerBoundBlock: 0, - RetentionPeriod: time.Duration(math.MaxInt64), + RetentionPeriod: defaultStorageRetention, DelayOnError: time.Second, IgnoreWriteErrors: true, ParentChainBlocksPerRead: 100, StateDir: "", + SyncExpiredData: true, } func SyncToStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".eager", DefaultSyncToStorageConfig.Eager, "eagerly sync batch data to this DAS's storage from the rest endpoints, using L1 as the index of batch data hashes; otherwise only sync lazily") f.Uint64(prefix+".eager-lower-bound-block", DefaultSyncToStorageConfig.EagerLowerBoundBlock, "when eagerly syncing, start indexing forward from this L1 block. Only used if there is no sync state") f.Uint64(prefix+".parent-chain-blocks-per-read", DefaultSyncToStorageConfig.ParentChainBlocksPerRead, "when eagerly syncing, max l1 blocks to read per poll") - f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to retain synced data (defaults to forever)") + f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to request storage to retain synced data") f.Duration(prefix+".delay-on-error", DefaultSyncToStorageConfig.DelayOnError, "time to wait if encountered an error before retrying") f.Bool(prefix+".ignore-write-errors", DefaultSyncToStorageConfig.IgnoreWriteErrors, "log only on failures to write when syncing; otherwise treat it as an error") f.String(prefix+".state-dir", DefaultSyncToStorageConfig.StateDir, "directory to store the sync state in, ie the block number currently synced up to, so that we don't sync from scratch each time") + f.Bool(prefix+".sync-expired-data", DefaultSyncToStorageConfig.SyncExpiredData, "sync even data that is expired; needed for mirror configuration") } type l1SyncService struct { @@ -191,7 +193,7 @@ func (s *l1SyncService) processBatchDelivered(ctx context.Context, batchDelivere } log.Info("BatchDelivered", "log", batchDeliveredLog, "event", deliveredEvent) storeUntil := arbmath.SaturatingUAdd(deliveredEvent.TimeBounds.MaxTimestamp, uint64(s.config.RetentionPeriod.Seconds())) - if storeUntil < uint64(time.Now().Unix()) { + if !s.config.SyncExpiredData && storeUntil < uint64(time.Now().Unix()) { // old batch - no need to store return nil } diff --git a/execution/nodeInterface/virtual-contracts.go b/execution/nodeInterface/virtual-contracts.go index d72ad0da8e..d04be10857 100644 --- a/execution/nodeInterface/virtual-contracts.go +++ b/execution/nodeInterface/virtual-contracts.go @@ -141,7 +141,8 @@ func init() { return } posterCost, _ := state.L1PricingState().PosterDataCost(msg, l1pricing.BatchPosterAddress, brotliCompressionLevel) - posterCostInL2Gas := arbos.GetPosterGas(state, header.BaseFee, msg.TxRunMode, posterCost) + // Use estimate mode because this is used to raise the gas cap, so we don't want to underestimate. + posterCostInL2Gas := arbos.GetPosterGas(state, header.BaseFee, core.MessageGasEstimationMode, posterCost) *gascap = arbmath.SaturatingUAdd(*gascap, posterCostInL2Gas) } diff --git a/staker/block_validator.go b/staker/block_validator.go index 0fea05469f..94ee907da5 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -815,7 +815,6 @@ validationsLoop: v.possiblyFatal(errors.New("failed to set SendingValidation status")) } validatorPendingValidationsGauge.Inc(1) - defer validatorPendingValidationsGauge.Dec(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot) @@ -826,6 +825,7 @@ validationsLoop: validationStatus.Runs = runs validationStatus.Cancel = cancel v.LaunchUntrackedThread(func() { + defer validatorPendingValidationsGauge.Dec(1) defer cancel() replaced = validationStatus.replaceStatus(SendingValidation, ValidationSent) if !replaced { diff --git a/system_tests/estimation_test.go b/system_tests/estimation_test.go index e7f00ca94e..284c709fad 100644 --- a/system_tests/estimation_test.go +++ b/system_tests/estimation_test.go @@ -324,3 +324,35 @@ func TestDisableL1Charging(t *testing.T) { _, err = builder.L2.Client.CallContract(ctx, ethereum.CallMsg{To: &addr, Gas: gasWithoutL1Charging, SkipL1Charging: true}, nil) Require(t, err) } + +func TestGasEstimationWithRPCGasLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + cleanup := builder.Build(t) + defer cleanup() + + execConfigA := builder.execConfig + execConfigA.RPC.RPCGasCap = params.TxGas + testClientA, cleanupA := builder.Build2ndNode(t, &SecondNodeParams{execConfig: execConfigA}) + defer cleanupA() + addr := common.HexToAddress("0x12345678") + estimateGas, err := testClientA.Client.EstimateGas(ctx, ethereum.CallMsg{To: &addr}) + Require(t, err) + if estimateGas <= params.TxGas { + Fatal(t, "Incorrect gas estimate") + } + + _, err = testClientA.Client.CallContract(ctx, ethereum.CallMsg{To: &addr}, nil) + Require(t, err) + + execConfigB := builder.execConfig + execConfigB.RPC.RPCGasCap = params.TxGas - 1 + testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{execConfig: execConfigB}) + defer cleanupB() + _, err = testClientB.Client.EstimateGas(ctx, ethereum.CallMsg{To: &addr}) + if err == nil { + Fatal(t, "EstimateGas passed with insufficient gas") + } +} diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index fb4f868571..1330f24882 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -126,6 +126,20 @@ func (r *mockExecRun) GetStepAt(position uint64) containers.PromiseInterface[*va }, nil) } +func (r *mockExecRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] { + ctx := context.Background() + hashes := make([]common.Hash, 0) + for i := uint64(0); i < maxIterations; i++ { + absoluteMachineIndex := machineStartIndex + stepSize*(i+1) + stepResult, err := r.GetStepAt(absoluteMachineIndex).Await(ctx) + if err != nil { + return containers.NewReadyPromise[[]common.Hash](nil, err) + } + hashes = append(hashes, stepResult.Hash) + } + return containers.NewReadyPromise[[]common.Hash](hashes, nil) +} + func (r *mockExecRun) GetLastStep() containers.PromiseInterface[*validator.MachineStepResult] { return r.GetStepAt(mockExecLastPos) } diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index fa6b9000f2..949260002d 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -197,6 +197,17 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[* }) } +func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] { + return stopwaiter.LaunchPromiseThread[[]common.Hash](r, func(ctx context.Context) ([]common.Hash, error) { + var resJson []common.Hash + err := r.client.client.CallContext(ctx, &resJson, server_api.Namespace+"_getMachineHashesWithStepSize", r.id, machineStartIndex, stepSize, maxIterations) + if err != nil { + return nil, err + } + return resJson, err + }) +} + func (r *ExecutionClientRun) GetProofAt(pos uint64) containers.PromiseInterface[[]byte] { return stopwaiter.LaunchPromiseThread[[]byte](r, func(ctx context.Context) ([]byte, error) { var resString string diff --git a/validator/interface.go b/validator/interface.go index 0324b996ed..91668a3771 100644 --- a/validator/interface.go +++ b/validator/interface.go @@ -30,6 +30,7 @@ type ExecutionSpawner interface { type ExecutionRun interface { GetStepAt(uint64) containers.PromiseInterface[*MachineStepResult] + GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] GetLastStep() containers.PromiseInterface[*MachineStepResult] GetProofAt(uint64) containers.PromiseInterface[[]byte] PrepareRange(uint64, uint64) containers.PromiseInterface[struct{}] diff --git a/validator/server_arb/execution_run.go b/validator/server_arb/execution_run.go index 255d42ab16..8bdce145a2 100644 --- a/validator/server_arb/execution_run.go +++ b/validator/server_arb/execution_run.go @@ -7,7 +7,12 @@ import ( "context" "fmt" "sync" + "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/validator" @@ -55,7 +60,6 @@ func (e *executionRun) GetStepAt(position uint64) containers.PromiseInterface[*v if position == ^uint64(0) { machine, err = e.cache.GetFinalMachine(ctx) } else { - // todo cache last machine machine, err = e.cache.GetMachineAt(ctx, position) } if err != nil { @@ -79,6 +83,104 @@ func (e *executionRun) GetStepAt(position uint64) containers.PromiseInterface[*v }) } +func (e *executionRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] { + return stopwaiter.LaunchPromiseThread(e, func(ctx context.Context) ([]common.Hash, error) { + return e.machineHashesWithStepSize(ctx, machineStartIndex, stepSize, maxIterations) + }) +} + +func (e *executionRun) machineHashesWithStepSize( + ctx context.Context, + machineStartIndex, + stepSize, + maxIterations uint64, +) ([]common.Hash, error) { + if stepSize == 0 { + return nil, fmt.Errorf("step size cannot be 0") + } + if maxIterations == 0 { + return nil, fmt.Errorf("max number of iterations cannot be 0") + } + machine, err := e.cache.GetMachineAt(ctx, machineStartIndex) + if err != nil { + return nil, err + } + log.Debug(fmt.Sprintf("Advanced machine to index %d, beginning hash computation", machineStartIndex)) + + // In BOLD, the hash of a machine at index 0 is a special hash that is computed as the + // `machineFinishedHash(gs)` where `gs` is the global state of the machine at index 0. + // This is so that the hash aligns with the start state of the claimed challenge edge + // at the level above, as required by the BOLD protocol. + var machineHashes []common.Hash + if machineStartIndex == 0 { + gs := machine.GetGlobalState() + log.Debug(fmt.Sprintf("Start global state for machine index 0: %+v", gs)) + machineHashes = append(machineHashes, machineFinishedHash(gs)) + } else { + // Otherwise, we simply append the machine hash at the specified start index. + machineHashes = append(machineHashes, machine.Hash()) + } + startHash := machineHashes[0] + + // If we only want 1 hash, we can return early. + if maxIterations == 1 { + return machineHashes, nil + } + + logInterval := maxIterations / 20 // Log every 5% progress + if logInterval == 0 { + logInterval = 1 + } + + start := time.Now() + for i := uint64(0); i < maxIterations; i++ { + // The absolute program counter the machine should be in after stepping. + absoluteMachineIndex := machineStartIndex + stepSize*(i+1) + + // Advance the machine in step size increments. + if err := machine.Step(ctx, stepSize); err != nil { + return nil, fmt.Errorf("failed to step machine to position %d: %w", absoluteMachineIndex, err) + } + if i%logInterval == 0 || i == maxIterations-1 { + progressPercent := (float64(i+1) / float64(maxIterations)) * 100 + log.Info( + fmt.Sprintf( + "Computing BOLD subchallenge progress: %.2f%% - %d of %d hashes", + progressPercent, + i+1, + maxIterations, + ), + "machinePosition", i*stepSize+machineStartIndex, + "timeSinceStart", time.Since(start), + "stepSize", stepSize, + "startHash", startHash, + "machineStartIndex", machineStartIndex, + "maxIterations", maxIterations, + ) + } + machineHashes = append(machineHashes, machine.Hash()) + if uint64(len(machineHashes)) == maxIterations { + log.Info("Reached the max number of iterations for the hashes needed to open a subchallenge") + break + } + if !machine.IsRunning() { + log.Info("Machine no longer running, exiting early from hash computation loop") + break + } + } + log.Info( + "Successfully finished computing the data needed for opening a subchallenge", + "stepSize", stepSize, + "startHash", startHash, + "machineStartIndex", machineStartIndex, + "numberOfHashesComputed", len(machineHashes), + "maxIterations", maxIterations, + "finishedHash", machineHashes[len(machineHashes)-1], + "finishedGlobalState", fmt.Sprintf("%+v", machine.GetGlobalState()), + ) + return machineHashes, nil +} + func (e *executionRun) GetProofAt(position uint64) containers.PromiseInterface[[]byte] { return stopwaiter.LaunchPromiseThread[[]byte](e, func(ctx context.Context) ([]byte, error) { machine, err := e.cache.GetMachineAt(ctx, position) @@ -92,3 +194,7 @@ func (e *executionRun) GetProofAt(position uint64) containers.PromiseInterface[[ func (e *executionRun) GetLastStep() containers.PromiseInterface[*validator.MachineStepResult] { return e.GetStepAt(^uint64(0)) } + +func machineFinishedHash(gs validator.GoGlobalState) common.Hash { + return crypto.Keccak256Hash([]byte("Machine finished:"), gs.Hash().Bytes()) +} diff --git a/validator/server_arb/execution_run_test.go b/validator/server_arb/execution_run_test.go new file mode 100644 index 0000000000..bdc1eefc4d --- /dev/null +++ b/validator/server_arb/execution_run_test.go @@ -0,0 +1,206 @@ +package server_arb + +import ( + "context" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/offchainlabs/nitro/validator" +) + +type mockMachine struct { + gs validator.GoGlobalState + totalSteps uint64 +} + +func (m *mockMachine) Hash() common.Hash { + if m.gs.PosInBatch == m.totalSteps-1 { + return machineFinishedHash(m.gs) + } + return m.gs.Hash() +} + +func (m *mockMachine) GetGlobalState() validator.GoGlobalState { + return m.gs +} + +func (m *mockMachine) Step(ctx context.Context, stepSize uint64) error { + for i := uint64(0); i < stepSize; i++ { + if m.gs.PosInBatch == m.totalSteps-1 { + return nil + } + m.gs.PosInBatch += 1 + } + return nil +} + +func (m *mockMachine) CloneMachineInterface() MachineInterface { + return &mockMachine{ + gs: validator.GoGlobalState{Batch: m.gs.Batch, PosInBatch: m.gs.PosInBatch}, + totalSteps: m.totalSteps, + } +} +func (m *mockMachine) GetStepCount() uint64 { + return 0 +} +func (m *mockMachine) IsRunning() bool { + return m.gs.PosInBatch < m.totalSteps-1 +} +func (m *mockMachine) ValidForStep(uint64) bool { + return true +} +func (m *mockMachine) Status() uint8 { + if m.gs.PosInBatch == m.totalSteps-1 { + return uint8(validator.MachineStatusFinished) + } + return uint8(validator.MachineStatusRunning) +} +func (m *mockMachine) ProveNextStep() []byte { + return nil +} +func (m *mockMachine) Freeze() {} +func (m *mockMachine) Destroy() {} + +func Test_machineHashesWithStep(t *testing.T) { + t.Run("basic argument checks", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e := &executionRun{} + machStartIndex := uint64(0) + stepSize := uint64(0) + maxIterations := uint64(0) + _, err := e.machineHashesWithStepSize(ctx, machStartIndex, stepSize, maxIterations) + if err == nil || !strings.Contains(err.Error(), "step size cannot be 0") { + t.Error("Wrong error") + } + stepSize = uint64(1) + _, err = e.machineHashesWithStepSize(ctx, machStartIndex, stepSize, maxIterations) + if err == nil || !strings.Contains(err.Error(), "number of iterations cannot be 0") { + t.Error("Wrong error") + } + }) + t.Run("machine at start index 0 hash is the finished state hash", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mm := &mockMachine{ + gs: validator.GoGlobalState{ + Batch: 1, + }, + totalSteps: 20, + } + machStartIndex := uint64(0) + stepSize := uint64(1) + maxIterations := uint64(1) + e := &executionRun{ + cache: NewMachineCache(ctx, func(_ context.Context) (MachineInterface, error) { + return mm, nil + }, &DefaultMachineCacheConfig), + } + + hashes, err := e.machineHashesWithStepSize(ctx, machStartIndex, stepSize, maxIterations) + if err != nil { + t.Fatal(err) + } + expected := machineFinishedHash(mm.gs) + if len(hashes) != 1 { + t.Error("Wanted one hash") + } + if expected != hashes[0] { + t.Errorf("Wanted %#x, got %#x", expected, hashes[0]) + } + }) + t.Run("can step in step size increments and collect hashes", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialGs := validator.GoGlobalState{ + Batch: 1, + PosInBatch: 0, + } + mm := &mockMachine{ + gs: initialGs, + totalSteps: 20, + } + machStartIndex := uint64(0) + stepSize := uint64(5) + maxIterations := uint64(4) + e := &executionRun{ + cache: NewMachineCache(ctx, func(_ context.Context) (MachineInterface, error) { + return mm, nil + }, &DefaultMachineCacheConfig), + } + hashes, err := e.machineHashesWithStepSize(ctx, machStartIndex, stepSize, maxIterations) + if err != nil { + t.Fatal(err) + } + expectedHashes := make([]common.Hash, 0) + for i := uint64(0); i < 4; i++ { + if i == 0 { + expectedHashes = append(expectedHashes, machineFinishedHash(initialGs)) + continue + } + gs := validator.GoGlobalState{ + Batch: 1, + PosInBatch: uint64(i * stepSize), + } + expectedHashes = append(expectedHashes, gs.Hash()) + } + if len(hashes) != len(expectedHashes) { + t.Fatal("Wanted one hash") + } + for i := range hashes { + if expectedHashes[i] != hashes[i] { + t.Errorf("Wanted at index %d, %#x, got %#x", i, expectedHashes[i], hashes[i]) + } + } + }) + t.Run("if finishes execution early, can return a smaller number of hashes than the expected max iterations", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialGs := validator.GoGlobalState{ + Batch: 1, + PosInBatch: 0, + } + mm := &mockMachine{ + gs: initialGs, + totalSteps: 20, + } + machStartIndex := uint64(0) + stepSize := uint64(5) + maxIterations := uint64(10) + e := &executionRun{ + cache: NewMachineCache(ctx, func(_ context.Context) (MachineInterface, error) { + return mm, nil + }, &DefaultMachineCacheConfig), + } + + hashes, err := e.machineHashesWithStepSize(ctx, machStartIndex, stepSize, maxIterations) + if err != nil { + t.Fatal(err) + } + expectedHashes := make([]common.Hash, 0) + for i := uint64(0); i < 4; i++ { + if i == 0 { + expectedHashes = append(expectedHashes, machineFinishedHash(initialGs)) + continue + } + gs := validator.GoGlobalState{ + Batch: 1, + PosInBatch: uint64(i * stepSize), + } + expectedHashes = append(expectedHashes, gs.Hash()) + } + expectedHashes = append(expectedHashes, machineFinishedHash(validator.GoGlobalState{ + Batch: 1, + PosInBatch: mm.totalSteps - 1, + })) + if len(hashes) >= int(maxIterations) { + t.Fatal("Wanted fewer hashes than the max iterations") + } + for i := range hashes { + if expectedHashes[i] != hashes[i] { + t.Errorf("Wanted at index %d, %#x, got %#x", i, expectedHashes[i], hashes[i]) + } + } + }) +} diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 016f30bd61..84f597c095 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -149,6 +149,7 @@ var TestValidationServerConfig = ValidationServerConfig{ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index a67299b1a1..3299366821 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -148,6 +148,19 @@ func (a *ExecServerAPI) GetStepAt(ctx context.Context, execid uint64, position u return server_api.MachineStepResultToJson(res), nil } +func (a *ExecServerAPI) GetMachineHashesWithStepSize(ctx context.Context, execid, fromStep, stepSize, maxIterations uint64) ([]common.Hash, error) { + run, err := a.getRun(execid) + if err != nil { + return nil, err + } + hashesInRange := run.GetMachineHashesWithStepSize(fromStep, stepSize, maxIterations) + res, err := hashesInRange.Await(ctx) + if err != nil { + return nil, err + } + return res, nil +} + func (a *ExecServerAPI) GetProofAt(ctx context.Context, execid uint64, position uint64) (string, error) { run, err := a.getRun(execid) if err != nil {