Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promote to release branch #92

Open
wants to merge 9 commits into
base: release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ type PrometheusRemoteBackendConfiguration struct {
PoolSize int `yaml:"poolSize" validate:"min=1"`
Retries int `yaml:"retries" validate:"min=0"`
TickDuration *time.Duration `yaml:"tickDuration"`
EnqueueTimeout *time.Duration `yaml:"enqueueTimeout"`
}

type PrometheusRemoteBackendEndpointHeader struct {
Expand Down Expand Up @@ -1007,6 +1008,6 @@ type MultiProcessConfiguration struct {
type QueryShadowingConfiguration struct {
// Query paths like "/api/v1/query" are not included.
// No trailing slash.
ShadowQueryURL string `yaml:"shadowQueryURL"`
QueryShadowingWorkers int `yaml:"queryShadowingWorkers" validate:"nonzero,min=1"`
ShadowQueryURL string `yaml:"shadowQueryURL"`
QueryShadowingWorkers int `yaml:"queryShadowingWorkers" validate:"nonzero,min=1"`
}
8 changes: 6 additions & 2 deletions src/query/storage/promremote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewOptions(
tenantDefault: cfg.TenantDefault,
tenantRules: tenantRules,
tickDuration: cfg.TickDuration,
queueTimeout: cfg.EnqueueTimeout,
}, nil
}

Expand Down Expand Up @@ -163,8 +164,11 @@ func validateBackendConfiguration(cfg *config.PrometheusRemoteBackendConfigurati
if cfg.ConnectTimeout != nil && *cfg.ConnectTimeout < 0 {
return errors.New("connectTimeout can't be negative")
}
if cfg.TickDuration != nil && *cfg.TickDuration < 0 {
return errors.New("tickDuration can't be negative")
if cfg.TickDuration != nil && *cfg.TickDuration <= 0 {
return errors.New("tickDuration can't be non positive")
}
if cfg.EnqueueTimeout != nil && *cfg.EnqueueTimeout <= 0 {
return errors.New("enqueueTimeout can't be non positive")
}
requireTenantHeader := strings.TrimSpace(cfg.TenantDefault) != ""
seenNames := map[string]struct{}{}
Expand Down
167 changes: 120 additions & 47 deletions src/query/storage/promremote/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,54 @@ func (wq *WriteQueue) Flush(ctx context.Context, p *promStorage) {
}
}

// introduce a dead letter queue to store the timed out samples from main queue
// samples inside the dead letter queue are flushed to the remote endpoint at next tick
// if dead letter queue is full, samples will be dropped to avoid cascading failures and retry storms
type deadLetterQueue struct {
capacity int
queries []*storage.WriteQuery

sync.Mutex
}

func newDeadLetterQueue(logger *zap.Logger, capacity int) *deadLetterQueue {
logger.Info("Creating a new dead letter queue", zap.Int("capacity", capacity))
return &deadLetterQueue{
capacity: capacity,
queries: make([]*storage.WriteQuery, 0, capacity),
}
}

func (dlq *deadLetterQueue) size() int {
dlq.Lock()
defer dlq.Unlock()
return len(dlq.queries)
}

func (dlq *deadLetterQueue) add(query *storage.WriteQuery) error {
if query == nil {
return nil
}
dlq.Lock()
defer dlq.Unlock()
if len(dlq.queries) < dlq.capacity {
dlq.queries = append(dlq.queries, query)
return nil
} else {
return errors.New("dead letter queue is full")
}
}

func (dlq *deadLetterQueue) flush(p *promStorage, ctx context.Context, wg *sync.WaitGroup, pendingQuery map[tenantKey]*WriteQueue) {
dlq.Lock()
defer dlq.Unlock()
p.dlqSize.Update(float64(len(dlq.queries)))
for _, query := range dlq.queries {
p.appendSample(ctx, wg, pendingQuery, query)
}
dlq.queries = dlq.queries[:0] // empty the queue
}

func validateOptions(opts Options) error {
if opts.poolSize < 1 {
return errors.New("poolSize must be greater than 0")
Expand Down Expand Up @@ -155,6 +203,9 @@ func NewStorage(opts Options) (storage.Storage, error) {
queriesWithFixedTenants[tenant] = NewWriteQueue(tenant, opts.queueSize)
}
}
// large data queue size to avoid dropping samples
dataQueueCapacity := (opts.retries + 1) * len(opts.tenantRules) * opts.queueSize
opts.logger.Info("Creating data queue", zap.Int("capacity", dataQueueCapacity))
s := &promStorage{
opts: opts,
client: client,
Expand All @@ -163,6 +214,7 @@ func NewStorage(opts Options) (storage.Storage, error) {
enqueuedSamples: scope.Counter("enqueued_samples"),
writtenSamples: scope.Counter("written_samples"),
droppedSamples: scope.Counter("dropped_samples"),
failedSamples: scope.Counter("failed_samples"),
inFlightSamples: scope.Gauge("in_flight_samples"),
batchWrites: scope.Counter("batch_writes"),
tickWrites: scope.Counter("tick_writes"),
Expand All @@ -171,13 +223,15 @@ func NewStorage(opts Options) (storage.Storage, error) {
retryWrites: scope.Counter("retry_writes"),
dupWrites: scope.Counter("duplicate_writes"),
logger: opts.logger,
dataQueue: make(chan *storage.WriteQuery, len(opts.tenantRules)*opts.queueSize),
dataQueue: make(chan *storage.WriteQuery, dataQueueCapacity),
dataQueueSize: scope.Gauge("data_queue_size"),
dlq: newDeadLetterQueue(opts.logger, dataQueueCapacity),
dlqSize: scope.Gauge("dead_letter_queue_size"),
workerPool: xsync.NewWorkerPool(opts.poolSize),
pendingQuery: queriesWithFixedTenants,
writeLoopDone: make(chan struct{}),
}
s.startAsync()
// carry over this queriesWithFixedTenants to make sure it is not concurrency safe
s.startAsync(queriesWithFixedTenants)
opts.logger.Info("Prometheus remote write storage created", zap.Int("num_tenants", len(queriesWithFixedTenants)))
return s, nil
}
Expand All @@ -193,6 +247,7 @@ type promStorage struct {
enqueuedSamples tally.Counter
writtenSamples tally.Counter
droppedSamples tally.Counter
failedSamples tally.Counter
inFlightSamples tally.Gauge
inFlightSampleValue atomic.Int64
// writes are # of http requests to downstream remote endpoints
Expand All @@ -205,8 +260,9 @@ type promStorage struct {
logger *zap.Logger
dataQueue chan *storage.WriteQuery
dataQueueSize tally.Gauge
dlq *deadLetterQueue
dlqSize tally.Gauge
workerPool xsync.WorkerPool
pendingQuery map[tenantKey]*WriteQueue
writeLoopDone chan struct{}
}

Expand All @@ -221,9 +277,34 @@ func (p *promStorage) getTenant(query *storage.WriteQuery) tenantKey {
return tenantKey(p.opts.tenantDefault)
}

func (p *promStorage) flushPendingQueues(ctx context.Context, wg *sync.WaitGroup) int {
func (p *promStorage) appendSample(ctx context.Context, wg *sync.WaitGroup, pendingQuery map[tenantKey]*WriteQueue, query *storage.WriteQuery) {
t := p.getTenant(query)
if _, ok := pendingQuery[t]; !ok {
p.droppedWrites.Inc(1)
p.logger.Error("no pre-defined tenant found, dropping it",
zap.String("tenant", string(t)),
zap.String("defaultTenant", p.opts.tenantDefault),
zap.String("timeseries", query.String()))
return
}
if dataBatch := pendingQuery[t].Add(query); dataBatch != nil {
p.batchWrites.Inc(1)
wg.Add(1)
p.workerPool.Go(func() {
defer wg.Done()
if err := p.writeBatch(ctx, t, dataBatch); err != nil {
p.logger.Error("error writing async batch",
zap.String("tenant", string(t)),
zap.Error(err))
}
})
}
}

func (p *promStorage) flushPendingQueues(ctx context.Context, wg *sync.WaitGroup, pendingQuery map[tenantKey]*WriteQueue) int {
numWrites := 0
for _, queue := range p.pendingQuery {
p.dlq.flush(p, ctx, wg, pendingQuery)
for _, queue := range pendingQuery {
if queue.Len() == 0 {
continue
}
Expand All @@ -239,7 +320,7 @@ func (p *promStorage) flushPendingQueues(ctx context.Context, wg *sync.WaitGroup
return numWrites
}

func (p *promStorage) writeLoop() {
func (p *promStorage) writeLoop(pendingQuery map[tenantKey]*WriteQueue) {
// This function ensures that all pending writes are flushed before returning.
ctxForWrites, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -257,35 +338,15 @@ func (p *promStorage) writeLoop() {
// This breaks out select instead of the for loop.
break
}
t := p.getTenant(query)
if _, ok := p.pendingQuery[t]; !ok {
p.droppedWrites.Inc(1)
p.logger.Error("no pre-defined tenant found, dropping it",
zap.String("tenant", string(t)),
zap.String("defaultTenant", p.opts.tenantDefault),
zap.String("timeseries", query.String()))
continue
}
if dataBatch := p.pendingQuery[t].Add(query); dataBatch != nil {
p.batchWrites.Inc(1)
wg.Add(1)
p.workerPool.Go(func() {
defer wg.Done()
if err := p.writeBatch(ctxForWrites, t, dataBatch); err != nil {
p.logger.Error("error writing async batch",
zap.String("tenant", string(t)),
zap.Error(err))
}
})
}
p.appendSample(ctxForWrites, &wg, pendingQuery, query)
break
case <-ticker.C:
p.flushPendingQueues(ctxForWrites, &wg)
p.flushPendingQueues(ctxForWrites, &wg, pendingQuery)
}
}
// At this point, `p.dataQueue` is drained and closed.
p.logger.Info("Draining pending per-tenant write queues")
numWrites := p.flushPendingQueues(ctxForWrites, &wg)
numWrites := p.flushPendingQueues(ctxForWrites, &wg, pendingQuery)
p.logger.Info("Waiting for all async pending writes to finish",
zap.Int("numWrites", numWrites))
// Block until all pending writes are flushed because we don't want to lose any data.
Expand All @@ -295,13 +356,13 @@ func (p *promStorage) writeLoop() {
p.writeLoopDone <- struct{}{}
}

func (p *promStorage) startAsync() {
func (p *promStorage) startAsync(pendingQuery map[tenantKey]*WriteQueue) {
p.logger.Info("Start prometheus remote write storage async job",
zap.Int("queueSize", p.opts.queueSize),
zap.Int("poolSize", p.opts.poolSize))
go func() {
p.logger.Info("Starting the write loop")
p.writeLoop()
p.writeLoop(pendingQuery)
}()
}

Expand Down Expand Up @@ -356,23 +417,33 @@ func (p *promStorage) Write(_ context.Context, query *storage.WriteQuery) error
queryCopy, err := storage.NewWriteQuery(deepCopy(query.Options()))
if err != nil {
p.droppedSamples.Inc(samples)
p.logger.Error("error copying write", zap.Error(err),
zap.String("write", query.String()))
return err
p.logger.Error("error copying write", zap.Error(err), zap.String("write", query.String()))
return nil
}
query = queryCopy
}
p.dataQueue <- query
// The data is enqueued successfully.
p.enqueuedSamples.Inc(samples)
p.inFlightSamples.Update(float64(p.inFlightSampleValue.Add(samples)))
p.dataQueueSize.Update(float64(len(p.dataQueue)))

select {
case p.dataQueue <- query:
// The data is enqueued successfully.
p.enqueuedSamples.Inc(samples)
p.inFlightSamples.Update(float64(p.inFlightSampleValue.Add(samples)))
p.dataQueueSize.Update(float64(len(p.dataQueue)))
case <-time.After(*p.opts.queueTimeout):
err := p.dlq.add(query)
if err != nil {
p.droppedSamples.Inc(samples)
if rand.Float32() < logSamplingRate {
p.logger.Error("error enqueue samples for prom remote write", zap.Error(err),
zap.String("data", query.String()))
}
}
}
return nil
}

func (p *promStorage) writeBatch(ctx context.Context, tenant tenantKey, queries []*storage.WriteQuery) error {
logSampling := rand.Float32()
if logSampling < logSamplingRate {
if rand.Float32() < logSamplingRate {
p.logger.Debug("async write batch",
zap.String("tenant", string(tenant)),
zap.Int("size", len(queries)))
Expand All @@ -388,7 +459,7 @@ func (p *promStorage) writeBatch(ctx context.Context, tenant tenantKey, queries
p.inFlightSamples.Update(float64(p.inFlightSampleValue.Add(-sampleCount)))
if err != nil {
p.errWrites.Inc(1)
p.droppedSamples.Inc(sampleCount)
p.failedSamples.Inc(sampleCount)
return err
}

Expand All @@ -399,7 +470,7 @@ func (p *promStorage) writeBatch(ctx context.Context, tenant tenantKey, queries
err = p.write(ctx, metrics, endpoint, tenant, bytes.NewReader(encoded))
if err != nil {
p.errWrites.Inc(1)
p.droppedSamples.Inc(sampleCount)
p.failedSamples.Inc(sampleCount)
} else {
p.writtenSamples.Inc(sampleCount)
}
Expand Down Expand Up @@ -472,9 +543,11 @@ func (p *promStorage) write(
err = nil
break
}
p.retryWrites.Inc(1)
time.Sleep(backoff)
backoff *= 2
if i > 0 { // only do backoff if we are going to retry
p.retryWrites.Inc(1)
time.Sleep(backoff)
backoff *= 2
}
}
methodDuration := time.Since(start)
metrics.RecordResponse(status, methodDuration)
Expand Down
Loading