From af72d4d98bff0b63604190c0c9d30a1b1fc760cb Mon Sep 17 00:00:00 2001 From: David Luna Date: Wed, 17 Jan 2024 16:47:00 +0100 Subject: [PATCH] chore(instrumentation-undici): add instrumentation config --- .../src/internal-types.ts | 20 +- .../src/types.ts | 56 +++++- .../src/undici.ts | 186 +++++++++++++----- .../test/fetch.test.ts | 154 +++++++++++---- .../test/utils/assertSpan.ts | 17 +- .../test/utils/mock-server.ts | 22 ++- 6 files changed, 335 insertions(+), 120 deletions(-) diff --git a/experimental/packages/opentelemetry-instrumentation-undici/src/internal-types.ts b/experimental/packages/opentelemetry-instrumentation-undici/src/internal-types.ts index 89077065c4..4206664c56 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/src/internal-types.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/src/internal-types.ts @@ -15,7 +15,7 @@ */ import type { Channel } from 'diagnostics_channel'; -import type { Request, RequestInfo, Response } from 'undici'; +import { UndiciRequest, UnidiciResponse }from './types'; export interface ListenerRecord { name: string; @@ -23,27 +23,13 @@ export interface ListenerRecord { onMessage: (message: any, name: string) => void; } -// type Writeable = { -readonly [P in keyof T]: T[P] }; -// type WriteableRequest = Writeable; - -// TODO: the actual `request` object at runtime have subtle differences -// from the `Request` type declared in `undici`. Type properly -// -// Types declared in the lib -// - have some properties declared as `readonly` but we are changing them -// - omits some properties we need to inspect for the instrumentation -type UndiciRequest = Request & { - origin: RequestInfo; - path: string; -}; - export interface RequestMessage { request: UndiciRequest; } -export interface RequestResponseMessage { +export interface HeadersMessage { request: UndiciRequest; - response: Response; + response: UnidiciResponse; } export interface RequestErrorMessage { diff --git a/experimental/packages/opentelemetry-instrumentation-undici/src/types.ts b/experimental/packages/opentelemetry-instrumentation-undici/src/types.ts index 576e03e6e1..81f899453e 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/src/types.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/src/types.ts @@ -14,7 +14,7 @@ * limitations under the License. */ import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; -import type { Span } from '@opentelemetry/api'; +import type { Attributes, Span } from '@opentelemetry/api'; export type UndiciRequestHook = (args: { request: RequestType; @@ -22,8 +22,56 @@ export type UndiciRequestHook = (args: { additionalHeaders: Record; }) => void; -// TODO: This package will instrument HTTP requests made through Undici +// TODO: notes about support +// - `fetch` API is added in node v16.15.0 +// - `undici` supports node >=18 + + +// TODO: `Request` class was added in node v16.15.0, make it work with v14 +// also we do not get that object from the diagnostics channel message but the +// core request from https://github.com/nodejs/undici/blob/main/lib/core/request.js +// which is not typed + + +export interface UndiciRequest { + origin: string; + method: string; + path: string; + /** + * Serialized string of headers in the form `name: value\r\n` + */ + headers: string; + throwOnError: boolean; + completed: boolean; + aborted: boolean; + idempotent: boolean; + contentLength: number | null; + contentType: string | null; + body: any; +} + +export interface UnidiciResponse { + headers: Buffer[]; + statusCode: number; +} + + +// This package will instrument HTTP requests made through `undici` or `fetch` global API // so it seems logical to have similar options than the HTTP instrumentation -export interface UndiciInstrumentationConfig extends InstrumentationConfig { - onRequest?: UndiciRequestHook; +export interface UndiciInstrumentationConfig extends InstrumentationConfig { + /** Not trace all outgoing requests that matched with custom function */ + ignoreRequestHook?: (request: RequestType) => boolean; + /** Function for adding custom attributes after response is handled */ + applyCustomAttributesOnSpan?: (span: Span, request: RequestType, response: Response) => void; + /** Function for adding custom attributes before request is handled */ + requestHook?: (span: Span, request: RequestType) => void; + /** Function for adding custom attributes before a span is started in outgoingRequest */ + startSpanHook?: (request: RequestType) => Attributes; + /** Require parent to create span for outgoing requests */ + requireParentforSpans?: boolean; + /** Map the following HTTP headers to span attributes. */ + headersToSpanAttributes?: { + requestHeaders?: string[]; + responseHeaders?: string[]; + }; } diff --git a/experimental/packages/opentelemetry-instrumentation-undici/src/undici.ts b/experimental/packages/opentelemetry-instrumentation-undici/src/undici.ts index 9f739343da..efad703806 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/src/undici.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/src/undici.ts @@ -14,9 +14,10 @@ * limitations under the License. */ import * as diagch from 'diagnostics_channel'; +import { URL } from 'url'; import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; -import { InstrumentationBase } from '@opentelemetry/instrumentation'; +import { InstrumentationBase, safeExecuteInTheMiddle } from '@opentelemetry/instrumentation'; import { Attributes, context, @@ -30,26 +31,9 @@ import { import { VERSION } from './version'; -import { ListenerRecord } from './internal-types'; -import { UndiciInstrumentationConfig } from './types'; - -// Get the content-length from undici response headers. -// `headers` is an Array of buffers: [k, v, k, v, ...]. -// If the header is not present, or has an invalid value, this returns null. -function contentLengthFromResponseHeaders(headers: Buffer[]) { - const name = 'content-length'; - for (let i = 0; i < headers.length; i += 2) { - const k = headers[i]; - if (k.length === name.length && k.toString().toLowerCase() === name) { - const v = Number(headers[i + 1]); - if (!Number.isNaN(Number(v))) { - return v; - } - return undefined; - } - } - return undefined; -} +import { HeadersMessage, ListenerRecord, RequestMessage } from './internal-types'; +import { UndiciInstrumentationConfig, UndiciRequest } from './types'; + // A combination of https://github.com/elastic/apm-agent-nodejs and // https://github.com/gadget-inc/opentelemetry-instrumentations/blob/main/packages/opentelemetry-instrumentation-undici/src/index.ts @@ -58,9 +42,7 @@ export class UndiciInstrumentation extends InstrumentationBase { // unsubscribing. private _channelSubs!: Array; - private _spanFromReq = new WeakMap(); - - private _requestHook: UndiciInstrumentationConfig['onRequest']; + private _spanFromReq = new WeakMap(); constructor(config?: UndiciInstrumentationConfig) { super('@opentelemetry/instrumentation-undici', VERSION, config); @@ -83,29 +65,44 @@ export class UndiciInstrumentation extends InstrumentationBase { } override disable(): void { + if (!this._config.enabled) { + return; + } + this._channelSubs.forEach(sub => sub.channel.unsubscribe(sub.onMessage)); this._channelSubs.length = 0; + this._config.enabled = false; } override enable(): void { if (this._config.enabled) { return; } + this._config.enabled = true; + // This method is called by the `InstrumentationAbstract` constructor before // ours is called. So we need to ensure the property is initalized this._channelSubs = this._channelSubs || []; - this.subscribeToChannel('undici:request:create', this.onRequest.bind(this)); - this.subscribeToChannel('undici:request:headers',this.onHeaders.bind(this)); + this.subscribeToChannel('undici:request:create', this.onRequestCreated.bind(this)); + this.subscribeToChannel('undici:client:sendHeaders',this.onRequestHeaders.bind(this)); + this.subscribeToChannel('undici:request:headers',this.onResponseHeaders.bind(this)); this.subscribeToChannel('undici:request:trailers', this.onDone.bind(this)); this.subscribeToChannel('undici:request:error', this.onError.bind(this)); } override setConfig(config?: UndiciInstrumentationConfig): void { super.setConfig(config); - if (typeof config?.onRequest === 'function') { - this._requestHook = config.onRequest; + + if (config?.enabled) { + this.enable(); + } else { + this.disable(); } } + + private _getConfig(): UndiciInstrumentationConfig { + return this._config as UndiciInstrumentationConfig; + } private subscribeToChannel( diagnosticChannel: string, @@ -120,27 +117,79 @@ export class UndiciInstrumentation extends InstrumentationBase { }); } - private onRequest({ request }: any): void { - // We do not handle instrumenting HTTP CONNECT. See limitation notes above. - if (request.method === 'CONNECT') { + // This is the 1st message we receive for each request (fired after request creation). Here we will + // create the span and populate some atttributes, then link the span to the request for further + // span processing + private onRequestCreated({ request }: RequestMessage): void { + console.log('onRequestCreated') + // Ignore if: + // - instrumentation is disabled + // - ignored by config + // - method is 'CONNECT' (TODO: check for limitations) + const config = this._getConfig(); + const shouldIgnoreReq = safeExecuteInTheMiddle( + () => !config.enabled || request.method === 'CONNECT' || config.ignoreRequestHook?.(request), + (e) => e && this._diag.error('caught ignoreRequestHook error: ', e), + true, + ); + + if (shouldIgnoreReq) { return; } + const requestUrl = new URL(request.origin); + const spanAttributes = { + [SemanticAttributes.HTTP_URL]: request.origin, + [SemanticAttributes.HTTP_METHOD]: request.method, + [SemanticAttributes.HTTP_TARGET]: request.path || '/', + [SemanticAttributes.NET_PEER_NAME]: requestUrl.hostname, + }; + + const rawHeaders = request.headers.split('\r\n'); + const reqHeaders = new Map(rawHeaders.map(h => { + const sepIndex = h.indexOf(':'); + const name = h.substring(0, sepIndex).toLowerCase(); + const val = h.substring(sepIndex + 1).trim(); + return [name, val]; + })); + + let hostAttribute = reqHeaders.get('host'); + + if (!hostAttribute) { + const protocolPorts: Record = { https: '443', http: '80' }; + const defaultPort = protocolPorts[requestUrl.protocol] || ''; + const port = requestUrl.port || defaultPort; + + hostAttribute = requestUrl.hostname; + if (port) { + hostAttribute += `:${port}`; + } + } + spanAttributes[SemanticAttributes.HTTP_HOST] = hostAttribute; + + const userAgent = reqHeaders.get('user-agent'); + if (userAgent) { + spanAttributes[SemanticAttributes.HTTP_USER_AGENT] = userAgent; + } + const span = this.tracer.startSpan(`HTTP ${request.method}`, { kind: SpanKind.CLIENT, - attributes: { - [SemanticAttributes.HTTP_URL]: String(request.origin), - [SemanticAttributes.HTTP_METHOD]: request.method, - [SemanticAttributes.HTTP_TARGET]: request.path, - }, + attributes: spanAttributes, }); + + // TODO: add headers based on config + + // Context propagation const requestContext = trace.setSpan(context.active(), span); const addedHeaders: Record = {}; propagation.inject(requestContext, addedHeaders); - if (this._requestHook) { - this._requestHook({ request, span, additionalHeaders: addedHeaders }); - } + // Execute the request hook if defined + safeExecuteInTheMiddle( + () => this._getConfig().requestHook?.(span, request), + (e) => e && this._diag.error('caught requestHook error: ', e), + true, + ); request.headers += Object.entries(addedHeaders) .map(([k, v]) => `${k}: ${v}\r\n`) @@ -148,31 +197,65 @@ export class UndiciInstrumentation extends InstrumentationBase { this._spanFromReq.set(request, span); } - private onHeaders({ request, response }: any): void { + // This is the 2nd message we recevie for each request. It is fired when connection with + // the remote is stablished and abut to send the first byte. Here do have info about the + // remote addres an port sowe can poupulate some `net.*` attributes into the span + private onRequestHeaders({ request, socket }: any): void { + console.log('onRequestHeaders') + const span = this._spanFromReq.get(request as UndiciRequest); + + if (span) { + const { remoteAddress, remotePort } = socket; + + span.setAttributes({ + [SemanticAttributes.NET_PEER_IP]: remoteAddress, + [SemanticAttributes.NET_PEER_PORT]: remotePort, + }); + } + } + + // This is the 3rd message we get for each request and it's fired when the server + // headers are received, body may not be accessible yet (TODO: check this). + // From the response headers we can set the status and content length + private onResponseHeaders({ request, response }: HeadersMessage): void { + console.log('onResponseHeaders') const span = this._spanFromReq.get(request); if (span !== undefined) { // We are currently *not* capturing response headers, even though the // intake API does allow it, because none of the other `setHttpContext` - // uses currently do. - - const cLen = contentLengthFromResponseHeaders(response.headers); + // uses currently do const attrs: Attributes = { [SemanticAttributes.HTTP_STATUS_CODE]: response.statusCode, }; - if (cLen) { - attrs[SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH] = cLen; + + // Get headers with names lowercased but values intact + const resHeaders = response.headers.map((h, idx) => { + const isName = idx % 2 === 0; + const result = h.toString(); + + return isName ? result.toLowerCase() : result; + }); + + // TODO: capture headers based on config + + const contentLengthIndex = resHeaders.findIndex(h => h === 'content-length'); + const contentLength = Number(contentLengthIndex === -1 ? undefined : resHeaders[contentLengthIndex + 1]); + if (!isNaN(contentLength)) { + attrs[SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH] = contentLength; } + span.setAttributes(attrs); span.setStatus({ - code: - response.statusCode >= 400 ? SpanStatusCode.ERROR : SpanStatusCode.OK, - message: String(response.statusCode), + code: response.statusCode >= 400 ? SpanStatusCode.ERROR : SpanStatusCode.UNSET, }); } } + + // This is the last event we receive if the request went without any errors (TODO: check this) private onDone({ request }: any): void { + console.log('onDone') const span = this._spanFromReq.get(request); if (span !== undefined) { span.end(); @@ -180,6 +263,13 @@ export class UndiciInstrumentation extends InstrumentationBase { } } + // TODO: check this + // This messge si triggered if there is any error in the request + // TODO: in `undici@6.3.0` when request aborted the error type changes from + // a custom error (`RequestAbortedError`) to a built-in `DOMException` so + // - `code` is from DOMEXception (ABORT_ERR: 20) + // - `message` changes + // - stacktrace is smaller and contains node internal frames private onError({ request, error }: any): void { const span = this._spanFromReq.get(request); if (span !== undefined) { diff --git a/experimental/packages/opentelemetry-instrumentation-undici/test/fetch.test.ts b/experimental/packages/opentelemetry-instrumentation-undici/test/fetch.test.ts index 398dc74cfa..35524c794d 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/test/fetch.test.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/test/fetch.test.ts @@ -15,12 +15,10 @@ */ import * as assert from 'assert'; -import { SpanKind, context, propagation } from '@opentelemetry/api'; +import { context, propagation } from '@opentelemetry/api'; import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; -import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; import { InMemorySpanExporter, - ReadableSpan, SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; @@ -61,6 +59,7 @@ describe('UndiciInstrumentation `fetch` tests', function () { after(function(done) { context.disable(); propagation.disable(); + mockServer.mockListener(undefined); mockServer.stop(done); }); @@ -69,34 +68,69 @@ describe('UndiciInstrumentation `fetch` tests', function () { }); describe('enable()', function () { - before(function () { + beforeEach(function () { instrumentation.enable(); }); - after(function () { + afterEach(function () { instrumentation.disable(); }); - it('should create a rootSpan for GET requests and add propagation headers', async function () { + it.skip('should create valid spans even if the configuration hooks fail', async function () { let spans = memoryExporter.getFinishedSpans(); assert.strictEqual(spans.length, 0); + // Set the bad configuration + instrumentation.setConfig({ + enabled: true, + ignoreRequestHook: () => { + throw new Error('ignoreRequestHook error'); + }, + applyCustomAttributesOnSpan: () => { + throw new Error('ignoreRequestHook error'); + }, + requestHook: () => { + throw new Error('requestHook error'); + }, + startSpanHook: () => { + throw new Error('startSpanHook error'); + }, + }) + const fetchUrl = `${protocol}://${hostname}:${mockServer.port}/?query=test`; const response = await fetch(fetchUrl); spans = memoryExporter.getFinishedSpans(); const span = spans[0]; - assert.ok(span); + assert.ok(span, 'a span is present'); assert.strictEqual(spans.length, 1); - assertSpanAttribs(span, 'HTTP GET', { - // TODO: I guess we want to have parity with HTTP insturmentation - // - there are missing attributes - // - also check if these current values make sense - [SemanticAttributes.HTTP_URL]: `${protocol}://${hostname}:${mockServer.port}`, - [SemanticAttributes.HTTP_METHOD]: 'GET', - [SemanticAttributes.HTTP_STATUS_CODE]: response.status, - [SemanticAttributes.HTTP_TARGET]: '/?query=test', + // console.dir(span, { depth: 9 }); + assertSpan(span, { + hostname: 'localhost', + httpStatusCode: response.status, + httpMethod: 'GET', + pathname: '/', + path: '/?query=test', + resHeaders: response.headers, }); + }); + + it.skip('should create valid spans with empty configuration', async function () { + let spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 0); + + // Empty configuration + instrumentation.setConfig({ enabled: true }); + + const fetchUrl = `${protocol}://${hostname}:${mockServer.port}/?query=test`; + const response = await fetch(fetchUrl); + + spans = memoryExporter.getFinishedSpans(); + const span = spans[0]; + + assert.ok(span, 'a span is present'); + assert.strictEqual(spans.length, 1); + // console.dir(span, { depth: 9 }); assertSpan(span, { hostname: 'localhost', httpStatusCode: response.status, @@ -106,24 +140,76 @@ describe('UndiciInstrumentation `fetch` tests', function () { resHeaders: response.headers, }); }); + + it('should create valid spans with the given configuration configuration', async function () { + let spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 0); + + // Empty configuration + instrumentation.setConfig({ + enabled: true, + ignoreRequestHook: (req) => { + return req.path.indexOf('/ignore/path') !== -1; + }, + requestHook: (span, req) => { + // TODO: maybe an intermediate request with better API + req.headers += `x-requested-with: undici instrumentation\r\n`; + }, + startSpanHook: (request) => { + return { + 'test.request.origin': request.origin, + 'test.request.headers.lengh': request.headers.split('\r\n').length, + }; + }, + headersToSpanAttributes: { + requestHeaders: ['foo-client', 'test.foo.client'], + responseHeaders: ['foo-server', 'test.foo.server'] + } + }); + + // Add some extra headers in the response + mockServer.mockListener((req, res) => { + res.statusCode = 200; + res.setHeader('content-type', 'application/json'); + res.setHeader('foo-server', 'bar'); + res.write(JSON.stringify({ success: true })); + res.end(); + }); + + // Do some requests + await fetch(`${protocol}://${hostname}:${mockServer.port}/ignore/path`); + const reqInit = { + headers: new Headers({ + 'user-agent': 'custom', + 'foo-client': 'bar' + }), + }; + const response = await fetch(`${protocol}://${hostname}:${mockServer.port}/?query=test`, reqInit); + + spans = memoryExporter.getFinishedSpans(); + const span = spans[0]; + console.dir(span, { depth: 9 }); + assert.ok(span, 'a span is present'); + assert.strictEqual(spans.length, 1); + assertSpan(span, { + hostname: 'localhost', + httpStatusCode: response.status, + httpMethod: 'GET', + pathname: '/', + path: '/?query=test', + reqHeaders: reqInit.headers, + resHeaders: response.headers, + }); + assert.strictEqual( + span.attributes['test.foo.client'], + 'bar', + `request headers are captured`, + ); + assert.strictEqual( + span.attributes['test.foo.server'], + 'bar', + `response headers are captured`, + ); + }); }); }); - -function assertSpanAttribs( - span: ReadableSpan, - name: string, - attribs: Record -) { - assert.strictEqual(span.spanContext().traceId.length, 32); - assert.strictEqual(span.spanContext().spanId.length, 16); - assert.strictEqual(span.kind, SpanKind.CLIENT); - assert.strictEqual(span.name, name); - - for (const [key, value] of Object.entries(attribs)) { - assert.strictEqual( - span.attributes[key], - value, - `expected value "${value}" but got "${span.attributes[key]}" for attribute "${key}" ` - ); - } -} diff --git a/experimental/packages/opentelemetry-instrumentation-undici/test/utils/assertSpan.ts b/experimental/packages/opentelemetry-instrumentation-undici/test/utils/assertSpan.ts index b139ad4a7e..7dc5eac866 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/test/utils/assertSpan.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/test/utils/assertSpan.ts @@ -37,7 +37,6 @@ export const assertSpan = ( reqHeaders?: Headers; path?: string | null; forceStatus?: SpanStatus; - serverName?: string; noNetPeer?: boolean; // we don't expect net peer info when request throw before being sent error?: Exception; } @@ -63,24 +62,25 @@ export const assertSpan = ( ); assert.strictEqual( span.attributes[SemanticAttributes.HTTP_STATUS_CODE], - validations.httpStatusCode + validations.httpStatusCode, + `attributes['${SemanticAttributes.HTTP_STATUS_CODE}'] is correct`, ); - assert.strictEqual(span.links.length, 0); + assert.strictEqual(span.links.length, 0, 'there are no links'); if (validations.error) { - assert.strictEqual(span.events.length, 1); - assert.strictEqual(span.events[0].name, 'exception'); + assert.strictEqual(span.events.length, 1, 'span contains one error event'); + assert.strictEqual(span.events[0].name, 'exception', 'error event name is correct'); const eventAttributes = span.events[0].attributes; - assert.ok(eventAttributes != null); + assert.ok(eventAttributes != null, 'event has attributes'); assert.deepStrictEqual(Object.keys(eventAttributes), [ 'exception.type', 'exception.message', 'exception.stacktrace', - ]); + ], 'the event attribute names are correct'); } else { - assert.strictEqual(span.events.length, 0); + assert.strictEqual(span.events.length, 0, 'span contains no events'); } const { httpStatusCode } = validations; @@ -91,6 +91,7 @@ export const assertSpan = ( validations.forceStatus || { code: isStatusUnset ? SpanStatusCode.UNSET : SpanStatusCode.ERROR }, + 'span status is correct' ); assert.ok(span.endTime, 'must be finished'); diff --git a/experimental/packages/opentelemetry-instrumentation-undici/test/utils/mock-server.ts b/experimental/packages/opentelemetry-instrumentation-undici/test/utils/mock-server.ts index 52a645ed85..db47cea290 100644 --- a/experimental/packages/opentelemetry-instrumentation-undici/test/utils/mock-server.ts +++ b/experimental/packages/opentelemetry-instrumentation-undici/test/utils/mock-server.ts @@ -15,28 +15,31 @@ */ import * as http from 'http'; + export class MockServer { private _port: number | undefined; private _httpServer: http.Server | undefined; + private _reqListener: http.RequestListener | undefined; get port(): number { return this._port || 0; } + mockListener(handler: http.RequestListener | undefined): void { + this._reqListener = handler; + } + start(cb: (err?: Error) => void) { this._httpServer = http.createServer((req, res) => { - if (req.url === '/timeout') { - setTimeout(() => { - res.end(); - }, 1000); + // Use the mock listener if defined + if (typeof this._reqListener === 'function') { + return this._reqListener(req, res); } + + // If no mock function is provided fallback to a basic response res.statusCode = 200; res.setHeader('content-type', 'application/json'); - res.write( - JSON.stringify({ - success: true, - }) - ); + res.write(JSON.stringify({ success: true })); res.end(); }); @@ -63,6 +66,7 @@ export class MockServer { stop(cb: (err?: Error) => void) { if (this._httpServer) { + this._reqListener = undefined; this._httpServer.close(); cb(); }