Skip to content

Commit

Permalink
Telemetry fixes (#10343)
Browse files Browse the repository at this point in the history
refs: #10300

Incidental

Best reviewed commit-by-commit

## Description

While verifying #10300 I ran into some errors and lack of stdout streaming features. This is what I arrived at to let me process some slog files manually.

### Security Considerations

None

### Scaling Considerations

None production impacting

This adds a new block throttle mechanism to the ingest-slog tool, while relaxing the line based throttle.

### Documentation Considerations

None

### Testing Considerations

Manually tested with the slog sender detailed in #10300 (review).

### Upgrade Considerations

Affects chain software, but only the optional telemetry side. Not consensus affecting.
  • Loading branch information
mergify[bot] authored Oct 28, 2024
2 parents 2ede8c1 + 2134944 commit 7ae1f27
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 40 deletions.
67 changes: 40 additions & 27 deletions packages/internal/src/node/fs-stream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { createWriteStream } from 'node:fs';
import process from 'node:process';
import { open } from 'node:fs/promises';

/**
* @param {import('fs').ReadStream | import('fs').WriteStream} stream
* @param {import('fs').ReadStream
* | import('fs').WriteStream
* | import('net').Socket} stream
* @returns {Promise<void>}
*/
export const fsStreamReady = stream =>
Expand Down Expand Up @@ -48,45 +51,51 @@ export const makeFsStreamWriter = async filePath => {
return undefined;
}

const handle = await open(filePath, 'a');
const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined);

const stream = createWriteStream(noPath, { fd: handle.fd });
const stream = handle
? createWriteStream(noPath, { fd: handle.fd })
: process.stdout;
await fsStreamReady(stream);

let flushed = Promise.resolve();
let closed = false;

const write = async data => {
if (closed) {
throw Error('Stream closed');
}

/** @type {Promise<void>} */
const written = new Promise((resolve, reject) => {
stream.write(data, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
const updateFlushed = p => {
flushed = flushed.then(
() => written,
async err =>
Promise.reject(
written.then(
() => err,
writtenError => AggregateError([err, writtenError]),
),
() => p,
err =>
p.then(
() => Promise.reject(err),
pError =>
Promise.reject(
pError !== err ? AggregateError([err, pError]) : err,
),
),
);
flushed.catch(() => {});
};

const write = async data => {
/** @type {Promise<void>} */
const written = closed
? Promise.reject(Error('Stream closed'))
: new Promise((resolve, reject) => {
stream.write(data, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
updateFlushed(written);
return written;
};

const flush = async () => {
await flushed;
await handle.sync().catch(err => {
await handle?.sync().catch(err => {
if (err.code === 'EINVAL') {
return;
}
Expand All @@ -95,10 +104,14 @@ export const makeFsStreamWriter = async filePath => {
};

const close = async () => {
// TODO: Consider creating a single Error here to use a write rejection
closed = true;
await flush();
stream.close();
// @ts-expect-error calling a possibly missing method
stream.close?.();
};

stream.on('error', err => updateFlushed(Promise.reject(err)));

return harden({ write, flush, close });
};
46 changes: 34 additions & 12 deletions packages/telemetry/src/ingest-slog-entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { makeSlogSender } from './make-slog-sender.js';

const LINE_COUNT_TO_FLUSH = 10000;
const ELAPSED_MS_TO_FLUSH = 3000;
const MAX_LINE_COUNT_PER_PERIOD = 1000;
const MAX_LINE_COUNT_PER_PERIOD = 10000;
const MAX_BLOCKS_PER_PERIOD = 10;
const PROCESSING_PERIOD = 1000;

async function run() {
Expand All @@ -29,7 +30,7 @@ async function run() {
return;
}

const [slogFile] = args;
const slogFile = args[0] === '-' ? undefined : args[0];
const slogSender = await makeSlogSender({
serviceName,
stateDir: '.',
Expand All @@ -56,14 +57,21 @@ async function run() {
const lines = readline.createInterface({ input: slogF });
const slogFileName = slogFile || '*stdin*';

const progressFileName = `${slogFileName}.ingest-progress`;
if (!fs.existsSync(progressFileName)) {
const progress = { virtualTimeOffset: 0, lastSlogTime: 0 };
fs.writeFileSync(progressFileName, JSON.stringify(progress));
const progressFileName = slogFile && `${slogFileName}.ingest-progress`;
const progress = { virtualTimeOffset: 0, lastSlogTime: 0 };
if (progressFileName) {
if (!fs.existsSync(progressFileName)) {
fs.writeFileSync(progressFileName, JSON.stringify(progress));
} else {
Object.assign(
progress,
JSON.parse(fs.readFileSync(progressFileName).toString()),
);
}
}
const progress = JSON.parse(fs.readFileSync(progressFileName).toString());

let linesProcessedThisPeriod = 0;
let blocksInThisPeriod = 0;
let startOfLastPeriod = 0;

let lastTime = Date.now();
Expand All @@ -75,10 +83,12 @@ async function run() {
return;
}
await slogSender.forceFlush?.();
fs.writeFileSync(progressFileName, JSON.stringify(progress));
if (progressFileName) {
fs.writeFileSync(progressFileName, JSON.stringify(progress));
}
};

console.log(`parsing`, slogFileName);
console.warn(`parsing`, slogFileName);

let update = false;
const maybeUpdateStats = async now => {
Expand Down Expand Up @@ -106,9 +116,14 @@ async function run() {
continue;
}

const isAfterCommit = obj.type === 'cosmic-swingset-after-commit-stats';

// Maybe wait for the next period to process a bunch of lines.
let maybeWait;
if (linesProcessedThisPeriod >= MAX_LINE_COUNT_PER_PERIOD) {
if (
linesProcessedThisPeriod >= MAX_LINE_COUNT_PER_PERIOD ||
blocksInThisPeriod >= MAX_BLOCKS_PER_PERIOD
) {
const delayMS = PROCESSING_PERIOD - (now - startOfLastPeriod);
maybeWait = new Promise(resolve => setTimeout(resolve, delayMS));
}
Expand All @@ -118,8 +133,8 @@ async function run() {
if (now - startOfLastPeriod >= PROCESSING_PERIOD) {
startOfLastPeriod = now;
linesProcessedThisPeriod = 0;
blocksInThisPeriod = 0;
}
linesProcessedThisPeriod += 1;

if (progress.virtualTimeOffset) {
const virtualTime = obj.time + progress.virtualTimeOffset;
Expand All @@ -133,10 +148,17 @@ async function run() {
// Use the original.
slogSender(obj);
}

linesProcessedThisPeriod += 1;
if (isAfterCommit) {
blocksInThisPeriod += 1;
lastTime = Date.now();
await stats(true);
}
}

await stats(true);
console.log(
console.warn(
`done parsing`,
slogFileName,
`(${lineCount} lines, ${byteCount} bytes)`,
Expand Down
2 changes: 1 addition & 1 deletion packages/telemetry/src/slog-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const makeSlogSender = async ({ env: { SLOGFILE } = {} } = {}) => {

const slogSender = (slogObj, jsonObj = serializeSlogObj(slogObj)) => {
// eslint-disable-next-line prefer-template
void stream.write(jsonObj + '\n');
stream.write(jsonObj + '\n').catch(() => {});
};

return Object.assign(slogSender, {
Expand Down

0 comments on commit 7ae1f27

Please sign in to comment.