Skip to content

Commit

Permalink
#8: don't use history archive caching for rpc, no need for metrics si…
Browse files Browse the repository at this point in the history
…nce archive usage is very limited
  • Loading branch information
sreuland committed Feb 9, 2024
1 parent 9cdf762 commit 2c465a5
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 61 deletions.
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
IngestionTimeout time.Duration
LogFormat LogFormat
LogLevel logrus.Level
Expand Down Expand Up @@ -64,6 +65,13 @@ type Config struct {
flagset *pflag.FlagSet
}

func (cfg *Config) ExtendedUserAgent(extension string) string {
if cfg.HistoryArchiveUserAgent == "" {
return extension
}
return cfg.HistoryArchiveUserAgent + "/" + extension
}

func (cfg *Config) SetValues(lookupEnv func(string) (string, bool)) error {
// We start with the defaults
if err := cfg.loadDefaults(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func TestConfigLoadDefaults(t *testing.T) {
assert.Equal(t, uint(runtime.NumCPU()), cfg.PreflightWorkerCount)
}

func TestConfigExtendedUserAgent(t *testing.T) {
cfg := Config{
HistoryArchiveUserAgent: "Test",
}
require.NoError(t, cfg.loadDefaults())
assert.Equal(t, "Test/123", cfg.ExtendedUserAgent("123"))
}

func TestConfigLoadFlagsDefaultValuesOverrideExisting(t *testing.T) {
// Set up a config with an existing non-default value
cfg := Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAllConfigFieldsMustHaveASingleOption(t *testing.T) {

// Allow us to explicitly exclude any fields on the Config struct, which are not going to have Options.
// e.g. "ConfigPath"
excluded := map[string]bool{}
excluded := map[string]bool{"HistoryArchiveUserAgent": true}

cfg := Config{}
cfgValue := reflect.ValueOf(cfg)
Expand Down
12 changes: 3 additions & 9 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package daemon
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/pprof" //nolint:gosec
"os"
"os/signal"
"path"
runtimePprof "runtime/pprof"
"sync"
"syscall"
Expand All @@ -20,7 +18,6 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
"github.com/stellar/go/support/log"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -125,7 +122,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken
CheckpointFrequency: cfg.CheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "captivecore",
UserAgent: cfg.ExtendedUserAgent("captivecore"),
UseDB: true,
}
return ledgerbackend.NewCaptive(captiveConfig)
Expand Down Expand Up @@ -156,12 +153,9 @@ func MustNew(cfg *config.Config) *Daemon {
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: fmt.Sprintf("soroban-rpc/%s", config.Version)},
UserAgent: cfg.HistoryArchiveUserAgent},
CacheConfig: historyarchive.CacheOptions{
Cache: true,
Path: path.Join(cfg.CaptiveCoreStoragePath, "bucket-cache"),
Log: log.WithField("subservice", "ha-cache"),
MaxFiles: 150,
Cache: false,
},
},
)
Expand Down
41 changes: 1 addition & 40 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,10 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

haStatsMetric := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: cfg.Daemon.MetricsNamespace(), Subsystem: "ingest", Name: "history_archive_stats_total",
Help: "counters of different history archive stats",
},
[]string{"source", "type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric,
haStatsMetric)
ledgerStatsMetric)

service := &Service{
logger: cfg.Logger,
Expand All @@ -99,9 +90,7 @@ func newService(cfg Config) *Service {
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
haStatsMetric: haStatsMetric,
},
archive: cfg.Archive,
}

return service
Expand Down Expand Up @@ -139,7 +128,6 @@ type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
haStatsMetric *prometheus.CounterVec
}

type Service struct {
Expand All @@ -153,7 +141,6 @@ type Service struct {
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
archive historyarchive.ArchiveInterface
}

func (s *Service) Close() error {
Expand Down Expand Up @@ -311,35 +298,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.metrics.latestLedgerMetric.Set(float64(sequence))
s.addHistoryArchiveStatsMetrics(s.archive.GetStats())
return nil
}

func (s *Service) addHistoryArchiveStatsMetrics(stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_downloads"}).
Add(float64(historyServerStat.GetDownloads()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_uploads"}).
Add(float64(historyServerStat.GetUploads()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "requests"}).
Add(float64(historyServerStat.GetRequests()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "cache_hits"}).
Add(float64(historyServerStat.GetCacheHits()))
}
}

func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error {
startTime := time.Now()
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
Expand Down
11 changes: 0 additions & 11 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
supportlog "github.com/stellar/go/support/log"
Expand Down Expand Up @@ -67,15 +66,6 @@ func TestIngestion(t *testing.T) {
mockDB := &MockDB{}
mockLedgerBackend := &ledgerbackend.MockDatabaseBackend{}

mockArchive := historyarchive.MockArchive{}
mockArchiveStats := historyarchive.MockArchiveStats{}
mockArchive.On("GetStats").Return([]historyarchive.ArchiveStats{&mockArchiveStats}).Once()
mockArchiveStats.On("GetBackendName").Return("").Times(4)
mockArchiveStats.On("GetDownloads").Return(uint32(0)).Once()
mockArchiveStats.On("GetCacheHits").Return(uint32(0)).Once()
mockArchiveStats.On("GetRequests").Return(uint32(0)).Once()
mockArchiveStats.On("GetUploads").Return(uint32(0)).Once()

daemon := interfaces.MakeNoOpDeamon()
config := Config{
Logger: supportlog.New(),
Expand All @@ -85,7 +75,6 @@ func TestIngestion(t *testing.T) {
LedgerBackend: mockLedgerBackend,
Daemon: daemon,
NetworkPassPhrase: network.TestNetworkPassphrase,
Archive: &mockArchive,
}
sequence := uint32(3)
service := newService(config)
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
cfg.HistoryArchiveUserAgent = fmt.Sprintf("soroban-rpc/%s", config.Version)
daemon.MustNew(&cfg).Run()
},
}
Expand Down

0 comments on commit 2c465a5

Please sign in to comment.