Skip to content

Commit

Permalink
feat(abciclient): limit concurrent gRPC connections
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Apr 5, 2024
1 parent e4c934f commit faf90c4
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 81 deletions.
49 changes: 48 additions & 1 deletion abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type grpcClient struct {
mtx sync.Mutex
addr string
err error

concurrency chan struct{}
}

var _ Client = (*grpcClient)(nil)
Expand All @@ -42,24 +44,69 @@ func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
logger: logger,
addr: addr,
mustConnect: mustConnect,
concurrency: nil,
}
cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
return cli
}

// SetMaxConcurrentStreams sets the maximum number of concurrent streams to be
// allowed on this client.
//
// Not thread-safe, only use this before starting the client.
//
// If limit is 0, no limit is enforced.
func (cli *grpcClient) SetMaxConcurrentStreams(n uint16) {
if cli.IsRunning() {
panic("cannot set max concurrent streams after starting the client")
}
if n == 0 {
cli.concurrency = nil
} else {
cli.concurrency = make(chan struct{}, n)
}
}

func dialerFunc(_ctx context.Context, addr string) (net.Conn, error) {
return tmnet.Connect(addr)
}

// rateLimit blocks until the client is allowed to send a request.
// It returns a function that should be called after the request is done.
func (cli *grpcClient) rateLimit() context.CancelFunc {
if cli.concurrency == nil {
return func() {}
}
cli.logger.Debug("grpcClient rateLimit", "addr", cli.addr)
cli.concurrency <- struct{}{}
return func() { <-cli.concurrency }
}

func (cli *grpcClient) unaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
done := cli.rateLimit()
defer done()

return invoker(ctx, method, req, reply, cc, opts...)
}

func (cli *grpcClient) streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
done := cli.rateLimit()
defer done()

return streamer(ctx, desc, cc, method, opts...)
}

func (cli *grpcClient) OnStart(ctx context.Context) error {
timer := time.NewTimer(0)
defer timer.Stop()

RETRY_LOOP:
for {
conn, err := grpc.Dial(cli.addr,
conn, err := grpc.NewClient(cli.addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialerFunc),
grpc.WithChainUnaryInterceptor(cli.unaryClientInterceptor),
grpc.WithChainStreamInterceptor(cli.streamClientInterceptor),
)
if err != nil {
if cli.mustConnect {
Expand Down
131 changes: 86 additions & 45 deletions abci/client/grpc_client_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package abciclient_test
package abciclient

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"

abciclient "github.com/dashpay/tenderdash/abci/client"
abciserver "github.com/dashpay/tenderdash/abci/server"
"github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/libs/log"
Expand All @@ -18,47 +19,77 @@ import (

// TestGRPCClientServerParallel tests that gRPC client and server can handle multiple parallel requests
func TestGRPCClientServerParallel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

logger := log.NewNopLogger()
app := &mockApplication{t: t}

socket := t.TempDir() + "/grpc_test"
client, _, err := makeGRPCClientServer(ctx, t, logger, app, socket)
if err != nil {
t.Fatal(err)
const (
timeout = 1 * time.Second
tick = 10 * time.Millisecond
)

type testCase struct {
threads int
concurrency uint16
}

// we'll use that mutex to ensure threads don't finish before we check status
app.mtx.Lock()

const threads = 5
// started will be marked as done as soon as app.Info() handler executes on the server
app.started.Add(threads)
// done will be used to wait for all threads to finish
var done sync.WaitGroup
done.Add(threads)

for i := 0; i < threads; i++ {
thread := uint64(i)
go func() {
_, _ = client.Info(ctx, &types.RequestInfo{BlockVersion: thread})
done.Done()
}()
testCases := []testCase{
{threads: 1, concurrency: 1},
{threads: 2, concurrency: 1},
{threads: 2, concurrency: 2},
{threads: 5, concurrency: 0},
{threads: 5, concurrency: 1},
{threads: 5, concurrency: 2},
{threads: 5, concurrency: 5},
}

// wait for threads to execute
// note it doesn't mean threads are really done, as they are waiting on the mtx
// so if all `started` are marked as done, it means all threads have started
// in parallel
app.started.Wait()

// unlock the mutex so that threads can finish their execution
app.mtx.Unlock()
logger := log.NewNopLogger()

// wait for all threads to really finish
done.Wait()
for _, tc := range testCases {
t.Run(fmt.Sprintf("%+v", tc), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

app := &mockApplication{t: t, concurrencyLimit: int32(tc.concurrency)}

socket := t.TempDir() + "/grpc_test"
client, _, err := makeGRPCClientServer(ctx, t, logger, app, socket, tc.concurrency)
if err != nil {
t.Fatal(err)
}

// we'll use that mutex to ensure threads don't finish before we check status
app.mtx.Lock()

// done will be used to wait for all threads to finish
var done sync.WaitGroup

for i := 0; i < tc.threads; i++ {
done.Add(1)
thread := uint64(i)
go func() {
// we use BlockVersion for logging purposes, so we put thread id there
_, _ = client.Info(ctx, &types.RequestInfo{BlockVersion: thread})
done.Done()
}()
}

expectThreads := int32(tc.concurrency)
if expectThreads == 0 {
expectThreads = int32(tc.threads)
}

// wait for all threads to start execution
assert.Eventually(t, func() bool {
return app.running.Load() == expectThreads
}, timeout, tick, "not all threads started in time")

// ensure no other threads will start
time.Sleep(2 * tick)

// unlock the mutex so that threads can finish their execution
app.mtx.Unlock()

// wait for all threads to really finish
done.Wait()
})
}
}

func makeGRPCClientServer(
Expand All @@ -67,7 +98,8 @@ func makeGRPCClientServer(
logger log.Logger,
app types.Application,
name string,
) (abciclient.Client, service.Service, error) {
limit uint16,
) (Client, service.Service, error) {
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
t.Cleanup(leaktest.Check(t))
Expand All @@ -82,7 +114,8 @@ func makeGRPCClientServer(
return nil, nil, err
}

client := abciclient.NewGRPCClient(logger.With("module", "abci-client"), socket, true)
client := NewGRPCClient(logger.With("module", "abci-client"), socket, true)
client.(*grpcClient).SetMaxConcurrentStreams(limit)

if err := client.Start(ctx); err != nil {
cancel()
Expand All @@ -96,19 +129,27 @@ func makeGRPCClientServer(
type mockApplication struct {
types.BaseApplication
mtx sync.Mutex
// we'll use that to ensure all threads have started
started sync.WaitGroup

running atomic.Int32
// concurrencyLimit of concurrent requests
concurrencyLimit int32

t *testing.T
}

func (m *mockApplication) Info(_ctx context.Context, req *types.RequestInfo) (res *types.ResponseInfo, err error) {
m.t.Logf("Info %d called", req.BlockVersion)
// mark wg as done to signal that we have executed
m.started.Done()
// we will wait here until all threads mark wg as done
running := m.running.Add(1)
defer m.running.Add(-1)

if m.concurrencyLimit > 0 {
assert.LessOrEqual(m.t, running, m.concurrencyLimit, "too many requests running in parallel")
}

// we will wait here until all expected threads are running
m.mtx.Lock()
defer m.mtx.Unlock()
m.t.Logf("Info %d finished", req.BlockVersion)

return &types.ResponseInfo{}, nil
}
21 changes: 11 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ require (
github.com/go-chi/chi v4.1.2+incompatible // indirect
github.com/go-kit/kit v0.12.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/golangci/golangci-lint v1.55.2
github.com/google/btree v1.1.2 // indirect
github.com/google/gopacket v1.1.19
github.com/google/orderedcode v0.0.1
github.com/google/uuid v1.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand All @@ -39,10 +39,10 @@ require (
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.18.0
golang.org/x/net v0.20.0
golang.org/x/crypto v0.19.0
golang.org/x/net v0.21.0
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.52.0
google.golang.org/grpc v1.63.0
pgregory.net/rapid v0.4.8
)

Expand Down Expand Up @@ -88,7 +88,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-pkgz/expirable-cache v0.0.3 // indirect
github.com/go-pkgz/rest v1.5.0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
Expand Down Expand Up @@ -116,7 +116,8 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.tmz.dev/musttag v0.7.2 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
)

require (
Expand Down Expand Up @@ -284,11 +285,11 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit faf90c4

Please sign in to comment.