Skip to content

Commit

Permalink
feat(proxy): add request result to prometheus stats
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Feb 28, 2024
1 parent a613716 commit db5adce
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 35 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
github.com/go-pkgz/jrpc v0.2.0
github.com/google/go-cmp v0.6.0
github.com/vektra/mockery/v2 v2.41.0
gotest.tools v2.2.0+incompatible
)

require (
Expand Down
69 changes: 37 additions & 32 deletions internal/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"os"
"strconv"
"syscall"
"time"

Expand Down Expand Up @@ -136,86 +137,90 @@ func kill() error {
return p.Signal(syscall.SIGABRT)
}

func (app *proxyClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
func (app *proxyClient) InitChain(ctx context.Context, req *types.RequestInitChain) (r *types.ResponseInitChain, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync", "success"))(&err)
return app.client.InitChain(ctx, req)
}

func (app *proxyClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
func (app *proxyClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (r *types.ResponsePrepareProposal, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))(&err)
return app.client.PrepareProposal(ctx, req)
}

func (app *proxyClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
func (app *proxyClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (r *types.ResponseProcessProposal, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))(&err)
return app.client.ProcessProposal(ctx, req)
}

func (app *proxyClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
func (app *proxyClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (r *types.ResponseExtendVote, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))(&err)
return app.client.ExtendVote(ctx, req)
}

func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (r *types.ResponseVerifyVoteExtension, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))(&err)
return app.client.VerifyVoteExtension(ctx, req)
}

func (app *proxyClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
func (app *proxyClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (r *types.ResponseFinalizeBlock, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))(&err)
return app.client.FinalizeBlock(ctx, req)
}

func (app *proxyClient) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
func (app *proxyClient) Flush(ctx context.Context) (err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))(&err)
return app.client.Flush(ctx)
}

func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (r *types.ResponseCheckTx, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))(&err)
return app.client.CheckTx(ctx, req)
}

func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
func (app *proxyClient) Echo(ctx context.Context, msg string) (r *types.ResponseEcho, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))(&err)
return app.client.Echo(ctx, msg)
}

func (app *proxyClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
func (app *proxyClient) Info(ctx context.Context, req *types.RequestInfo) (r *types.ResponseInfo, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))(&err)
return app.client.Info(ctx, req)
}

func (app *proxyClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
func (app *proxyClient) Query(ctx context.Context, req *types.RequestQuery) (r *types.ResponseQuery, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))(&err)
return app.client.Query(ctx, req)
}

func (app *proxyClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
func (app *proxyClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (r *types.ResponseListSnapshots, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))(&err)
return app.client.ListSnapshots(ctx, req)
}

func (app *proxyClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
func (app *proxyClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (r *types.ResponseOfferSnapshot, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))(&err)
return app.client.OfferSnapshot(ctx, req)
}

func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (r *types.ResponseLoadSnapshotChunk, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))(&err)
return app.client.LoadSnapshotChunk(ctx, req)
}

func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (r *types.ResponseApplySnapshotChunk, err error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))(&err)
return app.client.ApplySnapshotChunk(ctx, req)
}

// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
func addTimeSample(m metrics.Histogram) func(*error) {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }

// we take err address to simplify usage in defer()
return func(err *error) {
m.With("success", strconv.FormatBool(*err == nil)).Observe(time.Since(start).Seconds())
}
}
105 changes: 103 additions & 2 deletions internal/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"testing"
"time"

"github.com/go-kit/kit/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"gotest.tools/assert"

abciclient "github.com/dashpay/tenderdash/abci/client"
abcimocks "github.com/dashpay/tenderdash/abci/client/mocks"
"github.com/dashpay/tenderdash/abci/example/kvstore"
"github.com/dashpay/tenderdash/abci/server"
"github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/abci/types/mocks"
"github.com/dashpay/tenderdash/libs/log"
tmrand "github.com/dashpay/tenderdash/libs/rand"
)
Expand Down Expand Up @@ -239,3 +241,102 @@ func TestAppConns_Failure(t *testing.T) {
t.Fatal("expected process to receive SIGTERM signal")
}
}
func TestFailureMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger := log.NewTestingLogger(t)
mtr := mockMetrics()
hst := mtr.MethodTiming.(*mockMetric)

app := mocks.NewApplication(t)
app.On("CheckTx", mock.Anything, mock.Anything).Return(&types.ResponseCheckTx{}, errors.New("some error")).Once()
app.On("CheckTx", mock.Anything, mock.Anything).Return(&types.ResponseCheckTx{}, nil).Times(2)
app.On("Info", mock.Anything, mock.Anything).Return(&types.ResponseInfo{}, nil)

// promtest.ToFloat64(hst)
cli := abciclient.NewLocalClient(logger, app)

proxy := New(cli, log.NewNopLogger(), mtr)

var err error
for i := 0; i < 5; i++ {
_, err = proxy.Info(ctx, &types.RequestInfo{})
assert.NoError(t, err)
}

for i := 0; i < 3; i++ {
_, err = proxy.CheckTx(ctx, &types.RequestCheckTx{})
if i == 0 {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}

cancel() // this should stop all clients
proxy.Wait()
assert.Equal(t, 5, hst.count["method=info,type=sync,success=true"])
assert.Equal(t, 1, hst.count["method=check_tx,type=sync,success=false"])
assert.Equal(t, 2, hst.count["method=check_tx,type=sync,success=true"])
}

func mockMetrics() *Metrics {
return &Metrics{
MethodTiming: &mockMetric{
labels: []string{},
count: make(map[string]int),
mtx: &sync.Mutex{},
},
}
}

type mockMetric struct {
labels []string
/// count maps concatenated labels to the count of observations.
count map[string]int
mtx *sync.Mutex
}

var _ = metrics.Histogram(&mockMetric{})

func (m *mockMetric) With(labelValues ...string) metrics.Histogram {
m.mtx.Lock()
defer m.mtx.Unlock()

return &mockMetric{
labels: append(m.labels, labelValues...),
count: m.count,
mtx: m.mtx, // pointer, as we use the same m.count
}
}

func (m *mockMetric) Observe(_value float64) {
m.mtx.Lock()
defer m.mtx.Unlock()

labels := ""
for i, label := range m.labels {
labels += label
if i < len(m.labels)-1 {
if i%2 == 0 {
labels += "="
} else {
labels += ","
}
}
}

m.count[labels]++
}

func (m *mockMetric) String() (s string) {
m.mtx.Lock()
defer m.mtx.Unlock()

for labels, total := range m.count {
s += fmt.Sprintf("%s: %d\n", labels, total)
}

return s
}

0 comments on commit db5adce

Please sign in to comment.