-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][collector] Switch to OTEL's http/grpc server #6277
base: main
Are you sure you want to change the base?
Changes from 4 commits
641ddf2
9bb39e2
4bf8eaa
31588ef
624bb69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ package app | |
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"time" | ||
|
||
|
@@ -47,13 +46,10 @@ type Collector struct { | |
tenancyMgr *tenancy.Manager | ||
|
||
// state, read only | ||
hServer *http.Server | ||
grpcServer *grpc.Server | ||
otlpReceiver receiver.Traces | ||
zipkinReceiver receiver.Traces | ||
tlsGRPCCertWatcherCloser io.Closer | ||
tlsHTTPCertWatcherCloser io.Closer | ||
tlsZipkinCertWatcherCloser io.Closer | ||
hServer *http.Server | ||
grpcServer *grpc.Server | ||
otlpReceiver receiver.Traces | ||
zipkinReceiver receiver.Traces | ||
} | ||
|
||
// CollectorParams to construct a new Jaeger Collector. | ||
|
@@ -103,24 +99,24 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { | |
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) | ||
|
||
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ | ||
HostPort: options.GRPC.HostPort, | ||
HostPort: options.GRPC.NetAddr.Endpoint, | ||
Handler: c.spanHandlers.GRPCHandler, | ||
TLSConfig: options.GRPC.TLS, | ||
TLSConfig: options.GRPC.TLSSetting, | ||
SamplingProvider: c.samplingProvider, | ||
Logger: c.logger, | ||
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength, | ||
MaxConnectionAge: options.GRPC.MaxConnectionAge, | ||
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace, | ||
MaxReceiveMessageLength: options.GRPC.MaxRecvMsgSizeMiB, | ||
MaxConnectionAge: options.GRPC.Keepalive.ServerParameters.MaxConnectionAge, | ||
MaxConnectionAgeGrace: options.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("could not start gRPC server: %w", err) | ||
} | ||
c.grpcServer = grpcServer | ||
|
||
httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
HostPort: options.HTTP.HostPort, | ||
HostPort: options.HTTP.Endpoint, | ||
Handler: c.spanHandlers.JaegerBatchesHandler, | ||
TLSConfig: options.HTTP.TLS, | ||
TLSConfig: options.HTTP.TLSSetting, | ||
HealthCheck: c.hCheck, | ||
MetricsFactory: c.metricsFactory, | ||
SamplingProvider: c.samplingProvider, | ||
|
@@ -131,11 +127,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { | |
} | ||
c.hServer = httpServer | ||
|
||
c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS | ||
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS | ||
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS | ||
|
||
if options.Zipkin.HTTPHostPort == "" { | ||
if options.Zipkin.Endpoint == "" { | ||
c.logger.Info("Not listening for Zipkin HTTP traffic, port not configured") | ||
} else { | ||
zipkinReceiver, err := handler.StartZipkinReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr) | ||
|
@@ -209,17 +201,6 @@ func (c *Collector) Close() error { | |
} | ||
} | ||
|
||
// watchers actually never return errors from Close | ||
if c.tlsGRPCCertWatcherCloser != nil { | ||
_ = c.tlsGRPCCertWatcherCloser.Close() | ||
} | ||
if c.tlsHTTPCertWatcherCloser != nil { | ||
_ = c.tlsHTTPCertWatcherCloser.Close() | ||
} | ||
if c.tlsZipkinCertWatcherCloser != nil { | ||
_ = c.tlsZipkinCertWatcherCloser.Close() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,8 @@ import ( | |
"time" | ||
|
||
"github.com/spf13/viper" | ||
"go.opentelemetry.io/collector/config/configgrpc" | ||
"go.opentelemetry.io/collector/config/confighttp" | ||
"go.uber.org/zap" | ||
|
||
"github.com/jaegertracing/jaeger/cmd/internal/flags" | ||
|
@@ -106,26 +108,17 @@ type CollectorOptions struct { | |
// NumWorkers is the number of internal workers in a collector | ||
NumWorkers int | ||
// HTTP section defines options for HTTP server | ||
HTTP HTTPOptions | ||
HTTP confighttp.ServerConfig | ||
// GRPC section defines options for gRPC server | ||
GRPC GRPCOptions | ||
GRPC configgrpc.ServerConfig | ||
// OTLP section defines options for servers accepting OpenTelemetry OTLP format | ||
OTLP struct { | ||
Enabled bool | ||
GRPC GRPCOptions | ||
HTTP HTTPOptions | ||
GRPC configgrpc.ServerConfig | ||
HTTP confighttp.ServerConfig | ||
} | ||
// Zipkin section defines options for Zipkin HTTP server | ||
Zipkin struct { | ||
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests | ||
HTTPHostPort string | ||
// TLS configures secure transport for Zipkin endpoint to collect spans | ||
TLS tlscfg.Options | ||
// CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins. | ||
CORS corscfg.Options | ||
// KeepAlive configures allow Keep-Alive for Zipkin HTTP server | ||
KeepAlive bool | ||
} | ||
Zipkin confighttp.ServerConfig | ||
// CollectorTags is the string representing collector tags to append to each and every span | ||
CollectorTags map[string]string | ||
// SpanSizeMetricsEnabled determines whether to enable metrics based on processed span size | ||
|
@@ -137,39 +130,6 @@ type serverFlagsConfig struct { | |
tls tlscfg.ServerFlagsConfig | ||
} | ||
|
||
// HTTPOptions defines options for an HTTP server | ||
type HTTPOptions struct { | ||
// HostPort is the host:port address that the server listens on | ||
HostPort string | ||
// TLS configures secure transport for HTTP endpoint | ||
TLS tlscfg.Options | ||
// ReadTimeout sets the respective parameter of http.Server | ||
ReadTimeout time.Duration | ||
// ReadHeaderTimeout sets the respective parameter of http.Server | ||
ReadHeaderTimeout time.Duration | ||
// IdleTimeout sets the respective parameter of http.Server | ||
IdleTimeout time.Duration | ||
// CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins. | ||
CORS corscfg.Options | ||
} | ||
|
||
// GRPCOptions defines options for a gRPC server | ||
type GRPCOptions struct { | ||
// HostPort is the host:port address that the collector service listens in on for gRPC requests | ||
HostPort string | ||
// TLS configures secure transport for gRPC endpoint to collect spans | ||
TLS tlscfg.Options | ||
// MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. | ||
MaxReceiveMessageLength int | ||
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist. | ||
// See gRPC's keepalive.ServerParameters#MaxConnectionAge. | ||
MaxConnectionAge time.Duration | ||
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. | ||
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. | ||
MaxConnectionAgeGrace time.Duration | ||
// Tenancy configures tenancy for endpoints that collect spans | ||
Tenancy tenancy.Options | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// AddFlags adds flags for CollectorOptions | ||
func AddFlags(flagSet *flag.FlagSet) { | ||
|
@@ -223,30 +183,30 @@ func addGRPCFlags(flagSet *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort | |
cfg.tls.AddFlags(flagSet) | ||
} | ||
|
||
func (opts *HTTPOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error { | ||
opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) | ||
func initHTTPFromViper(v *viper.Viper, _ *zap.Logger, opts *confighttp.ServerConfig,cfg serverFlagsConfig) error { | ||
opts.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) | ||
opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout) | ||
opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout) | ||
opts.ReadHeaderTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadHeaderTimeout) | ||
tlsOpts, err := cfg.tls.InitFromViper(v) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse HTTP TLS options: %w", err) | ||
} | ||
opts.TLS = tlsOpts | ||
opts.TLSSetting = tlsOpts.ToOtelServerConfig() | ||
return nil | ||
} | ||
|
||
func (opts *GRPCOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error { | ||
opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) | ||
opts.MaxReceiveMessageLength = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength) | ||
opts.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge) | ||
opts.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace) | ||
func initGRPCFromViper(v *viper.Viper, _ *zap.Logger, opts *configgrpc.ServerConfig,cfg serverFlagsConfig) error { | ||
opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) | ||
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength) * 1024 * 1024 | ||
opts.Keepalive.ServerParameters.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge) | ||
opts.Keepalive.ServerParameters.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace) | ||
tlsOpts, err := cfg.tls.InitFromViper(v) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse gRPC TLS options: %w", err) | ||
} | ||
opts.TLS = tlsOpts | ||
opts.Tenancy = tenancy.InitFromViper(v) | ||
opts.TLSSetting = tlsOpts.ToOtelServerConfig() | ||
// opts.Tenancy = tenancy.InitFromViper(v) | ||
chahatsagarmain marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return nil | ||
} | ||
|
@@ -259,31 +219,33 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) | |
cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes | ||
cOpts.SpanSizeMetricsEnabled = v.GetBool(flagSpanSizeMetricsEnabled) | ||
|
||
if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil { | ||
if err := initHTTPFromViper(v, logger, &cOpts.HTTP,httpServerFlagsCfg); err != nil { | ||
return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err) | ||
} | ||
|
||
if err := cOpts.GRPC.initFromViper(v, logger, grpcServerFlagsCfg); err != nil { | ||
if err := initGRPCFromViper(v, logger, &cOpts.GRPC,grpcServerFlagsCfg); err != nil { | ||
return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err) | ||
} | ||
|
||
cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled) | ||
if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil { | ||
if err := initHTTPFromViper(v, logger, &cOpts.OTLP.HTTP,otlpServerFlagsCfg.HTTP); err != nil { | ||
return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err) | ||
} | ||
cOpts.OTLP.HTTP.CORS = corsOTLPFlags.InitFromViper(v) | ||
if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil { | ||
corsOpts := corsOTLPFlags.InitFromViper(v) | ||
cOpts.OTLP.HTTP.CORS = corsOpts.ToOTELCorsConfig() | ||
Comment on lines
+240
to
+241
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why isn't it part of initHTTPFromViper()? |
||
if err := initGRPCFromViper(v, logger, &cOpts.GRPC,otlpServerFlagsCfg.GRPC); err != nil { | ||
return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err) | ||
} | ||
|
||
cOpts.Zipkin.KeepAlive = v.GetBool(flagZipkinKeepAliveEnabled) | ||
cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort)) | ||
// cOpts.Zipkin. = v.GetBool(flagZipkinKeepAliveEnabled) | ||
cOpts.Zipkin.Endpoint = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should't this be simply |
||
tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v) | ||
if err != nil { | ||
return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err) | ||
} | ||
cOpts.Zipkin.TLS = tlsZipkin | ||
cOpts.Zipkin.CORS = corsZipkinFlags.InitFromViper(v) | ||
cOpts.Zipkin.TLSSetting = tlsZipkin.ToOtelServerConfig() | ||
corsOpts = corsZipkinFlags.InitFromViper(v) | ||
cOpts.Zipkin.CORS = corsOpts.ToOTELCorsConfig() | ||
|
||
return cOpts, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest changing server.GRPCServerParams to directly embed configgrpc struct, so that you don't have to do any field copying here