From c52a9a7ae1563127254530d416a8c569c54750bd Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 13 Sep 2024 15:28:28 -0700 Subject: [PATCH 1/9] [chore] Use assert/assert instead of require/assert (#35182) --- .../receiver_windows_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/receiver/windowseventlogreceiver/receiver_windows_test.go b/receiver/windowseventlogreceiver/receiver_windows_test.go index b4e3653864bb..531c53030021 100644 --- a/receiver/windowseventlogreceiver/receiver_windows_test.go +++ b/receiver/windowseventlogreceiver/receiver_windows_test.go @@ -152,7 +152,8 @@ func TestReadWindowsEventLogger(t *testing.T) { err = logger.Info(10, logMessage) require.NoError(t, err) - records := requireExpectedLogRecords(t, sink, src, 1) + records := assertExpectedLogRecords(t, sink, src, 1) + require.Len(t, records, 1) record := records[0] body := record.Body().Map().AsRaw() @@ -205,7 +206,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) { err = logger.Info(10, logMessage) require.NoError(t, err) - records := requireExpectedLogRecords(t, sink, src, 1) + records := assertExpectedLogRecords(t, sink, src, 1) + require.Len(t, records, 1) record := records[0] body := record.Body().AsString() bodyStruct := struct { @@ -272,8 +274,8 @@ func TestExcludeProvider(t *testing.T) { require.NoError(t, err) } - records := requireExpectedLogRecords(t, sink, notExcludedSrc, 1) - assert.NotEmpty(t, records) + records := assertExpectedLogRecords(t, sink, notExcludedSrc, 1) + assert.Len(t, records, 1) records = filterAllLogRecordsBySource(t, sink, excludedSrc) assert.Empty(t, records) @@ -319,13 +321,13 @@ func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSour return } -func requireExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord { +func assertExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord { var actualLogRecords []plog.LogRecord // logs sometimes take a while to be written, so a substantial wait buffer is needed - require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.EventuallyWithT(t, func(c *assert.CollectT) { actualLogRecords = filterAllLogRecordsBySource(t, sink, expectedEventSrc) - require.Len(c, actualLogRecords, expectedEventCount) + assert.Len(c, actualLogRecords, expectedEventCount) }, 10*time.Second, 250*time.Millisecond) return actualLogRecords From 01c66516afe7328aca8ca3dbb6a55075830f3693 Mon Sep 17 00:00:00 2001 From: Harshith Mente <109957201+joeyyy09@users.noreply.github.com> Date: Mon, 16 Sep 2024 14:50:09 +0530 Subject: [PATCH 2/9] [receiver/kafkareceiver] Add otlp_json support in kafka receiver (#34840) **Description:** The current features dont support otlp_json in receivers/kafkareceivers. Add support for otlp_json which accepts json formated for Otel Collector kafka receiver **Link to tracking Issue:** #33627 **Testing:** Added test files for the same. **Documentation:** --------- Signed-off-by: joeyyy09 Co-authored-by: Ziqi Zhao Co-authored-by: Yuri Shkuro --- .chloggen/otlp_logs.yaml | 29 ++++++++++++++++++++++ receiver/kafkareceiver/README.md | 1 + receiver/kafkareceiver/unmarshaler.go | 12 +++++---- receiver/kafkareceiver/unmarshaler_test.go | 3 +++ 4 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 .chloggen/otlp_logs.yaml diff --git a/.chloggen/otlp_logs.yaml b/.chloggen/otlp_logs.yaml new file mode 100644 index 000000000000..239dacb1b21e --- /dev/null +++ b/.chloggen/otlp_logs.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for `otlp_json` encoding to Kafka receiver. The payload is deserialized into OpenTelemetry traces using JSON format." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33627] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This encoding allows the Kafka receiver to handle trace data in JSON format, + enabling integration with systems that export traces as JSON-encoded data. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 98d8e56377f8..38c014fe8937 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,6 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. + - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index dbf029e63563..793848d94c8f 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -17,7 +17,6 @@ import ( type TracesUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (ptrace.Traces, error) - // Encoding of the serialized messages. Encoding() string } @@ -26,7 +25,6 @@ type TracesUnmarshaler interface { type MetricsUnmarshaler interface { // Unmarshal deserializes the message body into traces Unmarshal([]byte) (pmetric.Metrics, error) - // Encoding of the serialized messages Encoding() string } @@ -35,14 +33,12 @@ type MetricsUnmarshaler interface { type LogsUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (plog.Logs, error) - // Encoding of the serialized messages. Encoding() string } type LogsUnmarshalerWithEnc interface { LogsUnmarshaler - // WithEnc sets the character encoding (UTF-8, GBK, etc.) of the unmarshaler. WithEnc(string) (LogsUnmarshalerWithEnc, error) } @@ -50,6 +46,7 @@ type LogsUnmarshalerWithEnc interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, "otlp_json") jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -57,6 +54,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -67,20 +65,24 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, "otlp_json") raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, raw.Encoding(): raw, text.Encoding(): text, json.Encoding(): json, diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index fd1f998ee0a7..bb86ab8dfcd5 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -14,6 +14,7 @@ import ( func TestDefaultTracesUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "jaeger_proto", "jaeger_json", "zipkin_proto", @@ -34,6 +35,7 @@ func TestDefaultTracesUnMarshaler(t *testing.T) { func TestDefaultMetricsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultMetricsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers)) @@ -49,6 +51,7 @@ func TestDefaultMetricsUnMarshaler(t *testing.T) { func TestDefaultLogsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "raw", "text", "json", From 8243d20aa980e441975719dbd5978e7a5943a2c4 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Mon, 16 Sep 2024 05:33:29 -0400 Subject: [PATCH 3/9] feat(datadogexporter): adds remap_metrics feature gate (#35025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Adds a metrics configuration that enables/disables the conversion of OpenTelemetry metrics to Datadog semantics in the Datadog Exporter. This conversion will soon occur in a datadog semantic processor. **Link to tracking Issue:** N/A **Testing:** Unit tests *̶*̶D̶o̶c̶u̶m̶e̶n̶t̶a̶t̶i̶o̶n̶:̶*̶*̶ O̶n̶c̶e̶ t̶h̶i̶s̶ c̶o̶n̶f̶i̶g̶u̶r̶a̶t̶i̶o̶n̶ i̶s̶ G̶A̶ w̶e̶ w̶i̶l̶l̶ u̶p̶d̶a̶t̶e̶ t̶h̶e̶ d̶o̶c̶s̶ h̶e̶r̶e̶:̶ h̶t̶t̶p̶s̶:̶//d̶o̶c̶s̶.d̶a̶t̶a̶d̶o̶g̶h̶q̶.c̶o̶m̶/o̶p̶e̶n̶t̶e̶l̶e̶m̶e̶t̶r̶y̶/c̶o̶l̶l̶e̶c̶t̶o̶r̶_̶e̶x̶p̶o̶r̶t̶e̶r̶/c̶o̶n̶f̶i̶g̶u̶r̶a̶t̶i̶o̶n̶/ --------- Co-authored-by: Yang Song Co-authored-by: Pablo Baeyens --- .chloggen/munir_add-option-to-avoid-remapping.yaml | 13 +++++++++++++ exporter/datadogexporter/factory.go | 12 ++++++++++++ exporter/datadogexporter/metrics_exporter.go | 7 ++++++- 3 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 .chloggen/munir_add-option-to-avoid-remapping.yaml diff --git a/.chloggen/munir_add-option-to-avoid-remapping.yaml b/.chloggen/munir_add-option-to-avoid-remapping.yaml new file mode 100644 index 000000000000..35efaef6a6fd --- /dev/null +++ b/.chloggen/munir_add-option-to-avoid-remapping.yaml @@ -0,0 +1,13 @@ +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadogexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds exporter.datadogexporter.metricremappingdisabled featuregate which disables renaming OpenTelemetry metrics to match Datadog semantics. This feature gate is only for internal use." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35025] + +# Optional: A list of users who contributed to the change. This is used to generate the list of contributors in the changelog. +change_logs: [] diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 0a2381cda977..ec13fbbd63d0 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -53,6 +53,13 @@ var metricExportNativeClientFeatureGate = featuregate.GlobalRegistry().MustRegis featuregate.WithRegisterDescription("When enabled, metric export in datadogexporter uses native Datadog client APIs instead of Zorkian APIs."), ) +var metricRemappingDisableddFeatureGate = featuregate.GlobalRegistry().MustRegister( + "exporter.datadogexporter.metricremappingdisabled", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled the Datadog Exporter remaps OpenTelemetry semantic conventions to Datadog semantic conventions. This feature gate is only for internal use."), + featuregate.WithRegisterReferenceURL("https://docs.datadoghq.com/opentelemetry/schema_semantics/metrics_mapping/"), +) + // noAPMStatsFeatureGate causes the trace consumer to skip APM stats computation. var noAPMStatsFeatureGate = featuregate.GlobalRegistry().MustRegister( "exporter.datadogexporter.DisableAPMStats", @@ -65,6 +72,11 @@ func isMetricExportV2Enabled() bool { return metricExportNativeClientFeatureGate.IsEnabled() } +// isMetricRemappingDisabled returns true if the datadogexporter should generate Datadog-compliant metrics from OpenTelemetry metrics +func isMetricRemappingDisabled() bool { + return metricRemappingDisableddFeatureGate.IsEnabled() +} + func isLogsAgentExporterEnabled() bool { return logsAgentExporterFeatureGate.IsEnabled() } diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 6fa87508f40f..c6846807bd77 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -55,7 +55,12 @@ func translatorFromConfig(set component.TelemetrySettings, cfg *Config, attrsTra options := []otlpmetrics.TranslatorOption{ otlpmetrics.WithDeltaTTL(cfg.Metrics.DeltaTTL), otlpmetrics.WithFallbackSourceProvider(sourceProvider), - otlpmetrics.WithRemapping(), + } + + if isMetricRemappingDisabled() { + set.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).") + } else { + options = append(options, otlpmetrics.WithRemapping()) } if cfg.Metrics.HistConfig.SendAggregations { From 3d81cbe1be8e260266b10ed36a63408a6da1a85b Mon Sep 17 00:00:00 2001 From: Christos Markou Date: Mon, 16 Sep 2024 12:41:58 +0300 Subject: [PATCH 4/9] [processor/resourcedetection] Move processor.resourcedetection.hostCPUSteppingAsString feature gate to stable (#35202) **Description:** Follows https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33076 and https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31165. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31136 **Testing:** **Documentation:** /cc @mx-psi Signed-off-by: ChrsMark --- .chloggen/cpu_stepping_fg_stable.yaml | 27 +++++++++++++++++++ .../internal/metadata/resource_int_version.go | 11 -------- .../internal/system/system.go | 19 ++++--------- 3 files changed, 32 insertions(+), 25 deletions(-) create mode 100644 .chloggen/cpu_stepping_fg_stable.yaml delete mode 100644 processor/resourcedetectionprocessor/internal/system/internal/metadata/resource_int_version.go diff --git a/.chloggen/cpu_stepping_fg_stable.yaml b/.chloggen/cpu_stepping_fg_stable.yaml new file mode 100644 index 000000000000..19237453002c --- /dev/null +++ b/.chloggen/cpu_stepping_fg_stable.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: resourcedetectionprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move `processor.resourcedetection.hostCPUSteppingAsString` feature gate to stable. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31136] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/resourcedetectionprocessor/internal/system/internal/metadata/resource_int_version.go b/processor/resourcedetectionprocessor/internal/system/internal/metadata/resource_int_version.go deleted file mode 100644 index 6337c1e41421..000000000000 --- a/processor/resourcedetectionprocessor/internal/system/internal/metadata/resource_int_version.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package metadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/system/internal/metadata" - -// SetHostCPUSteppingAsInt sets provided value as "host.cpu.stepping" attribute as int. -func (rb *ResourceBuilder) SetHostCPUSteppingAsInt(val int64) { - if rb.config.HostCPUModelID.Enabled { - rb.res.Attributes().PutInt("host.cpu.stepping", val) - } -} diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index f3ff1c0c160a..45fc5c8607f2 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -31,12 +31,12 @@ var ( featuregate.WithRegisterToVersion("v0.101.0"), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/semantic-conventions/issues/495"), ) - hostCPUSteppingAsStringID = "processor.resourcedetection.hostCPUSteppingAsString" - hostCPUSteppingAsStringFeatureGate = featuregate.GlobalRegistry().MustRegister( - hostCPUSteppingAsStringID, - featuregate.StageBeta, + _ = featuregate.GlobalRegistry().MustRegister( + "processor.resourcedetection.hostCPUSteppingAsString", + featuregate.StageStable, featuregate.WithRegisterDescription("Change type of host.cpu.stepping to string."), featuregate.WithRegisterFromVersion("v0.95.0"), + featuregate.WithRegisterToVersion("v0.110.0"), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/semantic-conventions/issues/664"), ) ) @@ -215,15 +215,6 @@ func setHostCPUInfo(d *Detector, cpuInfo cpu.InfoStat) { } d.rb.SetHostCPUModelName(cpuInfo.ModelName) - if hostCPUSteppingAsStringFeatureGate.IsEnabled() { - d.rb.SetHostCPUStepping(fmt.Sprintf("%d", cpuInfo.Stepping)) - } else { - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31136 - d.logger.Info("This attribute will change from int to string. Switch now using the feature gate.", - zap.String("attribute", "host.cpu.stepping"), - zap.String("feature gate", hostCPUSteppingAsStringID), - ) - d.rb.SetHostCPUSteppingAsInt(int64(cpuInfo.Stepping)) - } + d.rb.SetHostCPUStepping(fmt.Sprintf("%d", cpuInfo.Stepping)) d.rb.SetHostCPUCacheL2Size(int64(cpuInfo.CacheSize)) } From bf194612e128c08d699ed20eb4c939dad2d0d893 Mon Sep 17 00:00:00 2001 From: Matthieu MOREL Date: Mon, 16 Sep 2024 12:50:08 +0200 Subject: [PATCH 5/9] [chore]: enable go-require rule from testifylint (#35190) #### Description Testifylint is a linter that provides best practices with the use of testify. This PR enables [go-require](https://github.com/Antonboom/testifylint?tab=readme-ov-file#go-require) rule from [testifylint](https://github.com/Antonboom/testifylint) --------- Signed-off-by: Matthieu MOREL --- .golangci.yml | 1 - Makefile.Common | 2 +- cmd/telemetrygen/internal/e2etest/e2e_test.go | 3 +- .../spanmetricsconnector/connector_test.go | 2 +- exporter/carbonexporter/exporter_test.go | 8 +- .../internal/hostmetadata/metadata_test.go | 6 +- .../logs_exporter_test.go | 2 +- .../log_exporter_test.go | 2 +- .../metrics_exporter_test.go | 2 +- .../trace_exporter_test.go | 2 +- exporter/lokiexporter/exporter_test.go | 12 +-- .../opensearchexporter/integration_test.go | 9 +- .../internal/arrow/exporter_test.go | 18 ++-- .../prometheusexporter/end_to_end_test.go | 3 +- exporter/sapmexporter/exporter_test.go | 4 +- exporter/splunkhecexporter/client_test.go | 14 ++-- exporter/sumologicexporter/sender_test.go | 14 ++-- exporter/syslogexporter/exporter_test.go | 4 +- .../jaegerremotesampling/extension_test.go | 2 +- .../sumologicextension/extension_test.go | 84 +++++++++---------- .../coreinternal/scraperinttest/scraperint.go | 3 +- internal/docker/docker_test.go | 2 +- internal/kubelet/client_test.go | 10 +-- internal/otelarrow/test/e2e_test.go | 15 ++-- pkg/stanza/adapter/integration_test.go | 3 +- pkg/stanza/adapter/receiver_test.go | 4 +- pkg/stanza/fileconsumer/benchmark_test.go | 7 +- .../internal/emittest/sink_test.go | 2 +- pkg/stanza/fileconsumer/rotation_test.go | 16 ++-- pkg/stanza/operator/helper/emitter_test.go | 7 +- pkg/stanza/operator/input/tcp/input_test.go | 9 +- pkg/stanza/operator/input/udp/input_test.go | 7 +- .../transformer/recombine/transformer_test.go | 3 +- .../internal/resourcedetection_test.go | 2 +- .../tailsamplingprocessor/processor_test.go | 10 +-- receiver/apachereceiver/scraper_test.go | 3 +- receiver/apachesparkreceiver/client_test.go | 51 +++++------ receiver/bigipreceiver/client_test.go | 45 +++++----- receiver/bigipreceiver/integration_test.go | 7 +- receiver/couchdbreceiver/client_test.go | 5 +- receiver/elasticsearchreceiver/client_test.go | 5 +- receiver/expvarreceiver/scraper_test.go | 3 +- receiver/flinkmetricsreceiver/client_test.go | 53 ++++++------ .../fluentforwardreceiver/receiver_test.go | 9 +- receiver/haproxyreceiver/scraper_test.go | 37 ++++---- receiver/httpcheckreceiver/scraper_test.go | 3 +- receiver/jaegerreceiver/jaeger_agent_test.go | 2 +- receiver/kafkareceiver/kafka_receiver_test.go | 18 ++-- receiver/nginxreceiver/scraper_test.go | 3 +- receiver/nsxtreceiver/client_test.go | 17 ++-- .../internal/arrow/arrow_test.go | 14 ++-- receiver/otelarrowreceiver/otelarrow_test.go | 26 +++--- .../internal/staleness_end_to_end_test.go | 6 +- receiver/rabbitmqreceiver/client_test.go | 5 +- receiver/riakreceiver/client_test.go | 3 +- receiver/sshcheckreceiver/scraper_test.go | 3 +- .../internal/mockserver/client_mock.go | 9 +- receiver/zookeeperreceiver/scraper_test.go | 9 +- 58 files changed, 329 insertions(+), 301 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index ec43cda68833..86777518d77e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -136,7 +136,6 @@ linters-settings: disable: - float-compare - formatter - - go-require - require-error - suite-subtest-run - useless-assert diff --git a/Makefile.Common b/Makefile.Common index 2e9b53cfe422..f0b6dea5bb5c 100644 --- a/Makefile.Common +++ b/Makefile.Common @@ -77,7 +77,7 @@ GOTESTSUM := $(TOOLS_BIN_DIR)/gotestsum TESTIFYLINT := $(TOOLS_BIN_DIR)/testifylint GOTESTSUM_OPT?= --rerun-fails=1 -TESTIFYLINT_OPT?= --enable-all --disable=float-compare,formatter,go-require,require-error,suite-subtest-run,useless-assert +TESTIFYLINT_OPT?= --enable-all --disable=float-compare,formatter,require-error,suite-subtest-run,useless-assert # BUILD_TYPE should be one of (dev, release). BUILD_TYPE?=release diff --git a/cmd/telemetrygen/internal/e2etest/e2e_test.go b/cmd/telemetrygen/internal/e2etest/e2e_test.go index 560df3022189..9e8b2dac7af6 100644 --- a/cmd/telemetrygen/internal/e2etest/e2e_test.go +++ b/cmd/telemetrygen/internal/e2etest/e2e_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" @@ -53,7 +54,7 @@ func TestGenerateTraces(t *testing.T) { } go func() { err = traces.Start(cfg) - require.NoError(t, err) + assert.NoError(t, err) }() require.Eventually(t, func() bool { return len(sink.AllTraces()) > 0 diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 0748a4409343..1bb2fc8705a5 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -659,7 +659,7 @@ func TestConcurrentShutdown(t *testing.T) { for i := 0; i < concurrency; i++ { go func() { err := p.Shutdown(ctx) - require.NoError(t, err) + assert.NoError(t, err) wg.Done() }() } diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go index 6098a51eb7cd..b50703e0af82 100644 --- a/exporter/carbonexporter/exporter_test.go +++ b/exporter/carbonexporter/exporter_test.go @@ -142,7 +142,7 @@ func TestConsumeMetrics(t *testing.T) { defer writersWG.Done() <-startCh for j := 0; j < tt.writesPerProducer; j++ { - require.NoError(t, exp.ConsumeMetrics(context.Background(), tt.md)) + assert.NoError(t, exp.ConsumeMetrics(context.Background(), tt.md)) } }() } @@ -332,10 +332,10 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) { // Close is expected to cause error. return } - require.NoError(t, err) + assert.NoError(t, err) go func(conn net.Conn) { defer func() { - require.NoError(t, conn.Close()) + assert.NoError(t, conn.Close()) }() reader := bufio.NewReader(conn) @@ -344,7 +344,7 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) { if errors.Is(err, io.EOF) { return } - require.NoError(t, err) + assert.NoError(t, err) if cs.expectedContainsValue != "" { assert.Contains(t, string(buf), cs.expectedContainsValue) diff --git a/exporter/datadogexporter/internal/hostmetadata/metadata_test.go b/exporter/datadogexporter/internal/hostmetadata/metadata_test.go index 4d2100046e80..24b30b85db1d 100644 --- a/exporter/datadogexporter/internal/hostmetadata/metadata_test.go +++ b/exporter/datadogexporter/internal/hostmetadata/metadata_test.go @@ -190,13 +190,13 @@ func TestPushMetadata(t *testing.T) { assert.Equal(t, "apikey", r.Header.Get("DD-Api-Key")) assert.Equal(t, "otelcontribcol/1.0", r.Header.Get("User-Agent")) reader, err := gzip.NewReader(r.Body) - require.NoError(t, err) + assert.NoError(t, err) body, err := io.ReadAll(reader) - require.NoError(t, err) + assert.NoError(t, err) var recvMetadata payload.HostMetadata err = json.Unmarshal(body, &recvMetadata) - require.NoError(t, err) + assert.NoError(t, err) assert.Equal(t, mockMetadata, recvMetadata) }) diff --git a/exporter/honeycombmarkerexporter/logs_exporter_test.go b/exporter/honeycombmarkerexporter/logs_exporter_test.go index 22f48e8e8c68..3eceacedaf4d 100644 --- a/exporter/honeycombmarkerexporter/logs_exporter_test.go +++ b/exporter/honeycombmarkerexporter/logs_exporter_test.go @@ -124,7 +124,7 @@ func TestExportMarkers(t *testing.T) { decodedBody := map[string]any{} err := json.NewDecoder(req.Body).Decode(&decodedBody) - require.NoError(t, err) + assert.NoError(t, err) assert.Equal(t, len(tt.attributeMap), len(decodedBody)) diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index baab3aa7af28..0286d5ec4121 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -466,7 +466,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { return case <-ticker.C: go func() { - require.NoError(t, p.ConsumeLogs(ctx, randomLogs())) + assert.NoError(t, p.ConsumeLogs(ctx, randomLogs())) }() } } diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index f46f833752ff..45930eadd34c 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -847,7 +847,7 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { return case <-ticker.C: go func() { - require.NoError(t, p.ConsumeMetrics(ctx, randomMetrics(t, 1, 1, 1, 1))) + assert.NoError(t, p.ConsumeMetrics(ctx, randomMetrics(t, 1, 1, 1, 1))) }() } } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index c2ab967cfc6a..554d97ccaa7e 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -569,7 +569,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { return case <-ticker.C: go func() { - require.NoError(t, p.ConsumeTraces(ctx, randomTraces())) + assert.NoError(t, p.ConsumeTraces(ctx, randomTraces())) }() } } diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index a7c1a5cb7fe0..fd18f198e85c 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -65,13 +65,13 @@ func TestPushLogData(t *testing.T) { // prepare ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { encPayload, err := io.ReadAll(r.Body) - require.NoError(t, err) + assert.NoError(t, err) decPayload, err := snappy.Decode(nil, encPayload) - require.NoError(t, err) + assert.NoError(t, err) err = proto.Unmarshal(decPayload, actualPushRequest) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -241,14 +241,14 @@ func TestLogsToLokiRequestWithGroupingByTenant(t *testing.T) { // prepare ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { encPayload, err := io.ReadAll(r.Body) - require.NoError(t, err) + assert.NoError(t, err) decPayload, err := snappy.Decode(nil, encPayload) - require.NoError(t, err) + assert.NoError(t, err) pr := &push.PushRequest{} err = proto.Unmarshal(decPayload, pr) - require.NoError(t, err) + assert.NoError(t, err) actualPushRequestPerTenant[r.Header.Get("X-Scope-OrgID")] = pr })) diff --git a/exporter/opensearchexporter/integration_test.go b/exporter/opensearchexporter/integration_test.go index 22d4d06a0580..70040aa19a34 100644 --- a/exporter/opensearchexporter/integration_test.go +++ b/exporter/opensearchexporter/integration_test.go @@ -12,6 +12,7 @@ import ( "os" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" @@ -112,13 +113,13 @@ func TestOpenSearchTraceExporter(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var err error docs := getReceivedDocuments(r.Body) - require.LessOrEqualf(t, requestCount, len(tc.RequestHandlers), "Test case generated more requests than it has response for.") + assert.LessOrEqualf(t, requestCount, len(tc.RequestHandlers), "Test case generated more requests than it has response for.") tc.RequestHandlers[requestCount].ValidateReceivedDocuments(t, requestCount, docs) w.WriteHeader(200) response, _ := os.ReadFile(tc.RequestHandlers[requestCount].ResponseJSONPath) _, err = w.Write(response) - require.NoError(t, err) + assert.NoError(t, err) requestCount++ })) @@ -242,13 +243,13 @@ func TestOpenSearchLogExporter(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var err error docs := getReceivedDocuments(r.Body) - require.LessOrEqualf(t, requestCount, len(tc.RequestHandlers), "Test case generated more requests than it has response for.") + assert.LessOrEqualf(t, requestCount, len(tc.RequestHandlers), "Test case generated more requests than it has response for.") tc.RequestHandlers[requestCount].ValidateReceivedDocuments(t, requestCount, docs) w.WriteHeader(200) response, _ := os.ReadFile(tc.RequestHandlers[requestCount].ResponseJSONPath) _, err = w.Write(response) - require.NoError(t, err) + assert.NoError(t, err) requestCount++ })) diff --git a/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/exporter/otelarrowexporter/internal/arrow/exporter_test.go index fc749a7f961d..4df1a98939fa 100644 --- a/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -505,11 +505,11 @@ func TestArrowExporterStreamRace(t *testing.T) { defer wg.Done() // This blocks until the cancelation. _, err := tc.exporter.SendAndWait(callctx, twoTraces) - require.Error(t, err) + assert.Error(t, err) stat, is := status.FromError(err) - require.True(t, is, "is a gRPC status error: %v", err) - require.Equal(t, codes.Canceled, stat.Code()) + assert.True(t, is, "is a gRPC status error: %v", err) + assert.Equal(t, codes.Canceled, stat.Code()) }() } @@ -547,8 +547,8 @@ func TestArrowExporterStreaming(t *testing.T) { defer wg.Done() for data := range channel.sendChannel() { traces, err := testCon.TracesFrom(data) - require.NoError(t, err) - require.Len(t, traces, 1) + assert.NoError(t, err) + assert.Len(t, traces, 1) actualOutput = append(actualOutput, traces[0]) channel.recv <- statusOKFor(data.BatchId) } @@ -606,7 +606,7 @@ func TestArrowExporterHeaders(t *testing.T) { actualOutput = append(actualOutput, nil) } else { _, err := hpd.Write(data.Headers) - require.NoError(t, err) + assert.NoError(t, err) actualOutput = append(actualOutput, md) md = metadata.MD{} } @@ -698,7 +698,7 @@ func TestArrowExporterIsTraced(t *testing.T) { actualOutput = append(actualOutput, nil) } else { _, err := hpd.Write(data.Headers) - require.NoError(t, err) + assert.NoError(t, err) actualOutput = append(actualOutput, md) md = metadata.MD{} } @@ -786,8 +786,8 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) { for data := range channel.sendChannel() { traces, err := testCon.TracesFrom(data) - require.NoError(t, err) - require.Len(t, traces, 1) + assert.NoError(t, err) + assert.Len(t, traces, 1) atomic.AddUint64(&actualCount, 1) channel.recv <- statusOKFor(data.BatchId) } diff --git a/exporter/prometheusexporter/end_to_end_test.go b/exporter/prometheusexporter/end_to_end_test.go index f80e717b56a6..cf9b3ef37a4c 100644 --- a/exporter/prometheusexporter/end_to_end_test.go +++ b/exporter/prometheusexporter/end_to_end_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/exporter/exportertest" @@ -38,7 +39,7 @@ func TestEndToEndSummarySupport(t *testing.T) { dropWizardServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) { // Serve back the metrics as if they were from DropWizard. _, err := rw.Write([]byte(dropWizardResponse)) - require.NoError(t, err) + assert.NoError(t, err) currentScrapeIndex++ if currentScrapeIndex == 8 { // We shall let the Prometheus receiver scrape the DropWizard mock server, at least 8 times. wg.Done() // done scraping dropWizardResponse 8 times diff --git a/exporter/sapmexporter/exporter_test.go b/exporter/sapmexporter/exporter_test.go index e70c3b7835b3..d9d863eaeb13 100644 --- a/exporter/sapmexporter/exporter_test.go +++ b/exporter/sapmexporter/exporter_test.go @@ -365,11 +365,11 @@ func TestCompression(t *testing.T) { assert.EqualValues(t, compression, tt.receivedCompression) payload, err := decompress(r.Body, compression) - require.NoError(t, err) + assert.NoError(t, err) var sapm splunksapm.PostSpansRequest err = sapm.Unmarshal(payload) - require.NoError(t, err) + assert.NoError(t, err) w.WriteHeader(200) tracesReceived = true diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 595eb89bf9c7..54a0b1c09c0e 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -229,7 +229,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() @@ -282,7 +282,7 @@ func runTraceExport(testConfig *Config, traces ptrace.Traces, expectedBatchesNum defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() @@ -342,7 +342,7 @@ func runLogExport(cfg *Config, ld plog.Logs, expectedBatchesNum int, t *testing. defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() @@ -1287,7 +1287,7 @@ func TestErrorReceived(t *testing.T) { defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() @@ -1376,7 +1376,7 @@ func TestHeartbeatStartupFailed(t *testing.T) { defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() factory := NewFactory() @@ -1415,7 +1415,7 @@ func TestHeartbeatStartupPass_Disabled(t *testing.T) { defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() factory := NewFactory() @@ -1450,7 +1450,7 @@ func TestHeartbeatStartupPass(t *testing.T) { defer s.Close() go func() { if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) + assert.NoError(t, e) } }() factory := NewFactory() diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go index e55705c37794..3502b775169c 100644 --- a/exporter/sumologicexporter/sender_test.go +++ b/exporter/sumologicexporter/sender_test.go @@ -335,7 +335,7 @@ func TestSendLogsSplitFailedOne(t *testing.T) { `{"id":"1TIRY-KGIVX-TPQRJ","errors":[{"code":"internal.error","message":"Internal server error."}]}`, ) - require.NoError(t, err) + assert.NoError(t, err) body := extractBody(t, req) assert.Equal(t, "Example log", body) @@ -987,7 +987,7 @@ func TestSendCompressGzip(t *testing.T) { res.WriteHeader(200) if _, err := res.Write([]byte("")); err != nil { res.WriteHeader(http.StatusInternalServerError) - assert.FailNow(t, "err: %v", err) + assert.Fail(t, "err: %v", err) return } body := decodeGzip(t, req.Body) @@ -1008,7 +1008,7 @@ func TestSendCompressGzipDeprecated(t *testing.T) { res.WriteHeader(200) if _, err := res.Write([]byte("")); err != nil { res.WriteHeader(http.StatusInternalServerError) - assert.FailNow(t, "err: %v", err) + assert.Fail(t, "err: %v", err) return } body := decodeGzip(t, req.Body) @@ -1029,7 +1029,7 @@ func TestSendCompressZstd(t *testing.T) { res.WriteHeader(200) if _, err := res.Write([]byte("")); err != nil { res.WriteHeader(http.StatusInternalServerError) - assert.FailNow(t, "err: %v", err) + assert.Fail(t, "err: %v", err) return } body := decodeZstd(t, req.Body) @@ -1050,7 +1050,7 @@ func TestSendCompressDeflate(t *testing.T) { res.WriteHeader(200) if _, err := res.Write([]byte("")); err != nil { res.WriteHeader(http.StatusInternalServerError) - assert.FailNow(t, "err: %v", err) + assert.Fail(t, "err: %v", err) return } body := decodeZlib(t, req.Body) @@ -1126,9 +1126,9 @@ func TestSendOTLPHistogram(t *testing.T) { func(_ http.ResponseWriter, req *http.Request) { unmarshaler := pmetric.ProtoUnmarshaler{} body, err := io.ReadAll(req.Body) - require.NoError(t, err) + assert.NoError(t, err) metrics, err := unmarshaler.UnmarshalMetrics(body) - require.NoError(t, err) + assert.NoError(t, err) assert.Equal(t, 3, metrics.MetricCount()) assert.Equal(t, 16, metrics.DataPointCount()) }, diff --git a/exporter/syslogexporter/exporter_test.go b/exporter/syslogexporter/exporter_test.go index db643caf4bb4..146d877b1429 100644 --- a/exporter/syslogexporter/exporter_test.go +++ b/exporter/syslogexporter/exporter_test.go @@ -37,7 +37,7 @@ func exampleLog(t *testing.T) plog.LogRecord { buffer.Body().SetStr(originalForm) timestamp := "2003-08-24T05:14:15-07:00" timeStr, err := time.Parse(time.RFC3339, timestamp) - require.NoError(t, err, "failed to start test syslog server") + assert.NoError(t, err, "failed to start test syslog server") ts := pcommon.NewTimestampFromTime(timeStr) buffer.SetTimestamp(ts) attrMap := map[string]any{"proc_id": "8710", "message": "It's time to make the do-nuts.", @@ -148,7 +148,7 @@ func TestSyslogExportSuccess(t *testing.T) { buffer := exampleLog(t) logs := logRecordsToLogs(buffer) err := test.exp.pushLogsData(context.Background(), logs) - require.NoError(t, err, "could not send message") + assert.NoError(t, err, "could not send message") }() err := test.srv.SetDeadline(time.Now().Add(time.Second * 1)) require.NoError(t, err, "cannot set deadline") diff --git a/extension/jaegerremotesampling/extension_test.go b/extension/jaegerremotesampling/extension_test.go index 072b45268e76..39ace820f1d9 100644 --- a/extension/jaegerremotesampling/extension_test.go +++ b/extension/jaegerremotesampling/extension_test.go @@ -92,7 +92,7 @@ func TestRemote(t *testing.T) { go func() { err = server.Serve(lis) - require.NoError(t, err) + assert.NoError(t, err) }() defer func() { server.Stop() }() diff --git a/extension/sumologicextension/extension_test.go b/extension/sumologicextension/extension_test.go index fd09a1b64f0b..03331154dcd3 100644 --- a/extension/sumologicextension/extension_test.go +++ b/extension/sumologicextension/extension_test.go @@ -100,7 +100,7 @@ func TestBasicStart(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) _, err := w.Write([]byte(`{ "collectorCredentialID": "collectorId", "collectorCredentialKey": "collectorKey", @@ -162,7 +162,7 @@ func TestStoreCredentials(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) _, err := w.Write([]byte(`{ "collectorCredentialID": "collectorId", "collectorCredentialKey": "collectorKey", @@ -317,12 +317,12 @@ func TestStoreCredentials_PreexistingCredentialsAreUsed(t *testing.T) { switch reqNum { // heartbeat case 1: - require.Equal(t, heartbeatURL, req.URL.Path) + assert.Equal(t, heartbeatURL, req.URL.Path) w.WriteHeader(204) // metadata case 2: - require.Equal(t, metadataURL, req.URL.Path) + assert.Equal(t, metadataURL, req.URL.Path) w.WriteHeader(200) // should not produce any more requests @@ -405,7 +405,7 @@ func TestLocalFSCredentialsStore_WorkCorrectlyForMultipleExtensions(t *testing.T // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) _, err := w.Write([]byte(`{ "collectorCredentialID": "collectorId", "collectorCredentialKey": "collectorKey", @@ -509,7 +509,7 @@ func TestRegisterEmptyCollectorName(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -578,7 +578,7 @@ func TestRegisterEmptyCollectorNameForceRegistration(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -601,7 +601,7 @@ func TestRegisterEmptyCollectorNameForceRegistration(t *testing.T) { // register again because force registration was set case 3: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -672,7 +672,7 @@ func TestCollectorSendsBasicAuthHeadersOnRegistration(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -776,7 +776,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // heatbeat case 1: - require.NotEqual(t, registerURL, req.URL.Path, + assert.NotEqual(t, registerURL, req.URL.Path, "collector shouldn't call the register API when credentials locally retrieved") assert.Equal(t, heartbeatURL, req.URL.Path) @@ -824,7 +824,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // failing heatbeat case 1: - require.NotEqual(t, registerURL, req.URL.Path, + assert.NotEqual(t, registerURL, req.URL.Path, "collector shouldn't call the register API when credentials locally retrieved") assert.Equal(t, heartbeatURL, req.URL.Path) @@ -840,7 +840,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // successful heatbeat case 2: - require.NotEqual(t, registerURL, req.URL.Path, + assert.NotEqual(t, registerURL, req.URL.Path, "collector shouldn't call the register API when credentials locally retrieved") assert.Equal(t, heartbeatURL, req.URL.Path) @@ -888,7 +888,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // failing heatbeat case 1: - require.NotEqual(t, registerURL, req.URL.Path, + assert.NotEqual(t, registerURL, req.URL.Path, "collector shouldn't call the register API when credentials locally retrieved") assert.Equal(t, heartbeatURL, req.URL.Path) @@ -904,7 +904,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // register case 2: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -956,7 +956,7 @@ func TestCollectorCheckingCredentialsFoundInLocalStorage(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -1047,7 +1047,7 @@ func TestRegisterEmptyCollectorNameWithBackoff(t *testing.T) { // register case reqNum <= retriesLimit: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -1113,7 +1113,7 @@ func TestRegisterEmptyCollectorNameUnrecoverableError(t *testing.T) { srv := httptest.NewServer(func() http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // TODO Add payload verification - verify if collectorName is set properly - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -1129,7 +1129,7 @@ func TestRegisterEmptyCollectorNameUnrecoverableError(t *testing.T) { } ] }`)) - require.NoError(t, err) + assert.NoError(t, err) }) }()) @@ -1166,7 +1166,7 @@ func TestRegistrationRedirect(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) authHeader := req.Header.Get("Authorization") @@ -1209,7 +1209,7 @@ func TestRegistrationRedirect(t *testing.T) { // should not produce any more requests default: - require.Fail(t, + assert.Fail(t, "extension should not make more than 5 requests to the destination server", ) } @@ -1224,12 +1224,12 @@ func TestRegistrationRedirect(t *testing.T) { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) http.Redirect(w, req, destSrv.URL, http.StatusMovedPermanently) // should not produce any more requests default: - require.Fail(t, + assert.Fail(t, "extension should not make more than 1 request to the original server", ) } @@ -1398,22 +1398,22 @@ func TestRegistrationRequestPayload(t *testing.T) { switch reqNum { // register case 1: - require.Equal(t, registerURL, req.URL.Path) + assert.Equal(t, registerURL, req.URL.Path) var reqPayload api.OpenRegisterRequestPayload - require.NoError(t, json.NewDecoder(req.Body).Decode(&reqPayload)) - require.True(t, reqPayload.Clobber) - require.Equal(t, hostname, reqPayload.Hostname) - require.Equal(t, "my description", reqPayload.Description) - require.Equal(t, "my category/", reqPayload.Category) - require.EqualValues(t, + assert.NoError(t, json.NewDecoder(req.Body).Decode(&reqPayload)) + assert.True(t, reqPayload.Clobber) + assert.Equal(t, hostname, reqPayload.Hostname) + assert.Equal(t, "my description", reqPayload.Description) + assert.Equal(t, "my category/", reqPayload.Category) + assert.EqualValues(t, map[string]any{ "field1": "value1", "field2": "value2", }, reqPayload.Fields, ) - require.Equal(t, "PST", reqPayload.TimeZone) + assert.Equal(t, "PST", reqPayload.TimeZone) authHeader := req.Header.Get("Authorization") assert.Equal(t, "Bearer dummy_install_token", authHeader, @@ -1425,7 +1425,7 @@ func TestRegistrationRequestPayload(t *testing.T) { "collectorId": "0000000001231231", "collectorName": "otc-test-123456123123" }`)) - require.NoError(t, err) + assert.NoError(t, err) // metadata case 2: assert.Equal(t, metadataURL, req.URL.Path) @@ -1526,24 +1526,24 @@ func TestUpdateMetadataRequestPayload(t *testing.T) { srv := httptest.NewServer(func() http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - require.Equal(t, metadataURL, req.URL.Path) + assert.Equal(t, metadataURL, req.URL.Path) var reqPayload api.OpenMetadataRequestPayload - require.NoError(t, json.NewDecoder(req.Body).Decode(&reqPayload)) - require.NotEmpty(t, reqPayload.HostDetails.Name) - require.NotEmpty(t, reqPayload.HostDetails.OsName) + assert.NoError(t, json.NewDecoder(req.Body).Decode(&reqPayload)) + assert.NotEmpty(t, reqPayload.HostDetails.Name) + assert.NotEmpty(t, reqPayload.HostDetails.OsName) // @sumo-drosiek: It happened to be empty OsVersion on my machine // require.NotEmpty(t, reqPayload.HostDetails.OsVersion) - require.NotEmpty(t, reqPayload.NetworkDetails.HostIPAddress) - require.EqualValues(t, "EKS-1.20.2", reqPayload.HostDetails.Environment) - require.EqualValues(t, "1.0.0", reqPayload.CollectorDetails.RunningVersion) - require.EqualValues(t, "A", reqPayload.TagDetails["team"]) - require.EqualValues(t, "linux", reqPayload.TagDetails["app"]) - require.EqualValues(t, "true", reqPayload.TagDetails["sumo.disco.enabled"]) + assert.NotEmpty(t, reqPayload.NetworkDetails.HostIPAddress) + assert.EqualValues(t, "EKS-1.20.2", reqPayload.HostDetails.Environment) + assert.EqualValues(t, "1.0.0", reqPayload.CollectorDetails.RunningVersion) + assert.EqualValues(t, "A", reqPayload.TagDetails["team"]) + assert.EqualValues(t, "linux", reqPayload.TagDetails["app"]) + assert.EqualValues(t, "true", reqPayload.TagDetails["sumo.disco.enabled"]) _, err := w.Write([]byte(``)) - require.NoError(t, err) + assert.NoError(t, err) }) }()) diff --git a/internal/coreinternal/scraperinttest/scraperint.go b/internal/coreinternal/scraperinttest/scraperint.go index 8987ef145455..a4dc6f4719e4 100644 --- a/internal/coreinternal/scraperinttest/scraperint.go +++ b/internal/coreinternal/scraperinttest/scraperint.go @@ -16,6 +16,7 @@ import ( "unicode" "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "go.opentelemetry.io/collector/component" @@ -142,7 +143,7 @@ func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo { for _, cr := range it.containerRequests { go func(req testcontainers.ContainerRequest) { var errs error - require.Eventuallyf(t, func() bool { + assert.Eventuallyf(t, func() bool { c, err := testcontainers.GenericContainer( context.Background(), testcontainers.GenericContainerRequest{ diff --git a/internal/docker/docker_test.go b/internal/docker/docker_test.go index 9a8b0bfee436..0166812fb1ab 100644 --- a/internal/docker/docker_test.go +++ b/internal/docker/docker_test.go @@ -184,7 +184,7 @@ func TestEventLoopHandlesError(t *testing.T) { wg.Done() } _, err := w.Write([]byte{}) - require.NoError(t, err) + assert.NoError(t, err) })) defer srv.Close() diff --git a/internal/kubelet/client_test.go b/internal/kubelet/client_test.go index 938bad01a176..f1ac1be5b478 100644 --- a/internal/kubelet/client_test.go +++ b/internal/kubelet/client_test.go @@ -105,9 +105,9 @@ func TestDefaultTLSClient(t *testing.T) { func TestSvcAcctClient(t *testing.T) { server := httptest.NewUnstartedServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Check if call is authenticated using token from test file - require.Equal(t, "Bearer s3cr3t", req.Header.Get("Authorization")) + assert.Equal(t, "Bearer s3cr3t", req.Header.Get("Authorization")) _, err := rw.Write([]byte(`OK`)) - require.NoError(t, err) + assert.NoError(t, err) })) cert, err := tls.LoadX509KeyPair(certPath, keyFile) require.NoError(t, err) @@ -174,11 +174,11 @@ func TestNewKubeConfigClient(t *testing.T) { t.Run(tt.name, func(t *testing.T) { server := httptest.NewUnstartedServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Check if call is authenticated using provided kubeconfig - require.Equal(t, "Bearer my-token", req.Header.Get("Authorization")) - require.Equal(t, "/api/v1/nodes/nodename/proxy/", req.URL.EscapedPath()) + assert.Equal(t, "Bearer my-token", req.Header.Get("Authorization")) + assert.Equal(t, "/api/v1/nodes/nodename/proxy/", req.URL.EscapedPath()) // Send response to be tested _, err := rw.Write([]byte(`OK`)) - require.NoError(t, err) + assert.NoError(t, err) })) server.StartTLS() defer server.Close() diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 0fc3971f5038..5d67b56a64b2 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -16,7 +16,8 @@ import ( "time" "github.com/open-telemetry/otel-arrow/pkg/datagen" - "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + otel_assert "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -175,18 +176,18 @@ func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfg // Run the receiver, shutdown after exporter does. go func() { defer receiverShutdownWG.Done() - require.NoError(t, receiver.Start(ctx, host)) + assert.NoError(t, receiver.Start(ctx, host)) exporterShutdownWG.Wait() - require.NoError(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) }() // Run the exporter and wait for clients to finish go func() { defer exporterShutdownWG.Done() - require.NoError(t, exporter.Start(ctx, host)) + assert.NoError(t, exporter.Start(ctx, host)) startWG.Done() startExporterShutdownWG.Wait() - require.NoError(t, exporter.Shutdown(ctx)) + assert.NoError(t, exporter.Shutdown(ctx)) }() // wait for the exporter to start @@ -287,8 +288,8 @@ func standardEnding(t *testing.T, _ testParams, testCon *testConsumer, expect [] for _, td := range testCon.sink.AllTraces() { receivedJSON = append(receivedJSON, ptraceotlp.NewExportRequestFromTraces(td)) } - asserter := assert.NewStdUnitTest(t) - assert.Equiv(asserter, expectJSON, receivedJSON) + asserter := otel_assert.NewStdUnitTest(t) + otel_assert.Equiv(asserter, expectJSON, receivedJSON) rops = map[string]int{} eops = map[string]int{} diff --git a/pkg/stanza/adapter/integration_test.go b/pkg/stanza/adapter/integration_test.go index 565e3e80d889..a088a917c808 100644 --- a/pkg/stanza/adapter/integration_test.go +++ b/pkg/stanza/adapter/integration_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -159,7 +160,7 @@ func TestEmitterToConsumer(t *testing.T) { go func() { ctx := context.Background() for _, e := range entries { - require.NoError(t, logsReceiver.emitter.Process(ctx, e)) + assert.NoError(t, logsReceiver.emitter.Process(ctx, e)) } }() diff --git a/pkg/stanza/adapter/receiver_test.go b/pkg/stanza/adapter/receiver_test.go index bcfbb457f996..d418ece2bea6 100644 --- a/pkg/stanza/adapter/receiver_test.go +++ b/pkg/stanza/adapter/receiver_test.go @@ -139,12 +139,12 @@ func TestShutdownFlush(t *testing.T) { for { select { case <-closeCh: - require.NoError(t, logsReceiver.Shutdown(context.Background())) + assert.NoError(t, logsReceiver.Shutdown(context.Background())) fmt.Println(">> Shutdown called") return default: err := stanzaReceiver.emitter.Process(context.Background(), entry.New()) - require.NoError(t, err) + assert.NoError(t, err) } consumedLogCount.Add(1) } diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index efa410b77920..c470fa50e17c 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -210,12 +211,12 @@ func BenchmarkFileInput(b *testing.B) { // Write the other half of the content while running for i := 0; i < b.N/2; i++ { _, err := f.WriteString(severalLines) - require.NoError(b, err) + assert.NoError(b, err) } // Signal end of file _, err := f.WriteString("\n") - require.NoError(b, err) - require.NoError(b, f.Sync()) + assert.NoError(b, err) + assert.NoError(b, f.Sync()) }(file) } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 326400b7e3e8..2cea011e7518 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -202,7 +202,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { } go func() { for _, c := range testCalls { - require.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) + assert.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) } }() return s, testCalls diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 51095c549a32..1864f0e7e241 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -69,16 +69,16 @@ func TestCopyTruncate(t *testing.T) { filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") time.Sleep(10 * time.Millisecond) } - require.NoError(t, file.Sync()) + assert.NoError(t, file.Sync()) _, err := file.Seek(0, 0) - require.NoError(t, err) + assert.NoError(t, err) dst := filetest.OpenFile(t, fileName(fn, rotationNum)) _, err = io.Copy(dst, file) - require.NoError(t, err) - require.NoError(t, dst.Close()) - require.NoError(t, file.Truncate(0)) + assert.NoError(t, err) + assert.NoError(t, dst.Close()) + assert.NoError(t, file.Truncate(0)) _, err = file.Seek(0, 0) - require.NoError(t, err) + assert.NoError(t, err) } }(fileNum) } @@ -130,8 +130,8 @@ func TestMoveCreate(t *testing.T) { filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") time.Sleep(10 * time.Millisecond) } - require.NoError(t, file.Close()) - require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) } }(fileNum) } diff --git a/pkg/stanza/operator/helper/emitter_test.go b/pkg/stanza/operator/helper/emitter_test.go index e8c11e6d05e1..f17e7f503b2d 100644 --- a/pkg/stanza/operator/helper/emitter_test.go +++ b/pkg/stanza/operator/helper/emitter_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -27,7 +28,7 @@ func TestLogEmitter(t *testing.T) { in := entry.New() go func() { - require.NoError(t, emitter.Process(context.Background(), in)) + assert.NoError(t, emitter.Process(context.Background(), in)) }() select { @@ -55,7 +56,7 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) { go func() { ctx := context.Background() for _, e := range entries { - require.NoError(t, emitter.Process(ctx, e)) + assert.NoError(t, emitter.Process(ctx, e)) } }() @@ -85,7 +86,7 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) { go func() { ctx := context.Background() - require.NoError(t, emitter.Process(ctx, entry)) + assert.NoError(t, emitter.Process(ctx, entry)) }() timeoutChan := time.After(timeout) diff --git a/pkg/stanza/operator/input/tcp/input_test.go b/pkg/stanza/operator/input/tcp/input_test.go index c0b0918a2fa7..7ddee6ced3f9 100644 --- a/pkg/stanza/operator/input/tcp/input_test.go +++ b/pkg/stanza/operator/input/tcp/input_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -435,13 +436,13 @@ func BenchmarkTCPInput(b *testing.B) { done := make(chan struct{}) go func() { conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) - require.NoError(b, err) + assert.NoError(b, err) defer func() { err := tcpInput.Stop() - require.NoError(b, err, "expected to stop tcp input operator without error") + assert.NoError(b, err, "expected to stop tcp input operator without error") err = conn.Close() - require.NoError(b, err, "expected to close connection without error") + assert.NoError(b, err, "expected to close connection without error") }() message := []byte("message\n") for { @@ -450,7 +451,7 @@ func BenchmarkTCPInput(b *testing.B) { return default: _, err := conn.Write(message) - require.NoError(b, err) + assert.NoError(b, err) } } }() diff --git a/pkg/stanza/operator/input/udp/input_test.go b/pkg/stanza/operator/input/udp/input_test.go index f3862d2a7edb..e7e7366067b0 100644 --- a/pkg/stanza/operator/input/udp/input_test.go +++ b/pkg/stanza/operator/input/udp/input_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -229,9 +230,9 @@ func BenchmarkUDPInput(b *testing.B) { done := make(chan struct{}) go func() { conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String()) - require.NoError(b, err) + assert.NoError(b, err) defer func() { - require.NoError(b, udpInput.Stop()) + assert.NoError(b, udpInput.Stop()) }() defer conn.Close() message := []byte("message\n") @@ -241,7 +242,7 @@ func BenchmarkUDPInput(b *testing.B) { return default: _, err := conn.Write(message) - require.NoError(b, err) + assert.NoError(b, err) } } }() diff --git a/pkg/stanza/operator/transformer/recombine/transformer_test.go b/pkg/stanza/operator/transformer/recombine/transformer_test.go index 35c412d04d22..0d4e53930018 100644 --- a/pkg/stanza/operator/transformer/recombine/transformer_test.go +++ b/pkg/stanza/operator/transformer/recombine/transformer_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -898,7 +899,7 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { ticker.Stop() return case <-ticker.C: - require.NoError(t, recombine.Process(ctx, next)) + assert.NoError(t, recombine.Process(ctx, next)) } } diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index 5817b6ad3e7a..4fcba6437995 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -221,7 +221,7 @@ func TestDetectResource_Parallel(t *testing.T) { go func() { defer wg.Done() detected, _, err := p.Get(context.Background(), http.DefaultClient) - require.NoError(t, err) + assert.NoError(t, err) assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw()) }() } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index a567e4bf9a3e..1f1096e88975 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -235,13 +235,13 @@ func TestConcurrentTraceArrival(t *testing.T) { wg.Add(2) concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { - require.NoError(t, sp.ConsumeTraces(context.Background(), td)) + assert.NoError(t, sp.ConsumeTraces(context.Background(), td)) wg.Done() <-concurrencyLimiter }(batch) concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { - require.NoError(t, sp.ConsumeTraces(context.Background(), td)) + assert.NoError(t, sp.ConsumeTraces(context.Background(), td)) wg.Done() <-concurrencyLimiter }(batch) @@ -292,12 +292,12 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) { wg.Add(1) go func(td ptrace.Traces) { for i := 0; i < 10; i++ { - require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) + assert.NoError(t, tsp.ConsumeTraces(context.Background(), td)) } <-evalStarted close(continueEvaluation) for i := 0; i < 10; i++ { - require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) + assert.NoError(t, tsp.ConsumeTraces(context.Background(), td)) } wg.Done() }(batch) @@ -357,7 +357,7 @@ func TestConcurrentTraceMapSize(t *testing.T) { for _, batch := range batches { wg.Add(1) go func(td ptrace.Traces) { - require.NoError(t, sp.ConsumeTraces(context.Background(), td)) + assert.NoError(t, sp.ConsumeTraces(context.Background(), td)) wg.Done() }(batch) } diff --git a/receiver/apachereceiver/scraper_test.go b/receiver/apachereceiver/scraper_test.go index 295c1a6fb8c7..beeb1530f7a5 100644 --- a/receiver/apachereceiver/scraper_test.go +++ b/receiver/apachereceiver/scraper_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -189,7 +190,7 @@ Load15: 0.3 Total Duration: 1501 Scoreboard: S_DD_L_GGG_____W__IIII_C________________W__________________________________.........................____WR______W____W________________________C______________________________________W_W____W______________R_________R________C_________WK_W________K_____W__C__________W___R______............................................................................................................................. `)) - require.NoError(t, err) + assert.NoError(t, err) return } rw.WriteHeader(404) diff --git a/receiver/apachesparkreceiver/client_test.go b/receiver/apachesparkreceiver/client_test.go index 7c81b31322e7..cd0a1f1665d7 100644 --- a/receiver/apachesparkreceiver/client_test.go +++ b/receiver/apachesparkreceiver/client_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -82,7 +83,7 @@ func TestClusterStats(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -102,11 +103,11 @@ func TestClusterStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "/metrics/json") { _, err = w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -125,9 +126,9 @@ func TestClusterStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "/metrics/json") { _, err = w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -165,7 +166,7 @@ func TestApplications(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -185,11 +186,11 @@ func TestApplications(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "applications") { _, err = w.Write([]byte("")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -208,9 +209,9 @@ func TestApplications(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "applications") { _, err = w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -248,7 +249,7 @@ func TestStageStats(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -268,11 +269,11 @@ func TestStageStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "stages") { _, err = w.Write([]byte("")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -291,9 +292,9 @@ func TestStageStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "stages") { _, err = w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -331,7 +332,7 @@ func TestExecutorStats(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -351,11 +352,11 @@ func TestExecutorStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "executors") { _, err = w.Write([]byte("")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -374,9 +375,9 @@ func TestExecutorStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "executors") { _, err = w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -414,7 +415,7 @@ func TestJobStats(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -434,11 +435,11 @@ func TestJobStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "jobs") { _, err = w.Write([]byte("")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -457,9 +458,9 @@ func TestJobStats(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "jobs") { _, err = w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() diff --git a/receiver/bigipreceiver/client_test.go b/receiver/bigipreceiver/client_test.go index 4bf162a75135..9d5c12e5b0fd 100644 --- a/receiver/bigipreceiver/client_test.go +++ b/receiver/bigipreceiver/client_test.go @@ -14,6 +14,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -127,7 +128,7 @@ func TestGetNewToken(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -147,7 +148,7 @@ func TestGetNewToken(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -181,7 +182,7 @@ func TestGetVirtualServers(t *testing.T) { w.WriteHeader(http.StatusUnauthorized) } else { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) } })) defer ts.Close() @@ -202,11 +203,11 @@ func TestGetVirtualServers(t *testing.T) { var err error if strings.HasSuffix(r.RequestURI, "stats") { _, err = w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -229,7 +230,7 @@ func TestGetVirtualServers(t *testing.T) { } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -249,7 +250,7 @@ func TestGetVirtualServers(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.HasSuffix(r.RequestURI, "stats") { _, err := w.Write(statsData) - require.NoError(t, err) + assert.NoError(t, err) } else { w.WriteHeader(http.StatusUnauthorized) } @@ -279,7 +280,7 @@ func TestGetVirtualServers(t *testing.T) { } else { _, err = w.Write([]byte("[{}]")) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -306,7 +307,7 @@ func TestGetVirtualServers(t *testing.T) { } else { _, err = w.Write([]byte("{}")) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -335,7 +336,7 @@ func TestGetVirtualServers(t *testing.T) { } else { _, err = w.Write(data) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -358,7 +359,7 @@ func TestGetVirtualServers(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -404,7 +405,7 @@ func TestGetPools(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -423,7 +424,7 @@ func TestGetPools(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -445,7 +446,7 @@ func TestGetPools(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -495,7 +496,7 @@ func TestGetPoolMembers(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -519,7 +520,7 @@ func TestGetPoolMembers(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.RequestURI, "~Common~dev") { _, err := w.Write(data1) - require.NoError(t, err) + assert.NoError(t, err) } else { w.WriteHeader(http.StatusUnauthorized) } @@ -547,7 +548,7 @@ func TestGetPoolMembers(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.RequestURI, "~Common~dev") { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) } else { w.WriteHeader(http.StatusUnauthorized) } @@ -581,7 +582,7 @@ func TestGetPoolMembers(t *testing.T) { } else { _, err = w.Write(data2) } - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -607,7 +608,7 @@ func TestGetPoolMembers(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -657,7 +658,7 @@ func TestGetNodes(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("[{}]")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -676,7 +677,7 @@ func TestGetNodes(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -698,7 +699,7 @@ func TestGetNodes(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() diff --git a/receiver/bigipreceiver/integration_test.go b/receiver/bigipreceiver/integration_test.go index 8cd950ab7547..18537e640b3e 100644 --- a/receiver/bigipreceiver/integration_test.go +++ b/receiver/bigipreceiver/integration_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -85,12 +86,12 @@ func setupMockIControlServer(t *testing.T) *httptest.Server { if strings.HasSuffix(r.RequestURI, loginURISuffix) { var body loginBody err = json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) + assert.NoError(t, err) if body.Username == "" || body.Password == "" || r.Method != "POST" { w.WriteHeader(http.StatusUnauthorized) } else { _, err = w.Write(mockLoginResponse) - require.NoError(t, err) + assert.NoError(t, err) } return @@ -122,7 +123,7 @@ func setupMockIControlServer(t *testing.T) *httptest.Server { w.WriteHeader(http.StatusBadRequest) err = nil } - require.NoError(t, err) + assert.NoError(t, err) })) return server diff --git a/receiver/couchdbreceiver/client_test.go b/receiver/couchdbreceiver/client_test.go index 2e7169e70d14..3306e5a14501 100644 --- a/receiver/couchdbreceiver/client_test.go +++ b/receiver/couchdbreceiver/client_test.go @@ -10,6 +10,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -145,13 +146,13 @@ func TestGetNodeStats(t *testing.T) { if strings.Contains(r.URL.Path, "/invalid_json") { w.WriteHeader(200) _, err := w.Write([]byte(`{"}`)) - require.NoError(t, err) + assert.NoError(t, err) return } if strings.Contains(r.URL.Path, "/_stats/couchdb") { w.WriteHeader(200) _, err := w.Write([]byte(`{"key":["value"]}`)) - require.NoError(t, err) + assert.NoError(t, err) return } w.WriteHeader(404) diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index bf0af5f81645..c9e5a2d49125 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -638,14 +639,14 @@ func newMockServer(t *testing.T, opts ...mockServerOption) *httptest.Server { if req.URL.Path == "/" { rw.WriteHeader(200) _, err := rw.Write(mock.metadata) - require.NoError(t, err) + assert.NoError(t, err) return } for prefix, payload := range mock.prefixes { if strings.HasPrefix(req.URL.Path, prefix) { rw.WriteHeader(200) _, err := rw.Write(payload) - require.NoError(t, err) + assert.NoError(t, err) return } } diff --git a/receiver/expvarreceiver/scraper_test.go b/receiver/expvarreceiver/scraper_test.go index 38d6a22f142f..713a8274e51f 100644 --- a/receiver/expvarreceiver/scraper_test.go +++ b/receiver/expvarreceiver/scraper_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pmetric" @@ -89,7 +90,7 @@ func newMockServer(tb testing.TB, responseBodyFile string) *httptest.Server { if req.URL.Path == defaultPath { rw.WriteHeader(http.StatusOK) _, err := rw.Write(fileContents) - require.NoError(tb, err) + assert.NoError(tb, err) return } rw.WriteHeader(http.StatusNotFound) diff --git a/receiver/flinkmetricsreceiver/client_test.go b/receiver/flinkmetricsreceiver/client_test.go index d11d9b018d94..01a07ca4a6b4 100644 --- a/receiver/flinkmetricsreceiver/client_test.go +++ b/receiver/flinkmetricsreceiver/client_test.go @@ -14,6 +14,7 @@ import ( "regexp" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -142,7 +143,7 @@ func TestGetJobmanagerMetrics(t *testing.T) { testFunc: func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -159,7 +160,7 @@ func TestGetJobmanagerMetrics(t *testing.T) { jobmanagerMetricValuesData := loadAPIResponseData(t, apiResponses, jobmanagerMetricValues) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(jobmanagerMetricValuesData) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -211,7 +212,7 @@ func TestGetTaskmanagersMetrics(t *testing.T) { testFunc: func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte(`{`)) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -229,12 +230,12 @@ func TestGetTaskmanagersMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if match, _ := regexp.MatchString(taskmanagerIDsRegex, r.URL.Path); match { _, err := w.Write(taskmanagerIDs) - require.NoError(t, err) + assert.NoError(t, err) return } _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -253,13 +254,13 @@ func TestGetTaskmanagersMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if match, _ := regexp.MatchString(taskmanagerIDsRegex, r.URL.Path); match { _, err := w.Write(taskmanagerIDs) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(taskmanagerMetricNamesRegex, r.URL.Path); match { _, err := w.Write(taskmanagerMetricValuesData) - require.NoError(t, err) + assert.NoError(t, err) return } })) @@ -312,7 +313,7 @@ func TestGetJobsMetrics(t *testing.T) { testFunc: func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte(`{`)) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -330,11 +331,11 @@ func TestGetJobsMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == jobsOverviewEndpoint { _, err := w.Write(jobsOverviewData) - require.NoError(t, err) + assert.NoError(t, err) return } _, err := w.Write([]byte(`{`)) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -353,12 +354,12 @@ func TestGetJobsMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == jobsOverviewEndpoint { _, err := w.Write(jobsOverviewData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(jobsMetricNamesRegex, r.URL.Path); match { _, err := w.Write(jobsMetricValuesData) - require.NoError(t, err) + assert.NoError(t, err) return } })) @@ -414,7 +415,7 @@ func TestGetSubtasksMetrics(t *testing.T) { testFunc: func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -432,11 +433,11 @@ func TestGetSubtasksMetrics(t *testing.T) { jobsData := loadAPIResponseData(t, apiResponses, jobsIDs) if r.URL.Path == jobsEndpoint { _, err := w.Write(jobsData) - require.NoError(t, err) + assert.NoError(t, err) return } _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -455,16 +456,16 @@ func TestGetSubtasksMetrics(t *testing.T) { jobsWithIDData := loadAPIResponseData(t, apiResponses, jobsWithID) if r.URL.Path == jobsEndpoint { _, err := w.Write(jobsData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match { _, err := w.Write(jobsWithIDData) - require.NoError(t, err) + assert.NoError(t, err) return } _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -484,21 +485,21 @@ func TestGetSubtasksMetrics(t *testing.T) { verticesData := loadAPIResponseData(t, apiResponses, vertices) if r.URL.Path == jobsEndpoint { _, err := w.Write(jobsData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match { _, err := w.Write(jobsWithIDData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(verticesRegex, r.URL.Path); match { _, err := w.Write(verticesData) - require.NoError(t, err) + assert.NoError(t, err) return } _, err := w.Write([]byte("{")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -519,22 +520,22 @@ func TestGetSubtasksMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == jobsEndpoint { _, err := w.Write(jobsData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match { _, err := w.Write(jobsWithIDData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(verticesRegex, r.URL.Path); match { _, err := w.Write(verticesData) - require.NoError(t, err) + assert.NoError(t, err) return } if match, _ := regexp.MatchString(subtaskMetricNamesRegex, r.URL.Path); match { _, err := w.Write(subtaskMetricValuesData) - require.NoError(t, err) + assert.NoError(t, err) return } })) diff --git a/receiver/fluentforwardreceiver/receiver_test.go b/receiver/fluentforwardreceiver/receiver_test.go index 6831819de337..d90e69296bbc 100644 --- a/receiver/fluentforwardreceiver/receiver_test.go +++ b/receiver/fluentforwardreceiver/receiver_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tinylib/msgp/msgp" "go.opentelemetry.io/collector/consumer/consumertest" @@ -51,7 +52,7 @@ func setupServer(t *testing.T) (func() net.Conn, *consumertest.LogsSink, *observ go func() { <-ctx.Done() - require.NoError(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) }() return connect, next, logObserver, cancel @@ -381,10 +382,10 @@ func TestHighVolume(t *testing.T) { for j := 0; j < totalMessagesPerRoutine; j++ { eventBytes := makeSampleEvent(fmt.Sprintf("tag-%d-%d", num, j)) n, err := conn.Write(eventBytes) - require.NoError(t, err) - require.Equal(t, len(eventBytes), n) + assert.NoError(t, err) + assert.Equal(t, len(eventBytes), n) } - require.NoError(t, conn.Close()) + assert.NoError(t, conn.Close()) wg.Done() }(i) } diff --git a/receiver/haproxyreceiver/scraper_test.go b/receiver/haproxyreceiver/scraper_test.go index 38bb9c2178a2..ed6001750d22 100644 --- a/receiver/haproxyreceiver/scraper_test.go +++ b/receiver/haproxyreceiver/scraper_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/receiver/receivertest" @@ -28,22 +29,22 @@ func Test_scraper_readStats(t *testing.T) { go func() { c, err2 := l.Accept() - require.NoError(t, err2) + assert.NoError(t, err2) buf := make([]byte, 512) nr, err2 := c.Read(buf) - require.NoError(t, err2) + assert.NoError(t, err2) data := string(buf[0:nr]) switch data { case "show stat\n": stats, err2 := os.ReadFile(filepath.Join("testdata", "stats.txt")) - require.NoError(t, err2) + assert.NoError(t, err2) _, err2 = c.Write(stats) - require.NoError(t, err2) - require.NoError(t, c.Close()) + assert.NoError(t, err2) + assert.NoError(t, c.Close()) default: - require.Fail(t, fmt.Sprintf("invalid message: %v", data)) + assert.Fail(t, fmt.Sprintf("invalid message: %v", data)) } }() @@ -72,22 +73,22 @@ func Test_scraper_readStatsWithIncompleteValues(t *testing.T) { go func() { c, err2 := l.Accept() - require.NoError(t, err2) + assert.NoError(t, err2) buf := make([]byte, 512) nr, err2 := c.Read(buf) - require.NoError(t, err2) + assert.NoError(t, err2) data := string(buf[0:nr]) switch data { case "show stat\n": stats, err2 := os.ReadFile(filepath.Join("testdata", "30252_stats.txt")) - require.NoError(t, err2) + assert.NoError(t, err2) _, err2 = c.Write(stats) - require.NoError(t, err2) - require.NoError(t, c.Close()) + assert.NoError(t, err2) + assert.NoError(t, c.Close()) default: - require.Fail(t, fmt.Sprintf("invalid message: %v", data)) + assert.Fail(t, fmt.Sprintf("invalid message: %v", data)) } }() @@ -116,22 +117,22 @@ func Test_scraper_readStatsWithNoValues(t *testing.T) { go func() { c, err2 := l.Accept() - require.NoError(t, err2) + assert.NoError(t, err2) buf := make([]byte, 512) nr, err2 := c.Read(buf) - require.NoError(t, err2) + assert.NoError(t, err2) data := string(buf[0:nr]) switch data { case "show stat\n": stats, err2 := os.ReadFile(filepath.Join("testdata", "empty_stats.txt")) - require.NoError(t, err2) + assert.NoError(t, err2) _, err2 = c.Write(stats) - require.NoError(t, err2) - require.NoError(t, c.Close()) + assert.NoError(t, err2) + assert.NoError(t, c.Close()) default: - require.Fail(t, fmt.Sprintf("invalid message: %v", data)) + assert.Fail(t, fmt.Sprintf("invalid message: %v", data)) } }() diff --git a/receiver/httpcheckreceiver/scraper_test.go b/receiver/httpcheckreceiver/scraper_test.go index 1c1afcbe3f78..d85180eed0bb 100644 --- a/receiver/httpcheckreceiver/scraper_test.go +++ b/receiver/httpcheckreceiver/scraper_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -27,7 +28,7 @@ func newMockServer(t *testing.T, responseCode int) *httptest.Server { // This could be expanded if the checks for the server include // parsing the response content _, err := rw.Write([]byte(``)) - require.NoError(t, err) + assert.NoError(t, err) })) } diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 305305c38b32..55da7eed2aad 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -119,7 +119,7 @@ func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server beforeServe(server) go func() { err := server.Serve(lis) - require.NoError(t, err) + assert.NoError(t, err) }() return server, lis.Addr() } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 61d05a76836b..2bc0f4ea57d8 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -179,7 +179,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -223,7 +223,7 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -255,7 +255,7 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { } go func() { err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - require.Error(t, err) + assert.Error(t, err) wg.Done() }() groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} @@ -519,7 +519,7 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -562,7 +562,7 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -594,7 +594,7 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { } go func() { err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - require.Error(t, err) + assert.Error(t, err) wg.Done() }() groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} @@ -872,7 +872,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -915,7 +915,7 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - require.NoError(t, c.ConsumeClaim(testSession, groupClaim)) + assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) wg.Done() }() @@ -947,7 +947,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { } go func() { err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - require.Error(t, err) + assert.Error(t, err) wg.Done() }() groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} diff --git a/receiver/nginxreceiver/scraper_test.go b/receiver/nginxreceiver/scraper_test.go index a4fb307caea5..f8a31ff92457 100644 --- a/receiver/nginxreceiver/scraper_test.go +++ b/receiver/nginxreceiver/scraper_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -108,7 +109,7 @@ server accepts handled requests 16630948 16630946 31070465 Reading: 6 Writing: 179 Waiting: 106 `)) - require.NoError(t, err) + assert.NoError(t, err) return } rw.WriteHeader(404) diff --git a/receiver/nsxtreceiver/client_test.go b/receiver/nsxtreceiver/client_test.go index be7fa5291b47..ed70ae6a1ea0 100644 --- a/receiver/nsxtreceiver/client_test.go +++ b/receiver/nsxtreceiver/client_test.go @@ -12,6 +12,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -309,56 +310,56 @@ func mockServer(t *testing.T) *httptest.Server { if req.URL.Path == "/api/v1/transport-nodes" { rw.WriteHeader(200) _, err = rw.Write(tNodeBytes) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == "/api/v1/cluster/nodes" { rw.WriteHeader(200) _, err = rw.Write(cNodeBytes) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/cluster/nodes/%s/network/interfaces", managerNode1) { rw.WriteHeader(200) _, err = rw.Write(mNodeInterfaces) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/transport-nodes/%s/status", transportNode1) { rw.WriteHeader(200) _, err = rw.Write(tNodeStatus) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/transport-nodes/%s/network/interfaces", transportNode1) { rw.WriteHeader(200) _, err = rw.Write(tNodeInterfaces) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/transport-nodes/%s/network/interfaces/%s/stats", transportNode1, transportNodeNic1) { rw.WriteHeader(200) _, err = rw.Write(tNodeInterfaceStats) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/cluster/nodes/%s/network/interfaces/%s/stats", managerNode1, managerNodeNic1) { rw.WriteHeader(200) _, err = rw.Write(mNodeInterfaceStats) - require.NoError(t, err) + assert.NoError(t, err) return } if req.URL.Path == fmt.Sprintf("/api/v1/cluster/nodes/%s/status", managerNode1) { rw.WriteHeader(200) _, err = rw.Write(mNodeStatus) - require.NoError(t, err) + assert.NoError(t, err) return } diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 7a136e918731..845f5e606511 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -784,7 +784,7 @@ func TestReceiverEOF(t *testing.T) { expectData = append(expectData, td) batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) - require.NoError(t, err) + assert.NoError(t, err) batch = copyBatch(batch) @@ -797,7 +797,7 @@ func TestReceiverEOF(t *testing.T) { wg.Add(1) go func() { - require.NoError(t, ctc.wait()) + assert.NoError(t, ctc.wait()) wg.Done() }() @@ -851,7 +851,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { td := testdata.GenerateTraces(2) batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) - require.NoError(t, err) + assert.NoError(t, err) batch = copyBatch(batch) @@ -863,7 +863,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { Name: key, Value: val, }) - require.NoError(t, err) + assert.NoError(t, err) } } @@ -879,7 +879,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { wg.Add(1) go func() { - require.NoError(t, ctc.wait()) + assert.NoError(t, ctc.wait()) wg.Done() }() @@ -1243,7 +1243,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { td := testdata.GenerateTraces(2) batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) - require.NoError(t, err) + assert.NoError(t, err) batch = copyBatch(batch) @@ -1256,7 +1256,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { Name: strings.ToLower(key), Value: val, }) - require.NoError(t, err) + assert.NoError(t, err) } } diff --git a/receiver/otelarrowreceiver/otelarrow_test.go b/receiver/otelarrowreceiver/otelarrow_test.go index fb4cb62342eb..e98d242190ad 100644 --- a/receiver/otelarrowreceiver/otelarrow_test.go +++ b/receiver/otelarrowreceiver/otelarrow_test.go @@ -354,12 +354,12 @@ func TestOTelArrowShutdown(t *testing.T) { for time.Since(start) < 5*time.Second { td := testdata.GenerateTraces(1) batch, batchErr := producer.BatchArrowRecordsFromTraces(td) - require.NoError(t, batchErr) - require.NoError(t, stream.Send(batch)) + assert.NoError(t, batchErr) + assert.NoError(t, stream.Send(batch)) } if cooperative { - require.NoError(t, stream.CloseSend()) + assert.NoError(t, stream.CloseSend()) } }() @@ -746,7 +746,7 @@ func TestConcurrentArrowReceiver(t *testing.T) { client := arrowpb.NewArrowTracesServiceClient(cc) stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true)) - require.NoError(t, err) + assert.NoError(t, err) producer := arrowRecord.NewProducer() var headerBuf bytes.Buffer @@ -762,21 +762,21 @@ func TestConcurrentArrowReceiver(t *testing.T) { Name: "seq", Value: fmt.Sprint(i), }) - require.NoError(t, err) + assert.NoError(t, err) batch, err := producer.BatchArrowRecordsFromTraces(td) - require.NoError(t, err) + assert.NoError(t, err) batch.Headers = headerBuf.Bytes() err = stream.Send(batch) - require.NoError(t, err) + assert.NoError(t, err) resp, err := stream.Recv() - require.NoError(t, err) - require.Equal(t, batch.BatchId, resp.BatchId) - require.Equal(t, arrowpb.StatusCode_OK, resp.StatusCode) + assert.NoError(t, err) + assert.Equal(t, batch.BatchId, resp.BatchId) + assert.Equal(t, arrowpb.StatusCode_OK, resp.StatusCode) } }() } @@ -854,17 +854,17 @@ func TestOTelArrowHalfOpenShutdown(t *testing.T) { } td := testdata.GenerateTraces(1) batch, batchErr := producer.BatchArrowRecordsFromTraces(td) - require.NoError(t, batchErr) + assert.NoError(t, batchErr) sendErr := stream.Send(batch) select { case <-ctx.Done(): if sendErr != nil { - require.ErrorIs(t, sendErr, io.EOF) + assert.ErrorIs(t, sendErr, io.EOF) } return default: - require.NoError(t, sendErr) + assert.NoError(t, sendErr) } } }() diff --git a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go index 12c6afb129f1..7835b90b8311 100644 --- a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go +++ b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go @@ -84,14 +84,14 @@ jvm_memory_pool_bytes_used{pool="CodeHeap 'non-nmethods'"} %.1f`, float64(i)) prweServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { // Snappy decode the uploads. payload, rerr := io.ReadAll(req.Body) - require.NoError(t, rerr) + assert.NoError(t, rerr) recv := make([]byte, len(payload)) decoded, derr := snappy.Decode(recv, payload) - require.NoError(t, derr) + assert.NoError(t, derr) writeReq := new(prompb.WriteRequest) - require.NoError(t, proto.Unmarshal(decoded, writeReq)) + assert.NoError(t, proto.Unmarshal(decoded, writeReq)) select { case <-ctx.Done(): diff --git a/receiver/rabbitmqreceiver/client_test.go b/receiver/rabbitmqreceiver/client_test.go index 1f0d94c4bf4f..e6be65b156d6 100644 --- a/receiver/rabbitmqreceiver/client_test.go +++ b/receiver/rabbitmqreceiver/client_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -117,7 +118,7 @@ func TestGetQueuesDetails(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write([]byte("{}")) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() @@ -136,7 +137,7 @@ func TestGetQueuesDetails(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() diff --git a/receiver/riakreceiver/client_test.go b/receiver/riakreceiver/client_test.go index 37de4aaba1ab..9ebe35826978 100644 --- a/receiver/riakreceiver/client_test.go +++ b/receiver/riakreceiver/client_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -101,7 +102,7 @@ func TestGetStatsDetails(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, err := w.Write(data) - require.NoError(t, err) + assert.NoError(t, err) })) defer ts.Close() diff --git a/receiver/sshcheckreceiver/scraper_test.go b/receiver/sshcheckreceiver/scraper_test.go index 6dd0d621504b..592a4a854350 100644 --- a/receiver/sshcheckreceiver/scraper_test.go +++ b/receiver/sshcheckreceiver/scraper_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/pkg/sftp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver/receivertest" @@ -68,7 +69,7 @@ func (s *sshServer) runSSHServer(t *testing.T) string { case <-s.done: return default: - require.NoError(t, err) + assert.NoError(t, err) } } _, chans, reqs, err := ssh.NewServerConn(conn, config) diff --git a/receiver/vcenterreceiver/internal/mockserver/client_mock.go b/receiver/vcenterreceiver/internal/mockserver/client_mock.go index dbf589542dc7..a5a071c71f40 100644 --- a/receiver/vcenterreceiver/internal/mockserver/client_mock.go +++ b/receiver/vcenterreceiver/internal/mockserver/client_mock.go @@ -13,6 +13,7 @@ import ( "testing" xj "github.com/basgys/goxml2json" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -38,17 +39,17 @@ func MockServer(t *testing.T, useTLS bool) *httptest.Server { handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // converting to JSON in order to iterate over map keys jsonified, err := xj.Convert(r.Body) - require.NoError(t, err) + assert.NoError(t, err) sr := &soapRequest{} err = json.Unmarshal(jsonified.Bytes(), sr) - require.NoError(t, err) - require.Len(t, sr.Envelope.Body, 1) + assert.NoError(t, err) + assert.Len(t, sr.Envelope.Body, 1) var requestType string for k := range sr.Envelope.Body { requestType = k } - require.NotEmpty(t, requestType) + assert.NotEmpty(t, requestType) body, err := routeBody(t, requestType, sr.Envelope.Body) if errors.Is(err, errNotFound) { diff --git a/receiver/zookeeperreceiver/scraper_test.go b/receiver/zookeeperreceiver/scraper_test.go index 59e94ae21134..4e643fdd5de3 100644 --- a/receiver/zookeeperreceiver/scraper_test.go +++ b/receiver/zookeeperreceiver/scraper_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -362,7 +363,7 @@ func (ms *mockedServer) mockZKServer(t *testing.T, cmdToFileMap map[string]strin case <-ms.quit: return default: - require.NoError(t, err) + assert.NoError(t, err) } } reader := bufio.NewReader(conn) @@ -372,13 +373,13 @@ func (ms *mockedServer) mockZKServer(t *testing.T, cmdToFileMap map[string]strin continue } - require.NoError(t, err) + assert.NoError(t, err) filename := cmdToFileMap[cmd] out, err := os.ReadFile(filepath.Join("testdata", filename)) - require.NoError(t, err) + assert.NoError(t, err) _, err = conn.Write(out) - require.NoError(t, err) + assert.NoError(t, err) conn.Close() } From 1b03346c9bce939742dfe9a3ef7ea9f819c932af Mon Sep 17 00:00:00 2001 From: Christos Markou Date: Mon, 16 Sep 2024 14:12:35 +0300 Subject: [PATCH 6/9] [processor/resourcedetection] Remove processor.resourcedetection.hostCPUModelAndFamilyAsString feature gate (#35203) Description: Follows #33077, https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/29152 and https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/29462. The feature gate has been stable since v0.101.0. Link to tracking Issue: Relates to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29025 and to https://github.com/open-telemetry/semantic-conventions/issues/495 /cc @mx-psi --------- Signed-off-by: ChrsMark --- .../remove_host_cpu_model_family_fg.yaml | 27 +++++++++++++++++++ .../internal/system/system.go | 8 ------ 2 files changed, 27 insertions(+), 8 deletions(-) create mode 100644 .chloggen/remove_host_cpu_model_family_fg.yaml diff --git a/.chloggen/remove_host_cpu_model_family_fg.yaml b/.chloggen/remove_host_cpu_model_family_fg.yaml new file mode 100644 index 000000000000..4c908c22df6c --- /dev/null +++ b/.chloggen/remove_host_cpu_model_family_fg.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: resourcedetectionprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove `processor.resourcedetection.hostCPUModelAndFamilyAsString` feature gate. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29025] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index 45fc5c8607f2..9262ee914732 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -23,14 +23,6 @@ import ( ) var ( - _ = featuregate.GlobalRegistry().MustRegister( - "processor.resourcedetection.hostCPUModelAndFamilyAsString", - featuregate.StageStable, - featuregate.WithRegisterDescription("Change type of host.cpu.model.id and host.cpu.model.family to string."), - featuregate.WithRegisterFromVersion("v0.89.0"), - featuregate.WithRegisterToVersion("v0.101.0"), - featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/semantic-conventions/issues/495"), - ) _ = featuregate.GlobalRegistry().MustRegister( "processor.resourcedetection.hostCPUSteppingAsString", featuregate.StageStable, From 1057f7293f04e9acf618fd6da64494cf24798557 Mon Sep 17 00:00:00 2001 From: Mingxi <71588583+joker-star-l@users.noreply.github.com> Date: Tue, 17 Sep 2024 00:25:55 +0800 Subject: [PATCH 7/9] [exporter/doris] Second PR (with logs Implementation) of New component: Doris Exporter (#35150) **Description:** Second PR of New component: Doris Exporter. Implementation of logs. **Link to tracking Issue:** #33479 **Testing:** Tested via unit test. **Documentation:** No additional documentation. --- .chloggen/doris-logs.yaml | 27 +++ .../dorisexporter/exporter_common_test.go | 19 +++ exporter/dorisexporter/exporter_logs.go | 159 ++++++++++++++++++ exporter/dorisexporter/exporter_logs_test.go | 89 ++++++++++ .../dorisexporter/exporter_traces_test.go | 25 ++- exporter/dorisexporter/factory.go | 13 +- exporter/dorisexporter/sql/logs_ddl.sql | 30 ++++ 7 files changed, 349 insertions(+), 13 deletions(-) create mode 100644 .chloggen/doris-logs.yaml create mode 100644 exporter/dorisexporter/exporter_logs.go create mode 100644 exporter/dorisexporter/exporter_logs_test.go create mode 100644 exporter/dorisexporter/sql/logs_ddl.sql diff --git a/.chloggen/doris-logs.yaml b/.chloggen/doris-logs.yaml new file mode 100644 index 000000000000..70ea260e9dcf --- /dev/null +++ b/.chloggen/doris-logs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: dorisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: logs implementation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33479] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/dorisexporter/exporter_common_test.go b/exporter/dorisexporter/exporter_common_test.go index aa0a93c695eb..77ecc8164bbf 100644 --- a/exporter/dorisexporter/exporter_common_test.go +++ b/exporter/dorisexporter/exporter_common_test.go @@ -4,6 +4,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" import ( + "net" "testing" "time" @@ -51,3 +52,21 @@ func TestStreamLoadUrl(t *testing.T) { url := streamLoadURL("http://doris:8030", "otel", "otel_logs") require.Equal(t, "http://doris:8030/api/otel/otel_logs/_stream_load", url) } + +func findRandomPort() (int, error) { + l, err := net.Listen("tcp", "localhost:0") + + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + + err = l.Close() + + if err != nil { + return 0, err + } + + return port, nil +} diff --git a/exporter/dorisexporter/exporter_logs.go b/exporter/dorisexporter/exporter_logs.go new file mode 100644 index 000000000000..bf854e1a8943 --- /dev/null +++ b/exporter/dorisexporter/exporter_logs.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" + +import ( + "context" + _ "embed" // for SQL file embedding + "encoding/json" + "fmt" + "io" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" +) + +//go:embed sql/logs_ddl.sql +var logsDDL string + +// dLog Log to Doris +type dLog struct { + ServiceName string `json:"service_name"` + Timestamp string `json:"timestamp"` + TraceID string `json:"trace_id"` + SpanID string `json:"span_id"` + SeverityNumber int32 `json:"severity_number"` + SeverityText string `json:"severity_text"` + Body string `json:"body"` + ResourceAttributes map[string]any `json:"resource_attributes"` + LogAttributes map[string]any `json:"log_attributes"` + ScopeName string `json:"scope_name"` + ScopeVersion string `json:"scope_version"` +} + +type logsExporter struct { + *commonExporter +} + +func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter { + return &logsExporter{ + commonExporter: newExporter(logger, cfg, set), + } +} + +func (e *logsExporter) start(ctx context.Context, host component.Host) error { + client, err := createDorisHTTPClient(ctx, e.cfg, host, e.TelemetrySettings) + if err != nil { + return err + } + e.client = client + + if !e.cfg.CreateSchema { + return nil + } + + conn, err := createDorisMySQLClient(e.cfg) + if err != nil { + return err + } + defer conn.Close() + + err = createAndUseDatabase(ctx, conn, e.cfg) + if err != nil { + return err + } + + ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr()) + _, err = conn.ExecContext(ctx, ddl) + return err +} + +func (e *logsExporter) shutdown(_ context.Context) error { + if e.client != nil { + e.client.CloseIdleConnections() + } + return nil +} + +func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error { + logs := make([]*dLog, 0, ld.LogRecordCount()) + + for i := 0; i < ld.ResourceLogs().Len(); i++ { + resourceLogs := ld.ResourceLogs().At(i) + resource := resourceLogs.Resource() + resourceAttributes := resource.Attributes() + serviceName := "" + v, ok := resourceAttributes.Get(semconv.AttributeServiceName) + if ok { + serviceName = v.AsString() + } + + for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ { + scopeLogs := resourceLogs.ScopeLogs().At(j) + + for k := 0; k < scopeLogs.LogRecords().Len(); k++ { + logRecord := scopeLogs.LogRecords().At(k) + + log := &dLog{ + ServiceName: serviceName, + Timestamp: e.formatTime(logRecord.Timestamp().AsTime()), + TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()), + SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()), + SeverityNumber: int32(logRecord.SeverityNumber()), + SeverityText: logRecord.SeverityText(), + Body: logRecord.Body().AsString(), + ResourceAttributes: resourceAttributes.AsRaw(), + LogAttributes: logRecord.Attributes().AsRaw(), + ScopeName: scopeLogs.Scope().Name(), + ScopeVersion: scopeLogs.Scope().Version(), + } + + logs = append(logs, log) + } + + } + + } + + return e.pushLogDataInternal(ctx, logs) +} + +func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error { + marshal, err := json.Marshal(logs) + if err != nil { + return err + } + + req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal) + if err != nil { + return err + } + + res, err := e.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return err + } + + response := streamLoadResponse{} + err = json.Unmarshal(body, &response) + if err != nil { + return err + } + + if !response.success() { + return fmt.Errorf("failed to push log data: %s", response.Message) + } + + return nil +} diff --git a/exporter/dorisexporter/exporter_logs_test.go b/exporter/dorisexporter/exporter_logs_test.go new file mode 100644 index 000000000000..8e28aa1ec73b --- /dev/null +++ b/exporter/dorisexporter/exporter_logs_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" +) + +func TestPushLogData(t *testing.T) { + port, err := findRandomPort() + require.NoError(t, err) + + config := createDefaultConfig().(*Config) + config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port) + config.CreateSchema = false + + err = config.Validate() + require.NoError(t, err) + + exporter := newLogsExporter(nil, config, testTelemetrySettings) + + ctx := context.Background() + + client, err := createDorisHTTPClient(ctx, config, nil, testTelemetrySettings) + require.NoError(t, err) + require.NotNil(t, client) + + exporter.client = client + + defer func() { + _ = exporter.shutdown(ctx) + }() + + server := &http.Server{ + ReadTimeout: 3 * time.Second, + Addr: fmt.Sprintf(":%d", port), + } + + go func() { + http.HandleFunc("/api/otel/otel_logs/_stream_load", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"Status":"Success"}`)) + }) + err = server.ListenAndServe() + require.Equal(t, http.ErrServerClosed, err) + }() + + err0 := fmt.Errorf("Not Started") + for err0 != nil { // until server started + err0 = exporter.pushLogData(ctx, simpleLogs(10)) + time.Sleep(100 * time.Millisecond) + } + + _ = server.Shutdown(ctx) +} + +func simpleLogs(count int) plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("service.name", "test-service") + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName("io.opentelemetry.contrib.doris") + sl.Scope().SetVersion("1.0.0") + sl.Scope().Attributes().PutStr("lib", "doris") + timestamp := time.Now() + for i := 0; i < count; i++ { + r := sl.LogRecords().AppendEmpty() + r.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp)) + r.SetSeverityNumber(plog.SeverityNumberError2) + r.SetSeverityText("error") + r.Body().SetStr("error message") + r.Attributes().PutStr(semconv.AttributeServiceNamespace, "default") + r.SetFlags(plog.DefaultLogRecordFlags) + r.SetTraceID([16]byte{1, 2, 3, byte(i)}) + r.SetSpanID([8]byte{1, 2, 3, byte(i)}) + } + return logs +} diff --git a/exporter/dorisexporter/exporter_traces_test.go b/exporter/dorisexporter/exporter_traces_test.go index efc1f1bdca5d..aafc08416132 100644 --- a/exporter/dorisexporter/exporter_traces_test.go +++ b/exporter/dorisexporter/exporter_traces_test.go @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "fmt" "net/http" "testing" "time" @@ -16,11 +17,14 @@ import ( ) func TestPushTraceData(t *testing.T) { + port, err := findRandomPort() + require.NoError(t, err) + config := createDefaultConfig().(*Config) - config.Endpoint = "http://127.0.0.1:18030" + config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port) config.CreateSchema = false - err := config.Validate() + err = config.Validate() require.NoError(t, err) exporter := newTracesExporter(nil, config, testTelemetrySettings) @@ -38,21 +42,24 @@ func TestPushTraceData(t *testing.T) { }() server := &http.Server{ - ReadTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + Addr: fmt.Sprintf(":%d", port), } + go func() { http.HandleFunc("/api/otel/otel_traces/_stream_load", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"Status":"Success"}`)) }) - server.Addr = ":18030" - _ = server.ListenAndServe() + err = server.ListenAndServe() + require.Equal(t, http.ErrServerClosed, err) }() - time.Sleep(1 * time.Second) - - err = exporter.pushTraceData(ctx, simpleTraces(10)) - require.NoError(t, err) + err0 := fmt.Errorf("Not Started") + for err0 != nil { // until server started + err0 = exporter.pushTraceData(ctx, simpleTraces(10)) + time.Sleep(100 * time.Millisecond) + } _ = server.Shutdown(ctx) } diff --git a/exporter/dorisexporter/factory.go b/exporter/dorisexporter/factory.go index ea7c7e50a834..be7259cb4a9e 100644 --- a/exporter/dorisexporter/factory.go +++ b/exporter/dorisexporter/factory.go @@ -12,7 +12,6 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter/internal/metadata" @@ -51,13 +50,19 @@ func createDefaultConfig() component.Config { } func createLogsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) { + c := cfg.(*Config) + exporter := newLogsExporter(set.Logger, c, set.TelemetrySettings) return exporterhelper.NewLogsExporter( ctx, set, cfg, - func(_ context.Context, _ plog.Logs) error { - return nil - }, + exporter.pushLogData, + exporterhelper.WithStart(exporter.start), + exporterhelper.WithShutdown(exporter.shutdown), + // we config the timeout option in http client, so we don't need to set timeout here + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithQueue(c.QueueSettings), + exporterhelper.WithRetry(c.BackOffConfig), ) } diff --git a/exporter/dorisexporter/sql/logs_ddl.sql b/exporter/dorisexporter/sql/logs_ddl.sql new file mode 100644 index 000000000000..b69abf71a8b4 --- /dev/null +++ b/exporter/dorisexporter/sql/logs_ddl.sql @@ -0,0 +1,30 @@ +CREATE TABLE IF NOT EXISTS %s +( + service_name VARCHAR(200), + timestamp DATETIME(6), + trace_id VARCHAR(200), + span_id STRING, + severity_number INT, + severity_text STRING, + body STRING, + resource_attributes VARIANT, + log_attributes VARIANT, + scope_name STRING, + scope_version STRING, + INDEX idx_service_name(service_name) USING INVERTED, + INDEX idx_timestamp(timestamp) USING INVERTED, + INDEX idx_trace_id(trace_id) USING INVERTED, + INDEX idx_span_id(span_id) USING INVERTED, + INDEX idx_severity_number(severity_number) USING INVERTED, + INDEX idx_body(body) USING INVERTED PROPERTIES("parser"="unicode", "support_phrase"="true"), + INDEX idx_severity_text(severity_text) USING INVERTED, + INDEX idx_resource_attributes(resource_attributes) USING INVERTED, + INDEX idx_log_attributes(log_attributes) USING INVERTED, + INDEX idx_scope_name(scope_name) USING INVERTED, + INDEX idx_scope_version(scope_version) USING INVERTED +) +ENGINE = OLAP +DUPLICATE KEY(service_name, timestamp) +PARTITION BY RANGE(timestamp) () +DISTRIBUTED BY HASH(trace_id) BUCKETS AUTO +%s; \ No newline at end of file From 7734698a5a8aa60b02051a9583ed5b32a6ffbf32 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Mon, 16 Sep 2024 09:27:17 -0700 Subject: [PATCH 8/9] [chore] Fix "Device or resource busy" error on Windows CI (#35192) **Description:** The error "Device or resource busy" happens intermittently when running the `build-and-test-windows` workflow on CI. Examining a few instances of the error it becomes clear that the concurrent run of component tests trigger concurrent builds of `gotestsum`. This can cause one of the runs to clash with the other and fail to build the `gotestsum` target. The fix is to add a explicit step to build `gotestsum` in the CI workflow. This way the local dev modus operandi is not altered (they can still rely on `make test` to build any tool necessary for the test). If any other tools are added to the `test` target they should also be added to the workflow on Windows. **Link to tracking Issue:** Fix #34691 **Testing:** Validated on repository fork. **Documentation:** N/A --- .github/workflows/build-and-test-windows.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/build-and-test-windows.yml b/.github/workflows/build-and-test-windows.yml index e8b469c413a2..68a088f5469d 100644 --- a/.github/workflows/build-and-test-windows.yml +++ b/.github/workflows/build-and-test-windows.yml @@ -72,6 +72,11 @@ jobs: - name: Ensure required ports in the dynamic range are available run: | & ${{ github.workspace }}\.github\workflows\scripts\win-required-ports.ps1 + - name: Build shared test tools + # If component tests share Makefile targets they need to be added here to avoid + # concurrent component tests clashing when building such targets. This applies + # specifically to Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34691 + run: make "$(${PWD} -replace '\\', '/')/.tools/gotestsum" - name: Run Unit tests run: make -j2 gotest GROUP=${{ matrix.group }} windows-unittest: From 943c736d161051a1a1c0d527d6aaf1b696c1d8ab Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 16 Sep 2024 09:29:31 -0700 Subject: [PATCH 9/9] [transformprocessor]: Remove unnecessary data copy when transform sum to/from gauge (#35177) Signed-off-by: Bogdan Drutu --- .chloggen/rm-copy-transformprocessor.yaml | 22 +++++++++++++++++++ .../metrics/func_convert_gauge_to_sum.go | 4 ++-- .../func_convert_gauge_to_sum_datapoint.go | 4 ++-- .../metrics/func_convert_sum_to_gauge.go | 2 +- .../func_convert_sum_to_gauge_datapoint.go | 4 ++-- .../metrics/func_extract_sum_metric.go | 2 +- 6 files changed, 30 insertions(+), 8 deletions(-) create mode 100644 .chloggen/rm-copy-transformprocessor.yaml diff --git a/.chloggen/rm-copy-transformprocessor.yaml b/.chloggen/rm-copy-transformprocessor.yaml new file mode 100644 index 000000000000..074c499d91c7 --- /dev/null +++ b/.chloggen/rm-copy-transformprocessor.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove unnecessary data copy when transform sum to/from gauge + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35177] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go index 3c3a5100dc3c..3fc352f0faca 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go +++ b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go @@ -54,8 +54,8 @@ func convertGaugeToSum(stringAggTemp string, monotonic bool) (ottl.ExprFunc[ottl metric.SetEmptySum().SetAggregationTemporality(aggTemp) metric.Sum().SetIsMonotonic(monotonic) - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.Sum().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.Sum().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go index dab12e96d094..61c848ab1b52 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go +++ b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go @@ -50,8 +50,8 @@ func convertDatapointGaugeToSum(stringAggTemp string, monotonic bool) (ottl.Expr metric.SetEmptySum().SetAggregationTemporality(aggTemp) metric.Sum().SetIsMonotonic(monotonic) - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.Sum().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.Sum().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go index f4763e65c9e5..212395bd524b 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go +++ b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go @@ -30,7 +30,7 @@ func convertSumToGauge() (ottl.ExprFunc[ottlmetric.TransformContext], error) { dps := metric.Sum().DataPoints() // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.SetEmptyGauge().DataPoints()) + dps.MoveAndAppendTo(metric.SetEmptyGauge().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go index ca2f09c8a121..1943d2d9796a 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go +++ b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go @@ -29,8 +29,8 @@ func convertDatapointSumToGauge() (ottl.ExprFunc[ottldatapoint.TransformContext] dps := metric.Sum().DataPoints() - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.SetEmptyGauge().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.SetEmptyGauge().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go b/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go index 78b47623cdf3..f002260944ac 100644 --- a/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go +++ b/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go @@ -32,7 +32,7 @@ func createExtractSumMetricFunction(_ ottl.FunctionContext, oArgs ottl.Arguments return extractSumMetric(args.Monotonic) } -// this interface helps unify the logic for extracting data from different histogram types +// SumCountDataPoint interface helps unify the logic for extracting data from different histogram types // all supported metric types' datapoints implement it type SumCountDataPoint interface { Attributes() pcommon.Map