Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor storage factories to hold one configuration #6156

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extensions:
password: "cassandra"
tls:
insecure: true
is_archive: true
receivers:
otlp:
protocols:
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ extensions:
elasticsearch:
indices:
index_prefix: "jaeger-archive"
is_archive: true

receivers:
otlp:
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/config-opensearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ extensions:
opensearch:
indices:
index_prefix: "jaeger-archive"
is_archive: true

receivers:
otlp:
Expand Down
12 changes: 0 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,6 @@ func TestServerAddArchiveStorage(t *testing.T) {
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
config: &Config{
Storage: Storage{
TracesArchive: "badger",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedErr: "",
},
}

for _, tt := range tests {
Expand Down
20 changes: 4 additions & 16 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,12 @@ func TestBuildQueryServiceOptions(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.Nil(t, qSvcOpts.ArchiveSpanReader)
assert.Nil(t, qSvcOpts.ArchiveSpanWriter)

comboFactory := struct {
*mocks.Factory
*mocks.ArchiveFactory
}{
&mocks.Factory{},
&mocks.ArchiveFactory{},
}
factory := &mocks.Factory{}

comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil)
comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil)
factory.On("CreateSpanReader").Return(&spanstore_mocks.Reader{}, nil)
factory.On("CreateSpanWriter").Return(&spanstore_mocks.Writer{}, nil)

qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop())
qSvcOpts := qOpts.BuildQueryServiceOptions(factory, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.NotNil(t, qSvcOpts.ArchiveSpanReader)
Expand Down
17 changes: 2 additions & 15 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,12 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
return false
}
reader, err := archiveFactory.CreateArchiveSpanReader()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
reader, err := storageFactory.CreateSpanReader()
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return false
}
writer, err := archiveFactory.CreateArchiveSpanWriter()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
writer, err := storageFactory.CreateSpanWriter()
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return false
Expand Down
17 changes: 4 additions & 13 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,31 +315,22 @@ func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error)
func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil }
func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil }

func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr }
func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr }
func (f *fakeStorageFactory2) CreateSpanReader() (spanstore.Reader, error) { return f.r, f.rErr }
func (f *fakeStorageFactory2) CreateSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr }

var (
_ storage.Factory = new(fakeStorageFactory1)
_ storage.ArchiveFactory = new(fakeStorageFactory2)
_ storage.Factory = new(fakeStorageFactory1)
_ storage.Factory = new(fakeStorageFactory2)
)

func TestInitArchiveStorageErrors(t *testing.T) {
opts := &QueryServiceOptions{}
logger := zap.NewNop()

assert.False(t, opts.InitArchiveStorage(new(fakeStorageFactory1), logger))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: errors.New("error")},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: errors.New("error")},
logger,
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runQueryService(t *testing.T, esURL string) *Server {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zaptest.NewLogger(t)

f := es.NewFactory()
f := es.NewFactory(false)
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags([]string{
"--es.tls.enabled=false",
Expand Down
5 changes: 3 additions & 2 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man
}, nil
}

func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) {
func createGRPCHandler(f storage.Factory, _ *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
Expand All @@ -78,7 +78,8 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl
// borrow code from Query service for archive storage
qOpts := &querysvc.QueryServiceOptions{}
// when archive storage not initialized (returns false), the reader/writer will be nil
_ = qOpts.InitArchiveStorage(f, logger)
// TODO: what should we do here?
// _ = qOpts.InitArchiveStorage(f, logger)
Comment on lines +81 to +82
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro any thoughts on how we should handle this case here? previously, this wouldn't initialize the archive storage if the factory didn't implement the ArchiveStorage interface but now it always will.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be different from all other v1 binaries. If I run

SPAN_STORAGE_TYPE=cassandra go run ./cmd/remote-storage help

I see that it supports the same --cassandra.* and --cassandra-archive.* flags, so we have enough information to instantiate two separate factories, just like v1 all-in-one/query would have to do.

impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader }
impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter }

Expand Down
2 changes: 2 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Configuration struct {
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used? Let's add some comments, because this is not the first time I have this question, and the flag doesn't really make sense to me.

// TODO: revisit if this needed
IsArchive bool `mapstructure:"is_archive"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down
16 changes: 2 additions & 14 deletions plugin/storage/blackhole/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
)
// interface comformance checks
var _ storage.Factory = (*Factory)(nil)

// Factory implements storage.Factory and creates blackhole storage components.
type Factory struct {
Expand Down Expand Up @@ -48,16 +46,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return f.store, nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
Expand Down
6 changes: 0 additions & 6 deletions plugin/storage/blackhole/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ func TestStorageFactory(t *testing.T) {
writer, err := f.CreateSpanWriter()
require.NoError(t, err)
assert.Equal(t, f.store, writer)
reader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.Equal(t, f.store, reader)
writer, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.Equal(t, f.store, writer)
depReader, err := f.CreateDependencyReader()
require.NoError(t, err)
assert.Equal(t, f.store, depReader)
Expand Down
99 changes: 33 additions & 66 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
Expand All @@ -50,22 +49,26 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
archiveConfig config.SessionBuilder
archiveSession cassandra.Session
config config.SessionBuilder
session cassandra.Session
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
func NewFactory(isArchive bool) *Factory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's not pass a flag but have a 2nd constructor NewArchiveFactory. Because NewFactory(true) has poor readability.

var options *Options
if isArchive {
options = NewOptions(archiveStorageConfig)
options.IsArchive = true
} else {
options = NewOptions(primaryStorageConfig)
}
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
Options: options,
}
}

Expand All @@ -75,7 +78,7 @@ func NewFactoryWithConfig(
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f := NewFactory(opts.IsArchive)
// use this to help with testing
b := &withConfigBuilder{
f: f,
Expand Down Expand Up @@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) {
// InitFromOptions initializes factory from options.
func (f *Factory) configureFromOptions(o *Options) {
f.Options = o
// TODO this is a hack because we do not define defaults in Options
if o.others == nil {
o.others = make(map[string]*NamespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
f.config = o.GetPrimary()
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
metricsNamespace := "cassandra"
if f.Options.IsArchive {
metricsNamespace = "cassandra-archive"
}
f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil})
f.logger = logger

primarySession, err := f.primaryConfig.NewSession()
primarySession, err := f.config.NewSession()
if err != nil {
return err
}
f.primarySession = primarySession

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
if err != nil {
return err
}
f.archiveSession = archiveSession
} else {
logger.Info("Cassandra archive storage configuration is empty, skipping")
}
f.session = primarySession

return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...)
return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
version := cDepStore.GetDependencyVersion(f.primarySession)
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
options, err := writerOptions(f.Options)
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...)
version := cDepStore.GetDependencyVersion(f.session)
return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version)
}

// CreateLock implements storage.SamplingStoreFactory
Expand All @@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}
f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId))

return cLock.NewLock(f.primarySession, hostId), nil
return cLock.NewLock(f.session, hostId), nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
Expand Down Expand Up @@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
if f.primarySession != nil {
f.primarySession.Close()
}
if f.archiveSession != nil {
f.archiveSession.Close()
if f.session != nil {
f.session.Close()
}

return nil
}

func (f *Factory) Purge(_ context.Context) error {
return f.primarySession.Query("TRUNCATE traces").Exec()
return f.session.Query("TRUNCATE traces").Exec()
}
Loading
Loading