-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Refactor storage factories to hold one configuration #6156
base: main
Are you sure you want to change the base?
Changes from all commits
92cc3f4
8a548a1
dd04a13
209b834
89c7c70
c6b6274
92e8070
535aa2c
76a3528
50795b0
a4d41b9
11a560d
8c8e8ae
bbcdbd7
6c1fd95
43487cd
c65bd21
7f87200
7d65eb8
c385416
4194aab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ extensions: | |
password: "cassandra" | ||
tls: | ||
insecure: true | ||
is_archive: true | ||
receivers: | ||
otlp: | ||
protocols: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ extensions: | |
opensearch: | ||
indices: | ||
index_prefix: "jaeger-archive" | ||
is_archive: true | ||
|
||
receivers: | ||
otlp: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,6 +136,8 @@ type Configuration struct { | |
Tags TagsAsFields `mapstructure:"tags_as_fields"` | ||
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration. | ||
Enabled bool `mapstructure:"-"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still used? Let's add some comments, because this is not the first time I have this question, and the flag doesn't really make sense to me. |
||
// TODO: revisit if this needed | ||
IsArchive bool `mapstructure:"is_archive"` | ||
} | ||
|
||
// TagsAsFields holds configuration for tag schema. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,6 @@ const ( | |
var ( // interface comformance checks | ||
_ storage.Factory = (*Factory)(nil) | ||
_ storage.Purger = (*Factory)(nil) | ||
_ storage.ArchiveFactory = (*Factory)(nil) | ||
_ storage.SamplingStoreFactory = (*Factory)(nil) | ||
_ io.Closer = (*Factory)(nil) | ||
_ plugin.Configurable = (*Factory)(nil) | ||
|
@@ -50,22 +49,26 @@ var ( // interface comformance checks | |
type Factory struct { | ||
Options *Options | ||
|
||
primaryMetricsFactory metrics.Factory | ||
archiveMetricsFactory metrics.Factory | ||
logger *zap.Logger | ||
tracer trace.TracerProvider | ||
metricsFactory metrics.Factory | ||
logger *zap.Logger | ||
tracer trace.TracerProvider | ||
|
||
primaryConfig config.SessionBuilder | ||
primarySession cassandra.Session | ||
archiveConfig config.SessionBuilder | ||
archiveSession cassandra.Session | ||
config config.SessionBuilder | ||
session cassandra.Session | ||
} | ||
|
||
// NewFactory creates a new Factory. | ||
func NewFactory() *Factory { | ||
func NewFactory(isArchive bool) *Factory { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: let's not pass a flag but have a 2nd constructor NewArchiveFactory. Because |
||
var options *Options | ||
if isArchive { | ||
options = NewOptions(archiveStorageConfig) | ||
options.IsArchive = true | ||
} else { | ||
options = NewOptions(primaryStorageConfig) | ||
} | ||
return &Factory{ | ||
tracer: otel.GetTracerProvider(), | ||
Options: NewOptions(primaryStorageConfig, archiveStorageConfig), | ||
Options: options, | ||
} | ||
} | ||
|
||
|
@@ -75,7 +78,7 @@ func NewFactoryWithConfig( | |
metricsFactory metrics.Factory, | ||
logger *zap.Logger, | ||
) (*Factory, error) { | ||
f := NewFactory() | ||
f := NewFactory(opts.IsArchive) | ||
// use this to help with testing | ||
b := &withConfigBuilder{ | ||
f: f, | ||
|
@@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { | |
// InitFromOptions initializes factory from options. | ||
func (f *Factory) configureFromOptions(o *Options) { | ||
f.Options = o | ||
// TODO this is a hack because we do not define defaults in Options | ||
if o.others == nil { | ||
o.others = make(map[string]*NamespaceConfig) | ||
} | ||
f.primaryConfig = o.GetPrimary() | ||
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { | ||
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error | ||
} | ||
f.config = o.GetPrimary() | ||
} | ||
|
||
// Initialize implements storage.Factory | ||
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { | ||
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil}) | ||
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) | ||
metricsNamespace := "cassandra" | ||
if f.Options.IsArchive { | ||
metricsNamespace = "cassandra-archive" | ||
} | ||
f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil}) | ||
f.logger = logger | ||
|
||
primarySession, err := f.primaryConfig.NewSession() | ||
primarySession, err := f.config.NewSession() | ||
if err != nil { | ||
return err | ||
} | ||
f.primarySession = primarySession | ||
|
||
if f.archiveConfig != nil { | ||
archiveSession, err := f.archiveConfig.NewSession() | ||
if err != nil { | ||
return err | ||
} | ||
f.archiveSession = archiveSession | ||
} else { | ||
logger.Info("Cassandra archive storage configuration is empty, skipping") | ||
} | ||
f.session = primarySession | ||
|
||
return nil | ||
} | ||
|
||
// CreateSpanReader implements storage.Factory | ||
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { | ||
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) | ||
return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) | ||
} | ||
|
||
// CreateSpanWriter implements storage.Factory | ||
|
@@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { | |
if err != nil { | ||
return nil, err | ||
} | ||
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...) | ||
return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) | ||
} | ||
|
||
// CreateDependencyReader implements storage.Factory | ||
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { | ||
version := cDepStore.GetDependencyVersion(f.primarySession) | ||
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version) | ||
} | ||
|
||
// CreateArchiveSpanReader implements storage.ArchiveFactory | ||
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { | ||
if f.archiveSession == nil { | ||
return nil, storage.ErrArchiveStorageNotConfigured | ||
} | ||
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) | ||
} | ||
|
||
// CreateArchiveSpanWriter implements storage.ArchiveFactory | ||
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { | ||
if f.archiveSession == nil { | ||
return nil, storage.ErrArchiveStorageNotConfigured | ||
} | ||
options, err := writerOptions(f.Options) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...) | ||
version := cDepStore.GetDependencyVersion(f.session) | ||
return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version) | ||
} | ||
|
||
// CreateLock implements storage.SamplingStoreFactory | ||
|
@@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { | |
} | ||
f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId)) | ||
|
||
return cLock.NewLock(f.primarySession, hostId), nil | ||
return cLock.NewLock(f.session, hostId), nil | ||
} | ||
|
||
// CreateSamplingStore implements storage.SamplingStoreFactory | ||
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { | ||
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil | ||
return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil | ||
} | ||
|
||
func writerOptions(opts *Options) ([]cSpanStore.Option, error) { | ||
|
@@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil) | |
|
||
// Close closes the resources held by the factory | ||
func (f *Factory) Close() error { | ||
if f.primarySession != nil { | ||
f.primarySession.Close() | ||
} | ||
if f.archiveSession != nil { | ||
f.archiveSession.Close() | ||
if f.session != nil { | ||
f.session.Close() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (f *Factory) Purge(_ context.Context) error { | ||
return f.primarySession.Query("TRUNCATE traces").Exec() | ||
return f.session.Query("TRUNCATE traces").Exec() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yurishkuro any thoughts on how we should handle this case here? previously, this wouldn't initialize the archive storage if the factory didn't implement the
ArchiveStorage
interface but now it always will.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be different from all other v1 binaries. If I run
I see that it supports the same
--cassandra.*
and--cassandra-archive.*
flags, so we have enough information to instantiate two separate factories, just like v1 all-in-one/query would have to do.