diff --git a/README.md b/README.md index 4d192a3..71a54bc 100644 --- a/README.md +++ b/README.md @@ -286,6 +286,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu - `type`: metric type (required) - `desc`: description of this metric (required) - `key`: key name of record for instrumentation (**optional**) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) If key is empty, the metric values is treated as 1, so the counter increments by 1 on each record regardless of contents of the record. @@ -310,6 +312,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `type`: metric type (required) - `desc`: description of metric (required) - `key`: key name of record for instrumentation (required) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) ### summary type @@ -332,6 +336,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `type`: metric type (required) - `desc`: description of metric (required) - `key`: key name of record for instrumentation (required) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) ### histogram type @@ -356,6 +362,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `desc`: description of metric (required) - `key`: key name of record for instrumentation (required) - `buckets`: buckets of record for instrumentation (optional) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) ## Labels @@ -430,6 +438,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels. +## Retention + +By default metrics with all encountered label combinations are preserved until the next restart of fluentd. +Even if a label combination did not receive any update for a long time. +That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle. +For these metrics you can set `retention` and `retention_check_interval` like this: + +``` + + name message_foo_counter + type counter + desc The total number of foo in message. + key foo + retention 3600 # 1h + retention_check_interval 1800 # 30m + + bar ${bar} + + +``` + +If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric +`foo{bar="baz"}` might be removed. +When this actually happens depends on `retention_check_interval` (default 60). +It causes a background thread to check every 30 minutes for expired metrics. +So worst case the metrics are removed 30 minutes after expiration. +You can set this value as low as `1`, but that may put more stress on your CPU. ## Try plugin with nginx diff --git a/lib/fluent/plugin/filter_prometheus.rb b/lib/fluent/plugin/filter_prometheus.rb index ccdfe78..0c47c05 100644 --- a/lib/fluent/plugin/filter_prometheus.rb +++ b/lib/fluent/plugin/filter_prometheus.rb @@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter include Fluent::Plugin::PrometheusLabelParser include Fluent::Plugin::Prometheus + helpers :thread + def initialize super @registry = ::Prometheus::Client.registry @@ -22,6 +24,17 @@ def configure(conf) @metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels) end + def start + super + Fluent::Plugin::Prometheus.start_retention_threads( + @metrics, + @registry, + method(:thread_create), + method(:thread_current_running?), + @log + ) + end + def filter(tag, time, record) instrument_single(tag, time, record, @metrics) record diff --git a/lib/fluent/plugin/out_prometheus.rb b/lib/fluent/plugin/out_prometheus.rb index cdaae4d..ceb1fe7 100644 --- a/lib/fluent/plugin/out_prometheus.rb +++ b/lib/fluent/plugin/out_prometheus.rb @@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output include Fluent::Plugin::PrometheusLabelParser include Fluent::Plugin::Prometheus + helpers :thread + def initialize super @registry = ::Prometheus::Client.registry @@ -22,6 +24,17 @@ def configure(conf) @metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels) end + def start + super + Fluent::Plugin::Prometheus.start_retention_threads( + @metrics, + @registry, + method(:thread_create), + method(:thread_current_running?), + @log + ) + end + def process(tag, es) instrument(tag, es, @metrics) end diff --git a/lib/fluent/plugin/prometheus.rb b/lib/fluent/plugin/prometheus.rb index 45d0b44..cf23d35 100644 --- a/lib/fluent/plugin/prometheus.rb +++ b/lib/fluent/plugin/prometheus.rb @@ -1,6 +1,7 @@ require 'prometheus/client' require 'prometheus/client/formats/text' require 'fluent/plugin/prometheus/placeholder_expander' +require 'fluent/plugin/prometheus/data_store' module Fluent module Plugin @@ -81,6 +82,17 @@ def self.parse_metrics_elements(conf, registry, labels = {}) metrics end + def self.start_retention_threads(metrics, registry, thread_create, thread_running, log) + metrics.select { |metric| metric.has_retention? }.each do |metric| + thread_create.call("prometheus_retention_#{metric.name}".to_sym) do + while thread_running.call() + metric.remove_expired_metrics(registry, log) + sleep(metric.retention_check_interval) + end + end + end + end + def self.placeholder_expander(log) Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log) end @@ -97,6 +109,11 @@ def stringify_keys(hash_to_stringify) end.to_h end + def initialize + super + ::Prometheus::Client.config.data_store = Fluent::Plugin::Prometheus::DataStore.new + end + def configure(conf) super @placeholder_values = {} @@ -151,6 +168,8 @@ class Metric attr_reader :name attr_reader :key attr_reader :desc + attr_reader :retention + attr_reader :retention_check_interval def initialize(element, registry, labels) ['name', 'desc'].each do |key| @@ -162,6 +181,11 @@ def initialize(element, registry, labels) @name = element['name'] @key = element['key'] @desc = element['desc'] + @retention = element['retention'].to_i + @retention_check_interval = element.fetch('retention_check_interval', 60).to_i + if has_retention? + @last_modified_store = LastModifiedStore.new + end @base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element) @base_labels = labels.merge(@base_labels) @@ -192,6 +216,74 @@ def self.get(registry, name, type, docstring) metric end + + def set_value?(value) + if value + return true + end + false + end + + def instrument(record, expander) + value = self.value(record) + if self.set_value?(value) + labels = labels(record, expander) + set_value(value, labels) + if has_retention? + @last_modified_store.set_last_updated(labels) + end + end + end + + def has_retention? + @retention > 0 + end + + def remove_expired_metrics(registry, log) + if has_retention? + metric = registry.get(@name) + + expiration_time = Time.now - @retention + expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time) + + expired_label_sets.each { |expired_label_set| + log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..." + metric.remove(expired_label_set) # this method is supplied by the require at the top of this method + @last_modified_store.remove(expired_label_set) + } + else + log.warn('remove_expired_metrics should not be called when retention is not set for this metric!') + end + end + + class LastModifiedStore + def initialize + @internal_store = Hash.new + @lock = Monitor.new + end + + def synchronize + @lock.synchronize { yield } + end + + def set_last_updated(labels) + synchronize do + @internal_store[labels] = Time.now + end + end + + def remove(labels) + synchronize do + @internal_store.delete(labels) + end + end + + def get_labels_not_modified_since(time) + synchronize do + @internal_store.select { |k, v| v < time }.keys + end + end + end end class Gauge < Metric @@ -208,16 +300,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @gauge.set(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @gauge.set(value, labels: labels) + end end class Counter < Metric @@ -230,20 +323,22 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) - # use record value of the key if key is specified, otherwise just increment + def value(record) if @key.nil? - value = 1 + 1 elsif @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) + @key.call(record) end + end - # ignore if record value is nil - return if value.nil? + def set_value?(value) + !value.nil? + end - @counter.increment(by: value, labels: labels(record, expander)) + def set_value(value, labels) + @counter.increment(by: value, labels: labels) end end @@ -261,16 +356,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @summary.observe(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @summary.observe(value, labels: labels) + end end class Histogram < Metric @@ -294,16 +390,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @histogram.observe(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @histogram.observe(value, labels: labels) + end end end end diff --git a/lib/fluent/plugin/prometheus/data_store.rb b/lib/fluent/plugin/prometheus/data_store.rb new file mode 100644 index 0000000..d89d4d7 --- /dev/null +++ b/lib/fluent/plugin/prometheus/data_store.rb @@ -0,0 +1,82 @@ +# The default Prometheus client data store has no means of removing values. +# For the "retention" feature we need to be able to remove metrics with specific labels after some time of inactivity. +# By patching the Metric class and using our own DataStore we implement that missing feature. +module Prometheus + module Client + class Metric + def remove(labels) + label_set = label_set_for(labels) + @store.remove(labels: label_set) + end + end + end +end + +module Fluent + module Plugin + module Prometheus + # Stores all the data in simple hashes, one per metric. Each of these metrics + # synchronizes access to their hash, but multiple metrics can run observations + # concurrently. + class DataStore + class InvalidStoreSettingsError < StandardError; end + + def for_metric(metric_name, metric_type:, metric_settings: {}) + # We don't need `metric_type` or `metric_settings` for this particular store + validate_metric_settings(metric_settings: metric_settings) + MetricStore.new + end + + private + + def validate_metric_settings(metric_settings:) + unless metric_settings.empty? + raise InvalidStoreSettingsError, + "Synchronized doesn't allow any metric_settings" + end + end + + class MetricStore + def initialize + @internal_store = Hash.new { |hash, key| hash[key] = 0.0 } + @lock = Monitor.new + end + + def synchronize + @lock.synchronize { yield } + end + + def set(labels:, val:) + synchronize do + @internal_store[labels] = val.to_f + end + end + + def increment(labels:, by: 1) + synchronize do + @internal_store[labels] += by + end + end + + def get(labels:) + synchronize do + @internal_store[labels] + end + end + + def remove(labels:) + synchronize do + @internal_store.delete(labels) + end + end + + def all_values + synchronize { @internal_store.dup } + end + end + + private_constant :MetricStore + end + end + end +end diff --git a/spec/fluent/plugin/filter_prometheus_spec.rb b/spec/fluent/plugin/filter_prometheus_spec.rb index f98c8c6..2ae2209 100644 --- a/spec/fluent/plugin/filter_prometheus_spec.rb +++ b/spec/fluent/plugin/filter_prometheus_spec.rb @@ -45,4 +45,38 @@ it_behaves_like 'instruments record' end + + describe '#run with retention' do + let(:message) { { "foo" => 100, "bar" => 100, "baz" => 100, "qux" => 10 } } + + context 'config with retention 1' do + let(:config) { + BASE_CONFIG + %( + + name simple + type counter + desc Something foo. + key foo + + bar ${bar} + baz ${baz} + qux ${qux} + + retention 1 + retention_check_interval 1 + + ) + } + + it 'expires metric after max 2s' do + expect(registry.metrics.map(&:name)).not_to eq([:simple]) + driver.run(default_tag: tag) { + driver.feed(event_time, message) + expect(registry.metrics[0].get(labels: { :bar => 100, :baz => 100, :qux => 10 })).to eq(100) + sleep(2) + expect(registry.metrics[0].get(labels: { :bar => 100, :baz => 100, :qux => 10 })).to eq(0.0) + } + end + end + end end diff --git a/spec/fluent/plugin/out_prometheus_spec.rb b/spec/fluent/plugin/out_prometheus_spec.rb index 111fc00..170491d 100644 --- a/spec/fluent/plugin/out_prometheus_spec.rb +++ b/spec/fluent/plugin/out_prometheus_spec.rb @@ -65,4 +65,39 @@ it_behaves_like 'instruments record' end + + describe '#run with retention' do + let(:message) { { "foo" => 100, "bar" => 100, "baz" => 100, "qux" => 10 } } + let(:labels) { { :bar => 100, :baz => 100, :qux => 10 } } + + context 'config with retention 1' do + let(:config) { + BASE_CONFIG + %( + + name simple + type counter + desc Something foo. + key foo + + bar ${bar} + baz ${baz} + qux ${qux} + + retention 1 + retention_check_interval 1 + + ) + } + + it 'expires metric after max 2s' do + expect(registry.metrics.map(&:name)).not_to eq([:simple]) + driver.run(default_tag: tag) { + driver.feed(event_time, message) + expect(registry.metrics[0].get(labels: labels)).to eq(100) + sleep(2) + expect(registry.metrics[0].get(labels: labels)).to eq(0.0) + } + end + end + end end