From 92cc3f4d2aa72b0f559c1fb814bd12e8a7b1965e Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 13:21:21 -0500 Subject: [PATCH 01/20] Add IsArchive Flag To ESConfig Signed-off-by: Mahad Zaryab --- pkg/es/config/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 872a1b5834a..acafb07752d 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -136,6 +136,8 @@ type Configuration struct { Tags TagsAsFields `mapstructure:"tags_as_fields"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` + // TODO: revisit if this needed + IsArchive bool } // TagsAsFields holds configuration for tag schema. From 8a548a1c85d53b968f77b349637f4c67bd1ae130 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 13:22:28 -0500 Subject: [PATCH 02/20] Change Options To Only Hold One Config Signed-off-by: Mahad Zaryab --- plugin/storage/es/options.go | 38 +++--------------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 1c4157f58eb..39362082ff6 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -85,8 +85,6 @@ var defaultIndexOptions = config.IndexOptions{ // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { Primary namespaceConfig `mapstructure:",squash"` - - others map[string]*namespaceConfig } type namespaceConfig struct { @@ -95,24 +93,14 @@ type namespaceConfig struct { } // NewOptions creates a new Options struct. -func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(namespace string) *Options { // TODO all default values should be defined via cobra flags defaultConfig := DefaultConfig() options := &Options{ Primary: namespaceConfig{ - Configuration: defaultConfig, - namespace: primaryNamespace, - }, - others: make(map[string]*namespaceConfig, len(otherNamespaces)), - } - - // Other namespaces need to be explicitly enabled. - defaultConfig.Enabled = false - for _, namespace := range otherNamespaces { - options.others[namespace] = &namespaceConfig{ Configuration: defaultConfig, namespace: namespace, - } + }, } return options @@ -127,9 +115,6 @@ func (cfg *namespaceConfig) getTLSFlagsConfig() tlscfg.ClientFlagsConfig { // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { addFlags(flagSet, &opt.Primary) - for _, cfg := range opt.others { - addFlags(flagSet, cfg) - } } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { @@ -286,7 +271,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixAdaptiveSamplingLookback, nsConfig.AdaptiveSamplingLookback, "How far back to look for the latest adaptive sampling probabilities") - if nsConfig.namespace == archiveNamespace { + if nsConfig.namespace == ArchiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, nsConfig.Enabled, @@ -305,9 +290,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { initFromViper(&opt.Primary, v) - for _, cfg := range opt.others { - initFromViper(cfg, v) - } } func initFromViper(cfg *namespaceConfig, v *viper.Viper) { @@ -392,20 +374,6 @@ func (opt *Options) GetPrimary() *config.Configuration { return &opt.Primary.Configuration } -// Get returns auxiliary named configuration. -func (opt *Options) Get(namespace string) *config.Configuration { - nsCfg, ok := opt.others[namespace] - if !ok { - nsCfg = &namespaceConfig{} - opt.others[namespace] = nsCfg - } - nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Configuration.Servers) == 0 { - nsCfg.Servers = opt.Primary.Servers - } - return &nsCfg.Configuration -} - // stripWhiteSpace removes all whitespace characters from a string func stripWhiteSpace(str string) string { return strings.ReplaceAll(str, " ", "") From dd04a13eb7d5af116b8f8b6155d1fbcf7d3bea3b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 13:23:45 -0500 Subject: [PATCH 03/20] Change ES Factory To Remove Archive Implementation Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 119 +++++++++-------------------------- 1 file changed, 31 insertions(+), 88 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index a204ccd6bc4..8b843fc1dee 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -36,16 +36,15 @@ import ( ) const ( - primaryNamespace = "es" - archiveNamespace = "es-archive" + PrimaryNamespace = "es" + ArchiveNamespace = "es-archive" ) var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ plugin.Configurable = (*Factory)(nil) - _ storage.Purger = (*Factory)(nil) + _ storage.Factory = (*Factory)(nil) + _ io.Closer = (*Factory)(nil) + _ plugin.Configurable = (*Factory)(nil) + _ storage.Purger = (*Factory)(nil) ) // Factory implements storage.Factory for Elasticsearch backend. @@ -58,19 +57,17 @@ type Factory struct { newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) - primaryConfig *config.Configuration - archiveConfig *config.Configuration + config *config.Configuration - primaryClient atomic.Pointer[es.Client] - archiveClient atomic.Pointer[es.Client] + client atomic.Pointer[es.Client] watchers []*fswatcher.FSWatcher } // NewFactory creates a new Factory. -func NewFactory() *Factory { +func NewFactory(namespace string) *Factory { return &Factory{ - Options: NewOptions(primaryNamespace, archiveNamespace), + Options: NewOptions(namespace), newClientFn: config.NewClient, tracer: otel.GetTracerProvider(), } @@ -78,6 +75,7 @@ func NewFactory() *Factory { func NewFactoryWithConfig( cfg config.Configuration, + namespace string, metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { @@ -88,19 +86,12 @@ func NewFactoryWithConfig( defaultConfig := DefaultConfig() cfg.ApplyDefaults(&defaultConfig) - archive := make(map[string]*namespaceConfig) - archive[archiveNamespace] = &namespaceConfig{ - Configuration: cfg, - namespace: archiveNamespace, - } - - f := NewFactory() + f := NewFactory(namespace) f.configureFromOptions(&Options{ Primary: namespaceConfig{ Configuration: cfg, - namespace: primaryNamespace, + namespace: namespace, }, - others: archive, }) err := f.Initialize(metricsFactory, logger) if err != nil { @@ -123,56 +114,32 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { // configureFromOptions configures factory from Options struct. func (f *Factory) configureFromOptions(o *Options) { f.Options = o - f.primaryConfig = f.Options.GetPrimary() - f.archiveConfig = f.Options.Get(archiveNamespace) + f.config = f.Options.GetPrimary() } // Initialize implements storage.Factory. func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) + primaryClient, err := f.newClientFn(f.config, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) } - f.primaryClient.Store(&primaryClient) + f.client.Store(&primaryClient) - if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { - primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" { + primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) } f.watchers = append(f.watchers, primaryWatcher) } - if f.archiveConfig.Enabled { - archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) - if err != nil { - return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) - } - f.archiveClient.Store(&archiveClient) - - if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { - archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger) - if err != nil { - return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) - } - f.watchers = append(f.watchers, archiveWatcher) - } - } - return nil } func (f *Factory) getPrimaryClient() es.Client { - if c := f.primaryClient.Load(); c != nil { - return *c - } - return nil -} - -func (f *Factory) getArchiveClient() es.Client { - if c := f.archiveClient.Load(); c != nil { + if c := f.client.Load(); c != nil { return *c } return nil @@ -180,39 +147,22 @@ func (f *Factory) getArchiveClient() es.Client { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) + return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) + return createSpanWriter(f.getPrimaryClient, f.config, false, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger) -} - -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if !f.archiveConfig.Enabled { - return nil, nil - } - return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if !f.archiveConfig.Enabled { - return nil, nil - } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) + return createDependencyReader(f.getPrimaryClient, f.config, f.logger) } func createSpanReader( clientFn func() es.Client, cfg *config.Configuration, - archive bool, mFactory metrics.Factory, logger *zap.Logger, tp trace.TracerProvider, @@ -229,7 +179,7 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: cfg.UseReadWriteAliases, - Archive: archive, + Archive: cfg.IsArchive, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, MetricsFactory: mFactory, @@ -287,16 +237,16 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store params := esSampleStore.Params{ Client: f.getPrimaryClient, Logger: f.logger, - IndexPrefix: f.primaryConfig.Indices.IndexPrefix, - IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, - IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), - Lookback: f.primaryConfig.AdaptiveSamplingLookback, - MaxDocCount: f.primaryConfig.MaxDocCount, + IndexPrefix: f.config.Indices.IndexPrefix, + IndexDateLayout: f.config.Indices.Sampling.DateLayout, + IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency), + Lookback: f.config.AdaptiveSamplingLookback, + MaxDocCount: f.config.MaxDocCount, } store := esSampleStore.NewSamplingStore(params) - if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM { - mappingBuilder := mappingBuilderFromConfig(f.primaryConfig) + if f.config.CreateIndexTemplates && !f.config.UseILM { + mappingBuilder := mappingBuilderFromConfig(f.config) samplingMapping, err := mappingBuilder.GetSamplingMappings() if err != nil { return nil, err @@ -344,19 +294,12 @@ func (f *Factory) Close() error { errs = append(errs, w.Close()) } errs = append(errs, f.getPrimaryClient().Close()) - if client := f.getArchiveClient(); client != nil { - errs = append(errs, client.Close()) - } return errors.Join(errs...) } func (f *Factory) onPrimaryPasswordChange() { - f.onClientPasswordChange(f.primaryConfig, &f.primaryClient) -} - -func (f *Factory) onArchivePasswordChange() { - f.onClientPasswordChange(f.archiveConfig, &f.archiveClient) + f.onClientPasswordChange(f.config, &f.client) } func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) { From 209b8348a1ac148bdbb49273c6c5d25065ab0173 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 13:25:05 -0500 Subject: [PATCH 04/20] Change Usages of Factory Signed-off-by: Mahad Zaryab --- cmd/jaeger/internal/extension/jaegerstorage/extension.go | 4 ++-- plugin/storage/factory.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index d30dd21c8c5..00e284859a0 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -130,9 +130,9 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error { case cfg.Cassandra != nil: factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger) case cfg.Elasticsearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, es.PrimaryNamespace, mf, s.telset.Logger) case cfg.Opensearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, es.PrimaryNamespace, mf, s.telset.Logger) } if err != nil { return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err) diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index e088bef593f..c1618de36a3 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -118,7 +118,7 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { case cassandraStorageType: return cassandra.NewFactory(), nil case elasticsearchStorageType, opensearchStorageType: - return es.NewFactory(), nil + return es.NewFactory(es.PrimaryNamespace), nil case memoryStorageType: return memory.NewFactory(), nil case kafkaStorageType: From 89c7c7077b02bfd4ae7864d7d2f728f9f8cda4c8 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 13:59:50 -0500 Subject: [PATCH 05/20] Remove Namespace Parameter For NewFactoryWithConfig Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 8b843fc1dee..0ab3a68c02d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -75,7 +75,6 @@ func NewFactory(namespace string) *Factory { func NewFactoryWithConfig( cfg config.Configuration, - namespace string, metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { @@ -86,13 +85,11 @@ func NewFactoryWithConfig( defaultConfig := DefaultConfig() cfg.ApplyDefaults(&defaultConfig) - f := NewFactory(namespace) - f.configureFromOptions(&Options{ - Primary: namespaceConfig{ - Configuration: cfg, - namespace: namespace, - }, - }) + f := &Factory{ + config: &cfg, + newClientFn: config.NewClient, + tracer: otel.GetTracerProvider(), + } err := f.Initialize(metricsFactory, logger) if err != nil { return nil, err From c6b6274f69a7400f959c9a9a4d2860617a6e0824 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:07:54 -0500 Subject: [PATCH 06/20] Fix Build Errors And Naming Signed-off-by: Mahad Zaryab --- .../extension/jaegerstorage/extension.go | 4 +- cmd/query/app/token_propagation_test.go | 2 +- plugin/storage/es/factory.go | 30 ++--- plugin/storage/es/factory_test.go | 124 +++--------------- plugin/storage/es/options_test.go | 40 +----- .../storage/integration/elasticsearch_test.go | 6 +- 6 files changed, 43 insertions(+), 163 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 00e284859a0..d30dd21c8c5 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -130,9 +130,9 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error { case cfg.Cassandra != nil: factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger) case cfg.Elasticsearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, es.PrimaryNamespace, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger) case cfg.Opensearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, es.PrimaryNamespace, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger) } if err != nil { return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 81ec248e4c6..4ab90d32cd8 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -68,7 +68,7 @@ func runQueryService(t *testing.T, esURL string) *Server { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zaptest.NewLogger(t) - f := es.NewFactory() + f := es.NewFactory(es.PrimaryNamespace) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags([]string{ "--es.tls.enabled=false", diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 0ab3a68c02d..91c0ad07b77 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -118,24 +118,24 @@ func (f *Factory) configureFromOptions(o *Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.newClientFn(f.config, logger, metricsFactory) + client, err := f.newClientFn(f.config, logger, metricsFactory) if err != nil { - return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) + return fmt.Errorf("failed to create Elasticsearch client: %w", err) } - f.client.Store(&primaryClient) + f.client.Store(&client) if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" { - primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + watcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPasswordChange, f.logger) if err != nil { - return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) + return fmt.Errorf("failed to create watcher for ES client's password: %w", err) } - f.watchers = append(f.watchers, primaryWatcher) + f.watchers = append(f.watchers, watcher) } return nil } -func (f *Factory) getPrimaryClient() es.Client { +func (f *Factory) getClient() es.Client { if c := f.client.Load(); c != nil { return *c } @@ -144,17 +144,17 @@ func (f *Factory) getPrimaryClient() es.Client { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer) + return createSpanReader(f.getClient, f.config, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getPrimaryClient, f.config, false, f.metricsFactory, f.logger) + return createSpanWriter(f.getClient, f.config, false, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.getPrimaryClient, f.config, f.logger) + return createDependencyReader(f.getClient, f.config, f.logger) } func createSpanReader( @@ -232,7 +232,7 @@ func createSpanWriter( func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { params := esSampleStore.Params{ - Client: f.getPrimaryClient, + Client: f.getClient, Logger: f.logger, IndexPrefix: f.config.Indices.IndexPrefix, IndexDateLayout: f.config.Indices.Sampling.DateLayout, @@ -248,7 +248,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store if err != nil { return nil, err } - if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil { + if _, err := f.getClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil { return nil, fmt.Errorf("failed to create template: %w", err) } } @@ -290,12 +290,12 @@ func (f *Factory) Close() error { for _, w := range f.watchers { errs = append(errs, w.Close()) } - errs = append(errs, f.getPrimaryClient().Close()) + errs = append(errs, f.getClient().Close()) return errors.Join(errs...) } -func (f *Factory) onPrimaryPasswordChange() { +func (f *Factory) onPasswordChange() { f.onClientPasswordChange(f.config, &f.client) } @@ -323,7 +323,7 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom } func (f *Factory) Purge(ctx context.Context) error { - esClient := f.getPrimaryClient() + esClient := f.getClient() _, err := esClient.DeleteIndex("*").Do(ctx) return err } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index e62c8d39157..62c2a41ef85 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -61,7 +61,7 @@ func (m *mockClientBuilder) NewClient(*escfg.Configuration, *zap.Logger, metrics } func TestElasticsearchFactory(t *testing.T) { - f := NewFactory() + f := NewFactory(PrimaryNamespace) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{}) f.InitFromViper(v, zap.NewNop()) @@ -69,15 +69,6 @@ func TestElasticsearchFactory(t *testing.T) { f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error")}).NewClient require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error") - f.archiveConfig.Enabled = true - f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { - // to test archive storage error, pretend that primary client creation is successful - // but override newClientFn so it fails for the next invocation - f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error2")}).NewClient - return (&mockClientBuilder{}).NewClient(c, logger, metricsFactory) - } - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2") - f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -90,12 +81,6 @@ func TestElasticsearchFactory(t *testing.T) { _, err = f.CreateDependencyReader() require.NoError(t, err) - _, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - - _, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - _, err = f.CreateSamplingStore(1) require.NoError(t, err) @@ -103,13 +88,12 @@ func TestElasticsearchFactory(t *testing.T) { } func TestElasticsearchTagsFileDoNotExist(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ + f := NewFactory(PrimaryNamespace) + f.config = &escfg.Configuration{ Tags: escfg.TagsAsFields{ File: "fixtures/file-does-not-exist.txt", }, } - f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) defer f.Close() @@ -119,11 +103,10 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { } func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ + f := NewFactory(ArchiveNamespace) + f.config = &escfg.Configuration{ UseILM: true, } - f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) defer f.Close() @@ -194,9 +177,8 @@ func TestTagKeysAsFields(t *testing.T) { } func TestCreateTemplateError(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{CreateIndexTemplates: true} - f.archiveConfig = &escfg.Configuration{} + f := NewFactory(PrimaryNamespace) + f.config = &escfg.Configuration{CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) @@ -212,9 +194,8 @@ func TestCreateTemplateError(t *testing.T) { } func TestILMDisableTemplateCreation(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} - f.archiveConfig = &escfg.Configuration{} + f := NewFactory(PrimaryNamespace) + f.config = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) defer f.Close() @@ -223,43 +204,13 @@ func TestILMDisableTemplateCreation(t *testing.T) { require.NoError(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error } -func TestArchiveDisabled(t *testing.T) { - f := NewFactory() - f.archiveConfig = &escfg.Configuration{Enabled: false} - f.newClientFn = (&mockClientBuilder{}).NewClient - w, err := f.CreateArchiveSpanWriter() - assert.Nil(t, w) - require.NoError(t, err) - r, err := f.CreateArchiveSpanReader() - assert.Nil(t, r) - require.NoError(t, err) -} - -func TestArchiveEnabled(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true} - f.newClientFn = (&mockClientBuilder{}).NewClient - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - defer f.Close() // Ensure resources are cleaned up if initialization is successful - w, err := f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.NotNil(t, w) - r, err := f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.NotNil(t, r) -} - func TestConfigureFromOptions(t *testing.T) { - f := NewFactory() + f := NewFactory(ArchiveNamespace) o := &Options{ Primary: namespaceConfig{Configuration: escfg.Configuration{Servers: []string{"server"}}}, - others: map[string]*namespaceConfig{"es-archive": {Configuration: escfg.Configuration{Servers: []string{"server2"}}}}, } f.configureFromOptions(o) - assert.Equal(t, o.GetPrimary(), f.primaryConfig) - assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) + assert.Equal(t, o.GetPrimary(), f.config) } func TestESStorageFactoryWithConfig(t *testing.T) { @@ -324,13 +275,8 @@ func TestESStorageFactoryWithConfigError(t *testing.T) { func TestPasswordFromFile(t *testing.T) { defer testutils.VerifyGoLeaksOnce(t) t.Run("primary client", func(t *testing.T) { - f := NewFactory() - testPasswordFromFile(t, f, f.getPrimaryClient, f.CreateSpanWriter) - }) - - t.Run("archive client", func(t *testing.T) { - f2 := NewFactory() - testPasswordFromFile(t, f2, f2.getArchiveClient, f2.CreateArchiveSpanWriter) + f := NewFactory(PrimaryNamespace) + testPasswordFromFile(t, f, f.getClient, f.CreateSpanWriter) }) t.Run("load token error", func(t *testing.T) { @@ -370,21 +316,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte(pwd1), 0o600)) - f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - Authentication: escfg.Authentication{ - BasicAuthentication: escfg.BasicAuthentication{ - Username: "user", - PasswordFilePath: pwdFile, - }, - }, - BulkProcessing: escfg.BulkProcessing{ - MaxBytes: -1, // disable bulk; we want immediate flush - }, - } - f.archiveConfig = &escfg.Configuration{ - Enabled: true, + f.config = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", Authentication: escfg.Authentication{ @@ -443,8 +375,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, func TestFactoryESClientsAreNil(t *testing.T) { f := &Factory{} - assert.Nil(t, f.getPrimaryClient()) - assert.Nil(t, f.getArchiveClient()) + assert.Nil(t, f.getClient()) } func TestPasswordFromFileErrors(t *testing.T) { @@ -457,17 +388,8 @@ func TestPasswordFromFileErrors(t *testing.T) { pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) - f := NewFactory() - f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - Authentication: escfg.Authentication{ - BasicAuthentication: escfg.BasicAuthentication{ - PasswordFilePath: pwdFile, - }, - }, - } - f.archiveConfig = &escfg.Configuration{ + f := NewFactory(PrimaryNamespace) + f.config = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", Authentication: escfg.Authentication{ @@ -481,16 +403,10 @@ func TestPasswordFromFileErrors(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, logger)) defer f.Close() - f.primaryConfig.Servers = []string{} - f.onPrimaryPasswordChange() - assert.Contains(t, buf.String(), "no servers specified") - - f.archiveConfig.Servers = []string{} - buf.Reset() - f.onArchivePasswordChange() + f.config.Servers = []string{} + f.onPasswordChange() assert.Contains(t, buf.String(), "no servers specified") require.NoError(t, os.Remove(pwdFile)) - f.onPrimaryPasswordChange() - f.onArchivePasswordChange() + f.onPasswordChange() } diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 33d933b5830..ec527850272 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -34,16 +34,10 @@ func TestOptions(t *testing.T) { assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) assert.False(t, primary.Sniffing.Enabled) assert.False(t, primary.Sniffing.UseHTTPS) - - aux := opts.Get("archive") - assert.Equal(t, primary.Authentication.BasicAuthentication.Username, aux.Authentication.BasicAuthentication.Username) - assert.Equal(t, primary.Authentication.BasicAuthentication.Password, aux.Authentication.BasicAuthentication.Password) - assert.Equal(t, primary.Authentication.BasicAuthentication.PasswordFilePath, aux.Authentication.BasicAuthentication.PasswordFilePath) - assert.Equal(t, primary.Servers, aux.Servers) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) err := command.ParseFlags([]string{ "--es.server-urls=1.1.1.1, 2.2.2.2", @@ -61,11 +55,6 @@ func TestOptionsWithFlags(t *testing.T) { "--es.index-rollover-frequency-services=day", // a couple overrides "--es.remote-read-clusters=cluster_one,cluster_two", - "--es.aux.server-urls=3.3.3.3, 4.4.4.4", - "--es.aux.max-span-age=24h", - "--es.aux.num-replicas=10", - "--es.aux.index-date-separator=.", - "--es.aux.index-rollover-frequency-spans=hour", "--es.tls.enabled=true", "--es.tls.skip-host-verify=true", "--es.tags-as-fields.all=true", @@ -96,32 +85,11 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "test,tags", primary.Tags.Include) assert.Equal(t, "20060102", primary.Indices.Services.DateLayout) assert.Equal(t, "2006010215", primary.Indices.Spans.DateLayout) - aux := opts.Get("es.aux") - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) - assert.Equal(t, "hello", aux.Authentication.BasicAuthentication.Username) - assert.Equal(t, "world", aux.Authentication.BasicAuthentication.Password) - assert.EqualValues(t, 5, aux.Indices.Spans.Shards) - assert.EqualValues(t, 5, aux.Indices.Services.Shards) - assert.EqualValues(t, 5, aux.Indices.Sampling.Shards) - assert.EqualValues(t, 5, aux.Indices.Dependencies.Shards) - assert.EqualValues(t, 10, aux.Indices.Spans.Replicas) - assert.EqualValues(t, 10, aux.Indices.Services.Replicas) - assert.EqualValues(t, 10, aux.Indices.Sampling.Replicas) - assert.EqualValues(t, 10, aux.Indices.Dependencies.Replicas) - assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) - assert.True(t, aux.Sniffing.Enabled) - assert.True(t, aux.Tags.AllAsFields) - assert.Equal(t, "@", aux.Tags.DotReplacement) - assert.Equal(t, "./file.txt", aux.Tags.File) - assert.Equal(t, "test,tags", aux.Tags.Include) - assert.Equal(t, "2006.01.02", aux.Indices.Services.DateLayout) - assert.Equal(t, "2006.01.02.15", aux.Indices.Spans.DateLayout) assert.True(t, primary.UseILM) - assert.Equal(t, "POST", aux.SendGetBodyAs) } func TestEmptyRemoteReadClusters(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) err := command.ParseFlags([]string{ "--es.remote-read-clusters=", @@ -134,7 +102,7 @@ func TestEmptyRemoteReadClusters(t *testing.T) { } func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) { - opts := NewOptions("es", archiveNamespace) + opts := NewOptions("es") _, command := config.Viperize(opts.AddFlags) flags := []string{"--es-archive.max-span-age=24h"} err := command.ParseFlags(flags) @@ -152,7 +120,7 @@ func TestMaxDocCount(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - opts := NewOptions("es", "es.aux") + opts := NewOptions("es") v, command := config.Viperize(opts.AddFlags) command.ParseFlags(tc.flags) opts.InitFromViper(v) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 57b0d8a1ed8..8cfe053445a 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -98,7 +98,7 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T) { func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := es.NewFactory() + f := es.NewFactory(es.PrimaryNamespace) v, command := config.Viperize(f.AddFlags) args := []string{ fmt.Sprintf("--es.num-shards=%v", 5), @@ -131,10 +131,6 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) s.DependencyReader, err = f.CreateDependencyReader() require.NoError(t, err) From 92e80700e386283ce580a512210aded0e89e8740 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:17:58 -0500 Subject: [PATCH 07/20] Remove Cast To Archive Factory Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service.go | 17 ++--------------- cmd/query/app/querysvc/query_service_test.go | 16 +++------------- 2 files changed, 5 insertions(+), 28 deletions(-) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 135ecc60bbe..54bb09a25f4 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -128,25 +128,12 @@ func (qs QueryService) GetCapabilities() StorageCapabilities { // InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool { - archiveFactory, ok := storageFactory.(storage.ArchiveFactory) - if !ok { - logger.Info("Archive storage not supported by the factory") - return false - } - reader, err := archiveFactory.CreateArchiveSpanReader() - if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) { - logger.Info("Archive storage not created", zap.String("reason", err.Error())) - return false - } + reader, err := storageFactory.CreateSpanReader() if err != nil { logger.Error("Cannot init archive storage reader", zap.Error(err)) return false } - writer, err := archiveFactory.CreateArchiveSpanWriter() - if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) { - logger.Info("Archive storage not created", zap.String("reason", err.Error())) - return false - } + writer, err := storageFactory.CreateSpanWriter() if err != nil { logger.Error("Cannot init archive storage writer", zap.Error(err)) return false diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 582a4568509..8fd11651131 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -315,31 +315,21 @@ func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil } func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil } -func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr } -func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr } +func (f *fakeStorageFactory2) CreateSpanReader() (spanstore.Reader, error) { return f.r, f.rErr } +func (f *fakeStorageFactory2) CreateSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr } var ( - _ storage.Factory = new(fakeStorageFactory1) - _ storage.ArchiveFactory = new(fakeStorageFactory2) + _ storage.Factory = new(fakeStorageFactory1) ) func TestInitArchiveStorageErrors(t *testing.T) { opts := &QueryServiceOptions{} logger := zap.NewNop() - assert.False(t, opts.InitArchiveStorage(new(fakeStorageFactory1), logger)) - assert.False(t, opts.InitArchiveStorage( - &fakeStorageFactory2{rErr: storage.ErrArchiveStorageNotConfigured}, - logger, - )) assert.False(t, opts.InitArchiveStorage( &fakeStorageFactory2{rErr: errors.New("error")}, logger, )) - assert.False(t, opts.InitArchiveStorage( - &fakeStorageFactory2{wErr: storage.ErrArchiveStorageNotConfigured}, - logger, - )) assert.False(t, opts.InitArchiveStorage( &fakeStorageFactory2{wErr: errors.New("error")}, logger, From 535aa2ca2b577a48855c2601c92f575cd6c5cc94 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:18:11 -0500 Subject: [PATCH 08/20] Fix Unit Test Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 62c2a41ef85..429bfa0417e 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -67,7 +67,7 @@ func TestElasticsearchFactory(t *testing.T) { f.InitFromViper(v, zap.NewNop()) f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error")}).NewClient - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error") + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create Elasticsearch client: made-up error") f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -269,7 +269,7 @@ func TestESStorageFactoryWithConfigError(t *testing.T) { } _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.Error(t, err) - require.ErrorContains(t, err, "failed to create primary Elasticsearch client") + require.ErrorContains(t, err, "failed to create Elasticsearch client") } func TestPasswordFromFile(t *testing.T) { From 76a3528f042b2a95a503edb6a78a9158ffd44193 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:22:26 -0500 Subject: [PATCH 09/20] Remove Archive Reader And Writers From Blackhole And Memory Storage Signed-off-by: Mahad Zaryab --- plugin/storage/blackhole/factory.go | 13 +------------ plugin/storage/blackhole/factory_test.go | 6 ------ plugin/storage/memory/factory.go | 11 ----------- 3 files changed, 1 insertion(+), 29 deletions(-) diff --git a/plugin/storage/blackhole/factory.go b/plugin/storage/blackhole/factory.go index ad149593633..521b20e41cf 100644 --- a/plugin/storage/blackhole/factory.go +++ b/plugin/storage/blackhole/factory.go @@ -14,8 +14,7 @@ import ( ) var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) + _ storage.Factory = (*Factory)(nil) ) // Factory implements storage.Factory and creates blackhole storage components. @@ -48,16 +47,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { return f.store, nil } -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - return f.store, nil -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil diff --git a/plugin/storage/blackhole/factory_test.go b/plugin/storage/blackhole/factory_test.go index 0ef1a783176..134e44047c1 100644 --- a/plugin/storage/blackhole/factory_test.go +++ b/plugin/storage/blackhole/factory_test.go @@ -27,12 +27,6 @@ func TestStorageFactory(t *testing.T) { writer, err := f.CreateSpanWriter() require.NoError(t, err) assert.Equal(t, f.store, writer) - reader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.Equal(t, f.store, reader) - writer, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.Equal(t, f.store, writer) depReader, err := f.CreateDependencyReader() require.NoError(t, err) assert.Equal(t, f.store, depReader) diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index 59214cd5bfa..8e9b90f6bc7 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -23,7 +23,6 @@ import ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) _ storage.Purger = (*Factory)(nil) @@ -89,16 +88,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { return f.store, nil } -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - return f.store, nil -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil From 50795b08d86368c2e49a0dea5691134459ad72ce Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:25:55 -0500 Subject: [PATCH 10/20] Fix Lint Checks Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_test.go | 1 + plugin/storage/blackhole/factory.go | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 8fd11651131..86e1d220ca4 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -320,6 +320,7 @@ func (f *fakeStorageFactory2) CreateSpanWriter() (spanstore.Writer, error) { ret var ( _ storage.Factory = new(fakeStorageFactory1) + _ storage.Factory = new(fakeStorageFactory2) ) func TestInitArchiveStorageErrors(t *testing.T) { diff --git a/plugin/storage/blackhole/factory.go b/plugin/storage/blackhole/factory.go index 521b20e41cf..7cbdd4ae2eb 100644 --- a/plugin/storage/blackhole/factory.go +++ b/plugin/storage/blackhole/factory.go @@ -13,9 +13,8 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) -) +// interface comformance checks +var _ storage.Factory = (*Factory)(nil) // Factory implements storage.Factory and creates blackhole storage components. type Factory struct { From a4d41b9b811f074d4a840bb95f2819ab1294162c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 15:30:38 -0500 Subject: [PATCH 11/20] Fix Unit Test Signed-off-by: Mahad Zaryab --- .../internal/extension/jaegerquery/server_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index e17875a707a..7fd650610b3 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -278,18 +278,6 @@ func TestServerAddArchiveStorage(t *testing.T) { expectedOutput: "", expectedErr: "cannot find archive storage factory: cannot find extension", }, - { - name: "Archive storage not supported", - config: &Config{ - Storage: Storage{ - TracesArchive: "badger", - }, - }, - qSvcOpts: &querysvc.QueryServiceOptions{}, - extension: fakeStorageExt{}, - expectedOutput: "Archive storage not supported by the factory", - expectedErr: "", - }, } for _, tt := range tests { From 11a560d9fb66bd2733fa8348ccb131bd401713cb Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 20:03:49 -0500 Subject: [PATCH 12/20] Fix Unit Tests Signed-off-by: Mahad Zaryab --- cmd/query/app/flags_test.go | 20 ++++---------------- cmd/remote-storage/app/server.go | 2 +- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index c8e623ed07c..de6aa083fb7 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -92,24 +92,12 @@ func TestBuildQueryServiceOptions(t *testing.T) { require.NoError(t, err) assert.NotNil(t, qOpts) - qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop()) - assert.NotNil(t, qSvcOpts) - assert.NotNil(t, qSvcOpts.Adjuster) - assert.Nil(t, qSvcOpts.ArchiveSpanReader) - assert.Nil(t, qSvcOpts.ArchiveSpanWriter) - - comboFactory := struct { - *mocks.Factory - *mocks.ArchiveFactory - }{ - &mocks.Factory{}, - &mocks.ArchiveFactory{}, - } + factory := &mocks.Factory{} - comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil) - comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil) + factory.On("CreateSpanReader").Return(&spanstore_mocks.Reader{}, nil) + factory.On("CreateSpanWriter").Return(&spanstore_mocks.Writer{}, nil) - qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop()) + qSvcOpts := qOpts.BuildQueryServiceOptions(factory, zap.NewNop()) assert.NotNil(t, qSvcOpts) assert.NotNil(t, qSvcOpts.Adjuster) assert.NotNil(t, qSvcOpts.ArchiveSpanReader) diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index 51f88a16699..bf73527a617 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -78,7 +78,7 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl // borrow code from Query service for archive storage qOpts := &querysvc.QueryServiceOptions{} // when archive storage not initialized (returns false), the reader/writer will be nil - _ = qOpts.InitArchiveStorage(f, logger) + // _ = qOpts.InitArchiveStorage(f, logger) impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader } impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter } From 8c8e8ae04c0d8fbfeda42a1a4f3e3e46fbc89a9b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 20:14:34 -0500 Subject: [PATCH 13/20] Fix Lint Error And Comment Out Archive Flags Signed-off-by: Mahad Zaryab --- cmd/remote-storage/app/server.go | 3 ++- plugin/storage/integration/elasticsearch_test.go | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index bf73527a617..606255649bd 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -54,7 +54,7 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man }, nil } -func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) { +func createGRPCHandler(f storage.Factory, _ *zap.Logger) (*shared.GRPCHandler, error) { reader, err := f.CreateSpanReader() if err != nil { return nil, err @@ -78,6 +78,7 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl // borrow code from Query service for archive storage qOpts := &querysvc.QueryServiceOptions{} // when archive storage not initialized (returns false), the reader/writer will be nil + // TODO: what should we do here? // _ = qOpts.InitArchiveStorage(f, logger) impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader } impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 8cfe053445a..88171df27d6 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -109,9 +109,10 @@ func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields b fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), fmt.Sprintf("--es.bulk.actions=%v", 1), fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), - "--es-archive.enabled=true", - fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), - fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), + // TODO: are these flags being used at the moment? + // "--es-archive.enabled=true", + // fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), + // fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), } require.NoError(t, command.ParseFlags(args)) f.InitFromViper(v, logger) From bbcdbd718415089096cbb71ce751606be8755092 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 20:35:27 -0500 Subject: [PATCH 14/20] Initialize Archive Factory In Tests Signed-off-by: Mahad Zaryab --- .../storage/integration/elasticsearch_test.go | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 88171df27d6..2ddef4e3795 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -50,7 +50,8 @@ type ESStorageIntegration struct { client *elastic.Client v8Client *elasticsearch8.Client - factory *es.Factory + factory *es.Factory + archiveFactory *es.Factory } func (s *ESStorageIntegration) getVersion() (uint, error) { @@ -94,25 +95,37 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) func (s *ESStorageIntegration) esCleanUp(t *testing.T) { require.NoError(t, s.factory.Purge(context.Background())) + require.NoError(t, s.archiveFactory.Purge(context.Background())) } -func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { +func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool, isArchive bool) *es.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := es.NewFactory(es.PrimaryNamespace) + var namespace string + if isArchive { + namespace = es.ArchiveNamespace + } else { + namespace = es.PrimaryNamespace + } + f := es.NewFactory(namespace) v, command := config.Viperize(f.AddFlags) - args := []string{ - fmt.Sprintf("--es.num-shards=%v", 5), - fmt.Sprintf("--es.num-replicas=%v", 1), - fmt.Sprintf("--es.index-prefix=%v", indexPrefix), - fmt.Sprintf("--es.use-ilm=%v", false), - fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second), - fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), - fmt.Sprintf("--es.bulk.actions=%v", 1), - fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), - // TODO: are these flags being used at the moment? - // "--es-archive.enabled=true", - // fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), - // fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), + var args []string + if isArchive { + args = []string{ + "--es-archive.enabled=true", + fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), + } + } else { + args = []string{ + fmt.Sprintf("--es.num-shards=%v", 5), + fmt.Sprintf("--es.num-replicas=%v", 1), + fmt.Sprintf("--es.index-prefix=%v", indexPrefix), + fmt.Sprintf("--es.use-ilm=%v", false), + fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second), + fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es.bulk.actions=%v", 1), + fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), + } } require.NoError(t, command.ParseFlags(args)) f.InitFromViper(v, logger) @@ -125,13 +138,19 @@ func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields b } func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) { - f := s.initializeESFactory(t, allTagsAsFields) + f := s.initializeESFactory(t, allTagsAsFields, false) s.factory = f var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) + af := s.initializeESFactory(t, allTagsAsFields, true) + s.archiveFactory = af + s.ArchiveSpanReader, err = af.CreateSpanReader() + require.NoError(t, err) + s.ArchiveSpanWriter, err = af.CreateSpanWriter() + require.NoError(t, err) s.DependencyReader, err = f.CreateDependencyReader() require.NoError(t, err) From 6c1fd9599bcfbeb8576a57a3e2b27d2251f18421 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 20:54:58 -0500 Subject: [PATCH 15/20] Set IsArchive Flag In Factory Signed-off-by: Mahad Zaryab --- cmd/query/app/token_propagation_test.go | 2 +- plugin/storage/es/factory.go | 15 +++++++++++---- plugin/storage/es/factory_test.go | 16 ++++++++-------- plugin/storage/es/options.go | 2 +- plugin/storage/factory.go | 2 +- plugin/storage/integration/elasticsearch_test.go | 8 +------- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 4ab90d32cd8..e81d17ab542 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -68,7 +68,7 @@ func runQueryService(t *testing.T, esURL string) *Server { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zaptest.NewLogger(t) - f := es.NewFactory(es.PrimaryNamespace) + f := es.NewFactory(false) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags([]string{ "--es.tls.enabled=false", diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 91c0ad07b77..50015f71808 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -36,8 +36,8 @@ import ( ) const ( - PrimaryNamespace = "es" - ArchiveNamespace = "es-archive" + primaryNamespace = "es" + archiveNamespace = "es-archive" ) var ( // interface comformance checks @@ -65,9 +65,16 @@ type Factory struct { } // NewFactory creates a new Factory. -func NewFactory(namespace string) *Factory { +func NewFactory(isArchive bool) *Factory { + var options *Options + if isArchive { + options = NewOptions(archiveNamespace) + options.Primary.IsArchive = true + } else { + options = NewOptions(primaryNamespace) + } return &Factory{ - Options: NewOptions(namespace), + Options: options, newClientFn: config.NewClient, tracer: otel.GetTracerProvider(), } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 429bfa0417e..3888f898757 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -61,7 +61,7 @@ func (m *mockClientBuilder) NewClient(*escfg.Configuration, *zap.Logger, metrics } func TestElasticsearchFactory(t *testing.T) { - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{}) f.InitFromViper(v, zap.NewNop()) @@ -88,7 +88,7 @@ func TestElasticsearchFactory(t *testing.T) { } func TestElasticsearchTagsFileDoNotExist(t *testing.T) { - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) f.config = &escfg.Configuration{ Tags: escfg.TagsAsFields{ File: "fixtures/file-does-not-exist.txt", @@ -103,7 +103,7 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { } func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { - f := NewFactory(ArchiveNamespace) + f := NewFactory(false) f.config = &escfg.Configuration{ UseILM: true, } @@ -177,7 +177,7 @@ func TestTagKeysAsFields(t *testing.T) { } func TestCreateTemplateError(t *testing.T) { - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) f.config = &escfg.Configuration{CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) @@ -194,7 +194,7 @@ func TestCreateTemplateError(t *testing.T) { } func TestILMDisableTemplateCreation(t *testing.T) { - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) f.config = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) @@ -205,7 +205,7 @@ func TestILMDisableTemplateCreation(t *testing.T) { } func TestConfigureFromOptions(t *testing.T) { - f := NewFactory(ArchiveNamespace) + f := NewFactory(false) o := &Options{ Primary: namespaceConfig{Configuration: escfg.Configuration{Servers: []string{"server"}}}, } @@ -275,7 +275,7 @@ func TestESStorageFactoryWithConfigError(t *testing.T) { func TestPasswordFromFile(t *testing.T) { defer testutils.VerifyGoLeaksOnce(t) t.Run("primary client", func(t *testing.T) { - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) testPasswordFromFile(t, f, f.getClient, f.CreateSpanWriter) }) @@ -388,7 +388,7 @@ func TestPasswordFromFileErrors(t *testing.T) { pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) - f := NewFactory(PrimaryNamespace) + f := NewFactory(false) f.config = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 39362082ff6..d9c83fd5232 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -271,7 +271,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixAdaptiveSamplingLookback, nsConfig.AdaptiveSamplingLookback, "How far back to look for the latest adaptive sampling probabilities") - if nsConfig.namespace == ArchiveNamespace { + if nsConfig.namespace == archiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, nsConfig.Enabled, diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index c1618de36a3..498339d49e4 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -118,7 +118,7 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { case cassandraStorageType: return cassandra.NewFactory(), nil case elasticsearchStorageType, opensearchStorageType: - return es.NewFactory(es.PrimaryNamespace), nil + return es.NewFactory(false), nil case memoryStorageType: return memory.NewFactory(), nil case kafkaStorageType: diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 2ddef4e3795..93839114679 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -100,13 +100,7 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T) { func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool, isArchive bool) *es.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - var namespace string - if isArchive { - namespace = es.ArchiveNamespace - } else { - namespace = es.PrimaryNamespace - } - f := es.NewFactory(namespace) + f := es.NewFactory(isArchive) v, command := config.Viperize(f.AddFlags) var args []string if isArchive { From 43487cd61fd75e3e60d2d9e317f09bcca1fc3b5b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 3 Nov 2024 21:08:44 -0500 Subject: [PATCH 16/20] Fix Span Writer In ES Config Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 50015f71808..bc43a28bf7d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -156,7 +156,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getClient, f.config, false, f.metricsFactory, f.logger) + return createSpanWriter(f.getClient, f.config, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory @@ -194,7 +194,6 @@ func createSpanReader( func createSpanWriter( clientFn func() es.Client, cfg *config.Configuration, - archive bool, mFactory metrics.Factory, logger *zap.Logger, ) (spanstore.Writer, error) { @@ -216,7 +215,7 @@ func createSpanWriter( AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, - Archive: archive, + Archive: cfg.IsArchive, UseReadWriteAliases: cfg.UseReadWriteAliases, Logger: logger, MetricsFactory: mFactory, From 7f872004c8a990b85369ae14aa59fe894631386a Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 4 Nov 2024 18:45:32 -0500 Subject: [PATCH 17/20] Expose is_archive Flag Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-elasticsearch.yaml | 1 + cmd/jaeger/config-opensearch.yaml | 1 + pkg/es/config/config.go | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 0883dc3b395..d3bd9654007 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -58,6 +58,7 @@ extensions: elasticsearch: indices: index_prefix: "jaeger-archive" + is_archive: true receivers: otlp: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index 1fe57d42c39..8654863c3d4 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -58,6 +58,7 @@ extensions: opensearch: indices: index_prefix: "jaeger-archive" + is_archive: true receivers: otlp: diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index efbae632295..7610be8aaef 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -137,7 +137,7 @@ type Configuration struct { // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` // TODO: revisit if this needed - IsArchive bool + IsArchive bool `mapstructure:"is_archive"` } // TagsAsFields holds configuration for tag schema. From 7d65eb8acca74f3dc363d8ede8be39560b8e9752 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Fri, 8 Nov 2024 20:36:56 -0500 Subject: [PATCH 18/20] Refactor Cassandra Namespace To Only Hold One Storage Type Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-cassandra.yaml | 1 + plugin/storage/cassandra/factory.go | 99 +++++++------------- plugin/storage/cassandra/factory_test.go | 66 ++++--------- plugin/storage/cassandra/options.go | 40 ++------ plugin/storage/cassandra/options_test.go | 37 +------- plugin/storage/factory.go | 2 +- plugin/storage/integration/cassandra_test.go | 12 +-- 7 files changed, 59 insertions(+), 198 deletions(-) diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 3ead38258c2..a7dffefe2df 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -52,6 +52,7 @@ extensions: password: "cassandra" tls: insecure: true + is_archive: true receivers: otlp: protocols: diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 4654a2a2c82..f7f95a83465 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -40,7 +40,6 @@ const ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) _ storage.Purger = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ io.Closer = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) @@ -50,22 +49,26 @@ var ( // interface comformance checks type Factory struct { Options *Options - primaryMetricsFactory metrics.Factory - archiveMetricsFactory metrics.Factory - logger *zap.Logger - tracer trace.TracerProvider + metricsFactory metrics.Factory + logger *zap.Logger + tracer trace.TracerProvider - primaryConfig config.SessionBuilder - primarySession cassandra.Session - archiveConfig config.SessionBuilder - archiveSession cassandra.Session + config config.SessionBuilder + session cassandra.Session } // NewFactory creates a new Factory. -func NewFactory() *Factory { +func NewFactory(isArchive bool) *Factory { + var options *Options + if isArchive { + options = NewOptions(archiveStorageConfig) + options.IsArchive = true + } else { + options = NewOptions(primaryStorageConfig) + } return &Factory{ tracer: otel.GetTracerProvider(), - Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + Options: options, } } @@ -75,7 +78,7 @@ func NewFactoryWithConfig( metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { - f := NewFactory() + f := NewFactory(opts.IsArchive) // use this to help with testing b := &withConfigBuilder{ f: f, @@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { // InitFromOptions initializes factory from options. func (f *Factory) configureFromOptions(o *Options) { f.Options = o - // TODO this is a hack because we do not define defaults in Options - if o.others == nil { - o.others = make(map[string]*NamespaceConfig) - } - f.primaryConfig = o.GetPrimary() - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error - } + f.config = o.GetPrimary() } // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil}) - f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) + metricsNamespace := "cassandra" + if f.Options.IsArchive { + metricsNamespace = "cassandra-archive" + } + f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.config.NewSession() if err != nil { return err } - f.primarySession = primarySession - - if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession() - if err != nil { - return err - } - f.archiveSession = archiveSession - } else { - logger.Info("Cassandra archive storage configuration is empty, skipping") - } + f.session = primarySession + return nil } // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) + return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) } // CreateSpanWriter implements storage.Factory @@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { if err != nil { return nil, err } - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...) + return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - version := cDepStore.GetDependencyVersion(f.primarySession) - return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version) -} - -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - options, err := writerOptions(f.Options) - if err != nil { - return nil, err - } - return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...) + version := cDepStore.GetDependencyVersion(f.session) + return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version) } // CreateLock implements storage.SamplingStoreFactory @@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { } f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId)) - return cLock.NewLock(f.primarySession, hostId), nil + return cLock.NewLock(f.session, hostId), nil } // CreateSamplingStore implements storage.SamplingStoreFactory func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { - return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil + return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil } func writerOptions(opts *Options) ([]cSpanStore.Option, error) { @@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { - if f.primarySession != nil { - f.primarySession.Close() - } - if f.archiveSession != nil { - f.archiveSession.Close() + if f.session != nil { + f.session.Close() } return nil } func (f *Factory) Purge(_ context.Context) error { - return f.primarySession.Query("TRUNCATE traces").Exec() + return f.session.Query("TRUNCATE traces").Exec() } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 9a2fcc486cf..a236ec5b3b2 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -39,15 +39,14 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { } func TestCassandraFactory(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) + logger, _ := testutils.NewLogger() + f := NewFactory(false) + v, _ := config.Viperize(f.AddFlags) f.InitFromViper(v, zap.NewNop()) // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -57,13 +56,11 @@ func TestCassandraFactory(t *testing.T) { session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) session.On("Close").Return() query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - f.archiveConfig = nil + f.config = newMockSessionBuilder(session, nil) require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") _, err := f.CreateSpanReader() require.NoError(t, err) @@ -74,21 +71,6 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateDependencyReader() require.NoError(t, err) - _, err = f.CreateArchiveSpanReader() - require.EqualError(t, err, "archive storage not configured") - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "archive storage not configured") - - f.archiveConfig = newMockSessionBuilder(session, nil) - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - - _, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - _, err = f.CreateLock() require.NoError(t, err) @@ -99,11 +81,9 @@ func TestCassandraFactory(t *testing.T) { } func TestExclusiveWhitelistBlacklist(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() + f := NewFactory(false) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{ - "--cassandra-archive.enabled=true", "--cassandra.index.tag-whitelist=a,b,c", "--cassandra.index.tag-blacklist=a,b,c", }) @@ -111,7 +91,7 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -120,22 +100,10 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - - f.archiveConfig = nil - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + f.config = newMockSessionBuilder(session, nil) _, err := f.CreateSpanWriter() require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") - - f.archiveConfig = &mockSessionBuilder{} - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") } func TestWriterOptions(t *testing.T) { @@ -181,13 +149,11 @@ func TestWriterOptions(t *testing.T) { } func TestConfigureFromOptions(t *testing.T) { - f := NewFactory() - o := NewOptions("foo", archiveStorageConfig) - o.others[archiveStorageConfig].Enabled = true + f := NewFactory(false) + o := NewOptions("foo") f.configureFromOptions(o) assert.Equal(t, o, f.Options) - assert.Equal(t, o.GetPrimary(), f.primaryConfig) - assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) + assert.Equal(t, o.GetPrimary(), f.config) } func TestNewFactoryWithConfig(t *testing.T) { @@ -201,7 +167,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -223,7 +189,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -242,14 +208,14 @@ func TestNewFactoryWithConfig(t *testing.T) { } func TestFactory_Purge(t *testing.T) { - f := NewFactory() + f := NewFactory(false) var ( session = &mocks.Session{} query = &mocks.Query{} ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primarySession = session + f.session = session err := f.Purge(context.Background()) require.NoError(t, err) diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index e266436ba29..3c876dfa904 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -49,9 +49,10 @@ const ( // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { Primary NamespaceConfig `mapstructure:",squash"` - others map[string]*NamespaceConfig - SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` - Index IndexConfig `mapstructure:"index"` + SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` + Index IndexConfig `mapstructure:"index"` + // TODO: this is just a placeholder + IsArchive bool `mapstructure:"is_archive"` } // IndexConfig configures indexing. @@ -74,31 +75,22 @@ type NamespaceConfig struct { } // NewOptions creates a new Options struct. -func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(namespace string) *Options { // TODO all default values should be defined via cobra flags options := &Options{ Primary: NamespaceConfig{ Configuration: config.DefaultConfiguration(), - namespace: primaryNamespace, + namespace: namespace, Enabled: true, }, - others: make(map[string]*NamespaceConfig, len(otherNamespaces)), SpanStoreWriteCacheTTL: time.Hour * 12, } - - for _, namespace := range otherNamespaces { - options.others[namespace] = &NamespaceConfig{namespace: namespace} - } - return options } // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { addFlags(flagSet, opt.Primary) - for _, cfg := range opt.others { - addFlags(flagSet, *cfg) - } flagSet.Duration(opt.Primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") @@ -206,9 +198,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { opt.Primary.initFromViper(v) - for _, cfg := range opt.others { - cfg.initFromViper(v) - } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.Primary.namespace + suffixSpanStoreWriteCacheTTL) opt.Index.TagBlackList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsBlacklist)) opt.Index.TagWhiteList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsWhitelist)) @@ -260,23 +249,6 @@ func (opt *Options) GetPrimary() *config.Configuration { return &opt.Primary.Configuration } -// Get returns auxiliary named configuration. -func (opt *Options) Get(namespace string) *config.Configuration { - nsCfg, ok := opt.others[namespace] - if !ok { - nsCfg = &NamespaceConfig{} - opt.others[namespace] = nsCfg - } - if !nsCfg.Enabled { - return nil - } - nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Connection.Servers) == 0 { - nsCfg.Connection.Servers = opt.Primary.Connection.Servers - } - return &nsCfg.Configuration -} - // TagIndexBlacklist returns the list of blacklisted tags func (opt *Options) TagIndexBlacklist() []string { if len(opt.Index.TagBlackList) > 0 { diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 8ccd813c838..418554b2666 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -6,10 +6,8 @@ package cassandra import ( "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -20,22 +18,10 @@ func TestOptions(t *testing.T) { assert.NotEmpty(t, primary.Schema.Keyspace) assert.NotEmpty(t, primary.Connection.Servers) assert.Equal(t, 2, primary.Connection.ConnectionsPerHost) - - aux := opts.Get("archive") - assert.Nil(t, aux) - - assert.NotNil(t, opts.others["archive"]) - opts.others["archive"].Enabled = true - aux = opts.Get("archive") - require.NotNil(t, aux) - assert.Equal(t, primary.Schema.Keyspace, aux.Schema.Keyspace) - assert.Equal(t, primary.Connection.Servers, aux.Connection.Servers) - assert.Equal(t, primary.Connection.ConnectionsPerHost, aux.Connection.ConnectionsPerHost) - assert.Equal(t, primary.Connection.ReconnectInterval, aux.Connection.ReconnectInterval) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("cas", "cas-aux") + opts := NewOptions("cas") v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ "--cas.keyspace=jaeger", @@ -56,13 +42,6 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.datastax.bdp.cassandra.auth.DseAuthenticator", "--cas.username=username", "--cas.password=password", - // enable aux with a couple overrides - "--cas-aux.enabled=true", - "--cas-aux.keyspace=jaeger-archive", - "--cas-aux.servers=3.3.3.3, 4.4.4.4", - "--cas-aux.username=username", - "--cas-aux.password=password", - "--cas-aux.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", }) opts.InitFromViper(v) @@ -77,20 +56,6 @@ func TestOptionsWithFlags(t *testing.T) { assert.True(t, opts.Index.Tags) assert.False(t, opts.Index.ProcessTags) assert.True(t, opts.Index.Logs) - - aux := opts.Get("cas-aux") - require.NotNil(t, aux) - assert.Equal(t, "jaeger-archive", aux.Schema.Keyspace) - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) - assert.Equal(t, 42, aux.Query.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Query.Timeout) - assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) - assert.Equal(t, 4242, aux.Connection.Port) - assert.Equal(t, "", aux.Query.Consistency, "aux storage does not inherit consistency from primary") - assert.Equal(t, 3, aux.Connection.ProtoVersion) - assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } func TestDefaultTlsHostVerify(t *testing.T) { diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 498339d49e4..4bb11fe0a56 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -116,7 +116,7 @@ func NewFactory(config FactoryConfig) (*Factory, error) { func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { switch factoryType { case cassandraStorageType: - return cassandra.NewFactory(), nil + return cassandra.NewFactory(false), nil case elasticsearchStorageType, opensearchStorageType: return es.NewFactory(false), nil case memoryStorageType: diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index c8f88d01452..9e4f477b0b3 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -42,7 +42,7 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { func (*CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := cassandra.NewFactory() + f := cassandra.NewFactory(false) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags(flags)) f.InitFromViper(v, logger) @@ -58,12 +58,6 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { "--cassandra.password=" + password, "--cassandra.username=" + username, "--cassandra.keyspace=jaeger_v1_dc1", - "--cassandra-archive.keyspace=jaeger_v1_dc1_archive", - "--cassandra-archive.enabled=true", - "--cassandra-archive.servers=127.0.0.1", - "--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator", - "--cassandra-archive.password=" + password, - "--cassandra-archive.username=" + username, }) s.factory = f var err error @@ -71,10 +65,6 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) s.SamplingStore, err = f.CreateSamplingStore(0) require.NoError(t, err) s.initializeDependencyReaderAndWriter(t, f) From c38541627851d01895a7eeed5c62f9c98b9afad1 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Fri, 8 Nov 2024 20:59:17 -0500 Subject: [PATCH 19/20] Fix Cassandra Archive Integration Test Signed-off-by: Mahad Zaryab --- plugin/storage/integration/cassandra_test.go | 26 +++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 9e4f477b0b3..a7dbece85bf 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -21,7 +21,8 @@ import ( type CassandraStorageIntegration struct { StorageIntegration - factory *cassandra.Factory + factory *cassandra.Factory + archiveFactory *cassandra.Factory } func newCassandraStorageIntegration() *CassandraStorageIntegration { @@ -38,11 +39,15 @@ func newCassandraStorageIntegration() *CassandraStorageIntegration { func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { require.NoError(t, s.factory.Purge(context.Background())) + require.NoError(t, s.archiveFactory.Purge(context.Background())) } -func (*CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory { +func (*CassandraStorageIntegration) initializeCassandraFactory( + t *testing.T, + flags []string, + isArchive bool) *cassandra.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := cassandra.NewFactory(false) + f := cassandra.NewFactory(isArchive) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags(flags)) f.InitFromViper(v, logger) @@ -58,13 +63,26 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { "--cassandra.password=" + password, "--cassandra.username=" + username, "--cassandra.keyspace=jaeger_v1_dc1", - }) + }, false) + af := s.initializeCassandraFactory(t, []string{ + "--cassandra-archive.keyspace=jaeger_v1_dc1_archive", + "--cassandra-archive.enabled=true", + "--cassandra-archive.servers=127.0.0.1", + "--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator", + "--cassandra-archive.password=" + password, + "--cassandra-archive.username=" + username, + }, true) s.factory = f + s.archiveFactory = af var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) + s.ArchiveSpanReader, err = af.CreateSpanReader() + require.NoError(t, err) + s.ArchiveSpanWriter, err = af.CreateSpanWriter() + require.NoError(t, err) s.SamplingStore, err = f.CreateSamplingStore(0) require.NoError(t, err) s.initializeDependencyReaderAndWriter(t, f) From 4194aabd52ad16288de489000560038752c4ffec Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 9 Nov 2024 08:10:30 -0500 Subject: [PATCH 20/20] Fix Linter Signed-off-by: Mahad Zaryab --- plugin/storage/integration/cassandra_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index a7dbece85bf..5c6fae7be26 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -45,7 +45,8 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { func (*CassandraStorageIntegration) initializeCassandraFactory( t *testing.T, flags []string, - isArchive bool) *cassandra.Factory { + isArchive bool, +) *cassandra.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) f := cassandra.NewFactory(isArchive) v, command := config.Viperize(f.AddFlags)