-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
query-frontend: Allow for multiple topics in readConsistencyRoundTripper #10220
Conversation
4caad1c
to
6591673
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, LGTM!
Please rebase with main
. I've done some small changes to reader metrics, but shouldn't be an issue to merge with your changes.
pkg/mimir/modules.go
Outdated
return t.QueryFrontendTopicOffsetsReader, nil | ||
ingestTopicOffsetsReader := ingest.NewTopicOffsetsReader(kafkaClient, t.Cfg.IngestStorage.KafkaConfig.Topic, getPartitionIDs, t.Cfg.IngestStorage.KafkaConfig.LastProducedOffsetPollInterval, t.Registerer, util_log.Logger) | ||
|
||
t.QueryFrontendTopicOffsetsReaders = map[string]*ingest.TopicOffsetsReader{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how you will integrate in GEM, but I'm wondering if we should check if the t.QueryFrontendTopicOffsetsReaders
already exists and, if so, just add the entry to it, so that we don't overwrite the map in case GEM initialization happens earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to add the new aggregations topic offsets reader module as dependent on this QueryFrontendTopicOffsetsReaders
module, but it does seem safer to do the check you suggested - added in 68bc274.
pkg/storage/ingest/reader.go
Outdated
|
||
r.Service = services.NewBasicService(r.start, r.run, r.stop) | ||
return r, nil | ||
} | ||
|
||
func (r *PartitionReader) Topic() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Is this ever used? I can't see it on a first sight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct - accidentally left this from an abandoned attempt at adding the topic label to the read consistency metrics. Removed in d3c9d1d!
5e0a00c
to
d3c9d1d
Compare
Thanks for the review @pracucci! |
What this PR does
Adjust
readConsistencyRoundTripper
to take a map of header keys to offsets readers, allowing it to support the tracking of the produced offsets of multiple topics.Which issue(s) this PR fixes or relates to
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.