From 0794bc0027a4ffb2c205ac28f6a18775ef2c0247 Mon Sep 17 00:00:00 2001 From: Rene Zbinden Date: Fri, 14 Jun 2024 07:59:51 +0200 Subject: [PATCH] it compiles again --- .golangci.yaml | 27 +-- go.mod | 3 +- go.sum | 5 +- internal/cmd/client/client.go | 55 +++-- internal/cmd/client/namespace.go | 18 +- internal/cmd/client/server.go | 13 +- internal/cmd/client/service.go | 32 +-- internal/cmd/client/token.go | 13 +- internal/cmd/server/server.go | 6 +- internal/server/api.go | 16 +- internal/server/server.go | 346 +++++++++++++++---------------- 11 files changed, 277 insertions(+), 257 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 0ee8f35..3b4cd0f 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,14 +1,11 @@ --- run: tests: false - skip-dirs: - - .github - - build - - web - - .go linters-settings: + dupl: + threshold: 100 funlen: - lines: 105 + lines: 100 statements: 50 goconst: min-len: 2 @@ -21,20 +18,16 @@ linters-settings: - performance - style disabled-checks: - - whyNoLint - hugeParam gocyclo: - min-complexity: 16 + min-complexity: 15 revive: - min-confidence: 0.8 - govet: - check-shadowing: true + confidence: 0.8 lll: line-length: 140 misspell: locale: US nolintlint: - allow-leading-space: false require-explanation: true allow-no-explanation: - gocognit @@ -45,8 +38,8 @@ linters: disable-all: true enable: - bodyclose - - depguard - dogsled + - dupl - errcheck - funlen - nolintlint @@ -67,9 +60,12 @@ linters: - misspell - nakedret - prealloc + - protogetter - rowserrcheck - exportloopref - staticcheck + - stylecheck + - sqlclosecheck - typecheck - unconvert - unparam @@ -86,3 +82,8 @@ issues: - 'shadow: declaration of "err" shadows declaration.*' max-same-issues: 0 exclude-use-default: false + exclude-dirs: + - .github + - build + - web + - .go diff --git a/go.mod b/go.mod index 0ce3db3..8e531c0 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/renameio v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 + github.com/hashicorp/go-cleanhttp v0.5.1 github.com/pkg/errors v0.9.1 github.com/postfinance/flash v0.5.0 github.com/postfinance/profiler v0.1.1 diff --git a/go.sum b/go.sum index e85f123..60fca0c 100644 --- a/go.sum +++ b/go.sum @@ -149,8 +149,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= -github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -225,11 +223,10 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 h1:1JYBfzqrWPcCclBwxFCPAou9n+q86mfnu7NAeHfte7A= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0/go.mod h1:YDZoGHuwE+ov0c8smSH49WLF3F2LaWnYYuDVd+EWrc0= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= diff --git a/internal/cmd/client/client.go b/internal/cmd/client/client.go index e6e5129..92fb3a7 100644 --- a/internal/cmd/client/client.go +++ b/internal/cmd/client/client.go @@ -3,15 +3,18 @@ package client import ( "context" + "crypto/tls" "crypto/x509" + "net/http" "os" "path/filepath" "time" "github.com/alecthomas/kong" + "github.com/hashicorp/go-cleanhttp" "github.com/pkg/errors" "github.com/postfinance/discovery/internal/auth" - discoveryv1 "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1" + discoveryv1connect "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1/discoveryv1connect" "github.com/zbindenren/king" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -32,7 +35,7 @@ type CLI struct { // Globals are the global client flags. type Globals struct { - Address string `short:"a" help:"The address of the discovery grpc endpoint." default:"localhost:3001"` + Address string `short:"a" help:"The address of the discovery grpc endpoint." default:"http://localhost:3001"` Timeout time.Duration `help:"The request timeout" default:"15s"` Debug bool `short:"d" help:"Log debug output."` Insecure bool `help:"use insecure connection without tls." xor:"tls"` @@ -83,40 +86,40 @@ func (g Globals) conn() (*grpc.ClientConn, error) { return grpc.Dial(g.Address, dialOpts...) } -func (g Globals) serverClient() (discoveryv1.ServerAPIClient, error) { - conn, err := g.conn() +func (g Globals) serverClient() (discoveryv1connect.ServerAPIClient, error) { + c, err := g.httpClient() if err != nil { return nil, err } - return discoveryv1.NewServerAPIClient(conn), nil + return discoveryv1connect.NewServerAPIClient(c, g.Address), nil } -func (g Globals) serviceClient() (discoveryv1.ServiceAPIClient, error) { - conn, err := g.conn() +func (g Globals) serviceClient() (discoveryv1connect.ServiceAPIClient, error) { + c, err := g.httpClient() if err != nil { return nil, err } - return discoveryv1.NewServiceAPIClient(conn), nil + return discoveryv1connect.NewServiceAPIClient(c, g.Address), nil } -func (g Globals) namespaceClient() (discoveryv1.NamespaceAPIClient, error) { - conn, err := g.conn() +func (g Globals) namespaceClient() (discoveryv1connect.NamespaceAPIClient, error) { + c, err := g.httpClient() if err != nil { return nil, err } - return discoveryv1.NewNamespaceAPIClient(conn), nil + return discoveryv1connect.NewNamespaceAPIClient(c, g.Address), nil } -func (g Globals) tokenClient() (discoveryv1.TokenAPIClient, error) { - conn, err := g.conn() +func (g Globals) tokenClient() (discoveryv1connect.TokenAPIClient, error) { + c, err := g.httpClient() if err != nil { return nil, err } - return discoveryv1.NewTokenAPIClient(conn), nil + return discoveryv1connect.NewTokenAPIClient(c, g.Address), nil } func buildClientInterceptor(token string) func(context.Context, string, interface{}, interface{}, *grpc.ClientConn, grpc.UnaryInvoker, ...grpc.CallOption) error { @@ -211,3 +214,27 @@ func (g Globals) getToken() (string, error) { return token, nil } + +func (g Globals) httpClient() (*http.Client, error) { + clnt := cleanhttp.DefaultClient() + clnt.Timeout = g.Timeout + + tlsConfig := &tls.Config{ + InsecureSkipVerify: g.Insecure, //nolint:gosec // configurable + } + + if !g.Insecure { + pool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + + tlsConfig.RootCAs = pool + } + + clnt.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, + } + + return clnt, nil +} diff --git a/internal/cmd/client/namespace.go b/internal/cmd/client/namespace.go index 7205813..e3e7ec6 100644 --- a/internal/cmd/client/namespace.go +++ b/internal/cmd/client/namespace.go @@ -4,6 +4,7 @@ import ( "errors" "os" + "connectrpc.com/connect" "github.com/alecthomas/kong" "github.com/postfinance/discovery" "github.com/postfinance/discovery/internal/server/convert" @@ -33,12 +34,12 @@ func (n namespaceList) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) er ctx, cancel := g.ctx() defer cancel() - r, err := cli.ListNamespace(ctx, &discoveryv1.ListNamespaceRequest{}) + r, err := cli.ListNamespace(ctx, connect.NewRequest(&discoveryv1.ListNamespaceRequest{})) if err != nil { return err } - namespaces := convert.NamespacesFromPB(r.GetNamespaces()) + namespaces := convert.NamespacesFromPB(r.Msg.GetNamespaces()) sw := sfmt.SliceWriter{ Writer: os.Stdout, @@ -78,10 +79,11 @@ func (n namespaceRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context return errors.New("unsupported export configuration") } - _, err = cli.RegisterNamespace(ctx, &discoveryv1.RegisterNamespaceRequest{ - Name: n.Name, - Export: int32(e), - }) + _, err = cli.RegisterNamespace(ctx, connect.NewRequest( + &discoveryv1.RegisterNamespaceRequest{ + Name: n.Name, + Export: int32(e), + })) return err } @@ -99,9 +101,9 @@ func (n namespaceUnRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Conte ctx, cancel := g.ctx() defer cancel() - _, err = cli.UnregisterNamespace(ctx, &discoveryv1.UnregisterNamespaceRequest{ + _, err = cli.UnregisterNamespace(ctx, connect.NewRequest(&discoveryv1.UnregisterNamespaceRequest{ Name: n.Name, - }) + })) return err } diff --git a/internal/cmd/client/server.go b/internal/cmd/client/server.go index 3b21261..439243f 100644 --- a/internal/cmd/client/server.go +++ b/internal/cmd/client/server.go @@ -5,6 +5,7 @@ import ( "os" "time" + "connectrpc.com/connect" "github.com/alecthomas/kong" "github.com/postfinance/discovery/internal/server/convert" discoveryv1 "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1" @@ -35,12 +36,12 @@ func (s serverList) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) error ctx, cancel := g.ctx() defer cancel() - r, err := cli.ListServer(ctx, &discoveryv1.ListServerRequest{}) + r, err := cli.ListServer(ctx, connect.NewRequest(&discoveryv1.ListServerRequest{})) if err != nil { return err } - servers := convert.ServersFromPB(r.GetServers()) + servers := convert.ServersFromPB(r.Msg.GetServers()) sw := sfmt.SliceWriter{ Writer: os.Stdout, @@ -69,10 +70,10 @@ func (s serverRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) e ctx, cancel := context.WithTimeout(context.Background(), registerTimout) defer cancel() - _, err = cli.RegisterServer(ctx, &discoveryv1.RegisterServerRequest{ + _, err = cli.RegisterServer(ctx, connect.NewRequest(&discoveryv1.RegisterServerRequest{ Name: s.Name, Labels: s.Labels, - }) + })) return err } @@ -92,9 +93,9 @@ func (s serverUnRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) ctx, cancel := context.WithTimeout(context.Background(), registerTimout) defer cancel() - _, err = cli.UnregisterServer(ctx, &discoveryv1.UnregisterServerRequest{ + _, err = cli.UnregisterServer(ctx, connect.NewRequest(&discoveryv1.UnregisterServerRequest{ Name: s.Name, - }) + })) return err } diff --git a/internal/cmd/client/service.go b/internal/cmd/client/service.go index 94bd5af..29893d1 100644 --- a/internal/cmd/client/service.go +++ b/internal/cmd/client/service.go @@ -8,10 +8,12 @@ import ( "regexp" "time" + "connectrpc.com/connect" "github.com/alecthomas/kong" "github.com/postfinance/discovery" "github.com/postfinance/discovery/internal/server/convert" discoveryv1 "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1" + discoveryv1connect "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1/discoveryv1connect" "github.com/sethvargo/go-retry" "github.com/zbindenren/sfmt" "go.uber.org/zap" @@ -49,14 +51,14 @@ func (s serviceList) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) erro ctx, cancel := g.ctx() defer cancel() - r, err := cli.ListService(ctx, &discoveryv1.ListServiceRequest{ + r, err := cli.ListService(ctx, connect.NewRequest(&discoveryv1.ListServiceRequest{ Namespace: s.Namespace, - }) + })) if err != nil { return err } - services := convert.ServicesFromPB(r.GetServices()) + services := convert.ServicesFromPB(r.Msg.GetServices()) filters, err := s.serviceFilter.filters() if err != nil { @@ -126,19 +128,19 @@ func (s serviceRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) id := "" if err := retry.Do(ctx, b, func(ctx context.Context) error { - r, err := cli.RegisterService(ctx, &discoveryv1.RegisterServiceRequest{ + r, err := cli.RegisterService(ctx, connect.NewRequest(&discoveryv1.RegisterServiceRequest{ Name: s.Name, Namespace: s.Namespace, Endpoint: ep, Labels: lbls, Selector: s.Selector, - }) + })) if err != nil { l.Errorw("retry register", "service", ep, "err", err) return retry.RetryableError(err) } - id = r.GetService().GetId() + id = r.Msg.GetService().GetId() return nil }); err != nil { @@ -194,10 +196,10 @@ func (s serviceUnRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context ctx, cancel := g.ctx() if err := retry.Do(ctx, b, func(ctx context.Context) error { - _, err := cli.UnRegisterService(ctx, &discoveryv1.UnRegisterServiceRequest{ + _, err := cli.UnRegisterService(ctx, connect.NewRequest(&discoveryv1.UnRegisterServiceRequest{ Namespace: s.Namespace, Id: ep, - }) + })) if err != nil { l.Errorw("retry unregister", "service", ep, "err", err) return retry.RetryableError(err) @@ -221,12 +223,12 @@ func (s serviceUnRegister) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context return nil } -func (s serviceUnRegister) unRegisterUnresolved(g *Globals, l *zap.SugaredLogger, cli discoveryv1.ServiceAPIClient) error { +func (s serviceUnRegister) unRegisterUnresolved(g *Globals, l *zap.SugaredLogger, cli discoveryv1connect.ServiceAPIClient) error { var lastErr error ctx, cancel := g.ctx() - r, err := cli.ListService(ctx, &discoveryv1.ListServiceRequest{}) + r, err := cli.ListService(ctx, connect.NewRequest(&discoveryv1.ListServiceRequest{})) cancel() @@ -234,7 +236,7 @@ func (s serviceUnRegister) unRegisterUnresolved(g *Globals, l *zap.SugaredLogger return err } - services := convert.ServicesFromPB(r.GetServices()) + services := convert.ServicesFromPB(r.Msg.GetServices()) unresolved, err := services.UnResolved() if err != nil { @@ -258,10 +260,10 @@ func (s serviceUnRegister) unRegisterUnresolved(g *Globals, l *zap.SugaredLogger ctx, cancel := g.ctx() - _, err = cli.UnRegisterService(ctx, &discoveryv1.UnRegisterServiceRequest{ + _, err = cli.UnRegisterService(ctx, connect.NewRequest(&discoveryv1.UnRegisterServiceRequest{ Namespace: svc.Namespace, Id: svc.Endpoint.String(), - }) + })) cancel() @@ -360,14 +362,14 @@ func (s serviceImport) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) er s := services[j] l.Debugw("import serivce", s.KeyVals()...) - _, err := cli.RegisterService(ctx, &discoveryv1.RegisterServiceRequest{ + _, err := cli.RegisterService(ctx, connect.NewRequest(&discoveryv1.RegisterServiceRequest{ Name: s.Name, Endpoint: s.Endpoint.String(), Description: s.Description, Labels: s.Labels, Namespace: s.Namespace, Selector: s.Selector, - }) + })) if err != nil { failed = append(failed, s) msg := s.KeyVals() diff --git a/internal/cmd/client/token.go b/internal/cmd/client/token.go index 6ef3f63..ad5e10c 100644 --- a/internal/cmd/client/token.go +++ b/internal/cmd/client/token.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "connectrpc.com/connect" "github.com/alecthomas/kong" "github.com/postfinance/discovery/internal/server/convert" discoveryv1 "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1" @@ -30,16 +31,16 @@ func (t tokenCreate) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) erro ctx, cancel := g.ctx() defer cancel() - r, err := cli.Create(ctx, &discoveryv1.CreateRequest{ + r, err := cli.Create(ctx, connect.NewRequest(&discoveryv1.CreateRequest{ Expires: t.Expiry.String(), Id: t.ID, Namespaces: t.Namespaces, - }) + })) if err != nil { return err } - fmt.Println(r.Token) + fmt.Println(r.Msg.GetToken()) return nil } @@ -57,14 +58,14 @@ func (t tokenInfo) Run(g *Globals, l *zap.SugaredLogger, c *kong.Context) error ctx, cancel := g.ctx() defer cancel() - r, err := cli.Info(ctx, &discoveryv1.InfoRequest{ + r, err := cli.Info(ctx, connect.NewRequest(&discoveryv1.InfoRequest{ Token: t.Token, - }) + })) if err != nil { return err } - i := r.GetTokeninfo() + i := r.Msg.GetTokeninfo() expiry := convert.TimeFromPB(i.GetExpiresAt()) expiryStr := expiry.Format(time.RFC3339) diff --git a/internal/cmd/server/server.go b/internal/cmd/server/server.go index 7189329..f00feba 100644 --- a/internal/cmd/server/server.go +++ b/internal/cmd/server/server.go @@ -26,8 +26,7 @@ type CLI struct { } type serverCmd struct { - GRPCListen string `short:"l" help:"GRPC gateway listen adddress" default:"localhost:3001"` - HTTPListen string `help:"HTTP listen adddress" default:"localhost:3002"` + ListenAddr string `help:"HTTP listen adddress" default:":3001"` Replicas int `help:"The number of service replicas." default:"1"` TokenIssuer string `help:"The jwt token issuer name. If you change this, alle issued tokens are invalid." default:"discovery.postfinance.ch"` TokenSecret string `help:"The secret key to issue jwt machine tokens. If you change this, alle issued tokens are invalid." required:"true"` @@ -90,8 +89,7 @@ func (s serverCmd) config(registry prometheus.Registerer) (server.Config, error) return server.Config{ PrometheusRegistry: registry, NumReplicas: s.Replicas, - GRPCListenAddr: s.GRPCListen, - HTTPListenAddr: s.HTTPListen, + ListenAddr: s.ListenAddr, TokenIssuer: s.TokenIssuer, TokenSecretKey: s.TokenSecret, OIDCClient: s.OIDC.ClientID, diff --git a/internal/server/api.go b/internal/server/api.go index c36eeda..19618ef 100644 --- a/internal/server/api.go +++ b/internal/server/api.go @@ -49,7 +49,7 @@ func (a *API) Create(_ context.Context, req *connect.Request[discoveryv1.CreateR return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create token: %w", err)) } - resp := connect.NewResponse[discoveryv1.CreateResponse](&discoveryv1.CreateResponse{ + resp := connect.NewResponse(&discoveryv1.CreateResponse{ Token: token, }) @@ -63,7 +63,7 @@ func (a *API) Info(_ context.Context, in *connect.Request[discoveryv1.InfoReques return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("token %s is not valid: %w", in.Msg.GetToken(), err)) } - resp := connect.NewResponse[discoveryv1.InfoResponse](&discoveryv1.InfoResponse{ + resp := connect.NewResponse(&discoveryv1.InfoResponse{ Tokeninfo: &discoveryv1.TokenInfo{ Id: u.Username, Namespaces: u.Namespaces, @@ -81,7 +81,7 @@ func (a *API) ListServer(_ context.Context, req *connect.Request[discoveryv1.Lis return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not list server: %w", err)) } - resp := connect.NewResponse[discoveryv1.ListServerResponse](&discoveryv1.ListServerResponse{ + resp := connect.NewResponse(&discoveryv1.ListServerResponse{ Servers: convert.ServersToPB(s), }) @@ -99,7 +99,7 @@ func (a *API) RegisterServer(_ context.Context, req *connect.Request[discoveryv1 return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not register server %s in store: %w", req.Msg.GetName(), err)) } - resp := connect.NewResponse[discoveryv1.RegisterServerResponse](&discoveryv1.RegisterServerResponse{ + resp := connect.NewResponse(&discoveryv1.RegisterServerResponse{ Server: convert.ServerToPB(s), }) @@ -117,7 +117,7 @@ func (a *API) UnregisterServer(_ context.Context, req *connect.Request[discovery return nil, connect.NewError(c, fmt.Errorf("could not unregister server %s in store: %w", req.Msg.GetName(), err)) } - resp := connect.NewResponse[discoveryv1.UnregisterServerResponse](&discoveryv1.UnregisterServerResponse{}) + resp := connect.NewResponse(&discoveryv1.UnregisterServerResponse{}) return resp, nil } @@ -129,7 +129,7 @@ func (a *API) ListNamespace(context.Context, *connect.Request[discoveryv1.ListNa return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not list namespaces: %w", err)) } - resp := connect.NewResponse[discoveryv1.ListNamespaceResponse](&discoveryv1.ListNamespaceResponse{ + resp := connect.NewResponse(&discoveryv1.ListNamespaceResponse{ Namespaces: convert.NamespacesToPB(namespaces), }) @@ -151,7 +151,7 @@ func (a *API) RegisterNamespace(_ context.Context, req *connect.Request[discover return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not register namespace %s in store: %w", req.Msg.GetName(), err)) } - resp := connect.NewResponse[discoveryv1.RegisterNamespaceResponse](&discoveryv1.RegisterNamespaceResponse{ + resp := connect.NewResponse(&discoveryv1.RegisterNamespaceResponse{ Namespace: convert.NamespaceToPB(n), }) @@ -169,7 +169,7 @@ func (a *API) UnregisterNamespace(_ context.Context, req *connect.Request[discov return nil, connect.NewError(c, fmt.Errorf("could not unregister namespace %s in store: %w", req.Msg.GetName(), err)) } - resp := connect.NewResponse[discoveryv1.UnregisterNamespaceResponse](&discoveryv1.UnregisterNamespaceResponse{}) + resp := connect.NewResponse(&discoveryv1.UnregisterNamespaceResponse{}) return resp, nil } diff --git a/internal/server/server.go b/internal/server/server.go index 98ff806..171884b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -4,33 +4,20 @@ package server import ( "context" "embed" - "fmt" - "net" + "log/slog" "net/http" - "runtime/debug" "sync" + "sync/atomic" "time" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/postfinance/discovery" "github.com/postfinance/discovery/internal/auth" "github.com/postfinance/discovery/internal/registry" - discoveryv1 "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1" discoveryv1connect "github.com/postfinance/discovery/pkg/discoverypb/postfinance/discovery/v1/discoveryv1connect" "github.com/postfinance/store" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/reflection" - "google.golang.org/grpc/status" ) const ( @@ -46,11 +33,11 @@ var static embed.FS // Server represents the discovery server. type Server struct { + ready atomic.Bool wg *sync.WaitGroup backend store.Backend l *zap.SugaredLogger config Config - grpcServer *grpc.Server httpServer *http.Server } @@ -75,88 +62,74 @@ func New(backend store.Backend, l *zap.SugaredLogger, cfg Config) (*Server, erro l: l, wg: &sync.WaitGroup{}, config: cfg, + ready: atomic.Bool{}, } + s.ready.Store(false) + return &s, nil } // Run starts the server and runs until context is canceled. func (s *Server) Run(ctx context.Context) error { - s.wg.Add(2) + var ( + wg = new(sync.WaitGroup) + errC = make(chan error) + err error + ) - errChan := make(chan error) + wg.Add(1) go func() { - if err := s.startGRPC(ctx); err != nil { - errChan <- err - } - }() + defer wg.Done() + s.l.Info("starting http server") - go func() { - if err := s.startHTTP(ctx); err != nil { - errChan <- err - } + errC <- s.startRene(ctx) }() - for { - select { - case err := <-errChan: - return err - case <-ctx.Done(): - err := s.stop() - s.wg.Wait() - - return err - } + select { + case <-ctx.Done(): + case e := <-errC: + err = e } -} -func (s *Server) startGRPC(ctx context.Context) error { - s.l.Infow("starting grpc server") + s.stop() + wg.Wait() - grpcMetrics := grpc_prometheus.NewServerMetrics() - grpcMetrics.EnableHandlingTimeHistogram() + return err +} - panicHandler := func(p interface{}) (err error) { - s.l.Errorw("panic ocured", "trace", string(debug.Stack())) - return status.Errorf(codes.Unknown, "%v", p) - } - opts := []grpc_recovery.Option{ - grpc_recovery.WithRecoveryHandler(panicHandler), +func (s *Server) createMux(api *API) *http.ServeMux { + mux := http.NewServeMux() + + r, ok := s.config.PrometheusRegistry.(prometheus.Gatherer) + if !ok { + panic("interface is not prometheus.Registry") } - tokenHandler := auth.NewTokenHandler(s.config.TokenIssuer, s.config.TokenSecretKey) + mux.HandleFunc("/ready", s.readyHandler()) + mux.Handle("/swagger/", http.FileServer(http.FS(static))) + mux.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{})) - verifier, err := auth.NewVerifier(s.config.OIDCURL, s.config.OIDCClient, httpClientTimeout, s.config.Transport) - if err != nil { - return err - } + httpPath, handler := discoveryv1connect.NewServerAPIHandler(api) + mux.Handle(httpPath, handler) + httpPath, handler = discoveryv1connect.NewNamespaceAPIHandler(api) + mux.Handle(httpPath, handler) + httpPath, handler = discoveryv1connect.NewTokenAPIHandler(api) + mux.Handle(httpPath, handler) - // Make sure that log statements internal to gRPC library are logged using the zapLogger as well. - // grpc_zap.ReplaceGrpcLoggerV2(s.l.Desugar()) - - s.grpcServer = grpc.NewServer( - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - grpc_recovery.StreamServerInterceptor(opts...), - grpcMetrics.StreamServerInterceptor(), - auth.StreamMethodNameInterceptor(), - grpc_auth.StreamServerInterceptor(auth.Func(verifier, tokenHandler, s.l.Named("auth"), s.config.ClaimConfig)), - auth.StreamAuthorizeInterceptor(s.config.OIDCRoles...), - grpc_zap.StreamServerInterceptor(s.l.Desugar(), grpc_zap.WithLevels(customCodeToLevel)), - )), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - grpc_recovery.UnaryServerInterceptor(opts...), - grpcMetrics.UnaryServerInterceptor(), - auth.UnaryMethodNameInterceptor(), - grpc_auth.UnaryServerInterceptor(auth.Func(verifier, tokenHandler, s.l.Named("auth"), s.config.ClaimConfig)), - auth.UnaryAuthorizeInterceptor(s.config.OIDCRoles...), - grpc_zap.UnaryServerInterceptor(s.l.Desugar(), grpc_zap.WithLevels(customCodeToLevel)), - )), - ) + return mux +} - if err := s.config.PrometheusRegistry.Register(grpcMetrics); err != nil { - return err - } +func (s *Server) startRene(ctx context.Context) error { + s.l.Infow("starting http server") + + tokenHandler := auth.NewTokenHandler(s.config.TokenIssuer, s.config.TokenSecretKey) + + // verifier, err := auth.NewVerifier(s.config.OIDCURL, s.config.OIDCClient, httpClientTimeout, s.config.Transport) + // if err != nil { + // return err + // } if err := s.config.PrometheusRegistry.Register(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ @@ -196,61 +169,16 @@ func (s *Server) startGRPC(ctx context.Context) error { tokenHandler: tokenHandler, } - // discoveryv1.RegisterServerAPIServer(s.grpcServer, a) - // discoveryv1.RegisterServiceAPIServer(s.grpcServer, a) - // discoveryv1.RegisterNamespaceAPIServer(s.grpcServer, a) - // discoveryv1.RegisterTokenAPIServer(s.grpcServer, a) - - // grpc reflection support - reflection.Register(s.grpcServer) - - listener, err := net.Listen("tcp", s.config.GRPCListenAddr) - if err != nil { - return err - } - - return s.grpcServer.Serve(listener) -} - -func (s *Server) startHTTP(ctx context.Context) error { - s.l.Infow("starting http server") - - ep, err := getGRPCEndpointFromListenAddr(s.config.GRPCListenAddr) - if err != nil { - return err - } - - gwmux := runtime.NewServeMux() - - discoveryv1connect.NewNamespaceAPIHandler() - - err = discoveryv1.RegisterServiceAPIHandlerFromEndpoint(ctx, gwmux, ep, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) - if err != nil { - return err - } - - err = discoveryv1.RegisterServerAPIHandlerFromEndpoint(ctx, gwmux, ep, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) - if err != nil { - return err - } - - mux := http.NewServeMux() - - r, ok := s.config.PrometheusRegistry.(prometheus.Gatherer) - if !ok { - panic("interface is not prometheus.Registry") - } - - mux.Handle("/swagger/", http.FileServer(http.FS(static))) - mux.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{})) - mux.Handle("/", gwmux) + mux := s.createMux(a) s.httpServer = &http.Server{ - Addr: s.config.HTTPListenAddr, + Addr: s.config.ListenAddr, Handler: mux, ReadTimeout: httpClientTimeout, } + s.ready.Store(true) + if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed { return err } @@ -258,64 +186,128 @@ func (s *Server) startHTTP(ctx context.Context) error { return nil } -func (s *Server) stop() error { - errChan := make(chan error) - - go func() { - defer close(errChan) - - if err := s.stopHTTP(); err != nil { - errChan <- err - } - }() - - go func() { - s.stopGRPC() - }() - - return <-errChan -} - -func (s *Server) stopGRPC() { - defer func() { - if s.wg != nil { - s.wg.Done() - } - }() - defer s.l.Infow("grpc server stopped") - - if s.grpcServer == nil { - return - } - - s.grpcServer.GracefulStop() -} - -func (s *Server) stopHTTP() error { - defer s.wg.Done() - defer s.l.Info("http server stopped") +// func (s *Server) startGRPC(ctx context.Context) error { +// s.l.Infow("starting grpc server") +// +// grpcMetrics := grpc_prometheus.NewServerMetrics() +// grpcMetrics.EnableHandlingTimeHistogram() +// +// panicHandler := func(p interface{}) (err error) { +// s.l.Errorw("panic ocured", "trace", string(debug.Stack())) +// return status.Errorf(codes.Unknown, "%v", p) +// } +// opts := []grpc_recovery.Option{ +// grpc_recovery.WithRecoveryHandler(panicHandler), +// } +// +// tokenHandler := auth.NewTokenHandler(s.config.TokenIssuer, s.config.TokenSecretKey) +// +// verifier, err := auth.NewVerifier(s.config.OIDCURL, s.config.OIDCClient, httpClientTimeout, s.config.Transport) +// if err != nil { +// return err +// } +// +// // Make sure that log statements internal to gRPC library are logged using the zapLogger as well. +// // grpc_zap.ReplaceGrpcLoggerV2(s.l.Desugar()) +// +// s.grpcServer = grpc.NewServer( +// grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( +// grpc_recovery.StreamServerInterceptor(opts...), +// grpcMetrics.StreamServerInterceptor(), +// auth.StreamMethodNameInterceptor(), +// grpc_auth.StreamServerInterceptor(auth.Func(verifier, tokenHandler, s.l.Named("auth"), s.config.ClaimConfig)), +// auth.StreamAuthorizeInterceptor(s.config.OIDCRoles...), +// grpc_zap.StreamServerInterceptor(s.l.Desugar(), grpc_zap.WithLevels(customCodeToLevel)), +// )), +// grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( +// grpc_recovery.UnaryServerInterceptor(opts...), +// grpcMetrics.UnaryServerInterceptor(), +// auth.UnaryMethodNameInterceptor(), +// grpc_auth.UnaryServerInterceptor(auth.Func(verifier, tokenHandler, s.l.Named("auth"), s.config.ClaimConfig)), +// auth.UnaryAuthorizeInterceptor(s.config.OIDCRoles...), +// grpc_zap.UnaryServerInterceptor(s.l.Desugar(), grpc_zap.WithLevels(customCodeToLevel)), +// )), +// ) +// +// if err := s.config.PrometheusRegistry.Register(grpcMetrics); err != nil { +// return err +// } +// +// if err := s.config.PrometheusRegistry.Register(prometheus.NewGaugeFunc( +// prometheus.GaugeOpts{ +// Name: "discovery_replication_factor", +// Help: "A metric with with constant value showing the configured replication factor.", +// }, +// func() float64 { return float64(s.config.NumReplicas) }, +// )); err != nil { +// return err +// } +// +// r, err := registry.New(s.backend, s.config.PrometheusRegistry, s.l, s.config.NumReplicas) +// if err != nil { +// return err +// } +// +// go r.StartCacheUpdater(ctx, cacheSyncInterval) +// go r.StartServiceCounterUpdater(ctx, serviceCounterUpdateInterval) +// +// ns, err := r.ListNamespaces() +// if err != nil { +// return err +// } +// +// dflt := discovery.DefaultNamespace() +// +// if ns.Index(dflt.Name) < 0 { +// s.l.Infow("creating default namespace", "name", dflt.Name) +// +// if _, err := r.RegisterNamespace(*dflt); err != nil { +// return err +// } +// } +// +// a := &API{ +// r: r, +// tokenHandler: tokenHandler, +// } +// +// // discoveryv1.RegisterServerAPIServer(s.grpcServer, a) +// // discoveryv1.RegisterServiceAPIServer(s.grpcServer, a) +// // discoveryv1.RegisterNamespaceAPIServer(s.grpcServer, a) +// // discoveryv1.RegisterTokenAPIServer(s.grpcServer, a) +// +// // grpc reflection support +// reflection.Register(s.grpcServer) +// +// listener, err := net.Listen("tcp", s.config.GRPCListenAddr) +// if err != nil { +// return err +// } +// +// return s.grpcServer.Serve(listener) +// } + +func (s *Server) stop() { + s.ready.Swap(false) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() - if s.httpServer == nil { - return nil + if err := s.httpServer.Shutdown(ctx); err != nil { + s.l.Error("shutdown connect server", slog.Any("err", err)) } - ctx, cancel := context.WithTimeout(context.Background(), httpStopTimeout) - defer cancel() - - return s.httpServer.Shutdown(ctx) + s.l.Info("http server stopped") } -func getGRPCEndpointFromListenAddr(grpcep string) (string, error) { - tcpaddr, err := net.ResolveTCPAddr("tcp", grpcep) - if err != nil { - return "", fmt.Errorf("%s is not a valid listen address: %w", grpcep, err) - } +func (s *Server) readyHandler() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, _ *http.Request) { + if s.ready.Load() { + w.WriteHeader(http.StatusNoContent) - host := "localhost" + return + } - if tcpaddr.IP != nil && !tcpaddr.IP.IsUnspecified() { - host = tcpaddr.IP.String() + w.WriteHeader(http.StatusServiceUnavailable) } - - return net.JoinHostPort(host, fmt.Sprintf("%d", tcpaddr.Port)), nil }