diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 3ead38258c2..a7dffefe2df 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -52,6 +52,7 @@ extensions: password: "cassandra" tls: insecure: true + is_archive: true receivers: otlp: protocols: diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 0883dc3b395..d3bd9654007 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -58,6 +58,7 @@ extensions: elasticsearch: indices: index_prefix: "jaeger-archive" + is_archive: true receivers: otlp: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index 1fe57d42c39..8654863c3d4 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -58,6 +58,7 @@ extensions: opensearch: indices: index_prefix: "jaeger-archive" + is_archive: true receivers: otlp: diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 6fd95566e9f..bbc5353bd31 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -279,18 +279,6 @@ func TestServerAddArchiveStorage(t *testing.T) { expectedOutput: "", expectedErr: "cannot find archive storage factory: cannot find extension", }, - { - name: "Archive storage not supported", - config: &Config{ - Storage: Storage{ - TracesArchive: "badger", - }, - }, - qSvcOpts: &querysvc.QueryServiceOptions{}, - extension: fakeStorageExt{}, - expectedOutput: "Archive storage not supported by the factory", - expectedErr: "", - }, } for _, tt := range tests { diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 14c6237953b..b0a4f3b1347 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -92,24 +92,12 @@ func TestBuildQueryServiceOptions(t *testing.T) { require.NoError(t, err) assert.NotNil(t, qOpts) - qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop()) - assert.NotNil(t, qSvcOpts) - assert.NotNil(t, qSvcOpts.Adjuster) - assert.Nil(t, qSvcOpts.ArchiveSpanReader) - assert.Nil(t, qSvcOpts.ArchiveSpanWriter) - - comboFactory := struct { - *mocks.Factory - *mocks.ArchiveFactory - }{ - &mocks.Factory{}, - &mocks.ArchiveFactory{}, - } + factory := &mocks.Factory{} - comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil) - comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil) + factory.On("CreateSpanReader").Return(&spanstore_mocks.Reader{}, nil) + factory.On("CreateSpanWriter").Return(&spanstore_mocks.Writer{}, nil) - qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop()) + qSvcOpts := qOpts.BuildQueryServiceOptions(factory, zap.NewNop()) assert.NotNil(t, qSvcOpts) assert.NotNil(t, qSvcOpts.Adjuster) assert.NotNil(t, qSvcOpts.ArchiveSpanReader) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 135ecc60bbe..54bb09a25f4 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -128,25 +128,12 @@ func (qs QueryService) GetCapabilities() StorageCapabilities { // InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool { - archiveFactory, ok := storageFactory.(storage.ArchiveFactory) - if !ok { - logger.Info("Archive storage not supported by the factory") - return false - } - reader, err := archiveFactory.CreateArchiveSpanReader() - if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) { - logger.Info("Archive storage not created", zap.String("reason", err.Error())) - return false - } + reader, err := storageFactory.CreateSpanReader() if err != nil { logger.Error("Cannot init archive storage reader", zap.Error(err)) return false } - writer, err := archiveFactory.CreateArchiveSpanWriter() - if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) { - logger.Info("Archive storage not created", zap.String("reason", err.Error())) - return false - } + writer, err := storageFactory.CreateSpanWriter() if err != nil { logger.Error("Cannot init archive storage writer", zap.Error(err)) return false diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 582a4568509..86e1d220ca4 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -315,31 +315,22 @@ func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil } func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil } -func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr } -func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr } +func (f *fakeStorageFactory2) CreateSpanReader() (spanstore.Reader, error) { return f.r, f.rErr } +func (f *fakeStorageFactory2) CreateSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr } var ( - _ storage.Factory = new(fakeStorageFactory1) - _ storage.ArchiveFactory = new(fakeStorageFactory2) + _ storage.Factory = new(fakeStorageFactory1) + _ storage.Factory = new(fakeStorageFactory2) ) func TestInitArchiveStorageErrors(t *testing.T) { opts := &QueryServiceOptions{} logger := zap.NewNop() - assert.False(t, opts.InitArchiveStorage(new(fakeStorageFactory1), logger)) - assert.False(t, opts.InitArchiveStorage( - &fakeStorageFactory2{rErr: storage.ErrArchiveStorageNotConfigured}, - logger, - )) assert.False(t, opts.InitArchiveStorage( &fakeStorageFactory2{rErr: errors.New("error")}, logger, )) - assert.False(t, opts.InitArchiveStorage( - &fakeStorageFactory2{wErr: storage.ErrArchiveStorageNotConfigured}, - logger, - )) assert.False(t, opts.InitArchiveStorage( &fakeStorageFactory2{wErr: errors.New("error")}, logger, diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 81ec248e4c6..e81d17ab542 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -68,7 +68,7 @@ func runQueryService(t *testing.T, esURL string) *Server { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zaptest.NewLogger(t) - f := es.NewFactory() + f := es.NewFactory(false) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags([]string{ "--es.tls.enabled=false", diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index 51f88a16699..606255649bd 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -54,7 +54,7 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man }, nil } -func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) { +func createGRPCHandler(f storage.Factory, _ *zap.Logger) (*shared.GRPCHandler, error) { reader, err := f.CreateSpanReader() if err != nil { return nil, err @@ -78,7 +78,8 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl // borrow code from Query service for archive storage qOpts := &querysvc.QueryServiceOptions{} // when archive storage not initialized (returns false), the reader/writer will be nil - _ = qOpts.InitArchiveStorage(f, logger) + // TODO: what should we do here? + // _ = qOpts.InitArchiveStorage(f, logger) impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader } impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter } diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index f5104a4691b..7610be8aaef 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -136,6 +136,8 @@ type Configuration struct { Tags TagsAsFields `mapstructure:"tags_as_fields"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` + // TODO: revisit if this needed + IsArchive bool `mapstructure:"is_archive"` } // TagsAsFields holds configuration for tag schema. diff --git a/plugin/storage/blackhole/factory.go b/plugin/storage/blackhole/factory.go index ad149593633..7cbdd4ae2eb 100644 --- a/plugin/storage/blackhole/factory.go +++ b/plugin/storage/blackhole/factory.go @@ -13,10 +13,8 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) -) +// interface comformance checks +var _ storage.Factory = (*Factory)(nil) // Factory implements storage.Factory and creates blackhole storage components. type Factory struct { @@ -48,16 +46,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { return f.store, nil } -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - return f.store, nil -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil diff --git a/plugin/storage/blackhole/factory_test.go b/plugin/storage/blackhole/factory_test.go index 0ef1a783176..134e44047c1 100644 --- a/plugin/storage/blackhole/factory_test.go +++ b/plugin/storage/blackhole/factory_test.go @@ -27,12 +27,6 @@ func TestStorageFactory(t *testing.T) { writer, err := f.CreateSpanWriter() require.NoError(t, err) assert.Equal(t, f.store, writer) - reader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.Equal(t, f.store, reader) - writer, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.Equal(t, f.store, writer) depReader, err := f.CreateDependencyReader() require.NoError(t, err) assert.Equal(t, f.store, depReader) diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 4654a2a2c82..f7f95a83465 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -40,7 +40,6 @@ const ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) _ storage.Purger = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ io.Closer = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) @@ -50,22 +49,26 @@ var ( // interface comformance checks type Factory struct { Options *Options - primaryMetricsFactory metrics.Factory - archiveMetricsFactory metrics.Factory - logger *zap.Logger - tracer trace.TracerProvider + metricsFactory metrics.Factory + logger *zap.Logger + tracer trace.TracerProvider - primaryConfig config.SessionBuilder - primarySession cassandra.Session - archiveConfig config.SessionBuilder - archiveSession cassandra.Session + config config.SessionBuilder + session cassandra.Session } // NewFactory creates a new Factory. -func NewFactory() *Factory { +func NewFactory(isArchive bool) *Factory { + var options *Options + if isArchive { + options = NewOptions(archiveStorageConfig) + options.IsArchive = true + } else { + options = NewOptions(primaryStorageConfig) + } return &Factory{ tracer: otel.GetTracerProvider(), - Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + Options: options, } } @@ -75,7 +78,7 @@ func NewFactoryWithConfig( metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { - f := NewFactory() + f := NewFactory(opts.IsArchive) // use this to help with testing b := &withConfigBuilder{ f: f, @@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { // InitFromOptions initializes factory from options. func (f *Factory) configureFromOptions(o *Options) { f.Options = o - // TODO this is a hack because we do not define defaults in Options - if o.others == nil { - o.others = make(map[string]*NamespaceConfig) - } - f.primaryConfig = o.GetPrimary() - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error - } + f.config = o.GetPrimary() } // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil}) - f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) + metricsNamespace := "cassandra" + if f.Options.IsArchive { + metricsNamespace = "cassandra-archive" + } + f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.config.NewSession() if err != nil { return err } - f.primarySession = primarySession - - if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession() - if err != nil { - return err - } - f.archiveSession = archiveSession - } else { - logger.Info("Cassandra archive storage configuration is empty, skipping") - } + f.session = primarySession + return nil } // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) + return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) } // CreateSpanWriter implements storage.Factory @@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { if err != nil { return nil, err } - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...) + return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - version := cDepStore.GetDependencyVersion(f.primarySession) - return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version) -} - -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - options, err := writerOptions(f.Options) - if err != nil { - return nil, err - } - return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...) + version := cDepStore.GetDependencyVersion(f.session) + return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version) } // CreateLock implements storage.SamplingStoreFactory @@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { } f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId)) - return cLock.NewLock(f.primarySession, hostId), nil + return cLock.NewLock(f.session, hostId), nil } // CreateSamplingStore implements storage.SamplingStoreFactory func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { - return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil + return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil } func writerOptions(opts *Options) ([]cSpanStore.Option, error) { @@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { - if f.primarySession != nil { - f.primarySession.Close() - } - if f.archiveSession != nil { - f.archiveSession.Close() + if f.session != nil { + f.session.Close() } return nil } func (f *Factory) Purge(_ context.Context) error { - return f.primarySession.Query("TRUNCATE traces").Exec() + return f.session.Query("TRUNCATE traces").Exec() } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 9a2fcc486cf..a236ec5b3b2 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -39,15 +39,14 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { } func TestCassandraFactory(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) + logger, _ := testutils.NewLogger() + f := NewFactory(false) + v, _ := config.Viperize(f.AddFlags) f.InitFromViper(v, zap.NewNop()) // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -57,13 +56,11 @@ func TestCassandraFactory(t *testing.T) { session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) session.On("Close").Return() query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - f.archiveConfig = nil + f.config = newMockSessionBuilder(session, nil) require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") _, err := f.CreateSpanReader() require.NoError(t, err) @@ -74,21 +71,6 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateDependencyReader() require.NoError(t, err) - _, err = f.CreateArchiveSpanReader() - require.EqualError(t, err, "archive storage not configured") - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "archive storage not configured") - - f.archiveConfig = newMockSessionBuilder(session, nil) - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - - _, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - _, err = f.CreateLock() require.NoError(t, err) @@ -99,11 +81,9 @@ func TestCassandraFactory(t *testing.T) { } func TestExclusiveWhitelistBlacklist(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() + f := NewFactory(false) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{ - "--cassandra-archive.enabled=true", "--cassandra.index.tag-whitelist=a,b,c", "--cassandra.index.tag-blacklist=a,b,c", }) @@ -111,7 +91,7 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -120,22 +100,10 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - - f.archiveConfig = nil - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + f.config = newMockSessionBuilder(session, nil) _, err := f.CreateSpanWriter() require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") - - f.archiveConfig = &mockSessionBuilder{} - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") } func TestWriterOptions(t *testing.T) { @@ -181,13 +149,11 @@ func TestWriterOptions(t *testing.T) { } func TestConfigureFromOptions(t *testing.T) { - f := NewFactory() - o := NewOptions("foo", archiveStorageConfig) - o.others[archiveStorageConfig].Enabled = true + f := NewFactory(false) + o := NewOptions("foo") f.configureFromOptions(o) assert.Equal(t, o, f.Options) - assert.Equal(t, o.GetPrimary(), f.primaryConfig) - assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) + assert.Equal(t, o.GetPrimary(), f.config) } func TestNewFactoryWithConfig(t *testing.T) { @@ -201,7 +167,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -223,7 +189,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -242,14 +208,14 @@ func TestNewFactoryWithConfig(t *testing.T) { } func TestFactory_Purge(t *testing.T) { - f := NewFactory() + f := NewFactory(false) var ( session = &mocks.Session{} query = &mocks.Query{} ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primarySession = session + f.session = session err := f.Purge(context.Background()) require.NoError(t, err) diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index e266436ba29..3c876dfa904 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -49,9 +49,10 @@ const ( // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { Primary NamespaceConfig `mapstructure:",squash"` - others map[string]*NamespaceConfig - SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` - Index IndexConfig `mapstructure:"index"` + SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` + Index IndexConfig `mapstructure:"index"` + // TODO: this is just a placeholder + IsArchive bool `mapstructure:"is_archive"` } // IndexConfig configures indexing. @@ -74,31 +75,22 @@ type NamespaceConfig struct { } // NewOptions creates a new Options struct. -func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(namespace string) *Options { // TODO all default values should be defined via cobra flags options := &Options{ Primary: NamespaceConfig{ Configuration: config.DefaultConfiguration(), - namespace: primaryNamespace, + namespace: namespace, Enabled: true, }, - others: make(map[string]*NamespaceConfig, len(otherNamespaces)), SpanStoreWriteCacheTTL: time.Hour * 12, } - - for _, namespace := range otherNamespaces { - options.others[namespace] = &NamespaceConfig{namespace: namespace} - } - return options } // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { addFlags(flagSet, opt.Primary) - for _, cfg := range opt.others { - addFlags(flagSet, *cfg) - } flagSet.Duration(opt.Primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") @@ -206,9 +198,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { opt.Primary.initFromViper(v) - for _, cfg := range opt.others { - cfg.initFromViper(v) - } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.Primary.namespace + suffixSpanStoreWriteCacheTTL) opt.Index.TagBlackList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsBlacklist)) opt.Index.TagWhiteList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsWhitelist)) @@ -260,23 +249,6 @@ func (opt *Options) GetPrimary() *config.Configuration { return &opt.Primary.Configuration } -// Get returns auxiliary named configuration. -func (opt *Options) Get(namespace string) *config.Configuration { - nsCfg, ok := opt.others[namespace] - if !ok { - nsCfg = &NamespaceConfig{} - opt.others[namespace] = nsCfg - } - if !nsCfg.Enabled { - return nil - } - nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Connection.Servers) == 0 { - nsCfg.Connection.Servers = opt.Primary.Connection.Servers - } - return &nsCfg.Configuration -} - // TagIndexBlacklist returns the list of blacklisted tags func (opt *Options) TagIndexBlacklist() []string { if len(opt.Index.TagBlackList) > 0 { diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 8ccd813c838..418554b2666 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -6,10 +6,8 @@ package cassandra import ( "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -20,22 +18,10 @@ func TestOptions(t *testing.T) { assert.NotEmpty(t, primary.Schema.Keyspace) assert.NotEmpty(t, primary.Connection.Servers) assert.Equal(t, 2, primary.Connection.ConnectionsPerHost) - - aux := opts.Get("archive") - assert.Nil(t, aux) - - assert.NotNil(t, opts.others["archive"]) - opts.others["archive"].Enabled = true - aux = opts.Get("archive") - require.NotNil(t, aux) - assert.Equal(t, primary.Schema.Keyspace, aux.Schema.Keyspace) - assert.Equal(t, primary.Connection.Servers, aux.Connection.Servers) - assert.Equal(t, primary.Connection.ConnectionsPerHost, aux.Connection.ConnectionsPerHost) - assert.Equal(t, primary.Connection.ReconnectInterval, aux.Connection.ReconnectInterval) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("cas", "cas-aux") + opts := NewOptions("cas") v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ "--cas.keyspace=jaeger", @@ -56,13 +42,6 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.datastax.bdp.cassandra.auth.DseAuthenticator", "--cas.username=username", "--cas.password=password", - // enable aux with a couple overrides - "--cas-aux.enabled=true", - "--cas-aux.keyspace=jaeger-archive", - "--cas-aux.servers=3.3.3.3, 4.4.4.4", - "--cas-aux.username=username", - "--cas-aux.password=password", - "--cas-aux.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", }) opts.InitFromViper(v) @@ -77,20 +56,6 @@ func TestOptionsWithFlags(t *testing.T) { assert.True(t, opts.Index.Tags) assert.False(t, opts.Index.ProcessTags) assert.True(t, opts.Index.Logs) - - aux := opts.Get("cas-aux") - require.NotNil(t, aux) - assert.Equal(t, "jaeger-archive", aux.Schema.Keyspace) - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) - assert.Equal(t, 42, aux.Query.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Query.Timeout) - assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) - assert.Equal(t, 4242, aux.Connection.Port) - assert.Equal(t, "", aux.Query.Consistency, "aux storage does not inherit consistency from primary") - assert.Equal(t, 3, aux.Connection.ProtoVersion) - assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } func TestDefaultTlsHostVerify(t *testing.T) { diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 2ce06af5fc9..85a6821ba82 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -41,11 +41,10 @@ const ( ) var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ plugin.Configurable = (*Factory)(nil) - _ storage.Purger = (*Factory)(nil) + _ storage.Factory = (*Factory)(nil) + _ io.Closer = (*Factory)(nil) + _ plugin.Configurable = (*Factory)(nil) + _ storage.Purger = (*Factory)(nil) ) // Factory implements storage.Factory for Elasticsearch backend. @@ -58,19 +57,24 @@ type Factory struct { newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) - primaryConfig *config.Configuration - archiveConfig *config.Configuration + config *config.Configuration - primaryClient atomic.Pointer[es.Client] - archiveClient atomic.Pointer[es.Client] + client atomic.Pointer[es.Client] watchers []*fswatcher.FSWatcher } // NewFactory creates a new Factory. -func NewFactory() *Factory { +func NewFactory(isArchive bool) *Factory { + var options *Options + if isArchive { + options = NewOptions(archiveNamespace) + options.Primary.IsArchive = true + } else { + options = NewOptions(primaryNamespace) + } return &Factory{ - Options: NewOptions(primaryNamespace, archiveNamespace), + Options: options, newClientFn: config.NewClient, tracer: otel.GetTracerProvider(), } @@ -88,20 +92,11 @@ func NewFactoryWithConfig( defaultConfig := DefaultConfig() cfg.ApplyDefaults(&defaultConfig) - archive := make(map[string]*namespaceConfig) - archive[archiveNamespace] = &namespaceConfig{ - Configuration: cfg, - namespace: archiveNamespace, + f := &Factory{ + config: &cfg, + newClientFn: config.NewClient, + tracer: otel.GetTracerProvider(), } - - f := NewFactory() - f.configureFromOptions(&Options{ - Primary: namespaceConfig{ - Configuration: cfg, - namespace: primaryNamespace, - }, - others: archive, - }) err := f.Initialize(metricsFactory, logger) if err != nil { return nil, err @@ -123,56 +118,32 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { // configureFromOptions configures factory from Options struct. func (f *Factory) configureFromOptions(o *Options) { f.Options = o - f.primaryConfig = f.Options.GetPrimary() - f.archiveConfig = f.Options.Get(archiveNamespace) + f.config = f.Options.GetPrimary() } // Initialize implements storage.Factory. func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) + client, err := f.newClientFn(f.config, logger, metricsFactory) if err != nil { - return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) - } - f.primaryClient.Store(&primaryClient) - - if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { - primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) - if err != nil { - return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) - } - f.watchers = append(f.watchers, primaryWatcher) + return fmt.Errorf("failed to create Elasticsearch client: %w", err) } + f.client.Store(&client) - if f.archiveConfig.Enabled { - archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) + if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" { + watcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPasswordChange, f.logger) if err != nil { - return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) - } - f.archiveClient.Store(&archiveClient) - - if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { - archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger) - if err != nil { - return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) - } - f.watchers = append(f.watchers, archiveWatcher) + return fmt.Errorf("failed to create watcher for ES client's password: %w", err) } + f.watchers = append(f.watchers, watcher) } return nil } -func (f *Factory) getPrimaryClient() es.Client { - if c := f.primaryClient.Load(); c != nil { - return *c - } - return nil -} - -func (f *Factory) getArchiveClient() es.Client { - if c := f.archiveClient.Load(); c != nil { +func (f *Factory) getClient() es.Client { + if c := f.client.Load(); c != nil { return *c } return nil @@ -180,39 +151,22 @@ func (f *Factory) getArchiveClient() es.Client { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) + return createSpanReader(f.getClient, f.config, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) + return createSpanWriter(f.getClient, f.config, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger) -} - -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if !f.archiveConfig.Enabled { - return nil, nil - } - return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if !f.archiveConfig.Enabled { - return nil, nil - } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) + return createDependencyReader(f.getClient, f.config, f.logger) } func createSpanReader( clientFn func() es.Client, cfg *config.Configuration, - archive bool, mFactory metrics.Factory, logger *zap.Logger, tp trace.TracerProvider, @@ -229,7 +183,7 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: cfg.UseReadWriteAliases, - Archive: archive, + Archive: cfg.IsArchive, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, MetricsFactory: mFactory, @@ -240,7 +194,6 @@ func createSpanReader( func createSpanWriter( clientFn func() es.Client, cfg *config.Configuration, - archive bool, mFactory metrics.Factory, logger *zap.Logger, ) (spanstore.Writer, error) { @@ -262,7 +215,7 @@ func createSpanWriter( AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, - Archive: archive, + Archive: cfg.IsArchive, UseReadWriteAliases: cfg.UseReadWriteAliases, Logger: logger, MetricsFactory: mFactory, @@ -285,23 +238,23 @@ func createSpanWriter( func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { params := esSampleStore.Params{ - Client: f.getPrimaryClient, + Client: f.getClient, Logger: f.logger, - IndexPrefix: f.primaryConfig.Indices.IndexPrefix, - IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, - IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), - Lookback: f.primaryConfig.AdaptiveSamplingLookback, - MaxDocCount: f.primaryConfig.MaxDocCount, + IndexPrefix: f.config.Indices.IndexPrefix, + IndexDateLayout: f.config.Indices.Sampling.DateLayout, + IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency), + Lookback: f.config.AdaptiveSamplingLookback, + MaxDocCount: f.config.MaxDocCount, } store := esSampleStore.NewSamplingStore(params) - if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM { - mappingBuilder := mappingBuilderFromConfig(f.primaryConfig) + if f.config.CreateIndexTemplates && !f.config.UseILM { + mappingBuilder := mappingBuilderFromConfig(f.config) samplingMapping, err := mappingBuilder.GetSamplingMappings() if err != nil { return nil, err } - if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil { + if _, err := f.getClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil { return nil, fmt.Errorf("failed to create template: %w", err) } } @@ -343,20 +296,13 @@ func (f *Factory) Close() error { for _, w := range f.watchers { errs = append(errs, w.Close()) } - errs = append(errs, f.getPrimaryClient().Close()) - if client := f.getArchiveClient(); client != nil { - errs = append(errs, client.Close()) - } + errs = append(errs, f.getClient().Close()) return errors.Join(errs...) } -func (f *Factory) onPrimaryPasswordChange() { - f.onClientPasswordChange(f.primaryConfig, &f.primaryClient) -} - -func (f *Factory) onArchivePasswordChange() { - f.onClientPasswordChange(f.archiveConfig, &f.archiveClient) +func (f *Factory) onPasswordChange() { + f.onClientPasswordChange(f.config, &f.client) } func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) { @@ -383,7 +329,7 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom } func (f *Factory) Purge(ctx context.Context) error { - esClient := f.getPrimaryClient() + esClient := f.getClient() _, err := esClient.DeleteIndex("*").Do(ctx) return err } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 4da9a6c6f45..ce227036598 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -61,22 +61,13 @@ func (m *mockClientBuilder) NewClient(*escfg.Configuration, *zap.Logger, metrics } func TestElasticsearchFactory(t *testing.T) { - f := NewFactory() + f := NewFactory(false) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{}) f.InitFromViper(v, zap.NewNop()) f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error")}).NewClient - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error") - - f.archiveConfig.Enabled = true - f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { - // to test archive storage error, pretend that primary client creation is successful - // but override newClientFn so it fails for the next invocation - f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error2")}).NewClient - return (&mockClientBuilder{}).NewClient(c, logger, metricsFactory) - } - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2") + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create Elasticsearch client: made-up error") f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -90,12 +81,6 @@ func TestElasticsearchFactory(t *testing.T) { _, err = f.CreateDependencyReader() require.NoError(t, err) - _, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - - _, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - _, err = f.CreateSamplingStore(1) require.NoError(t, err) @@ -103,13 +88,12 @@ func TestElasticsearchFactory(t *testing.T) { } func TestElasticsearchTagsFileDoNotExist(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ + f := NewFactory(false) + f.config = &escfg.Configuration{ Tags: escfg.TagsAsFields{ File: "fixtures/file-does-not-exist.txt", }, } - f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) defer f.Close() @@ -119,11 +103,10 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { } func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ + f := NewFactory(false) + f.config = &escfg.Configuration{ UseILM: true, } - f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) defer f.Close() @@ -194,9 +177,8 @@ func TestTagKeysAsFields(t *testing.T) { } func TestCreateTemplateError(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{CreateIndexTemplates: true} - f.archiveConfig = &escfg.Configuration{} + f := NewFactory(false) + f.config = &escfg.Configuration{CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) @@ -212,9 +194,8 @@ func TestCreateTemplateError(t *testing.T) { } func TestILMDisableTemplateCreation(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} - f.archiveConfig = &escfg.Configuration{} + f := NewFactory(false) + f.config = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) defer f.Close() @@ -223,43 +204,13 @@ func TestILMDisableTemplateCreation(t *testing.T) { require.NoError(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error } -func TestArchiveDisabled(t *testing.T) { - f := NewFactory() - f.archiveConfig = &escfg.Configuration{Enabled: false} - f.newClientFn = (&mockClientBuilder{}).NewClient - w, err := f.CreateArchiveSpanWriter() - assert.Nil(t, w) - require.NoError(t, err) - r, err := f.CreateArchiveSpanReader() - assert.Nil(t, r) - require.NoError(t, err) -} - -func TestArchiveEnabled(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true} - f.newClientFn = (&mockClientBuilder{}).NewClient - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - defer f.Close() // Ensure resources are cleaned up if initialization is successful - w, err := f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.NotNil(t, w) - r, err := f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.NotNil(t, r) -} - func TestConfigureFromOptions(t *testing.T) { - f := NewFactory() + f := NewFactory(false) o := &Options{ Primary: namespaceConfig{Configuration: escfg.Configuration{Servers: []string{"server"}}}, - others: map[string]*namespaceConfig{"es-archive": {Configuration: escfg.Configuration{Servers: []string{"server2"}}}}, } f.configureFromOptions(o) - assert.Equal(t, o.GetPrimary(), f.primaryConfig) - assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) + assert.Equal(t, o.GetPrimary(), f.config) } func TestESStorageFactoryWithConfig(t *testing.T) { @@ -317,19 +268,14 @@ func TestESStorageFactoryWithConfigError(t *testing.T) { LogLevel: "error", } _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) - require.ErrorContains(t, err, "failed to create primary Elasticsearch client") + require.ErrorContains(t, err, "failed to create Elasticsearch client") } func TestPasswordFromFile(t *testing.T) { defer testutils.VerifyGoLeaksOnce(t) t.Run("primary client", func(t *testing.T) { - f := NewFactory() - testPasswordFromFile(t, f, f.getPrimaryClient, f.CreateSpanWriter) - }) - - t.Run("archive client", func(t *testing.T) { - f2 := NewFactory() - testPasswordFromFile(t, f2, f2.getArchiveClient, f2.CreateArchiveSpanWriter) + f := NewFactory(false) + testPasswordFromFile(t, f, f.getClient, f.CreateSpanWriter) }) t.Run("load token error", func(t *testing.T) { @@ -369,21 +315,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte(pwd1), 0o600)) - f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - Authentication: escfg.Authentication{ - BasicAuthentication: escfg.BasicAuthentication{ - Username: "user", - PasswordFilePath: pwdFile, - }, - }, - BulkProcessing: escfg.BulkProcessing{ - MaxBytes: -1, // disable bulk; we want immediate flush - }, - } - f.archiveConfig = &escfg.Configuration{ - Enabled: true, + f.config = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", Authentication: escfg.Authentication{ @@ -442,8 +374,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, func TestFactoryESClientsAreNil(t *testing.T) { f := &Factory{} - assert.Nil(t, f.getPrimaryClient()) - assert.Nil(t, f.getArchiveClient()) + assert.Nil(t, f.getClient()) } func TestPasswordFromFileErrors(t *testing.T) { @@ -456,17 +387,8 @@ func TestPasswordFromFileErrors(t *testing.T) { pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - Authentication: escfg.Authentication{ - BasicAuthentication: escfg.BasicAuthentication{ - PasswordFilePath: pwdFile, - }, - }, - } - f.archiveConfig = &escfg.Configuration{ + f := NewFactory(false) + f.config = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", Authentication: escfg.Authentication{ @@ -480,16 +402,10 @@ func TestPasswordFromFileErrors(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, logger)) defer f.Close() - f.primaryConfig.Servers = []string{} - f.onPrimaryPasswordChange() - assert.Contains(t, buf.String(), "no servers specified") - - f.archiveConfig.Servers = []string{} - buf.Reset() - f.onArchivePasswordChange() + f.config.Servers = []string{} + f.onPasswordChange() assert.Contains(t, buf.String(), "no servers specified") require.NoError(t, os.Remove(pwdFile)) - f.onPrimaryPasswordChange() - f.onArchivePasswordChange() + f.onPasswordChange() } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 1c4157f58eb..d9c83fd5232 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -85,8 +85,6 @@ var defaultIndexOptions = config.IndexOptions{ // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { Primary namespaceConfig `mapstructure:",squash"` - - others map[string]*namespaceConfig } type namespaceConfig struct { @@ -95,24 +93,14 @@ type namespaceConfig struct { } // NewOptions creates a new Options struct. -func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(namespace string) *Options { // TODO all default values should be defined via cobra flags defaultConfig := DefaultConfig() options := &Options{ Primary: namespaceConfig{ - Configuration: defaultConfig, - namespace: primaryNamespace, - }, - others: make(map[string]*namespaceConfig, len(otherNamespaces)), - } - - // Other namespaces need to be explicitly enabled. - defaultConfig.Enabled = false - for _, namespace := range otherNamespaces { - options.others[namespace] = &namespaceConfig{ Configuration: defaultConfig, namespace: namespace, - } + }, } return options @@ -127,9 +115,6 @@ func (cfg *namespaceConfig) getTLSFlagsConfig() tlscfg.ClientFlagsConfig { // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { addFlags(flagSet, &opt.Primary) - for _, cfg := range opt.others { - addFlags(flagSet, cfg) - } } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { @@ -305,9 +290,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { initFromViper(&opt.Primary, v) - for _, cfg := range opt.others { - initFromViper(cfg, v) - } } func initFromViper(cfg *namespaceConfig, v *viper.Viper) { @@ -392,20 +374,6 @@ func (opt *Options) GetPrimary() *config.Configuration { return &opt.Primary.Configuration } -// Get returns auxiliary named configuration. -func (opt *Options) Get(namespace string) *config.Configuration { - nsCfg, ok := opt.others[namespace] - if !ok { - nsCfg = &namespaceConfig{} - opt.others[namespace] = nsCfg - } - nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Configuration.Servers) == 0 { - nsCfg.Servers = opt.Primary.Servers - } - return &nsCfg.Configuration -} - // stripWhiteSpace removes all whitespace characters from a string func stripWhiteSpace(str string) string { return strings.ReplaceAll(str, " ", "") diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 33d933b5830..ec527850272 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -34,16 +34,10 @@ func TestOptions(t *testing.T) { assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) assert.False(t, primary.Sniffing.Enabled) assert.False(t, primary.Sniffing.UseHTTPS) - - aux := opts.Get("archive") - assert.Equal(t, primary.Authentication.BasicAuthentication.Username, aux.Authentication.BasicAuthentication.Username) - assert.Equal(t, primary.Authentication.BasicAuthentication.Password, aux.Authentication.BasicAuthentication.Password) - assert.Equal(t, primary.Authentication.BasicAuthentication.PasswordFilePath, aux.Authentication.BasicAuthentication.PasswordFilePath) - assert.Equal(t, primary.Servers, aux.Servers) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) err := command.ParseFlags([]string{ "--es.server-urls=1.1.1.1, 2.2.2.2", @@ -61,11 +55,6 @@ func TestOptionsWithFlags(t *testing.T) { "--es.index-rollover-frequency-services=day", // a couple overrides "--es.remote-read-clusters=cluster_one,cluster_two", - "--es.aux.server-urls=3.3.3.3, 4.4.4.4", - "--es.aux.max-span-age=24h", - "--es.aux.num-replicas=10", - "--es.aux.index-date-separator=.", - "--es.aux.index-rollover-frequency-spans=hour", "--es.tls.enabled=true", "--es.tls.skip-host-verify=true", "--es.tags-as-fields.all=true", @@ -96,32 +85,11 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "test,tags", primary.Tags.Include) assert.Equal(t, "20060102", primary.Indices.Services.DateLayout) assert.Equal(t, "2006010215", primary.Indices.Spans.DateLayout) - aux := opts.Get("es.aux") - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) - assert.Equal(t, "hello", aux.Authentication.BasicAuthentication.Username) - assert.Equal(t, "world", aux.Authentication.BasicAuthentication.Password) - assert.EqualValues(t, 5, aux.Indices.Spans.Shards) - assert.EqualValues(t, 5, aux.Indices.Services.Shards) - assert.EqualValues(t, 5, aux.Indices.Sampling.Shards) - assert.EqualValues(t, 5, aux.Indices.Dependencies.Shards) - assert.EqualValues(t, 10, aux.Indices.Spans.Replicas) - assert.EqualValues(t, 10, aux.Indices.Services.Replicas) - assert.EqualValues(t, 10, aux.Indices.Sampling.Replicas) - assert.EqualValues(t, 10, aux.Indices.Dependencies.Replicas) - assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) - assert.True(t, aux.Sniffing.Enabled) - assert.True(t, aux.Tags.AllAsFields) - assert.Equal(t, "@", aux.Tags.DotReplacement) - assert.Equal(t, "./file.txt", aux.Tags.File) - assert.Equal(t, "test,tags", aux.Tags.Include) - assert.Equal(t, "2006.01.02", aux.Indices.Services.DateLayout) - assert.Equal(t, "2006.01.02.15", aux.Indices.Spans.DateLayout) assert.True(t, primary.UseILM) - assert.Equal(t, "POST", aux.SendGetBodyAs) } func TestEmptyRemoteReadClusters(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) err := command.ParseFlags([]string{ "--es.remote-read-clusters=", @@ -134,7 +102,7 @@ func TestEmptyRemoteReadClusters(t *testing.T) { } func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) { - opts := NewOptions("es", archiveNamespace) + opts := NewOptions("es") _, command := config.Viperize(opts.AddFlags) flags := []string{"--es-archive.max-span-age=24h"} err := command.ParseFlags(flags) @@ -152,7 +120,7 @@ func TestMaxDocCount(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) command.ParseFlags(tc.flags) opts.InitFromViper(v) diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index e088bef593f..4bb11fe0a56 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -116,9 +116,9 @@ func NewFactory(config FactoryConfig) (*Factory, error) { func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { switch factoryType { case cassandraStorageType: - return cassandra.NewFactory(), nil + return cassandra.NewFactory(false), nil case elasticsearchStorageType, opensearchStorageType: - return es.NewFactory(), nil + return es.NewFactory(false), nil case memoryStorageType: return memory.NewFactory(), nil case kafkaStorageType: diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index c8f88d01452..5c6fae7be26 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -21,7 +21,8 @@ import ( type CassandraStorageIntegration struct { StorageIntegration - factory *cassandra.Factory + factory *cassandra.Factory + archiveFactory *cassandra.Factory } func newCassandraStorageIntegration() *CassandraStorageIntegration { @@ -38,11 +39,16 @@ func newCassandraStorageIntegration() *CassandraStorageIntegration { func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { require.NoError(t, s.factory.Purge(context.Background())) + require.NoError(t, s.archiveFactory.Purge(context.Background())) } -func (*CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory { +func (*CassandraStorageIntegration) initializeCassandraFactory( + t *testing.T, + flags []string, + isArchive bool, +) *cassandra.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := cassandra.NewFactory() + f := cassandra.NewFactory(isArchive) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags(flags)) f.InitFromViper(v, logger) @@ -58,22 +64,25 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { "--cassandra.password=" + password, "--cassandra.username=" + username, "--cassandra.keyspace=jaeger_v1_dc1", + }, false) + af := s.initializeCassandraFactory(t, []string{ "--cassandra-archive.keyspace=jaeger_v1_dc1_archive", "--cassandra-archive.enabled=true", "--cassandra-archive.servers=127.0.0.1", "--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator", "--cassandra-archive.password=" + password, "--cassandra-archive.username=" + username, - }) + }, true) s.factory = f + s.archiveFactory = af var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() + s.ArchiveSpanReader, err = af.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + s.ArchiveSpanWriter, err = af.CreateSpanWriter() require.NoError(t, err) s.SamplingStore, err = f.CreateSamplingStore(0) require.NoError(t, err) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 5aae3614262..3c012c0877b 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -50,7 +50,8 @@ type ESStorageIntegration struct { client *elastic.Client v8Client *elasticsearch8.Client - factory *es.Factory + factory *es.Factory + archiveFactory *es.Factory } func (s *ESStorageIntegration) getVersion() (uint, error) { @@ -94,24 +95,31 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) func (s *ESStorageIntegration) esCleanUp(t *testing.T) { require.NoError(t, s.factory.Purge(context.Background())) + require.NoError(t, s.archiveFactory.Purge(context.Background())) } -func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { +func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool, isArchive bool) *es.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := es.NewFactory() + f := es.NewFactory(isArchive) v, command := config.Viperize(f.AddFlags) - args := []string{ - fmt.Sprintf("--es.num-shards=%v", 5), - fmt.Sprintf("--es.num-replicas=%v", 1), - fmt.Sprintf("--es.index-prefix=%v", indexPrefix), - fmt.Sprintf("--es.use-ilm=%v", false), - fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second), - fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), - fmt.Sprintf("--es.bulk.actions=%v", 1), - fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), - "--es-archive.enabled=true", - fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), - fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), + var args []string + if isArchive { + args = []string{ + "--es-archive.enabled=true", + fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), + } + } else { + args = []string{ + fmt.Sprintf("--es.num-shards=%v", 5), + fmt.Sprintf("--es.num-replicas=%v", 1), + fmt.Sprintf("--es.index-prefix=%v", indexPrefix), + fmt.Sprintf("--es.use-ilm=%v", false), + fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second), + fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es.bulk.actions=%v", 1), + fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), + } } require.NoError(t, command.ParseFlags(args)) f.InitFromViper(v, logger) @@ -124,16 +132,18 @@ func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields b } func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) { - f := s.initializeESFactory(t, allTagsAsFields) + f := s.initializeESFactory(t, allTagsAsFields, false) s.factory = f var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() + af := s.initializeESFactory(t, allTagsAsFields, true) + s.archiveFactory = af + s.ArchiveSpanReader, err = af.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + s.ArchiveSpanWriter, err = af.CreateSpanWriter() require.NoError(t, err) s.DependencyReader, err = f.CreateDependencyReader() diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index 59214cd5bfa..8e9b90f6bc7 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -23,7 +23,6 @@ import ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) _ storage.Purger = (*Factory)(nil) @@ -89,16 +88,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { return f.store, nil } -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - return f.store, nil -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil