Skip to content

Commit

Permalink
refactor: make disperser client reuse same grpc connection (#826)
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf authored Oct 24, 2024
1 parent 1e035c8 commit 6894828
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 56 deletions.
137 changes: 92 additions & 45 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"sync"
"time"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand All @@ -18,15 +19,16 @@ import (
)

type Config struct {
Hostname string
Port string
Hostname string
Port string
// BlobDispersal Timeouts for both authenticated and unauthenticated dispersals
// GetBlobStatus and RetrieveBlob timeouts are hardcoded to 60seconds
// TODO: do we want to add config timeouts for those separate requests?
Timeout time.Duration
UseSecureGrpcFlag bool
}

// Deprecated: Use &Config{...} directly instead
func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag bool) *Config {
return &Config{
Hostname: hostname,
Expand All @@ -37,47 +39,80 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b
}

type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
}

// See the NewDisperserClient constructor's documentation for details and usage examples.
type disperserClient struct {
config *Config
signer core.BlobRequestSigner
// conn and client are not initialized in the constructor, but are initialized lazily
// whenever a method is called, using initOnce to make sure initialization happens only once
// and is thread-safe
initOnce sync.Once
// We use a single grpc connection, which allows a max number of concurrent open streams (from DisperseBlobAuthenticated).
// This should be fine in most cases, as each such request should take <1sec per 1MB blob.
// The MaxConcurrentStreams parameter is set by the server. If not set, then it defaults to the stdlib's
// http2 default of 100-1000: https://github.com/golang/net/blob/4783315416d92ff3d4664762748bd21776b42b98/http2/transport.go#L55
// This means a conservative estimate of 100-1000MB/sec, which should be amply sufficient.
// If we ever need to increase this, we could either consider asking the disperser to increase its limit,
// or to use a pool of connections here.
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
}

var _ DisperserClient = &disperserClient{}

func NewDisperserClient(config *Config, signer core.BlobRequestSigner) DisperserClient {
// DisperserClient maintains a single underlying grpc connection to the disperser server,
// through which it sends requests to disperse blobs, get blob status, and retrieve blobs.
// The connection is established lazily on the first method call. Don't forget to call Close(),
// which is safe to call even if the connection was never established.
//
// DisperserClient is safe to be used concurrently by multiple goroutines.
//
// Example usage:
//
// client := NewDisperserClient(config, signer)
// defer client.Close()
//
// // The connection will be established on the first call
// status, requestId, err := client.DisperseBlob(ctx, someData, someQuorums)
// if err != nil {
// // Handle error
// }
//
// // Subsequent calls will use the existing connection
// status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums)
func NewDisperserClient(config *Config, signer core.BlobRequestSigner) *disperserClient {
return &disperserClient{
config: config,
signer: signer,
// conn and client are initialized lazily
}
}

func (c *disperserClient) getDialOptions() []grpc.DialOption {
if c.config.UseSecureGrpcFlag {
config := &tls.Config{}
credential := credentials.NewTLS(config)
return []grpc.DialOption{grpc.WithTransportCredentials(credential)}
} else {
return []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *disperserClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
c.client = nil
return err
}
return nil
}

func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)

dialOptions := c.getDialOptions()
conn, err := grpc.Dial(addr, dialOptions...)
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
}
defer func() { _ = conn.Close() }()

disperserClient := disperser_rpc.NewDisperserClient(conn)
ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)
defer cancel()

Expand All @@ -96,7 +131,7 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums
CustomQuorumNumbers: quorumNumbers,
}

reply, err := disperserClient.DisperseBlob(ctxTimeout, request)
reply, err := c.client.DisperseBlob(ctxTimeout, request)
if err != nil {
return nil, nil, err
}
Expand All @@ -110,6 +145,11 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums
}

func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
}

if c.signer == nil {
return nil, nil, fmt.Errorf("uninitialized signer for authenticated dispersal")
}
Expand Down Expand Up @@ -137,22 +177,11 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
AccountId: accountId,
}

addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)

dialOptions := c.getDialOptions()
conn, err := grpc.Dial(addr, dialOptions...)

if err != nil {
return nil, nil, err
}
defer func() { _ = conn.Close() }()

disperserClient := disperser_rpc.NewDisperserClient(conn)
ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)

defer cancel()

stream, err := disperserClient.DisperseBlobAuthenticated(ctxTimeout)
stream, err := c.client.DisperseBlobAuthenticated(ctxTimeout)
if err != nil {
return nil, nil, fmt.Errorf("error while calling DisperseBlobAuthenticated: %w", err)
}
Expand Down Expand Up @@ -215,22 +244,19 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}

func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (*disperser_rpc.BlobStatusReply, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := c.getDialOptions()
conn, err := grpc.Dial(addr, dialOptions...)
err := c.initOnceGrpcConnection()
if err != nil {
return nil, err
return nil, fmt.Errorf("error initializing connection: %w", err)
}

disperserClient := disperser_rpc.NewDisperserClient(conn)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()

request := &disperser_rpc.BlobStatusRequest{
RequestId: requestID,
}

reply, err := disperserClient.GetBlobStatus(ctxTimeout, request)
reply, err := c.client.GetBlobStatus(ctxTimeout, request)
if err != nil {
return nil, err
}
Expand All @@ -239,19 +265,14 @@ func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (
}

func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)

options := c.getDialOptions()
options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100*1024*1024))) // 100MiB receive buffer

conn, err := grpc.Dial(addr, options...)
err := c.initOnceGrpcConnection()
if err != nil {
return nil, err
return nil, fmt.Errorf("error initializing connection: %w", err)
}
disperserClient := disperser_rpc.NewDisperserClient(conn)

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
reply, err := disperserClient.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{
reply, err := c.client.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
})
Expand All @@ -260,3 +281,29 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by
}
return reply.Data, nil
}

func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(addr, dialOptions...)
if err != nil {
initErr = err
return
}
c.conn = conn
c.client = disperser_rpc.NewDisperserClient(conn)
})
return initErr
}

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
options := []grpc.DialOption{}
if useSecureGrpcFlag {
options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
} else {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return options
}
43 changes: 38 additions & 5 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type IEigenDAClient interface {
GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
PutBlob(ctx context.Context, txData []byte) (*grpcdisperser.BlobInfo, error)
GetCodec() codecs.BlobCodec
Close() error
}

// EigenDAClient is a wrapper around the DisperserClient which
// encodes blobs before dispersing them, and decodes them after retrieving them.
// See the NewEigenDAClient constructor's documentation for details and usage examples.
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Expand All @@ -37,6 +37,33 @@ type EigenDAClient struct {

var _ IEigenDAClient = &EigenDAClient{}

// EigenDAClient is a wrapper around the DisperserClient which
// encodes blobs before dispersing them, and decodes them after retrieving them.
// It also turns the disperser's async polling-based API (disperseBlob + poll GetBlobStatus)
// into a sync API where PutBlob will poll for the blob to be confirmed or finalized.
//
// DisperserClient is safe to be used concurrently by multiple goroutines.
// Don't forget to call Close() on the client when you're done with it, to close the
// underlying grpc connection maintained by the DiserserClient.
//
// Example usage:
//
// client, err := NewEigenDAClient(log, EigenDAClientConfig{...})
// if err != nil {
// return err
// }
// defer client.Close()
//
// blobData := []byte("hello world")
// blobInfo, err := client.PutBlob(ctx, blobData)
// if err != nil {
// return err
// }
//
// retrievedData, err := client.GetBlob(ctx, blobInfo.BatchMetadata.BatchHeaderHash, blobInfo.BlobIndex)
// if err != nil {
// return err
// }
func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClient, error) {
err := config.CheckAndSetDefaults()
if err != nil {
Expand All @@ -58,8 +85,8 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
return nil, fmt.Errorf("invalid length for signer private key")
}

llConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
llClient := NewDisperserClient(llConfig, signer)
disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
disperserClient := NewDisperserClient(disperserConfig, signer)

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
Expand All @@ -76,7 +103,7 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
return &EigenDAClient{
Log: log,
Config: config,
Client: llClient,
Client: disperserClient,
Codec: codec,
}, nil
}
Expand Down Expand Up @@ -232,3 +259,9 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
}
}
}

// Close simply calls Close() on the wrapped disperserClient, to close the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *EigenDAClient) Close() error {
return c.Client.Close()
}
5 changes: 5 additions & 0 deletions api/clients/mock/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ func (c *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash
}
return blob, err
}

func (c *MockDisperserClient) Close() error {
args := c.Called()
return args.Error(0)
}
12 changes: 6 additions & 6 deletions tools/traffic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
customQuorumsUint8[i] = uint8(q)
}
return &Config{
Config: *clients.NewConfig(
ctx.GlobalString(flags.HostnameFlag.Name),
ctx.GlobalString(flags.GrpcPortFlag.Name),
ctx.Duration(flags.TimeoutFlag.Name),
ctx.GlobalBool(flags.UseSecureGrpcFlag.Name),
),
Config: clients.Config{
Hostname: ctx.GlobalString(flags.HostnameFlag.Name),
Port: ctx.GlobalString(flags.GrpcPortFlag.Name),
Timeout: ctx.Duration(flags.TimeoutFlag.Name),
UseSecureGrpcFlag: ctx.GlobalBool(flags.UseSecureGrpcFlag.Name),
},
NumInstances: ctx.GlobalUint(flags.NumInstancesFlag.Name),
RequestInterval: ctx.Duration(flags.RequestIntervalFlag.Name),
DataSize: ctx.GlobalUint64(flags.DataSizeFlag.Name),
Expand Down
5 changes: 5 additions & 0 deletions tools/traffic/workers/mock_disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash
args := m.mock.Called(batchHeaderHash, blobIndex)
return args.Get(0).([]byte), args.Error(1)
}

func (m *MockDisperserClient) Close() error {
args := m.mock.Called()
return args.Error(0)
}

0 comments on commit 6894828

Please sign in to comment.