Skip to content

Commit

Permalink
[exporter/elasticsearch] Data stream routing based on data_stream.*
Browse files Browse the repository at this point in the history
… attributes (open-telemetry#33794)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Taking over from open-telemetry#33755
- Add data stream routing based on `data_stream.*` attributes
- Refine metrics grouping to work with DS routing

**Link to tracking Issue:**
Closes open-telemetry#33755 
Fixes open-telemetry#33756

**Testing:** <Describe what testing was performed and which tests were
added.>

See unit tests

**Documentation:** <Describe the documentation added.>

Updated readme

---------

Co-authored-by: Andrzej Stencel <[email protected]>
Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
3 people authored Jul 1, 2024
1 parent cbc04f3 commit 0814644
Show file tree
Hide file tree
Showing 12 changed files with 655 additions and 274 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data stream routing

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33794, 33756]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
`data_stream.dataset` and `data_stream.namespace` in attributes will be respected when config `*_dynamic_index.enabled` is true.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
48 changes: 26 additions & 22 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

Expand Down Expand Up @@ -83,39 +83,43 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
### Elasticsearch document routing

Telemetry data will be written to signal specific data streams by default:
logs to `logs-generic-default`, and traces to `traces-generic-default`.
logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`.
This can be customised through the following settings:

- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to.
- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to.
The default value is `logs-generic-default`.

- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`
- `logs_dynamic_index` (optional):
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.

- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.

- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.

- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

### Elasticsearch document mapping

The Elasticsearch exporter supports several document schemas and preprocessing
behaviours, which may be configured throug the following settings:
behaviours, which may be configured through the following settings:

- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
Expand All @@ -142,7 +146,7 @@ behaviours, which may be configured throug the following settings:

In ECS mapping mode, the Elastisearch Exporter attempts to map fields from
[OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS].
This mode may be used for compatibility with existing dashboards that work with with ECS.
This mode may be used for compatibility with existing dashboards that work with ECS.

### Elasticsearch ingest pipeline

Expand Down
46 changes: 13 additions & 33 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,23 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
)

// resource is higher priotized than record attribute
type attrGetter interface {
Attributes() pcommon.Map
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// Deprecated: Use getFromAttributesNew instead.
func getFromAttributes(name string, resource, scope, record attrGetter) string {
var str string
val, exist := resource.Attributes().Get(name)
if !exist {
val, exist = scope.Attributes().Get(name)
if !exist {
val, exist = record.Attributes().Get(name)
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
return str
}

func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string {
func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {
for _, attributeMap := range attributeMaps {
if value, exists := attributeMap.Get(name); exists {
return value.AsString()
return value.AsString(), true
}
}
return defaultValue
return defaultValue, false
}
57 changes: 42 additions & 15 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "trace_index",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -110,12 +119,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -163,12 +181,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func(
pcommon.Map,
pcommon.Map,
pcommon.Map,
string,
) string {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
) string {
// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}
}
recordAttr.PutStr(dataStreamDataset, dataset)
recordAttr.PutStr(dataStreamNamespace, namespace)
recordAttr.PutStr(dataStreamType, defaultDSType)
return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace)
}
}

// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes.
// This function may mutate record attributes.
func routeLogRecord(
record plog.LogRecord,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// This function may mutate record attributes.
func routeDataPoint(
dataPoint pmetric.NumberDataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpan(
span ptrace.Span,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}
Loading

0 comments on commit 0814644

Please sign in to comment.