Skip to content

Commit

Permalink
Merge branch 'merkle-perf' into merkle-perf-a
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed Jun 28, 2024
2 parents 49db0ff + a473af8 commit 68b9317
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y \
ca-certificates \
wabt && \
wabt \
sysstat && \
/usr/sbin/update-ca-certificates && \
useradd -s /bin/bash user && \
mkdir -p /home/user/l1keystore && \
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ test-go-deps: \
build-replay-env \
$(stylus_test_wasms) \
$(arbitrator_stylus_lib) \
$(arbitrator_generated_header) \
$(patsubst %,$(arbitrator_cases)/%.wasm, global-state read-inboxmsg-10 global-state-wrapper const)

build-prover-header: $(arbitrator_generated_header)
Expand Down
4 changes: 4 additions & 0 deletions arbos/programs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func newApiClosures(
}
captureHostio := func(name string, args, outs []byte, startInk, endInk uint64) {
tracingInfo.Tracer.CaptureStylusHostio(name, args, outs, startInk, endInk)
if name == "evm_gas_left" || name == "evm_ink_left" {
tracingInfo.Tracer.CaptureState(0, vm.GAS, 0, 0, scope, []byte{}, depth, nil)
tracingInfo.Tracer.CaptureState(0, vm.POP, 0, 0, scope, []byte{}, depth, nil)
}
}

return func(req RequestType, input []byte) ([]byte, []byte, uint64) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/offchainlabs/nitro/staker/validatorwallet"
"github.com/offchainlabs/nitro/util/colors"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/iostat"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/validator/server_common"
Expand Down Expand Up @@ -404,6 +405,10 @@ func mainImpl() int {
return 1
}

if nodeConfig.Metrics {
go iostat.RegisterAndPopulateMetrics(ctx, 1, 5)
}

var deferFuncs []func()
defer func() {
for i := range deferFuncs {
Expand Down
37 changes: 31 additions & 6 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"math/bits"
"sync/atomic"
"time"

flag "github.com/spf13/pflag"
Expand All @@ -25,6 +26,16 @@ import (
"github.com/offchainlabs/nitro/util/pretty"
)

const metricBase string = "arb/das/rpc/aggregator/store"

var (
// This metric shows 1 if there was any error posting to the backends, until
// there was a Store that had no backend failures.
anyErrorGauge = metrics.GetOrRegisterGauge(metricBase+"/error/gauge", nil)

// Other aggregator metrics are generated dynamically in the Store function.
)

type AggregatorConfig struct {
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Expand Down Expand Up @@ -154,13 +165,22 @@ type storeResponse struct {
// (eg via TimeoutWrapper) then it also returns an error.
func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0))

allBackendsSucceeded := false
defer func() {
if allBackendsSucceeded {
anyErrorGauge.Update(0)
} else {
anyErrorGauge.Update(1)
}
}()

responses := make(chan storeResponse, len(a.services))

expectedHash := dastree.Hash(message)
for _, d := range a.services {
go func(ctx context.Context, d ServiceDetails) {
storeCtx, cancel := context.WithTimeout(ctx, a.requestTimeout)
const metricBase string = "arb/das/rpc/aggregator/store"
var metricWithServiceName = metricBase + "/" + d.metricName
defer cancel()
incFailureMetric := func() {
Expand Down Expand Up @@ -226,22 +246,22 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64)
err error
}

var storeFailures atomic.Int64
// Collect responses from backends.
certDetailsChan := make(chan certDetails)
go func() {
var pubKeys []blsSignatures.PublicKey
var sigs []blsSignatures.Signature
var aggSignersMask uint64
var storeFailures, successfullyStoredCount int
var successfullyStoredCount int
var returned bool
for i := 0; i < len(a.services); i++ {

select {
case <-ctx.Done():
break
case r := <-responses:
if r.err != nil {
storeFailures++
_ = storeFailures.Add(1)
log.Warn("das.Aggregator: Error from backend", "backend", r.details.service, "signerMask", r.details.signersMask, "err", r.err)
} else {
pubKeys = append(pubKeys, r.details.pubKey)
Expand All @@ -265,10 +285,10 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64)
certDetailsChan <- cd
returned = true
if a.maxAllowedServiceStoreFailures > 0 && // Ignore the case where AssumedHonest = 1, probably a testnet
storeFailures+1 > a.maxAllowedServiceStoreFailures {
int(storeFailures.Load())+1 > a.maxAllowedServiceStoreFailures {
log.Error("das.Aggregator: storing the batch data succeeded to enough DAS commitee members to generate the Data Availability Cert, but if one more had failed then the cert would not have been able to be generated. Look for preceding logs with \"Error from backend\"")
}
} else if storeFailures > a.maxAllowedServiceStoreFailures {
} else if int(storeFailures.Load()) > a.maxAllowedServiceStoreFailures {
cd := certDetails{}
cd.err = fmt.Errorf("aggregator failed to store message to at least %d out of %d DASes (assuming %d are honest). %w", a.requiredServicesForStore, len(a.services), a.config.AssumedHonest, daprovider.ErrBatchToDasFailed)
certDetailsChan <- cd
Expand Down Expand Up @@ -302,6 +322,11 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64)
if !verified {
return nil, fmt.Errorf("failed aggregate signature check. %w", daprovider.ErrBatchToDasFailed)
}

if storeFailures.Load() == 0 {
allBackendsSucceeded = true
}

return &aggCert, nil
}

Expand Down
2 changes: 1 addition & 1 deletion nitro-testnode
114 changes: 114 additions & 0 deletions util/iostat/iostat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package iostat

import (
"bufio"
"context"
"fmt"
"os/exec"
"runtime"
"strconv"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

func RegisterAndPopulateMetrics(ctx context.Context, spawnInterval, maxDeviceCount int) {
if runtime.GOOS != "linux" {
log.Warn("Iostat command not supported disabling corresponding metrics")
return
}
deviceMetrics := make(map[string]map[string]metrics.GaugeFloat64)
statReceiver := make(chan DeviceStats)
go Run(ctx, spawnInterval, statReceiver)
for {
stat, ok := <-statReceiver
if !ok {
log.Info("Iostat statReceiver channel was closed due to error or command being completed")
return
}
if _, ok := deviceMetrics[stat.DeviceName]; !ok {
// Register metrics for a maximum of maxDeviceCount (fail safe incase iostat command returns incorrect names indefinitely)
if len(deviceMetrics) < maxDeviceCount {
baseMetricName := fmt.Sprintf("isotat/%s/", stat.DeviceName)
deviceMetrics[stat.DeviceName] = make(map[string]metrics.GaugeFloat64)
deviceMetrics[stat.DeviceName]["readspersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"readspersecond", nil)
deviceMetrics[stat.DeviceName]["writespersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"writespersecond", nil)
deviceMetrics[stat.DeviceName]["await"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"await", nil)
} else {
continue
}
}
deviceMetrics[stat.DeviceName]["readspersecond"].Update(stat.ReadsPerSecond)
deviceMetrics[stat.DeviceName]["writespersecond"].Update(stat.WritesPerSecond)
deviceMetrics[stat.DeviceName]["await"].Update(stat.Await)
}
}

type DeviceStats struct {
DeviceName string
ReadsPerSecond float64
WritesPerSecond float64
Await float64
}

func Run(ctx context.Context, interval int, receiver chan DeviceStats) {
defer close(receiver)
// #nosec G204
cmd := exec.CommandContext(ctx, "iostat", "-dNxy", strconv.Itoa(interval))
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Error("Failed to get stdout", "err", err)
return
}
if err := cmd.Start(); err != nil {
log.Error("Failed to start iostat command", "err", err)
return
}
var fields []string
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if strings.HasPrefix(line, "Device") {
fields = strings.Fields(line)
continue
}
data := strings.Fields(line)
if len(data) == 0 {
continue
}
stat := DeviceStats{}
var err error
for i, field := range fields {
switch field {
case "Device", "Device:":
stat.DeviceName = data[i]
case "r/s":
stat.ReadsPerSecond, err = strconv.ParseFloat(data[i], 64)
case "w/s":
stat.WritesPerSecond, err = strconv.ParseFloat(data[i], 64)
case "await":
stat.Await, err = strconv.ParseFloat(data[i], 64)
}
if err != nil {
log.Error("Error parsing command result from iostat", "err", err)
continue
}
}
if stat.DeviceName == "" {
continue
}
receiver <- stat
}
if scanner.Err() != nil {
log.Error("Iostat scanner error", err, scanner.Err())
}
if err := cmd.Process.Kill(); err != nil {
log.Error("Failed to kill iostat process", "err", err)
}
if err := cmd.Wait(); err != nil {
log.Error("Error waiting for iostat to exit", "err", err)
}
stdout.Close()
log.Info("Iostat command terminated")
}

0 comments on commit 68b9317

Please sign in to comment.