-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix es integration test 6094 #6157
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -7,6 +7,7 @@ package spanstore | |||
import ( | ||||
"context" | ||||
"fmt" | ||||
"sync" | ||||
"time" | ||||
|
||||
"go.uber.org/zap" | ||||
|
@@ -21,10 +22,11 @@ import ( | |||
) | ||||
|
||||
const ( | ||||
spanType = "span" | ||||
serviceType = "service" | ||||
serviceCacheTTLDefault = 12 * time.Hour | ||||
indexCacheTTLDefault = 48 * time.Hour | ||||
spanType = "span" | ||||
serviceType = "service" | ||||
serviceCacheTTLDefault = 12 * time.Hour | ||||
indexCacheTTLDefault = 48 * time.Hour | ||||
defaultIndexWaitTimeout = 60 * time.Second | ||||
) | ||||
|
||||
type spanWriterMetrics struct { | ||||
|
@@ -42,6 +44,63 @@ type SpanWriter struct { | |||
serviceWriter serviceWriter | ||||
spanConverter dbmodel.FromDomain | ||||
spanServiceIndex spanAndServiceIndexFn | ||||
indexCache sync.Map | ||||
} | ||||
|
||||
func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error { | ||||
if _, exists := s.indexCache.Load(indexName); exists { | ||||
return nil | ||||
} | ||||
|
||||
Comment on lines
+51
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the following statement looks like a CAS operation, so why do you need this one above? |
||||
_, loaded := s.indexCache.LoadOrStore(indexName, struct{}{}) | ||||
if loaded { | ||||
return nil | ||||
} | ||||
|
||||
exists, err := s.client().IndexExists(indexName).Do(ctx) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to check index existence: %w", err) | ||||
} | ||||
|
||||
if !exists { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shortcircuit
|
||||
s.logger.Info("Creating index", zap.String("index", indexName)) | ||||
|
||||
// Set specific settings for the test environment | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
body := `{ | ||||
"settings": { | ||||
"number_of_shards": 1, | ||||
"number_of_replicas": 0, | ||||
"index.write.wait_for_active_shards": 1 | ||||
} | ||||
}` | ||||
|
||||
_, err = s.client().CreateIndex(indexName).Body(body).Do(ctx) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the writer should not be responsible for creating indices. The indices are created by ES automatically from templates on writes. The templates are created by the factory: jaeger/plugin/storage/es/factory.go Line 279 in 61ad598
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, now i understand the core issue better:
Would this be a better approach?
something like, passing context in createSpanWriter and in Create Templates function we can handle the context.. I am taling about these 2 functions, if thinking in correct direction shall i proceed and test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Visualizing like this (What do you think, shall i proceed with this?) I really think by passing context we can mititgate the issue. Before: After: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we want the sequence in the After part, we just need to enforce that the templates finished creating before returning the writer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made a new PR, #6229 |
||||
if err != nil { | ||||
return fmt.Errorf("failed to create index with settings: %w", err) | ||||
} | ||||
s.logger.Info("Index created with settings", | ||||
zap.String("index", indexName), | ||||
zap.String("settings", body)) | ||||
} | ||||
|
||||
// Wait for index to be ready by checking its existence repeatedly | ||||
deadline := time.Now().Add(defaultIndexWaitTimeout) | ||||
start := time.Now() | ||||
for time.Now().Before(deadline) { | ||||
exists, err := s.client().IndexExists(indexName).Do(ctx) | ||||
if err == nil && exists { | ||||
s.logger.Info("Index is ready", | ||||
zap.String("index", indexName), | ||||
zap.Duration("took", time.Since(start))) | ||||
return nil | ||||
} | ||||
s.logger.Debug("Waiting for index to be ready", | ||||
zap.String("index", indexName), | ||||
zap.Duration("elapsed", time.Since(start))) | ||||
time.Sleep(time.Second) | ||||
} | ||||
|
||||
return fmt.Errorf("timeout waiting for index %s to be ready", indexName) | ||||
} | ||||
|
||||
// SpanWriterParams holds constructor parameters for NewSpanWriter | ||||
|
@@ -121,14 +180,49 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { | |||
} | ||||
|
||||
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch | ||||
func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { | ||||
func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { | ||||
spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime) | ||||
|
||||
// Ensure indices exist before writing | ||||
if err := s.ensureIndex(ctx, spanIndexName); err != nil { | ||||
return fmt.Errorf("failed to ensure span index: %w", err) | ||||
} | ||||
if serviceIndexName != "" { | ||||
if err := s.ensureIndex(ctx, serviceIndexName); err != nil { | ||||
return fmt.Errorf("failed to ensure service index: %w", err) | ||||
} | ||||
} | ||||
|
||||
jsonSpan := s.spanConverter.FromDomainEmbedProcess(span) | ||||
if serviceIndexName != "" { | ||||
s.writeService(serviceIndexName, jsonSpan) | ||||
} | ||||
s.writeSpan(spanIndexName, jsonSpan) | ||||
s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName)) | ||||
|
||||
// Write with retries | ||||
var lastErr error | ||||
for i := 0; i < 3; i++ { | ||||
err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan) | ||||
if err == nil { | ||||
return nil | ||||
} | ||||
lastErr = err | ||||
s.logger.Debug("Retrying span write", | ||||
zap.String("index", spanIndexName), | ||||
zap.Int("attempt", i+1), | ||||
zap.Error(lastErr)) | ||||
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) | ||||
} | ||||
|
||||
return fmt.Errorf("failed to write span after retries: %w", lastErr) | ||||
} | ||||
|
||||
func (s *SpanWriter) writeSpanWithResult(_ context.Context, indexName string, jsonSpan *dbmodel.Span) error { | ||||
indexService := s.client().Index(). | ||||
Index(indexName). | ||||
Type(spanType). | ||||
BodyJson(jsonSpan) | ||||
|
||||
indexService.Add() | ||||
return nil | ||||
} | ||||
|
||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please delete from 43