Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][collector] Switch to OTEL's http/grpc server #6277

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 12 additions & 31 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -47,13 +46,10 @@ type Collector struct {
tenancyMgr *tenancy.Manager

// state, read only
hServer *http.Server
grpcServer *grpc.Server
otlpReceiver receiver.Traces
zipkinReceiver receiver.Traces
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
hServer *http.Server
grpcServer *grpc.Server
otlpReceiver receiver.Traces
zipkinReceiver receiver.Traces
}

// CollectorParams to construct a new Jaeger Collector.
Expand Down Expand Up @@ -103,24 +99,24 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest changing server.GRPCServerParams to directly embed configgrpc struct, so that you don't have to do any field copying here

HostPort: options.GRPC.HostPort,
HostPort: options.GRPC.NetAddr.Endpoint,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.GRPC.TLS,
TLSConfig: options.GRPC.TLSSetting,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
MaxReceiveMessageLength: options.GRPC.MaxRecvMsgSizeMiB,
MaxConnectionAge: options.GRPC.Keepalive.ServerParameters.MaxConnectionAge,
MaxConnectionAgeGrace: options.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

HostPort: options.HTTP.HostPort,
HostPort: options.HTTP.Endpoint,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
TLSConfig: options.HTTP.TLSSetting,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Expand All @@ -131,11 +127,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}
c.hServer = httpServer

c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS

if options.Zipkin.HTTPHostPort == "" {
if options.Zipkin.Endpoint == "" {
c.logger.Info("Not listening for Zipkin HTTP traffic, port not configured")
} else {
zipkinReceiver, err := handler.StartZipkinReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
Expand Down Expand Up @@ -209,17 +201,6 @@ func (c *Collector) Close() error {
}
}

// watchers actually never return errors from Close
if c.tlsGRPCCertWatcherCloser != nil {
_ = c.tlsGRPCCertWatcherCloser.Close()
}
if c.tlsHTTPCertWatcherCloser != nil {
_ = c.tlsHTTPCertWatcherCloser.Close()
}
if c.tlsZipkinCertWatcherCloser != nil {
_ = c.tlsZipkinCertWatcherCloser.Close()
}

return nil
}

Expand Down
20 changes: 10 additions & 10 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *flags.CollectorOptions {
collectorOpts := &flags.CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.GRPC.NetAddr.Endpoint = ":0"
collectorOpts.HTTP.Endpoint = ":0"
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.HostPort = ":0"
collectorOpts.OTLP.HTTP.HostPort = ":0"
collectorOpts.Zipkin.HTTPHostPort = ":0"
collectorOpts.OTLP.GRPC.NetAddr.Endpoint = ":0"
collectorOpts.OTLP.HTTP.Endpoint = ":0"
collectorOpts.Zipkin.Endpoint = ":0"
return collectorOpts
}

Expand Down Expand Up @@ -112,23 +112,23 @@ func TestCollector_StartErrors(t *testing.T) {
var options *flags.CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
options.GRPC.NetAddr.Endpoint = ":-1"
run("gRPC", options, "could not start gRPC server")

options = optionsForEphemeralPorts()
options.HTTP.HostPort = ":-1"
options.HTTP.Endpoint = ":-1"
run("HTTP", options, "could not start HTTP server")

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
options.Zipkin.Endpoint = ":-1"
run("Zipkin", options, "could not start Zipkin receiver")

options = optionsForEphemeralPorts()
options.OTLP.GRPC.HostPort = ":-1"
options.OTLP.GRPC.NetAddr.Endpoint = ":-1"
run("OTLP/GRPC", options, "could not start OTLP receiver")

options = optionsForEphemeralPorts()
options.OTLP.HTTP.HostPort = ":-1"
options.OTLP.HTTP.Endpoint = ":-1"
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

Expand Down
94 changes: 28 additions & 66 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/spf13/viper"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/internal/flags"
Expand Down Expand Up @@ -106,26 +108,17 @@ type CollectorOptions struct {
// NumWorkers is the number of internal workers in a collector
NumWorkers int
// HTTP section defines options for HTTP server
HTTP HTTPOptions
HTTP confighttp.ServerConfig
// GRPC section defines options for gRPC server
GRPC GRPCOptions
GRPC configgrpc.ServerConfig
// OTLP section defines options for servers accepting OpenTelemetry OTLP format
OTLP struct {
Enabled bool
GRPC GRPCOptions
HTTP HTTPOptions
GRPC configgrpc.ServerConfig
HTTP confighttp.ServerConfig
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
HTTPHostPort string
// TLS configures secure transport for Zipkin endpoint to collect spans
TLS tlscfg.Options
// CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins.
CORS corscfg.Options
// KeepAlive configures allow Keep-Alive for Zipkin HTTP server
KeepAlive bool
}
Zipkin confighttp.ServerConfig
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// SpanSizeMetricsEnabled determines whether to enable metrics based on processed span size
Expand All @@ -137,39 +130,6 @@ type serverFlagsConfig struct {
tls tlscfg.ServerFlagsConfig
}

// HTTPOptions defines options for an HTTP server
type HTTPOptions struct {
// HostPort is the host:port address that the server listens on
HostPort string
// TLS configures secure transport for HTTP endpoint
TLS tlscfg.Options
// ReadTimeout sets the respective parameter of http.Server
ReadTimeout time.Duration
// ReadHeaderTimeout sets the respective parameter of http.Server
ReadHeaderTimeout time.Duration
// IdleTimeout sets the respective parameter of http.Server
IdleTimeout time.Duration
// CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins.
CORS corscfg.Options
}

// GRPCOptions defines options for a gRPC server
type GRPCOptions struct {
// HostPort is the host:port address that the collector service listens in on for gRPC requests
HostPort string
// TLS configures secure transport for gRPC endpoint to collect spans
TLS tlscfg.Options
// MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
MaxReceiveMessageLength int
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist.
// See gRPC's keepalive.ServerParameters#MaxConnectionAge.
MaxConnectionAge time.Duration
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
// Tenancy configures tenancy for endpoints that collect spans
Tenancy tenancy.Options
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

// AddFlags adds flags for CollectorOptions
func AddFlags(flagSet *flag.FlagSet) {
Expand Down Expand Up @@ -223,30 +183,30 @@ func addGRPCFlags(flagSet *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort
cfg.tls.AddFlags(flagSet)
}

func (opts *HTTPOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error {
opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
func initHTTPFromViper(v *viper.Viper, _ *zap.Logger, opts *confighttp.ServerConfig,cfg serverFlagsConfig) error {
opts.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout)
opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout)
opts.ReadHeaderTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadHeaderTimeout)
tlsOpts, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse HTTP TLS options: %w", err)
}
opts.TLS = tlsOpts
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
return nil
}

func (opts *GRPCOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error {
opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.MaxReceiveMessageLength = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength)
opts.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge)
opts.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace)
func initGRPCFromViper(v *viper.Viper, _ *zap.Logger, opts *configgrpc.ServerConfig,cfg serverFlagsConfig) error {
opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength) * 1024 * 1024
opts.Keepalive.ServerParameters.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge)
opts.Keepalive.ServerParameters.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace)
tlsOpts, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse gRPC TLS options: %w", err)
}
opts.TLS = tlsOpts
opts.Tenancy = tenancy.InitFromViper(v)
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
// opts.Tenancy = tenancy.InitFromViper(v)
chahatsagarmain marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand All @@ -259,31 +219,33 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger)
cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.SpanSizeMetricsEnabled = v.GetBool(flagSpanSizeMetricsEnabled)

if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil {
if err := initHTTPFromViper(v, logger, &cOpts.HTTP,httpServerFlagsCfg); err != nil {
return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err)
}

if err := cOpts.GRPC.initFromViper(v, logger, grpcServerFlagsCfg); err != nil {
if err := initGRPCFromViper(v, logger, &cOpts.GRPC,grpcServerFlagsCfg); err != nil {
return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err)
}

cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled)
if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil {
if err := initHTTPFromViper(v, logger, &cOpts.OTLP.HTTP,otlpServerFlagsCfg.HTTP); err != nil {
return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err)
}
cOpts.OTLP.HTTP.CORS = corsOTLPFlags.InitFromViper(v)
if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil {
corsOpts := corsOTLPFlags.InitFromViper(v)
cOpts.OTLP.HTTP.CORS = corsOpts.ToOTELCorsConfig()
Comment on lines +240 to +241
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't it part of initHTTPFromViper()?

if err := initGRPCFromViper(v, logger, &cOpts.GRPC,otlpServerFlagsCfg.GRPC); err != nil {
return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err)
}

cOpts.Zipkin.KeepAlive = v.GetBool(flagZipkinKeepAliveEnabled)
cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort))
// cOpts.Zipkin. = v.GetBool(flagZipkinKeepAliveEnabled)
cOpts.Zipkin.Endpoint = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should't this be simply initHTTPFromViper(..., &cOpts.Zipkin, ...)?

tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v)
if err != nil {
return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err)
}
cOpts.Zipkin.TLS = tlsZipkin
cOpts.Zipkin.CORS = corsZipkinFlags.InitFromViper(v)
cOpts.Zipkin.TLSSetting = tlsZipkin.ToOtelServerConfig()
corsOpts = corsZipkinFlags.InitFromViper(v)
cOpts.Zipkin.CORS = corsOpts.ToOTELCorsConfig()

return cOpts, nil
}
Loading
Loading