diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml new file mode 100644 index 000000000000..33ade28d21ed --- /dev/null +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/elasticsearch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add data stream routing + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33794, 33756] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + `data_stream.dataset` and `data_stream.namespace` in attributes will be respected when config `*_dynamic_index.enabled` is true. + + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 206111c48c20..6c2db78b52da 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -14,7 +14,7 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). +This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). ## Configuration options @@ -83,39 +83,43 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export ### Elasticsearch document routing Telemetry data will be written to signal specific data streams by default: -logs to `logs-generic-default`, and traces to `traces-generic-default`. +logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`. This can be customised through the following settings: -- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to. +- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`. + - `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): - takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) - - `enabled`(default=false): Enable/Disable dynamic index for log records -- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. + +- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. + - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + +- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. -- `metrics_dynamic_index` (optional): - takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `metrics_index`. + +- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - - `enabled`(default=false): Enable/Disable dynamic index for metrics + - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. -- `traces_dynamic_index` (optional): - takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute) - - `enabled`(default=false): Enable/Disable dynamic index for trace spans -- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format. - - `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date, - e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`. - The last string appended belongs to the date when the data is being generated. + +- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. + - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + +- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. + - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, + e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`. + The last string appended belongs to the date when the data is being generated. - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing -behaviours, which may be configured throug the following settings: +behaviours, which may be configured through the following settings: - `mapping`: Events are encoded to JSON. The `mapping` allows users to configure additional mapping rules. @@ -142,7 +146,7 @@ behaviours, which may be configured throug the following settings: In ECS mapping mode, the Elastisearch Exporter attempts to map fields from [OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS]. -This mode may be used for compatibility with existing dashboards that work with with ECS. +This mode may be used for compatibility with existing dashboards that work with ECS. ### Elasticsearch ingest pipeline diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 987b13f807bb..369e885e22cc 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,43 +7,23 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + dataStreamDataset = "data_stream.dataset" + dataStreamNamespace = "data_stream.namespace" + dataStreamType = "data_stream.type" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" + defaultDataStreamTypeLogs = "logs" + defaultDataStreamTypeMetrics = "metrics" + defaultDataStreamTypeTraces = "traces" ) -// resource is higher priotized than record attribute -type attrGetter interface { - Attributes() pcommon.Map -} - -// retrieve attribute out of resource, scope, and record (span or log, if not found in resource) -// Deprecated: Use getFromAttributesNew instead. -func getFromAttributes(name string, resource, scope, record attrGetter) string { - var str string - val, exist := resource.Attributes().Get(name) - if !exist { - val, exist = scope.Attributes().Get(name) - if !exist { - val, exist = record.Attributes().Get(name) - if exist { - str = val.AsString() - } - } - if exist { - str = val.AsString() - } - } - if exist { - str = val.AsString() - } - return str -} - -func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string { +func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { for _, attributeMap := range attributeMaps { if value, exists := attributeMap.Get(name); exists { - return value.AsString() + return value.AsString(), true } } - return defaultValue + return defaultValue, false } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 8ca137118a0a..c409f175497e 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -57,12 +57,21 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"https://elastic.example.com:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"https://elastic.example.com:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "trace_index", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + }, + TracesIndex: "trace_index", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -110,12 +119,21 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "my_log_index", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "my_log_index", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -163,12 +181,21 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, MetricsIndex: "my_metric_index", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go new file mode 100644 index 000000000000..0368f6a1b958 --- /dev/null +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func( + pcommon.Map, + pcommon.Map, + pcommon.Map, + string, +) string { + return func( + recordAttr pcommon.Map, + scopeAttr pcommon.Map, + resourceAttr pcommon.Map, + fIndex string, + ) string { + // Order: + // 1. read data_stream.* from attributes + // 2. read elasticsearch.index.* from attributes + // 3. use default hardcoded data_stream.* + dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr) + namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr) + dataStreamMode := datasetExists || namespaceExists + if !dataStreamMode { + prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) + suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) + if prefixExists || suffixExists { + return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } + } + recordAttr.PutStr(dataStreamDataset, dataset) + recordAttr.PutStr(dataStreamNamespace, namespace) + recordAttr.PutStr(dataStreamType, defaultDSType) + return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace) + } +} + +// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes. +// This function may mutate record attributes. +func routeLogRecord( + record plog.LogRecord, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, + fIndex string, +) string { + route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) +} + +// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. +// This function may mutate record attributes. +func routeDataPoint( + dataPoint pmetric.NumberDataPoint, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, + fIndex string, +) string { + route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) +} + +// routeSpan returns the name of the index to send the span to according to data stream routing attributes. +// This function may mutate record attributes. +func routeSpan( + span ptrace.Span, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, + fIndex string, +) string { + route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 01103adfa12f..6cb64da0983d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -16,6 +16,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) type elasticsearchExporter struct { @@ -117,10 +119,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, record) - suffix := getFromAttributes(indexSuffix, resource, scope, record) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = routeLogRecord(record, scope, resource, fIndex) } if e.logstashFormat.Enabled { @@ -149,51 +148,84 @@ func (e *elasticsearchExporter) pushMetricsData( resourceMetric := resourceMetrics.At(i) resource := resourceMetric.Resource() scopeMetrics := resourceMetric.ScopeMetrics() - for j := 0; j < scopeMetrics.Len(); j++ { - scope := scopeMetrics.At(j).Scope() - metricSlice := scopeMetrics.At(j).Metrics() - if err := e.pushMetricSlice(ctx, resource, metricSlice, scope); err != nil { - if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + resourceDocs := make(map[string]map[uint32]objmodel.Document) + + for j := 0; j < scopeMetrics.Len(); j++ { + scopeMetrics := scopeMetrics.At(j) + scope := scopeMetrics.Scope() + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + + // We only support Sum and Gauge metrics at the moment. + var dataPoints pmetric.NumberDataPointSlice + switch metric.Type() { + case pmetric.MetricTypeSum: + dataPoints = metric.Sum().DataPoints() + case pmetric.MetricTypeGauge: + dataPoints = metric.Gauge().DataPoints() } - errs = append(errs, err) + for l := 0; l < dataPoints.Len(); l++ { + dataPoint := dataPoints.At(l) + fIndex, err := e.getMetricDataPointIndex(resource, scope, dataPoint) + if err != nil { + errs = append(errs, err) + continue + } + if _, ok := resourceDocs[fIndex]; !ok { + resourceDocs[fIndex] = make(map[uint32]objmodel.Document) + } + if err := e.model.upsertMetricDataPoint(resourceDocs[fIndex], resource, scope, metric, dataPoint); err != nil { + errs = append(errs, err) + } + } } + } + + for fIndex, docs := range resourceDocs { + for _, doc := range docs { + var ( + docBytes []byte + err error + ) + docBytes, err = e.model.encodeDocument(doc) + if err != nil { + errs = append(errs, err) + continue + } + if err := pushDocuments(ctx, fIndex, docBytes, e.bulkIndexer); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + } } } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushMetricSlice( - ctx context.Context, +func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, - slice pmetric.MetricSlice, scope pcommon.InstrumentationScope, -) error { + dataPoint pmetric.NumberDataPoint, +) (string, error) { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes()) - suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes()) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } - - documents, err := e.model.encodeMetrics(resource, slice, scope) - if err != nil { - return fmt.Errorf("failed to encode a metric event: %w", err) + fIndex = routeDataPoint(dataPoint, scope, resource, fIndex) } - for _, document := range documents { - err := pushDocuments(ctx, fIndex, document, e.bulkIndexer) + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { - return err + return "", err } + fIndex = formattedIndex } - - return nil + return fIndex, nil } func (e *elasticsearchExporter) pushTraceData( @@ -228,10 +260,7 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, span) - suffix := getFromAttributes(indexSuffix, resource, scope, span) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = routeSpan(span, scope, resource, fIndex) } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index d2fda9b9aeb9..754cfaa4675f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -10,7 +10,6 @@ import ( "fmt" "net/http" "runtime" - "strings" "sync" "sync/atomic" "testing" @@ -168,7 +167,7 @@ func TestExporterLogs(t *testing.T) { <-done }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -180,16 +179,8 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, create["_index"].(string)) + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -213,20 +204,40 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) + assert.Equal(t, "logs-record.dataset-resource.namespace", actionJSONToIndex(t, docs[0].Action)) - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) + return itemsAllOK(docs) + }) - create := jsonVal["create"].(map[string]any) - assert.Contains(t, create["_index"], "not-used-index") + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicIndex.Enabled = true + }) + logs := newLogsWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "record.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), "not-used-index") return itemsAllOK(docs) }) @@ -250,17 +261,8 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -469,6 +471,168 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(2) }) + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + expected := "resource.prefix-metrics.index-resource.suffix" + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + indexSuffix: "-data.point.suffix", + }, + map[string]string{ + indexPrefix: "resource.prefix-", + indexSuffix: "-resource.suffix", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + expected := "metrics-resource.dataset-data.point.namespace" + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + dataStreamNamespace: "data.point.namespace", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + + t.Run("publish with metrics grouping", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + cfg.Mapping.Mode = "ecs" + }) + + addToMetricSlice := func(metricSlice pmetric.MetricSlice) { + fooMetric := metricSlice.AppendEmpty() + fooMetric.SetName("metric.foo") + fooDps := fooMetric.SetEmptyGauge().DataPoints() + fooDp := fooDps.AppendEmpty() + fooDp.SetIntValue(1) + fooOtherDp := fooDps.AppendEmpty() + fillResourceAttributeMap(fooOtherDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + }) + fooOtherDp.SetDoubleValue(1.0) + + barMetric := metricSlice.AppendEmpty() + barMetric.SetName("metric.bar") + barDps := barMetric.SetEmptyGauge().DataPoints() + barDp := barDps.AppendEmpty() + barDp.SetDoubleValue(1.0) + barOtherDp := barDps.AppendEmpty() + fillResourceAttributeMap(barOtherDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + }) + barOtherDp.SetDoubleValue(1.0) + barOtherIndexDp := barDps.AppendEmpty() + fillResourceAttributeMap(barOtherIndexDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + dataStreamNamespace: "bar", + }) + barOtherIndexDp.SetDoubleValue(1.0) + + bazMetric := metricSlice.AppendEmpty() + bazMetric.SetName("metric.baz") + bazDps := bazMetric.SetEmptyGauge().DataPoints() + bazDp := bazDps.AppendEmpty() + bazDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) + bazDp.SetDoubleValue(1.0) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), map[string]string{ + dataStreamNamespace: "resource.namespace", + }) + scopeA := resourceMetrics.ScopeMetrics().AppendEmpty() + addToMetricSlice(scopeA.Metrics()) + + scopeB := resourceMetrics.ScopeMetrics().AppendEmpty() + fillResourceAttributeMap(scopeB.Scope().Attributes(), map[string]string{ + dataStreamDataset: "scope.b", + }) + addToMetricSlice(scopeB.Metrics()) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(8) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic-bar"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-bar"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) } func TestExporterTraces(t *testing.T) { @@ -486,7 +650,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -531,23 +695,43 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with logstash format index", func(t *testing.T) { - var defaultCfg Config + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) + expected := "traces-span.dataset-default" + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) + return itemsAllOK(docs) + }) - create := jsonVal["create"].(map[string]any) + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesDynamicIndex.Enabled = true + }) - assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.TracesIndex), true) + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "span.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + }, + )) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash format index", func(t *testing.T) { + var defaultCfg Config + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), defaultCfg.TracesIndex) return itemsAllOK(docs) }) @@ -574,17 +758,8 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -776,3 +951,14 @@ type roundTripperFunc func(*http.Request) (*http.Response, error) func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { + action := struct { + Create struct { + Index string `json:"_index"` + } `json:"create"` + }{} + err := json.Unmarshal(actionJSON, &action) + require.NoError(t, err) + return action.Create.Index +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index cc2e27421f39..7826fb59a47e 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -49,8 +50,17 @@ func createDefaultConfig() component.Config { ClientConfig: httpClientConfig, Index: "", LogsIndex: defaultLogsIndex, - MetricsIndex: defaultMetricsIndex, - TracesIndex: defaultTracesIndex, + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, + MetricsIndex: defaultMetricsIndex, + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + }, + TracesIndex: defaultTracesIndex, + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + }, Retry: RetrySettings{ Enabled: true, MaxRequests: 3, @@ -104,6 +114,7 @@ func createLogsExporter( set, cfg, exporter.pushLogsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -127,6 +138,7 @@ func createMetricsExporter( set, cfg, exporter.pushMetricsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -149,6 +161,7 @@ func createTracesExporter(ctx context.Context, set, cfg, exporter.pushTraceData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index bbff538ff408..9c49a61022cb 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -17,6 +17,7 @@ require ( go.opentelemetry.io/collector/config/confighttp v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 go.opentelemetry.io/collector/confmap v0.103.0 + go.opentelemetry.io/collector/consumer v0.103.0 go.opentelemetry.io/collector/exporter v0.103.0 go.opentelemetry.io/collector/extension/auth v0.103.0 go.opentelemetry.io/collector/pdata v1.10.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/config/configtls v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect - go.opentelemetry.io/collector/consumer v0.103.0 // indirect go.opentelemetry.io/collector/extension v0.103.0 // indirect go.opentelemetry.io/collector/featuregate v1.10.0 // indirect go.opentelemetry.io/collector/receiver v0.103.0 // indirect diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 626b8f566e1f..6df90e214d78 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -64,8 +64,9 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) - encodeMetrics(pcommon.Resource, pmetric.MetricSlice, pcommon.InstrumentationScope) ([][]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) + upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error + encodeDocument(objmodel.Document) ([]byte, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -169,123 +170,42 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, metrics pmetric.MetricSlice, _ pcommon.InstrumentationScope) ([][]byte, error) { - var baseDoc objmodel.Document - - baseDoc.AddAttributes("", resource.Attributes()) - - // Put all metrics that have the same attributes and timestamp in one document. - docs := map[uint32]*objmodel.Document{} - for i := 0; i < metrics.Len(); i++ { - metric := metrics.At(i) - - var dps pmetric.NumberDataPointSlice - - // Only Gauge and Sum metric types are supported at the moment. - switch metric.Type() { - case pmetric.MetricTypeGauge: - dps = metric.Gauge().DataPoints() - case pmetric.MetricTypeSum: - dps = metric.Sum().DataPoints() - } - - for j := 0; j < dps.Len(); j++ { - dp := dps.At(j) - - hash := metricHash(dp.Timestamp(), dp.Attributes()) - doc, docExists := docs[hash] - if !docExists { - doc = baseDoc.Clone() - doc.AddTimestamp("@timestamp", dp.Timestamp()) - doc.AddAttributes("", dp.Attributes()) - - docs[hash] = doc - } - - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - doc.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) - case pmetric.NumberDataPointValueTypeInt: - doc.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) - } - } +func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { + if m.dedup { + document.Dedup() + } else if m.dedot { + document.Sort() } - res := make([][]byte, 0, len(docs)) - - for _, doc := range docs { - if m.dedup { - doc.Dedup() - } else if m.dedot { - doc.Sort() - } - - var buf bytes.Buffer - err := doc.Serialize(&buf, m.dedot) - if err != nil { - return nil, err - } - - res = append(res, buf.Bytes()) + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot) + if err != nil { + return nil, err } - - return res, nil -} - -func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { - hasher := fnv.New32a() - - timestampBuf := make([]byte, 8) - binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) - hasher.Write(timestampBuf) - - mapHash(hasher, attributes) - - return hasher.Sum32() + return buf.Bytes(), nil } -func mapHash(hasher hash.Hash, m pcommon.Map) { - m.Range(func(k string, v pcommon.Value) bool { - hasher.Write([]byte(k)) - valueHash(hasher, v) - - return true - }) -} - -func valueHash(h hash.Hash, v pcommon.Value) { - switch v.Type() { - case pcommon.ValueTypeEmpty: - h.Write([]byte{0}) - case pcommon.ValueTypeStr: - h.Write([]byte(v.Str())) - case pcommon.ValueTypeBool: - if v.Bool() { - h.Write([]byte{1}) - } else { - h.Write([]byte{0}) - } - case pcommon.ValueTypeDouble: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double())) - h.Write(buf) - case pcommon.ValueTypeInt: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(v.Int())) - h.Write(buf) - case pcommon.ValueTypeBytes: - h.Write(v.Bytes().AsRaw()) - case pcommon.ValueTypeMap: - mapHash(h, v.Map()) - case pcommon.ValueTypeSlice: - sliceHash(h, v.Slice()) +func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error { + hash := metricHash(dp.Timestamp(), dp.Attributes()) + var ( + document objmodel.Document + ok bool + ) + if document, ok = documents[hash]; !ok { + document.AddAttributes("", resource.Attributes()) + document.AddTimestamp("@timestamp", dp.Timestamp()) + document.AddAttributes("", dp.Attributes()) } -} -func sliceHash(h hash.Hash, s pcommon.Slice) { - for i := 0; i < s.Len(); i++ { - valueHash(h, s.At(i)) + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) + case pmetric.NumberDataPointValueTypeInt: + document.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) } + + documents[hash] = document + return nil } func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { @@ -479,3 +399,60 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor document.AddTimestamp("@timestamp", record.ObservedTimestamp()) } + +// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity +func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { + hasher := fnv.New32a() + + timestampBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) + hasher.Write(timestampBuf) + + mapHash(hasher, attributes) + + return hasher.Sum32() +} + +func mapHash(hasher hash.Hash, m pcommon.Map) { + m.Range(func(k string, v pcommon.Value) bool { + hasher.Write([]byte(k)) + valueHash(hasher, v) + + return true + }) +} + +func valueHash(h hash.Hash, v pcommon.Value) { + switch v.Type() { + case pcommon.ValueTypeEmpty: + h.Write([]byte{0}) + case pcommon.ValueTypeStr: + h.Write([]byte(v.Str())) + case pcommon.ValueTypeBool: + if v.Bool() { + h.Write([]byte{1}) + } else { + h.Write([]byte{0}) + } + case pcommon.ValueTypeDouble: + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double())) + h.Write(buf) + case pcommon.ValueTypeInt: + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(v.Int())) + h.Write(buf) + case pcommon.ValueTypeBytes: + h.Write(v.Bytes().AsRaw()) + case pcommon.ValueTypeMap: + mapHash(h, v.Map()) + case pcommon.ValueTypeSlice: + sliceHash(h, v.Slice()) + } +} + +func sliceHash(h hash.Hash, s pcommon.Slice) { + for i := 0; i < s.Len(); i++ { + valueHash(h, s.At(i)) + } +} diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 26199ba8cca9..675f78af83f7 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -90,21 +90,40 @@ func TestEncodeMetric(t *testing.T) { model := &encodeModel{ dedot: true, dedup: true, - mode: MappingNone, + mode: MappingECS, + } + + docs := make(map[uint32]objmodel.Document) + + var docsBytes [][]byte + for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { + err := model.upsertMetricDataPoint(docs, + metrics.ResourceMetrics().At(0).Resource(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + require.NoError(t, err) + } + + for _, doc := range docs { + bytes, err := model.encodeDocument(doc) + require.NoError(t, err) + docsBytes = append(docsBytes, bytes) } - docsBytes, err := model.encodeMetrics(metrics.ResourceMetrics().At(0).Resource(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope()) + allDocsSorted := docBytesToSortedString(docsBytes) + assert.Equal(t, expectedMetricsEncoded, allDocsSorted) +} + +func docBytesToSortedString(docsBytes [][]byte) string { // Convert the byte arrays to strings and sort the docs to make the test deterministic. - require.NoError(t, err) - docs := make([]string, 0, len(docsBytes)) - for _, docBytes := range docsBytes { - docs = append(docs, string(docBytes)) + docs := make([]string, len(docsBytes)) + for i, docBytes := range docsBytes { + docs[i] = string(docBytes) } sort.Strings(docs) allDocsSorted := strings.Join(docs, "\n") - - // Test that the result matches the expected value. - assert.Equal(t, expectedMetricsEncoded, allDocsSorted) + return allDocsSorted } func createTestMetrics(t *testing.T) pmetric.Metrics { diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index de3d60418b24..f57f16272c24 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -10,6 +10,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "slices" "strings" "sync" "testing" @@ -18,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -26,6 +28,29 @@ type itemRequest struct { Document json.RawMessage } +func itemRequestsSortFunc(a, b itemRequest) int { + comp := bytes.Compare(a.Action, b.Action) + if comp == 0 { + return bytes.Compare(a.Document, b.Document) + } + return comp +} + +func assertItemsEqual(t *testing.T, expected, actual []itemRequest, assertOrder bool) { + expectedItems := expected + actualItems := actual + if !assertOrder { + // Make copies to avoid mutating the args + expectedItems = make([]itemRequest, len(expected)) + copy(expectedItems, expected) + slices.SortFunc(expectedItems, itemRequestsSortFunc) + actualItems = make([]itemRequest, len(actual)) + copy(actualItems, actual) + slices.SortFunc(actualItems, itemRequestsSortFunc) + } + assert.Equal(t, expectedItems, actualItems) +} + type itemResponse struct { Status int `json:"status"` } @@ -236,6 +261,16 @@ func newLogsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[stri return logs } +func newMetricsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) pmetric.Metrics { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + + fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), resMp) + fillResourceAttributeMap(resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes(), attrMp) + + return metrics +} + func newTracesWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) ptrace.Traces { traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans()