Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
usmanmani1122 committed Dec 17, 2024
1 parent 9e124c3 commit 0192ce0
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 54 deletions.
34 changes: 19 additions & 15 deletions packages/internal/src/node/fs-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ const noPath = /** @type {import('fs').PathLike} */ (
/** @typedef {NonNullable<Awaited<ReturnType<typeof makeFsStreamWriter>>>} FsStreamWriter */
/** @param {string | undefined | null} filePath */
export const makeFsStreamWriter = async filePath => {
if (!filePath) {
return undefined;
}
if (!filePath) return undefined;

const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined);
const isStdOutStream = filePath === '-';

const stream = handle
? createWriteStream(noPath, { fd: handle.fd })
: process.stdout;
const handle = await (isStdOutStream ? undefined : open(filePath, 'a'));

const stream = isStdOutStream
? process.stdout
: createWriteStream(noPath, { autoClose: false, fd: handle.fd });
await fsStreamReady(stream);

let flushed = Promise.resolve();
Expand Down Expand Up @@ -95,20 +95,24 @@ export const makeFsStreamWriter = async filePath => {

const flush = async () => {
await flushed;
await handle?.sync().catch(err => {
if (err.code === 'EINVAL') {
return;
}
throw err;
});
if (!isStdOutStream)
await handle.sync().catch(err => {
if (err.code === 'EINVAL') {
return;
}
throw err;
});
};

const close = async () => {
// TODO: Consider creating a single Error here to use a write rejection
closed = true;
await flush();
// @ts-expect-error calling a possibly missing method
stream.close?.();

if (!isStdOutStream) {
await new Promise(resolve => stream.end(resolve));
await handle?.close();
}
};

stream.on('error', err => updateFlushed(Promise.reject(err)));
Expand Down
148 changes: 148 additions & 0 deletions packages/telemetry/src/block-slog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/* eslint-env node */

import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js';
import { makeContextualSlogProcessor } from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

/**
* @typedef {import('./context-aware-slog.js').Slog} Slog
* @typedef {ReturnType<ReturnType<typeof makeContextualSlogProcessor>>} ContextSlog
*/

const BLOCK_STREAM_HANDLERS_WINDOW = 5;
const CLEANUP_INTERVAL = 20000;
const DEFAULT_BLOCK_HEIGHT = -1;

/**
* @param {import('./index.js').MakeSlogSenderOptions} options
*/
export const makeSlogSender = async options => {
const { CHAIN_ID } = options.env || {};
if (!options.stateDir)
return console.error(
'Ignoring invocation of slogger "block-slog" without the presence of "stateDir"',
);

let currentBlock = DEFAULT_BLOCK_HEIGHT;
/**
* @type {NodeJS.Timeout}
*/
let cleanupRef;
/**
* @type {{[key: string]: Awaited<ReturnType<typeof makeFsStreamWriter>>}}
*/
const streamHandlers = {};

/**
* @type {{[key: string]: ReturnType<typeof createBlockStream>}}
*/
const streamCreationPromises = {};

const persistenceUtils = getContextFilePersistenceUtils(
process.env.SLOG_CONTEXT_FILE_PATH ||
`${options.stateDir}/${DEFAULT_CONTEXT_FILE}`,
);

/**
* @param {Awaited<ReturnType<typeof makeFsStreamWriter>>} stream
*/
const closeFileStream = async stream => {
if (!stream) return console.error('Trying to close a null stream');

await stream.close();
};

const contextualSlogProcessor = makeContextualSlogProcessor(
{ 'chain-id': CHAIN_ID },
persistenceUtils,
);

/**
* @param {ContextSlog['attributes']['block.height']} blockHeight
* @param {import('./index.js').MakeSlogSenderOptions['stateDir']} directory
* @param {ContextSlog['time']} time
*/
const createBlockStream = async (blockHeight, directory, time) => {
if (blockHeight === undefined)
throw Error('Block Height required for creating the write stream');

const fileName = `${directory}/${blockHeight}-${time}.json`;
const stream = await makeFsStreamWriter(
`${directory}/${blockHeight}-${time}.json`,
);

if (!stream)
throw Error(`Couldn't create a write stream on file "${fileName}"`);

streamHandlers[String(blockHeight)] = stream;
};

const regularCleanup = async () => {
if (currentBlock !== DEFAULT_BLOCK_HEIGHT)
await Promise.all(

Check failure on line 85 in packages/telemetry/src/block-slog.js

View workflow job for this annotation

GitHub Actions / lint-primary

The first `await` appearing in an async function must not be nested
Object.keys(streamHandlers).map(async streamIdentifier => {
if (
Number(streamIdentifier) <
currentBlock - BLOCK_STREAM_HANDLERS_WINDOW ||
Number(streamIdentifier) >
currentBlock + BLOCK_STREAM_HANDLERS_WINDOW
) {
await closeFileStream(streamHandlers[streamIdentifier]);

Check failure on line 93 in packages/telemetry/src/block-slog.js

View workflow job for this annotation

GitHub Actions / lint-primary

The first `await` appearing in an async function must not be nested
delete streamHandlers[streamIdentifier];
delete streamCreationPromises[streamIdentifier];
}
}),
);

cleanupRef = setTimeout(regularCleanup, CLEANUP_INTERVAL);
};

await regularCleanup();

/**
* @param {import('./context-aware-slog.js').Slog} slog
*/
const slogSender = async slog => {
const contextualSlog = contextualSlogProcessor(slog);
const blockHeight = contextualSlog.attributes['block.height'];
const blockHeightString = String(blockHeight);

if (blockHeight !== undefined && currentBlock !== blockHeight) {
if (!(blockHeightString in streamHandlers)) {
if (!(blockHeightString in streamCreationPromises))
streamCreationPromises[blockHeightString] = createBlockStream(
blockHeight,
options.stateDir,
contextualSlog.time,
);

await streamCreationPromises[blockHeightString];

Check failure on line 122 in packages/telemetry/src/block-slog.js

View workflow job for this annotation

GitHub Actions / lint-primary

The first `await` appearing in an async function must not be nested
}

currentBlock = blockHeight;
}

if (currentBlock !== DEFAULT_BLOCK_HEIGHT) {
const stream = streamHandlers[String(currentBlock)];
if (!stream)
throw Error(`Stream not found for block height ${currentBlock}`);

stream.write(serializeSlogObj(contextualSlog) + '\n').catch(() => {});

Check failure on line 133 in packages/telemetry/src/block-slog.js

View workflow job for this annotation

GitHub Actions / lint-primary

Unexpected string concatenation
}
};

return Object.assign(slogSender, {
forceFlush: () => streamHandlers[String(currentBlock)]?.flush(),
shutdown: () => {
clearTimeout(cleanupRef);
return Promise.all(
Object.entries(streamHandlers).map(([, stream]) =>
closeFileStream(stream),
),
);
},
});
};
17 changes: 14 additions & 3 deletions packages/telemetry/src/context-aware-slog-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js';
import { makeContextualSlogProcessor } from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

/**
Expand All @@ -21,9 +24,17 @@ export const makeSlogSender = async options => {
`Couldn't create a write stream on file "${CONTEXTUAL_SLOGFILE}"`,
);

const contextualSlogProcessor = makeContextualSlogProcessor({
'chain-id': CHAIN_ID,
});
const persistenceUtils = getContextFilePersistenceUtils(
process.env.SLOG_CONTEXT_FILE_PATH ||
`${options.stateDir}/${DEFAULT_CONTEXT_FILE}`,
);

const contextualSlogProcessor = makeContextualSlogProcessor(
{
'chain-id': CHAIN_ID,
},
persistenceUtils,
);

/**
* @param {import('./context-aware-slog.js').Slog} slog
Expand Down
39 changes: 39 additions & 0 deletions packages/telemetry/src/context-aware-slog-persistent-util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { readFileSync, writeFileSync } from 'fs';
import { serializeSlogObj } from './serialize-slog-obj.js';

export const DEFAULT_CONTEXT_FILE = 'slog-context.json';
const FILE_ENCODING = 'utf8';

/**
* @param {string} filePath
*/
const getContextFilePersistenceUtils = filePath => {
console.warn(`Using file ${filePath} for slogger context`);

return {
/**
* @param {import('./context-aware-slog.js').Context} context
*/
persistContext: context => {
try {
writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING);
} catch (err) {
console.error('Error writing context to file: ', err);
}
},

/**
* @returns {import('./context-aware-slog.js').Context | null}
*/
restoreContext: () => {
try {
return JSON.parse(readFileSync(filePath, FILE_ENCODING));
} catch (parseErr) {
console.error('Error reading context from file: ', parseErr);
return null;
}
},
};
};

export default getContextFilePersistenceUtils;
40 changes: 4 additions & 36 deletions packages/telemetry/src/otel-context-aware-slog.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,19 @@
/* eslint-env node */

import { logs, SeverityNumber } from '@opentelemetry/api-logs';
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http';
import { Resource } from '@opentelemetry/resources';
import {
LoggerProvider,
SimpleLogRecordProcessor,
} from '@opentelemetry/sdk-logs';
import { readFileSync, writeFileSync } from 'fs';
import { makeContextualSlogProcessor } from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { getResourceAttributes } from './index.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

const DEFAULT_CONTEXT_FILE = 'slog-context.json';
const FILE_ENCODING = 'utf8';

/**
* @param {string} filePath
*/
export const getContextFilePersistenceUtils = filePath => {
console.warn(`Using file ${filePath} for slogger context`);

return {
/**
* @param {import('./context-aware-slog.js').Context} context
*/
persistContext: context => {
try {
writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING);
} catch (err) {
console.error('Error writing context to file: ', err);
}
},

/**
* @returns {import('./context-aware-slog.js').Context | null}
*/
restoreContext: () => {
try {
return JSON.parse(readFileSync(filePath, FILE_ENCODING));
} catch (parseErr) {
console.error('Error reading context from file: ', parseErr);
return null;
}
},
};
};

/**
* @param {import('./index.js').MakeSlogSenderOptions} options
*/
Expand Down

0 comments on commit 0192ce0

Please sign in to comment.