Skip to content

Commit

Permalink
Add driveFetch instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ggodlewski committed Nov 23, 2023
1 parent e2c8731 commit 4be5671
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
2 changes: 1 addition & 1 deletion apps/ui/src/components/ZipkinViewer.vue
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export default {
data() {
return {
traces: [],
ZIPKIN_URL
ZIPKIN_URL: ZIPKIN_URL.startsWith('http://localhost:9411') ? ZIPKIN_URL + '/zipkin' : ZIPKIN_URL
};
},
created() {
Expand Down
2 changes: 0 additions & 2 deletions src/containers/job/JobManagerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import {GoogleFolderContainer} from '../google_folder/GoogleFolderContainer';
import {TransformContainer} from '../transform/TransformContainer';

import {fileURLToPath} from 'url';
import {WatchChangesContainer} from '../changes/WatchChangesContainer';
import {GoogleFile} from '../../model/GoogleFile';
import {UserConfigService} from '../google_folder/UserConfigService';
import {MarkdownTreeProcessor} from '../transform/MarkdownTreeProcessor';
import {WorkerPool} from './WorkerPool';
Expand Down
7 changes: 5 additions & 2 deletions src/google/driveFetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Readable} from 'stream';
import {SimpleFile} from '../model/GoogleFile';
import opentelemetry from '@opentelemetry/api';
import {QuotaLimiter} from './QuotaLimiter';
import {instrumentFunction} from '../telemetry';

async function handleReadable(obj): Promise<string> {
if (obj instanceof Readable) {
Expand Down Expand Up @@ -141,7 +142,8 @@ async function driveRequest(quotaLimiter: QuotaLimiter, accessToken: string, met
}

if (!quotaLimiter) {
const response = await fetch(url, {
const fetchInstrumented = instrumentFunction(fetch, 1);
const response = await fetchInstrumented(url, {
method,
headers: {
Authorization: 'Bearer ' + accessToken,
Expand All @@ -161,7 +163,8 @@ async function driveRequest(quotaLimiter: QuotaLimiter, accessToken: string, met
return await new Promise<Response>(async (resolve, reject) => { /* eslint-disable-line no-async-promise-executor */
const job = async () => {
try {
const response = await fetch(url, {
const fetchInstrumented = instrumentFunction(fetch, 1);
const response = await fetchInstrumented(url, {
method,
headers: {
Authorization: 'Bearer ' + accessToken,
Expand Down
41 changes: 41 additions & 0 deletions src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,47 @@ export class ClassInstrumentation extends InstrumentationBase {
}
}

export function instrumentFunction<K>(func, telemetryParamCount = 0, telemetryParamOffset = 0) {
return async function (...args) {
if (!process.env.ZIPKIN_URL) {
return await func(...args);
}

const stackTrace = new Error().stack.split('\n').splice(2);
const tracer = opentelemetry.trace.getTracer(
provider.resource.attributes[SemanticResourceAttributes.SERVICE_NAME].toString(),
'1.0'
);

let spanName = func.name;
if (telemetryParamCount) {
spanName += '(';
for (let i = telemetryParamOffset; i < telemetryParamCount + telemetryParamOffset; i++) {
if (i > 0) {
spanName += ', ';
}
spanName += args[0];
}
spanName += ')';
}

return tracer.startActiveSpan(
spanName,
{ kind: SpanKind.INTERNAL },
async (span) => {
try {
return await func(...args);
} catch (err) {
err.stack = [err.message].concat(stackTrace).join('\n');
span.recordException(err);
throw err;
} finally {
span.end();
}
});
};
}

export async function addTelemetry(serviceName: string, mainDir: string) {
if (!process.env.ZIPKIN_URL) {
return;
Expand Down

0 comments on commit 4be5671

Please sign in to comment.