From d828041a521efd7da2d3f864f7e00714c88c8c31 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Thu, 14 Dec 2023 22:09:35 -0800 Subject: [PATCH] fix(sdk-logs): await async resources in log processors (#4349) --- experimental/CHANGELOG.md | 1 + experimental/packages/sdk-logs/package.json | 1 + .../src/export/BatchLogRecordProcessorBase.ts | 46 ++++++---- .../src/export/SimpleLogRecordProcessor.ts | 53 +++++++++--- .../export/BatchLogRecordProcessor.test.ts | 28 +++++- .../export/SimpleLogRecordProcessor.test.ts | 69 ++++++++++++++- .../common/export/TestExporterWithDelay.ts | 51 +++++++++++ package-lock.json | 85 +++++++++++++++++++ 8 files changed, 298 insertions(+), 36 deletions(-) create mode 100644 experimental/packages/sdk-logs/test/common/export/TestExporterWithDelay.ts diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index cbe0140c16..2b4a27e968 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -31,6 +31,7 @@ All notable changes to experimental packages in this project will be documented ### :bug: (Bug Fix) +* fix(sdk-logs): await async resources in log processors * fix(sdk-logs): avoid map attribute set when count limit exceeded * fix(instrumentation-fetch): only access navigator if it is defined [#4063](https://github.com/open-telemetry/opentelemetry-js/pull/4063) * allows for experimental usage of this instrumentation with non-browser runtimes diff --git a/experimental/packages/sdk-logs/package.json b/experimental/packages/sdk-logs/package.json index d14f8c7116..1d0526a95d 100644 --- a/experimental/packages/sdk-logs/package.json +++ b/experimental/packages/sdk-logs/package.json @@ -75,6 +75,7 @@ "@babel/core": "7.23.6", "@opentelemetry/api": ">=1.4.0 <1.8.0", "@opentelemetry/api-logs": "0.46.0", + "@opentelemetry/resources_1.9.0": "npm:@opentelemetry/resources@1.9.0", "@types/mocha": "10.0.6", "@types/node": "18.6.5", "@types/sinon": "10.0.20", diff --git a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts index 027f103c07..bfe6367cf2 100644 --- a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts +++ b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts @@ -21,8 +21,9 @@ import { getEnv, globalErrorHandler, unrefTimer, - callWithTimeout, BindOnceFuture, + internal, + callWithTimeout, } from '@opentelemetry/core'; import type { BufferConfig } from '../types'; @@ -163,21 +164,34 @@ export abstract class BatchLogRecordProcessorBase } } - private _export(logRecords: LogRecord[]): Promise { - return new Promise((resolve, reject) => { - this._exporter.export(logRecords, (res: ExportResult) => { - if (res.code !== ExportResultCode.SUCCESS) { - reject( - res.error ?? - new Error( - `BatchLogRecordProcessorBase: log record export failed (status ${res})` - ) - ); - return; - } - resolve(res); - }); - }); + private _export(logRecords: LogRecord[]): Promise { + const doExport = () => + internal + ._export(this._exporter, logRecords) + .then((result: ExportResult) => { + if (result.code !== ExportResultCode.SUCCESS) { + globalErrorHandler( + result.error ?? + new Error( + `BatchLogRecordProcessor: log record export failed (status ${result})` + ) + ); + } + }) + .catch(globalErrorHandler); + + const pendingResources = logRecords + .map(logRecord => logRecord.resource) + .filter(resource => resource.asyncAttributesPending); + + // Avoid scheduling a promise to make the behavior more predictable and easier to test + if (pendingResources.length === 0) { + return doExport(); + } else { + return Promise.all( + pendingResources.map(resource => resource.waitForAsyncAttributes?.()) + ).then(doExport, globalErrorHandler); + } } protected abstract onShutdown(): void; diff --git a/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts b/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts index b516d55fc7..97134c886b 100644 --- a/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts +++ b/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts @@ -19,17 +19,19 @@ import { BindOnceFuture, ExportResultCode, globalErrorHandler, + internal, } from '@opentelemetry/core'; - import type { LogRecordExporter } from './LogRecordExporter'; import type { LogRecordProcessor } from '../LogRecordProcessor'; import type { LogRecord } from './../LogRecord'; export class SimpleLogRecordProcessor implements LogRecordProcessor { private _shutdownOnce: BindOnceFuture; + private _unresolvedExports: Set>; constructor(private readonly _exporter: LogRecordExporter) { this._shutdownOnce = new BindOnceFuture(this._shutdown, this); + this._unresolvedExports = new Set>(); } public onEmit(logRecord: LogRecord): void { @@ -37,22 +39,45 @@ export class SimpleLogRecordProcessor implements LogRecordProcessor { return; } - this._exporter.export([logRecord], (res: ExportResult) => { - if (res.code !== ExportResultCode.SUCCESS) { - globalErrorHandler( - res.error ?? - new Error( - `SimpleLogRecordProcessor: log record export failed (status ${res})` - ) - ); - return; + const doExport = () => + internal + ._export(this._exporter, [logRecord]) + .then((result: ExportResult) => { + if (result.code !== ExportResultCode.SUCCESS) { + globalErrorHandler( + result.error ?? + new Error( + `SimpleLogRecordProcessor: log record export failed (status ${result})` + ) + ); + } + }) + .catch(globalErrorHandler); + + // Avoid scheduling a promise to make the behavior more predictable and easier to test + if (logRecord.resource.asyncAttributesPending) { + const exportPromise = logRecord.resource + .waitForAsyncAttributes?.() + .then(() => { + // Using TS Non-null assertion operator because exportPromise could not be null in here + // if waitForAsyncAttributes is not present this code will never be reached + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this._unresolvedExports.delete(exportPromise!); + return doExport(); + }, globalErrorHandler); + + // store the unresolved exports + if (exportPromise != null) { + this._unresolvedExports.add(exportPromise); } - }); + } else { + void doExport(); + } } - public forceFlush(): Promise { - // do nothing as all log records are being exported without waiting - return Promise.resolve(); + public async forceFlush(): Promise { + // await unresolved resources before resolving + await Promise.all(Array.from(this._unresolvedExports)); } public shutdown(): Promise { diff --git a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts index 70859a314c..f6a8e77611 100644 --- a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts +++ b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts @@ -32,16 +32,19 @@ import { import { BatchLogRecordProcessorBase } from '../../../src/export/BatchLogRecordProcessorBase'; import { reconfigureLimits } from '../../../src/config'; import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState'; -import { Resource } from '@opentelemetry/resources'; +import { Resource, ResourceAttributes } from '@opentelemetry/resources'; class BatchLogRecordProcessor extends BatchLogRecordProcessorBase { onInit() {} onShutdown() {} } -const createLogRecord = (limits?: LogRecordLimits): LogRecord => { +const createLogRecord = ( + limits?: LogRecordLimits, + resource?: Resource +): LogRecord => { const sharedState = new LoggerProviderSharedState( - Resource.default(), + resource || Resource.default(), Infinity, reconfigureLimits(limits ?? {}) ); @@ -308,6 +311,25 @@ describe('BatchLogRecordProcessorBase', () => { await processor.forceFlush(); assert.strictEqual(exporter.getFinishedLogRecords().length, 1); }); + + it('should wait for pending resource on flush', async () => { + const processor = new BatchLogRecordProcessor(exporter); + const asyncResource = new Resource( + {}, + new Promise(resolve => { + setTimeout(() => resolve({ async: 'fromasync' }), 1); + }) + ); + const logRecord = createLogRecord(undefined, asyncResource); + processor.onEmit(logRecord); + await processor.forceFlush(); + const exportedLogs = exporter.getFinishedLogRecords(); + assert.strictEqual(exportedLogs.length, 1); + assert.strictEqual( + exportedLogs[0].resource.attributes['async'], + 'fromasync' + ); + }); }); describe('shutdown', () => { diff --git a/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts b/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts index 27eacc9195..a19723f4cc 100644 --- a/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts +++ b/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts @@ -21,6 +21,8 @@ import { loggingErrorHandler, setGlobalErrorHandler, } from '@opentelemetry/core'; +import { Resource, ResourceAttributes } from '@opentelemetry/resources'; +import { Resource as Resource190 } from '@opentelemetry/resources_1.9.0'; import { InMemoryLogRecordExporter, @@ -29,12 +31,12 @@ import { LogRecord, } from './../../../src'; import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState'; -import { Resource } from '@opentelemetry/resources'; import { reconfigureLimits } from '../../../src/config'; +import { TestExporterWithDelay } from './TestExporterWithDelay'; -const setup = (exporter: LogRecordExporter) => { +const setup = (exporter: LogRecordExporter, resource?: Resource) => { const sharedState = new LoggerProviderSharedState( - Resource.default(), + resource || Resource.default(), Infinity, reconfigureLimits({}) ); @@ -113,4 +115,65 @@ describe('SimpleLogRecordProcessor', () => { assert.ok(shutdownSpy.callCount === 1); }); }); + + describe('force flush', () => { + it('should await unresolved resources', async () => { + const exporter = new InMemoryLogRecordExporter(); + const asyncResource = new Resource( + {}, + new Promise(resolve => { + setTimeout(() => resolve({ async: 'fromasync' }), 1); + }) + ); + const { processor, logRecord } = setup(exporter, asyncResource); + assert.strictEqual(exporter.getFinishedLogRecords().length, 0); + processor.onEmit(logRecord); + + await processor.forceFlush(); + + const exportedLogs = exporter.getFinishedLogRecords(); + assert.strictEqual(exportedLogs.length, 1); + assert.strictEqual( + exportedLogs[0].resource.attributes['async'], + 'fromasync' + ); + }); + + it('should await doExport() and delete from _unresolvedExports', async () => { + const testExporterWithDelay = new TestExporterWithDelay(); + const asyncResource = new Resource( + {}, + new Promise(resolve => { + setTimeout(() => resolve({ async: 'fromasync' }), 1); + }) + ); + const processor = new SimpleLogRecordProcessor(testExporterWithDelay); + const { logRecord } = setup(testExporterWithDelay, asyncResource); + + processor.onEmit(logRecord); + assert.strictEqual(processor['_unresolvedExports'].size, 1); + await processor.forceFlush(); + assert.strictEqual(processor['_unresolvedExports'].size, 0); + const exportedLogRecords = testExporterWithDelay.getFinishedLogRecords(); + assert.strictEqual(exportedLogRecords.length, 1); + }); + }); + + describe('compatibility', () => { + it('should export when using old resource implementation', async () => { + const exporter = new InMemoryLogRecordExporter(); + const { processor, logRecord } = setup( + exporter, + new Resource190({ fromold: 'fromold' }) + ); + assert.strictEqual(exporter.getFinishedLogRecords().length, 0); + processor.onEmit(logRecord); + const exportedLogs = exporter.getFinishedLogRecords(); + assert.strictEqual(exportedLogs.length, 1); + assert.strictEqual( + exportedLogs[0].resource.attributes['fromold'], + 'fromold' + ); + }); + }); }); diff --git a/experimental/packages/sdk-logs/test/common/export/TestExporterWithDelay.ts b/experimental/packages/sdk-logs/test/common/export/TestExporterWithDelay.ts new file mode 100644 index 0000000000..1ce3a20ba0 --- /dev/null +++ b/experimental/packages/sdk-logs/test/common/export/TestExporterWithDelay.ts @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ExportResult } from '@opentelemetry/core'; +import { InMemoryLogRecordExporter, ReadableLogRecord } from '../../../src'; + +/** + * A test-only exporter that delays during export to mimic a real exporter. + */ +export class TestExporterWithDelay extends InMemoryLogRecordExporter { + private _exporterCreatedLogRecords: ReadableLogRecord[] = []; + + constructor() { + super(); + } + + override export( + logRecords: ReadableLogRecord[], + resultCallback: (result: ExportResult) => void + ): void { + super.export(logRecords, () => setTimeout(resultCallback, 1)); + } + + override shutdown(): Promise { + return super.shutdown().then(() => { + this._exporterCreatedLogRecords = []; + }); + } + + override reset() { + super.reset(); + this._exporterCreatedLogRecords = []; + } + + getExporterCreatedLogRecords(): ReadableLogRecord[] { + return this._exporterCreatedLogRecords; + } +} diff --git a/package-lock.json b/package-lock.json index 867838c7e9..490d403d43 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4713,6 +4713,7 @@ "@babel/core": "7.23.6", "@opentelemetry/api": ">=1.4.0 <1.8.0", "@opentelemetry/api-logs": "0.46.0", + "@opentelemetry/resources_1.9.0": "npm:@opentelemetry/resources@1.9.0", "@types/mocha": "10.0.6", "@types/node": "18.6.5", "@types/sinon": "10.0.20", @@ -4744,6 +4745,56 @@ "@opentelemetry/api-logs": ">=0.39.1" } }, + "experimental/packages/sdk-logs/node_modules/@opentelemetry/api": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.4.1.tgz", + "integrity": "sha512-O2yRJce1GOc6PAy3QxFM4NzFiWzvScDC1/5ihYBL6BUEVdq0XMWN01sppE+H6bBXbaFYipjwFLEWLg5PaSOThA==", + "dev": true, + "engines": { + "node": ">=8.0.0" + } + }, + "experimental/packages/sdk-logs/node_modules/@opentelemetry/resources_1.9.0": { + "name": "@opentelemetry/resources", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.9.0.tgz", + "integrity": "sha512-zCyien0p3XWarU6zv72c/JZ6QlG5QW/hc61Nh5TSR1K9ndnljzAGrH55x4nfyQdubfoh9QxLNh9FXH0fWK6vcg==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.9.0", + "@opentelemetry/semantic-conventions": "1.9.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.5.0" + } + }, + "experimental/packages/sdk-logs/node_modules/@opentelemetry/resources_1.9.0/node_modules/@opentelemetry/core": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.9.0.tgz", + "integrity": "sha512-Koy1ApRUp5DB5KpOqhDk0JjO9x6QeEkmcePl8qQDsXZGF4MuHUBShXibd+J2tRNckTsvgEHi1uEuUckDgN+c/A==", + "dev": true, + "dependencies": { + "@opentelemetry/semantic-conventions": "1.9.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.5.0" + } + }, + "experimental/packages/sdk-logs/node_modules/@opentelemetry/semantic-conventions": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.9.0.tgz", + "integrity": "sha512-po7penSfQ/Z8352lRVDpaBrd9znwA5mHGqXR7nDEiVnxkDFkBIhVf/tKeAJDIq/erFpcRowKFeCsr5eqqcSyFQ==", + "dev": true, + "engines": { + "node": ">=14" + } + }, "experimental/packages/sdk-logs/node_modules/@webpack-cli/configtest": { "version": "2.1.1", "dev": true, @@ -42063,6 +42114,7 @@ "@opentelemetry/api-logs": "0.46.0", "@opentelemetry/core": "1.19.0", "@opentelemetry/resources": "1.19.0", + "@opentelemetry/resources_1.9.0": "npm:@opentelemetry/resources@1.9.0", "@types/mocha": "10.0.6", "@types/node": "18.6.5", "@types/sinon": "10.0.20", @@ -42087,6 +42139,39 @@ "webpack-merge": "5.10.0" }, "dependencies": { + "@opentelemetry/api": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.4.1.tgz", + "integrity": "sha512-O2yRJce1GOc6PAy3QxFM4NzFiWzvScDC1/5ihYBL6BUEVdq0XMWN01sppE+H6bBXbaFYipjwFLEWLg5PaSOThA==", + "dev": true + }, + "@opentelemetry/resources_1.9.0": { + "version": "npm:@opentelemetry/resources@1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.9.0.tgz", + "integrity": "sha512-zCyien0p3XWarU6zv72c/JZ6QlG5QW/hc61Nh5TSR1K9ndnljzAGrH55x4nfyQdubfoh9QxLNh9FXH0fWK6vcg==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.9.0", + "@opentelemetry/semantic-conventions": "1.9.0" + }, + "dependencies": { + "@opentelemetry/core": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.9.0.tgz", + "integrity": "sha512-Koy1ApRUp5DB5KpOqhDk0JjO9x6QeEkmcePl8qQDsXZGF4MuHUBShXibd+J2tRNckTsvgEHi1uEuUckDgN+c/A==", + "dev": true, + "requires": { + "@opentelemetry/semantic-conventions": "1.9.0" + } + } + } + }, + "@opentelemetry/semantic-conventions": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.9.0.tgz", + "integrity": "sha512-po7penSfQ/Z8352lRVDpaBrd9znwA5mHGqXR7nDEiVnxkDFkBIhVf/tKeAJDIq/erFpcRowKFeCsr5eqqcSyFQ==", + "dev": true + }, "@webpack-cli/configtest": { "version": "2.1.1", "dev": true,