-
Notifications
You must be signed in to change notification settings - Fork 825
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(instrumentation-undici): add instrumentation config
- Loading branch information
1 parent
092b9c1
commit af72d4d
Showing
6 changed files
with
335 additions
and
120 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,9 +14,10 @@ | |
* limitations under the License. | ||
*/ | ||
import * as diagch from 'diagnostics_channel'; | ||
import { URL } from 'url'; | ||
|
||
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; | ||
import { InstrumentationBase } from '@opentelemetry/instrumentation'; | ||
import { InstrumentationBase, safeExecuteInTheMiddle } from '@opentelemetry/instrumentation'; | ||
import { | ||
Attributes, | ||
context, | ||
|
@@ -30,26 +31,9 @@ import { | |
|
||
import { VERSION } from './version'; | ||
|
||
import { ListenerRecord } from './internal-types'; | ||
import { UndiciInstrumentationConfig } from './types'; | ||
|
||
// Get the content-length from undici response headers. | ||
// `headers` is an Array of buffers: [k, v, k, v, ...]. | ||
// If the header is not present, or has an invalid value, this returns null. | ||
function contentLengthFromResponseHeaders(headers: Buffer[]) { | ||
const name = 'content-length'; | ||
for (let i = 0; i < headers.length; i += 2) { | ||
const k = headers[i]; | ||
if (k.length === name.length && k.toString().toLowerCase() === name) { | ||
const v = Number(headers[i + 1]); | ||
if (!Number.isNaN(Number(v))) { | ||
return v; | ||
} | ||
return undefined; | ||
} | ||
} | ||
return undefined; | ||
} | ||
import { HeadersMessage, ListenerRecord, RequestMessage } from './internal-types'; | ||
import { UndiciInstrumentationConfig, UndiciRequest } from './types'; | ||
|
||
|
||
// A combination of https://github.com/elastic/apm-agent-nodejs and | ||
// https://github.com/gadget-inc/opentelemetry-instrumentations/blob/main/packages/opentelemetry-instrumentation-undici/src/index.ts | ||
|
@@ -58,9 +42,7 @@ export class UndiciInstrumentation extends InstrumentationBase { | |
// unsubscribing. | ||
private _channelSubs!: Array<ListenerRecord>; | ||
|
||
private _spanFromReq = new WeakMap<any, Span>(); | ||
|
||
private _requestHook: UndiciInstrumentationConfig['onRequest']; | ||
private _spanFromReq = new WeakMap<UndiciRequest, Span>(); | ||
|
||
constructor(config?: UndiciInstrumentationConfig) { | ||
super('@opentelemetry/instrumentation-undici', VERSION, config); | ||
|
@@ -83,29 +65,44 @@ export class UndiciInstrumentation extends InstrumentationBase { | |
} | ||
|
||
override disable(): void { | ||
if (!this._config.enabled) { | ||
return; | ||
} | ||
|
||
this._channelSubs.forEach(sub => sub.channel.unsubscribe(sub.onMessage)); | ||
this._channelSubs.length = 0; | ||
this._config.enabled = false; | ||
} | ||
|
||
override enable(): void { | ||
if (this._config.enabled) { | ||
return; | ||
} | ||
this._config.enabled = true; | ||
|
||
// This method is called by the `InstrumentationAbstract` constructor before | ||
// ours is called. So we need to ensure the property is initalized | ||
this._channelSubs = this._channelSubs || []; | ||
this.subscribeToChannel('undici:request:create', this.onRequest.bind(this)); | ||
this.subscribeToChannel('undici:request:headers',this.onHeaders.bind(this)); | ||
this.subscribeToChannel('undici:request:create', this.onRequestCreated.bind(this)); | ||
this.subscribeToChannel('undici:client:sendHeaders',this.onRequestHeaders.bind(this)); | ||
this.subscribeToChannel('undici:request:headers',this.onResponseHeaders.bind(this)); | ||
this.subscribeToChannel('undici:request:trailers', this.onDone.bind(this)); | ||
this.subscribeToChannel('undici:request:error', this.onError.bind(this)); | ||
} | ||
|
||
override setConfig(config?: UndiciInstrumentationConfig): void { | ||
super.setConfig(config); | ||
if (typeof config?.onRequest === 'function') { | ||
this._requestHook = config.onRequest; | ||
|
||
if (config?.enabled) { | ||
this.enable(); | ||
} else { | ||
this.disable(); | ||
} | ||
} | ||
|
||
private _getConfig(): UndiciInstrumentationConfig { | ||
return this._config as UndiciInstrumentationConfig; | ||
} | ||
|
||
private subscribeToChannel( | ||
diagnosticChannel: string, | ||
|
@@ -120,66 +117,159 @@ export class UndiciInstrumentation extends InstrumentationBase { | |
}); | ||
} | ||
|
||
private onRequest({ request }: any): void { | ||
// We do not handle instrumenting HTTP CONNECT. See limitation notes above. | ||
if (request.method === 'CONNECT') { | ||
// This is the 1st message we receive for each request (fired after request creation). Here we will | ||
// create the span and populate some atttributes, then link the span to the request for further | ||
// span processing | ||
private onRequestCreated({ request }: RequestMessage): void { | ||
console.log('onRequestCreated') | ||
// Ignore if: | ||
// - instrumentation is disabled | ||
// - ignored by config | ||
// - method is 'CONNECT' (TODO: check for limitations) | ||
const config = this._getConfig(); | ||
const shouldIgnoreReq = safeExecuteInTheMiddle( | ||
() => !config.enabled || request.method === 'CONNECT' || config.ignoreRequestHook?.(request), | ||
(e) => e && this._diag.error('caught ignoreRequestHook error: ', e), | ||
true, | ||
); | ||
|
||
if (shouldIgnoreReq) { | ||
return; | ||
} | ||
|
||
const requestUrl = new URL(request.origin); | ||
const spanAttributes = { | ||
[SemanticAttributes.HTTP_URL]: request.origin, | ||
[SemanticAttributes.HTTP_METHOD]: request.method, | ||
[SemanticAttributes.HTTP_TARGET]: request.path || '/', | ||
[SemanticAttributes.NET_PEER_NAME]: requestUrl.hostname, | ||
}; | ||
|
||
const rawHeaders = request.headers.split('\r\n'); | ||
const reqHeaders = new Map(rawHeaders.map(h => { | ||
const sepIndex = h.indexOf(':'); | ||
const name = h.substring(0, sepIndex).toLowerCase(); | ||
const val = h.substring(sepIndex + 1).trim(); | ||
return [name, val]; | ||
})); | ||
|
||
let hostAttribute = reqHeaders.get('host'); | ||
|
||
if (!hostAttribute) { | ||
const protocolPorts: Record<string, string> = { https: '443', http: '80' }; | ||
const defaultPort = protocolPorts[requestUrl.protocol] || ''; | ||
const port = requestUrl.port || defaultPort; | ||
|
||
hostAttribute = requestUrl.hostname; | ||
if (port) { | ||
hostAttribute += `:${port}`; | ||
} | ||
} | ||
spanAttributes[SemanticAttributes.HTTP_HOST] = hostAttribute; | ||
|
||
const userAgent = reqHeaders.get('user-agent'); | ||
if (userAgent) { | ||
spanAttributes[SemanticAttributes.HTTP_USER_AGENT] = userAgent; | ||
} | ||
|
||
const span = this.tracer.startSpan(`HTTP ${request.method}`, { | ||
kind: SpanKind.CLIENT, | ||
attributes: { | ||
[SemanticAttributes.HTTP_URL]: String(request.origin), | ||
[SemanticAttributes.HTTP_METHOD]: request.method, | ||
[SemanticAttributes.HTTP_TARGET]: request.path, | ||
}, | ||
attributes: spanAttributes, | ||
}); | ||
|
||
// TODO: add headers based on config | ||
|
||
// Context propagation | ||
const requestContext = trace.setSpan(context.active(), span); | ||
const addedHeaders: Record<string, string> = {}; | ||
propagation.inject(requestContext, addedHeaders); | ||
|
||
if (this._requestHook) { | ||
this._requestHook({ request, span, additionalHeaders: addedHeaders }); | ||
} | ||
// Execute the request hook if defined | ||
safeExecuteInTheMiddle( | ||
() => this._getConfig().requestHook?.(span, request), | ||
(e) => e && this._diag.error('caught requestHook error: ', e), | ||
true, | ||
); | ||
|
||
request.headers += Object.entries(addedHeaders) | ||
.map(([k, v]) => `${k}: ${v}\r\n`) | ||
.join(''); | ||
this._spanFromReq.set(request, span); | ||
} | ||
|
||
private onHeaders({ request, response }: any): void { | ||
// This is the 2nd message we recevie for each request. It is fired when connection with | ||
// the remote is stablished and abut to send the first byte. Here do have info about the | ||
// remote addres an port sowe can poupulate some `net.*` attributes into the span | ||
private onRequestHeaders({ request, socket }: any): void { | ||
console.log('onRequestHeaders') | ||
const span = this._spanFromReq.get(request as UndiciRequest); | ||
|
||
if (span) { | ||
const { remoteAddress, remotePort } = socket; | ||
|
||
span.setAttributes({ | ||
[SemanticAttributes.NET_PEER_IP]: remoteAddress, | ||
[SemanticAttributes.NET_PEER_PORT]: remotePort, | ||
}); | ||
} | ||
} | ||
|
||
// This is the 3rd message we get for each request and it's fired when the server | ||
// headers are received, body may not be accessible yet (TODO: check this). | ||
// From the response headers we can set the status and content length | ||
private onResponseHeaders({ request, response }: HeadersMessage): void { | ||
console.log('onResponseHeaders') | ||
const span = this._spanFromReq.get(request); | ||
|
||
if (span !== undefined) { | ||
// We are currently *not* capturing response headers, even though the | ||
// intake API does allow it, because none of the other `setHttpContext` | ||
// uses currently do. | ||
|
||
const cLen = contentLengthFromResponseHeaders(response.headers); | ||
// uses currently do | ||
const attrs: Attributes = { | ||
[SemanticAttributes.HTTP_STATUS_CODE]: response.statusCode, | ||
}; | ||
if (cLen) { | ||
attrs[SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH] = cLen; | ||
|
||
// Get headers with names lowercased but values intact | ||
const resHeaders = response.headers.map((h, idx) => { | ||
const isName = idx % 2 === 0; | ||
const result = h.toString(); | ||
|
||
return isName ? result.toLowerCase() : result; | ||
}); | ||
|
||
// TODO: capture headers based on config | ||
|
||
const contentLengthIndex = resHeaders.findIndex(h => h === 'content-length'); | ||
const contentLength = Number(contentLengthIndex === -1 ? undefined : resHeaders[contentLengthIndex + 1]); | ||
if (!isNaN(contentLength)) { | ||
attrs[SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH] = contentLength; | ||
} | ||
|
||
span.setAttributes(attrs); | ||
span.setStatus({ | ||
code: | ||
response.statusCode >= 400 ? SpanStatusCode.ERROR : SpanStatusCode.OK, | ||
message: String(response.statusCode), | ||
code: response.statusCode >= 400 ? SpanStatusCode.ERROR : SpanStatusCode.UNSET, | ||
}); | ||
} | ||
} | ||
|
||
|
||
// This is the last event we receive if the request went without any errors (TODO: check this) | ||
private onDone({ request }: any): void { | ||
console.log('onDone') | ||
const span = this._spanFromReq.get(request); | ||
if (span !== undefined) { | ||
span.end(); | ||
this._spanFromReq.delete(request); | ||
} | ||
} | ||
|
||
// TODO: check this | ||
// This messge si triggered if there is any error in the request | ||
// TODO: in `[email protected]` when request aborted the error type changes from | ||
// a custom error (`RequestAbortedError`) to a built-in `DOMException` so | ||
// - `code` is from DOMEXception (ABORT_ERR: 20) | ||
// - `message` changes | ||
// - stacktrace is smaller and contains node internal frames | ||
private onError({ request, error }: any): void { | ||
const span = this._spanFromReq.get(request); | ||
if (span !== undefined) { | ||
|
Oops, something went wrong.