Skip to content

Commit

Permalink
feat(zetaclient): add generic rpc metrics (#2597)
Browse files Browse the repository at this point in the history
* feat(zetaclient): add generic rpc metrics

* feedback

* changelog

* fmt
  • Loading branch information
gartnera committed Aug 15, 2024
1 parent 2d1baa7 commit f3b450e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [2524](https://github.com/zeta-chain/node/pull/2524) - add inscription envolop parsing
* [2533](https://github.com/zeta-chain/node/pull/2533) - parse memo from both OP_RETURN and inscription
* [2568](https://github.com/zeta-chain/node/pull/2568) - improve AppContext by converging chains, chainParams, enabledChains, and additionalChains into a single zctx.Chain
* [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient


## v19.0.1
Expand Down
8 changes: 7 additions & 1 deletion zetaclient/chains/evm/signer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
ethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
Expand Down Expand Up @@ -866,11 +867,16 @@ func getEVMRPC(ctx context.Context, endpoint string) (interfaces.EVMRPCClient, e
client := &mocks.MockEvmClient{}
return client, ethSigner, nil
}
httpClient, err := metrics.GetInstrumentedHTTPClient(endpoint)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get instrumented HTTP client")
}

client, err := ethclient.Dial(endpoint)
rpcClient, err := ethrpc.DialHTTPWithClient(endpoint, httpClient)
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to dial EVM client (endpoint %q)", endpoint)
}
client := ethclient.NewClient(rpcClient)

chainID, err := client.ChainID(ctx)
if err != nil {
Expand Down
57 changes: 57 additions & 0 deletions zetaclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metrics
import (
"context"
"net/http"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -112,6 +113,34 @@ var (
Help: "Histogram of the TSS keysign latency",
Buckets: []float64{1, 7, 15, 30, 60, 120, 240},
}, []string{"result"})

// RPCInProgress is a gauge that contains the number of RPCs requests in progress
RPCInProgress = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_in_progress",
Help: "Number of RPC requests in progress",
}, []string{"host"})

// RPCCount is a counter that contains the number of total RPC requests
RPCCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_count",
Help: "A counter for number of total RPC requests",
},
[]string{"host", "code"},
)

// RPCLatency is a histogram of the RPC latency
RPCLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_duration_seconds",
Help: "A histogram of the RPC duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"host"},
)
)

// NewMetrics creates a new Metrics instance
Expand Down Expand Up @@ -151,3 +180,31 @@ func (m *Metrics) Stop() error {
defer cancel()
return m.s.Shutdown(ctx)
}

// GetInstrumentedHTTPClient sets up a http client that emits prometheus metrics
func GetInstrumentedHTTPClient(endpoint string) (*http.Client, error) {
host := endpoint
// try to parse as url (so that we do not expose auth uuid in metrics)
endpointURL, err := url.Parse(endpoint)
if err == nil {
host = endpointURL.Host
}
labels := prometheus.Labels{"host": host}
rpcCounterMetric, err := RPCCount.CurryWith(labels)
if err != nil {
return nil, err
}
rpcLatencyMetric, err := RPCLatency.CurryWith(labels)
if err != nil {
return nil, err
}

transport := http.DefaultTransport
transport = promhttp.InstrumentRoundTripperDuration(rpcLatencyMetric, transport)
transport = promhttp.InstrumentRoundTripperCounter(rpcCounterMetric, transport)
transport = promhttp.InstrumentRoundTripperInFlight(RPCInProgress.With(labels), transport)

return &http.Client{
Transport: transport,
}, nil
}
49 changes: 43 additions & 6 deletions zetaclient/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package metrics

import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
. "gopkg.in/check.v1"
)

Expand All @@ -23,20 +28,52 @@ func (ms *MetricsSuite) SetUpSuite(c *C) {
ms.m = m
}

// assert that the curried metric actually uses the same underlying storage
func (ms *MetricsSuite) TestCurryWith(c *C) {
rpcTotalsC := RPCCount.MustCurryWith(prometheus.Labels{"host": "test"})
rpcTotalsC.With(prometheus.Labels{"code": "400"}).Add(1.0)

rpcCtr := testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "test", "code": "400"}))
c.Assert(rpcCtr, Equals, 1.0)

RPCCount.Reset()
}

func (ms *MetricsSuite) TestMetrics(c *C) {
GetFilterLogsPerChain.WithLabelValues("chain1").Inc()
GetFilterLogsPerChain.WithLabelValues("chain2").Inc()
GetFilterLogsPerChain.WithLabelValues("chain2").Inc()
time.Sleep(1 * time.Second)
res, err := http.Get("http://127.0.0.1:8886/metrics")

chain1Ctr := testutil.ToFloat64(GetFilterLogsPerChain.WithLabelValues("chain1"))
c.Assert(chain1Ctr, Equals, 1.0)

httpClient, err := GetInstrumentedHTTPClient("http://127.0.0.1:8886/myauthuuid")
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, http.StatusOK)
defer res.Body.Close()
//out, err := ioutil.ReadAll(res.Body)
//fmt.Println(string(out))

res, err = http.Get("http://127.0.0.1:8886")
res, err := httpClient.Get("http://127.0.0.1:8886")
c.Assert(err, IsNil)
defer res.Body.Close()
c.Assert(res.StatusCode, Equals, http.StatusOK)

res, err = httpClient.Get("http://127.0.0.1:8886/metrics")
c.Assert(err, IsNil)
defer res.Body.Close()
c.Assert(res.StatusCode, Equals, http.StatusOK)
body, err := io.ReadAll(res.Body)
c.Assert(err, IsNil)
metricsBody := string(body)
c.Assert(strings.Contains(metricsBody, fmt.Sprintf("%s_%s", ZetaClientNamespace, "rpc_count")), Equals, true)

// assert that rpc count is being incremented at all
rpcCount := testutil.ToFloat64(RPCCount)
c.Assert(rpcCount, Equals, 2.0)

// assert that rpc count is being incremented correctly
rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "200"}))
c.Assert(rpcCount, Equals, 2.0)

// assert that rpc count is not being incremented incorrectly
rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "502"}))
c.Assert(rpcCount, Equals, 0.0)
}
10 changes: 8 additions & 2 deletions zetaclient/orchestrator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
ethrpc "github.com/ethereum/go-ethereum/rpc"
solrpc "github.com/gagliardetto/solana-go/rpc"
"github.com/pkg/errors"

Expand Down Expand Up @@ -279,12 +280,17 @@ func syncObserverMap(
continue
}

// create EVM client
evmClient, err := ethclient.DialContext(ctx, cfg.Endpoint)
httpClient, err := metrics.GetInstrumentedHTTPClient(cfg.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Str("rpc.endpoint", cfg.Endpoint).Msgf("Unable to create HTTP client")
continue
}
rpcClient, err := ethrpc.DialHTTPWithClient(cfg.Endpoint, httpClient)
if err != nil {
logger.Std.Error().Err(err).Str("rpc.endpoint", cfg.Endpoint).Msgf("Unable to dial EVM RPC")
continue
}
evmClient := ethclient.NewClient(rpcClient)

database, err := db.NewFromSqlite(dbpath, chainName, true)
if err != nil {
Expand Down

0 comments on commit f3b450e

Please sign in to comment.