From b53a1ea5edab235c0da3b4597585ac36a218ee3e Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Sun, 27 Oct 2024 15:45:16 +0200 Subject: [PATCH] Fixing deprecated --- .golangci.yml | 8 +- client/grpc/grpc.go | 12 +-- client/grpc/grpc_test.go | 144 +---------------------------- client/grpc/integration_test.go | 145 ++++++++++++++++++++++++++++++ client/sns/integration_test.go | 17 +--- client/sns/sns.go | 4 +- client/sqs/integration_test.go | 17 +--- client/sqs/sqs.go | 4 +- component/grpc/component_test.go | 7 +- component/sqs/integration_test.go | 36 ++------ examples/client/main.go | 2 +- 11 files changed, 177 insertions(+), 219 deletions(-) create mode 100644 client/grpc/integration_test.go diff --git a/.golangci.yml b/.golangci.yml index ce4d00c13..fecdae0b3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -73,16 +73,10 @@ linters: - dogsled - protogetter - usestdlibvars - - testableexamples + - testableexamples fast: false issues: exclude-dirs: - vendor max-same-issues: 10 - - exclude-rules: - # Exclude some staticcheck messages - - linters: - - staticcheck - text: "SA1019:" diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 1ffda9fee..559cad9ec 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -2,26 +2,18 @@ package grpc import ( - "context" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) -// Dial creates a client connection to the given target with a tracing and -// metrics unary interceptor. -func Dial(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - return DialContext(context.Background(), target, opts...) -} - // DialContext creates a client connection to the given target with a context and // a tracing and metrics unary interceptor. -func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func NewClient(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { if len(opts) == 0 { opts = make([]grpc.DialOption, 0) } opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler())) - return grpc.DialContext(ctx, target, opts...) + return grpc.NewClient(target, opts...) } diff --git a/client/grpc/grpc_test.go b/client/grpc/grpc_test.go index d8450fed2..4666e2924 100644 --- a/client/grpc/grpc_test.go +++ b/client/grpc/grpc_test.go @@ -1,78 +1,19 @@ package grpc import ( - "context" - "fmt" - "log" - "net" - "os" "testing" - "github.com/beatlabs/patron/examples" - "github.com/beatlabs/patron/internal/test" - "github.com/beatlabs/patron/observability/trace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" - "google.golang.org/grpc/test/bufconn" ) const ( - bufSize = 1024 * 1024 - target = "bufnet" + target = "target" ) -var lis *bufconn.Listener - -type server struct { - examples.UnimplementedGreeterServer -} - -func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error { - return status.Error(codes.Unavailable, "streaming not supported") -} - -func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) { - if req.GetFirstname() == "" { - return nil, status.Error(codes.InvalidArgument, "first name cannot be empty") - } - return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil -} - -func TestMain(m *testing.M) { - lis = bufconn.Listen(bufSize) - s := grpc.NewServer() - examples.RegisterGreeterServer(s, &server{}) - go func() { - if err := s.Serve(lis); err != nil { - log.Fatal(err) - } - }() - - code := m.Run() - - s.GracefulStop() - - os.Exit(code) -} - -func bufDialer(_ context.Context, _ string) (net.Conn, error) { - return lis.Dial() -} - -func TestDial(t *testing.T) { - conn, err := Dial(target, grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) - require.NoError(t, err) - assert.NotNil(t, conn) - require.NoError(t, conn.Close()) -} - -func TestDialContext(t *testing.T) { +func TestNewClient(t *testing.T) { t.Parallel() type args struct { opts []grpc.DialOption @@ -83,7 +24,7 @@ func TestDialContext(t *testing.T) { }{ "success": { args: args{ - opts: []grpc.DialOption{grpc.WithContextDialer(bufDialer), grpc.WithInsecure()}, + opts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, }, }, "failure missing grpc.WithInsecure()": { @@ -94,7 +35,7 @@ func TestDialContext(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - gotConn, err := DialContext(context.Background(), target, tt.args.opts...) + gotConn, err := NewClient(target, tt.args.opts...) if tt.expectedErr != "" { require.EqualError(t, err, tt.expectedErr) assert.Nil(t, gotConn) @@ -105,80 +46,3 @@ func TestDialContext(t *testing.T) { }) } } - -func TestSayHello(t *testing.T) { - ctx := context.Background() - conn, err := DialContext(ctx, target, grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - defer func() { - require.NoError(t, conn.Close()) - }() - - // Tracing setup - exp := tracetest.NewInMemoryExporter() - tracePublisher := trace.Setup("test", nil, exp) - - // Metrics monitoring set up - shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t) - defer shutdownProvider() - - client := examples.NewGreeterClient(conn) - - tt := map[string]struct { - req *examples.HelloRequest - wantErr bool - wantCode codes.Code - wantMsg string - wantCounter int - }{ - "ok": { - req: &examples.HelloRequest{Firstname: "John"}, - wantErr: false, - wantCode: codes.OK, - wantMsg: "Hello John!", - wantCounter: 1, - }, - "invalid": { - req: &examples.HelloRequest{}, - wantErr: true, - wantCode: codes.InvalidArgument, - wantMsg: "first name cannot be empty", - wantCounter: 1, - }, - } - - for n, tt := range tt { - t.Run(n, func(t *testing.T) { - t.Cleanup(func() { exp.Reset() }) - - res, err := client.SayHello(ctx, tt.req) - if tt.wantErr { - require.Nil(t, res) - require.Error(t, err) - - rpcStatus, ok := status.FromError(err) - require.True(t, ok) - require.Equal(t, tt.wantCode, rpcStatus.Code()) - require.Equal(t, tt.wantMsg, rpcStatus.Message()) - } else { - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, tt.wantMsg, res.GetMessage()) - } - - require.NoError(t, tracePublisher.ForceFlush(context.Background())) - - snaps := exp.GetSpans().Snapshots() - - assert.Len(t, snaps, 1) - assert.Equal(t, "examples.Greeter/SayHello", snaps[0].Name()) - assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), snaps[0].Attributes()[0]) - assert.Equal(t, attribute.String("rpc.method", "SayHello"), snaps[0].Attributes()[1]) - assert.Equal(t, attribute.String("rpc.system", "grpc"), snaps[0].Attributes()[2]) - assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), snaps[0].Attributes()[3]) - - // Metrics - _ = assertCollectMetrics(4) - }) - } -} diff --git a/client/grpc/integration_test.go b/client/grpc/integration_test.go new file mode 100644 index 000000000..922a80edc --- /dev/null +++ b/client/grpc/integration_test.go @@ -0,0 +1,145 @@ +//go:build integration + +package grpc + +import ( + "context" + "fmt" + "log" + "net" + "testing" + + "github.com/beatlabs/patron/examples" + "github.com/beatlabs/patron/internal/test" + "github.com/beatlabs/patron/observability/trace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" +) + +type server struct { + examples.UnimplementedGreeterServer +} + +func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error { + return status.Error(codes.Unavailable, "streaming not supported") +} + +func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) { + if req.GetFirstname() == "" { + return nil, status.Error(codes.InvalidArgument, "first name cannot be empty") + } + return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil +} + +func TestSayHello(t *testing.T) { + ctx := context.Background() + + client, closer := testServer() + defer closer() + + // Tracing setup + exp := tracetest.NewInMemoryExporter() + tracePublisher := trace.Setup("test", nil, exp) + + // Metrics monitoring set up + shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t) + defer shutdownProvider() + + tt := map[string]struct { + req *examples.HelloRequest + wantErr bool + wantCode codes.Code + wantMsg string + wantCounter int + }{ + "ok": { + req: &examples.HelloRequest{Firstname: "John"}, + wantErr: false, + wantCode: codes.OK, + wantMsg: "Hello John!", + wantCounter: 1, + }, + "invalid": { + req: &examples.HelloRequest{}, + wantErr: true, + wantCode: codes.InvalidArgument, + wantMsg: "first name cannot be empty", + wantCounter: 1, + }, + } + + for n, tt := range tt { + t.Run(n, func(t *testing.T) { + t.Cleanup(func() { exp.Reset() }) + + res, err := client.SayHello(ctx, tt.req) + if tt.wantErr { + require.Nil(t, res) + require.Error(t, err) + + rpcStatus, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, tt.wantCode, rpcStatus.Code()) + require.Equal(t, tt.wantMsg, rpcStatus.Message()) + } else { + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, tt.wantMsg, res.GetMessage()) + } + + require.NoError(t, tracePublisher.ForceFlush(context.Background())) + + spans := exp.GetSpans().Snapshots() + + assert.Len(t, spans, 1) + assert.Equal(t, "examples.Greeter/SayHello", spans[0].Name()) + assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), spans[0].Attributes()[0]) + assert.Equal(t, attribute.String("rpc.method", "SayHello"), spans[0].Attributes()[1]) + assert.Equal(t, attribute.String("rpc.system", "grpc"), spans[0].Attributes()[2]) + assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), spans[0].Attributes()[3]) + + // Metrics + _ = assertCollectMetrics(4) + }) + } +} + +func testServer() (examples.GreeterClient, func()) { + buffer := 101024 * 1024 + lis := bufconn.Listen(buffer) + + baseServer := grpc.NewServer() + examples.RegisterGreeterServer(baseServer, &server{}) + go func() { + if err := baseServer.Serve(lis); err != nil { + log.Printf("error serving server: %v", err) + } + }() + + conn, err := NewClient("123", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Printf("error connecting to server: %v", err) + } + + closer := func() { + err := lis.Close() + if err != nil { + log.Printf("error closing listener: %v", err) + } + baseServer.Stop() + } + + client := examples.NewGreeterClient(conn) + + return client, closer +} diff --git a/client/sns/integration_test.go b/client/sns/integration_test.go index 553a5752d..802269259 100644 --- a/client/sns/integration_test.go +++ b/client/sns/integration_test.go @@ -22,26 +22,17 @@ func TestNewFromConfig(t *testing.T) { awsRegion := "eu-west-1" - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == sns.ServiceID && region == awsRegion { - return aws.Endpoint{ - URL: "http://localhost:4566", - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(awsRegion), - config.WithEndpointResolverWithOptions(customResolver), config.WithCredentialsProvider(aws.NewCredentialsCache( credentials.NewStaticCredentialsProvider("test", "test", "token"))), ) require.NoError(t, err) - client := NewFromConfig(cfg) + client := NewFromConfig(cfg, func(o *sns.Options) { + o.Region = awsRegion + o.BaseEndpoint = aws.String("http://localhost:4566") + }) // Add your assertions here to test the behavior of the client diff --git a/client/sns/sns.go b/client/sns/sns.go index 41cf1bc26..94443c474 100644 --- a/client/sns/sns.go +++ b/client/sns/sns.go @@ -7,7 +7,7 @@ import ( ) // NewFromConfig creates a new SNS client from aws.Config with OpenTelemetry instrumentation enabled. -func NewFromConfig(cfg aws.Config) *sns.Client { +func NewFromConfig(cfg aws.Config, optFns ...func(*sns.Options)) *sns.Client { otelaws.AppendMiddlewares(&cfg.APIOptions) - return sns.NewFromConfig(cfg) + return sns.NewFromConfig(cfg, optFns...) } diff --git a/client/sqs/integration_test.go b/client/sqs/integration_test.go index cf4285a15..411665166 100644 --- a/client/sqs/integration_test.go +++ b/client/sqs/integration_test.go @@ -22,26 +22,17 @@ func TestNewFromConfig(t *testing.T) { awsRegion := "eu-west-1" - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == sqs.ServiceID && region == awsRegion { - return aws.Endpoint{ - URL: "http://localhost:4566", - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(awsRegion), - config.WithEndpointResolverWithOptions(customResolver), config.WithCredentialsProvider(aws.NewCredentialsCache( credentials.NewStaticCredentialsProvider("test", "test", "token"))), ) require.NoError(t, err) - client := NewFromConfig(cfg) + client := NewFromConfig(cfg, func(o *sqs.Options) { + o.BaseEndpoint = aws.String("http://localhost:4566") + o.Region = awsRegion + }) // Add your assertions here to test the behavior of the client diff --git a/client/sqs/sqs.go b/client/sqs/sqs.go index f4cbeb071..38b9b9917 100644 --- a/client/sqs/sqs.go +++ b/client/sqs/sqs.go @@ -7,7 +7,7 @@ import ( ) // NewFromConfig creates a new SQS client from aws.Config with OpenTelemetry instrumentation enabled. -func NewFromConfig(cfg aws.Config) *sqs.Client { +func NewFromConfig(cfg aws.Config, optFns ...func(*sqs.Options)) *sqs.Client { otelaws.AppendMiddlewares(&cfg.APIOptions) - return sqs.NewFromConfig(cfg) + return sqs.NewFromConfig(cfg, optFns...) } diff --git a/component/grpc/component_test.go b/component/grpc/component_test.go index f1356045b..096ce5d09 100644 --- a/component/grpc/component_test.go +++ b/component/grpc/component_test.go @@ -77,8 +77,7 @@ func TestComponent_Run_Unary(t *testing.T) { assert.NoError(t, cmp.Run(ctx)) chDone <- struct{}{} }() - conn, err := grpc.DialContext(ctx, "localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) + conn, err := grpc.NewClient("localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) c := examples.NewGreeterClient(conn) @@ -162,8 +161,8 @@ func TestComponent_Run_Stream(t *testing.T) { assert.NoError(t, cmp.Run(ctx)) chDone <- struct{}{} }() - conn, err := grpc.DialContext(ctx, "localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) + + conn, err := grpc.NewClient("localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) c := examples.NewGreeterClient(conn) diff --git a/component/sqs/integration_test.go b/component/sqs/integration_test.go index 1a56cce03..ef883560f 100644 --- a/component/sqs/integration_test.go +++ b/component/sqs/integration_test.go @@ -143,36 +143,18 @@ type SQSAPI interface { } func createSQSAPI(region, endpoint string) (*sqs.Client, error) { - cfg, err := createConfig(sqs.ServiceID, region, endpoint) - if err != nil { - return nil, err - } - - api := sqs.NewFromConfig(cfg) - - return api, nil -} - -func createConfig(awsServiceID, awsRegion, awsEndpoint string) (aws.Config, error) { - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == awsServiceID && region == awsRegion { - return aws.Endpoint{ - URL: awsEndpoint, - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - - cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), - awsConfig.WithRegion(awsRegion), - awsConfig.WithEndpointResolverWithOptions(customResolver), + cfg, err := awsConfig.LoadDefaultConfig(context.Background(), + awsConfig.WithRegion(region), awsConfig.WithCredentialsProvider(aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider("test", "test", ""))), ) if err != nil { - return aws.Config{}, fmt.Errorf("failed to create AWS config: %w", err) + return nil, fmt.Errorf("failed to create AWS config: %w", err) } - return cfg, nil + api := sqs.NewFromConfig(cfg, func(o *sqs.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.Region = region + }) + + return api, nil } diff --git a/examples/client/main.go b/examples/client/main.go index 3223c7455..2176e1164 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -131,7 +131,7 @@ func sendHTTPRequest(ctx context.Context) error { } func sendGRPCRequest(ctx context.Context) error { - cc, err := patrongrpc.DialContext(ctx, examples.GRPCTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := patrongrpc.NewClient(examples.GRPCTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return err }