From 1ea3f06e705b34b50dffa069f4f469f8d9a8184e Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:50:12 +0000 Subject: [PATCH 1/7] fix(internal): better stream error handling --- packages/internal/src/node/fs-stream.js | 51 ++++++++++++++----------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index eb7f335f45c..cdb09e1a6ec 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -56,31 +56,35 @@ export const makeFsStreamWriter = async filePath => { let flushed = Promise.resolve(); let closed = false; - const write = async data => { - if (closed) { - throw Error('Stream closed'); - } - - /** @type {Promise} */ - const written = new Promise((resolve, reject) => { - stream.write(data, err => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); + const updateFlushed = p => { flushed = flushed.then( - () => written, - async err => - Promise.reject( - written.then( - () => err, - writtenError => AggregateError([err, writtenError]), - ), + () => p, + err => + p.then( + () => Promise.reject(err), + pError => + Promise.reject( + pError !== err ? AggregateError([err, pError]) : err, + ), ), ); + flushed.catch(() => {}); + }; + + const write = async data => { + /** @type {Promise} */ + const written = closed + ? Promise.reject(Error('Stream closed')) + : new Promise((resolve, reject) => { + stream.write(data, err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + updateFlushed(written); return written; }; @@ -95,10 +99,13 @@ export const makeFsStreamWriter = async filePath => { }; const close = async () => { + // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); stream.close(); }; + stream.on('error', err => updateFlushed(Promise.reject(err))); + return harden({ write, flush, close }); }; From 91089d7273ef3d41555b34d84471120d45602497 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:51:17 +0000 Subject: [PATCH 2/7] fix(telemetry): silence slogfile write errors They'll be reported on flush --- packages/telemetry/src/slog-file.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/telemetry/src/slog-file.js b/packages/telemetry/src/slog-file.js index 3c091211a5f..e7b49d559bd 100644 --- a/packages/telemetry/src/slog-file.js +++ b/packages/telemetry/src/slog-file.js @@ -11,7 +11,7 @@ export const makeSlogSender = async ({ env: { SLOGFILE } = {} } = {}) => { const slogSender = (slogObj, jsonObj = serializeSlogObj(slogObj)) => { // eslint-disable-next-line prefer-template - void stream.write(jsonObj + '\n'); + stream.write(jsonObj + '\n').catch(() => {}); }; return Object.assign(slogSender, { From d4b8dfa91155789f7ceda5cc3cef06019b9527e7 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:52:30 +0000 Subject: [PATCH 3/7] fix(telemetry): avoid polluting stdout in ingest-slog --- packages/telemetry/src/ingest-slog-entrypoint.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/telemetry/src/ingest-slog-entrypoint.js b/packages/telemetry/src/ingest-slog-entrypoint.js index 3a10d8463f2..baead039bbe 100755 --- a/packages/telemetry/src/ingest-slog-entrypoint.js +++ b/packages/telemetry/src/ingest-slog-entrypoint.js @@ -78,7 +78,7 @@ async function run() { fs.writeFileSync(progressFileName, JSON.stringify(progress)); }; - console.log(`parsing`, slogFileName); + console.warn(`parsing`, slogFileName); let update = false; const maybeUpdateStats = async now => { @@ -136,7 +136,7 @@ async function run() { } await stats(true); - console.log( + console.warn( `done parsing`, slogFileName, `(${lineCount} lines, ${byteCount} bytes)`, From b4af8296e8af37eecf80449870c18546e4c8856a Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:54:04 +0000 Subject: [PATCH 4/7] feat(internal): fs stream to stdout --- packages/internal/src/node/fs-stream.js | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index cdb09e1a6ec..8e7ae36a48b 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -1,8 +1,11 @@ import { createWriteStream } from 'node:fs'; +import process from 'node:process'; import { open } from 'node:fs/promises'; /** - * @param {import('fs').ReadStream | import('fs').WriteStream} stream + * @param {import('fs').ReadStream + * | import('fs').WriteStream + * | import('net').Socket} stream * @returns {Promise} */ export const fsStreamReady = stream => @@ -48,9 +51,11 @@ export const makeFsStreamWriter = async filePath => { return undefined; } - const handle = await open(filePath, 'a'); + const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined); - const stream = createWriteStream(noPath, { fd: handle.fd }); + const stream = handle + ? createWriteStream(noPath, { fd: handle.fd }) + : process.stdout; await fsStreamReady(stream); let flushed = Promise.resolve(); @@ -90,7 +95,7 @@ export const makeFsStreamWriter = async filePath => { const flush = async () => { await flushed; - await handle.sync().catch(err => { + await handle?.sync().catch(err => { if (err.code === 'EINVAL') { return; } @@ -102,7 +107,8 @@ export const makeFsStreamWriter = async filePath => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - stream.close(); + // @ts-expect-error calling a possibly missing method + stream.close?.(); }; stream.on('error', err => updateFlushed(Promise.reject(err))); From 63367c4aaf9bafbd6553a1f4cb808c96bc90845a Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:55:29 +0000 Subject: [PATCH 5/7] feat(telemetry): ingest-slog explicitly supports `-` for stdin --- packages/telemetry/src/ingest-slog-entrypoint.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/telemetry/src/ingest-slog-entrypoint.js b/packages/telemetry/src/ingest-slog-entrypoint.js index baead039bbe..3d48ae2d1fe 100755 --- a/packages/telemetry/src/ingest-slog-entrypoint.js +++ b/packages/telemetry/src/ingest-slog-entrypoint.js @@ -29,7 +29,7 @@ async function run() { return; } - const [slogFile] = args; + const slogFile = args[0] === '-' ? undefined : args[0]; const slogSender = await makeSlogSender({ serviceName, stateDir: '.', From 62589ca7b6d4aaa9eb7042f95ec7aec633db27f9 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:56:21 +0000 Subject: [PATCH 6/7] fix(telemetry): ingest-slog avoid writing progress file for stdin --- .../telemetry/src/ingest-slog-entrypoint.js | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/packages/telemetry/src/ingest-slog-entrypoint.js b/packages/telemetry/src/ingest-slog-entrypoint.js index 3d48ae2d1fe..73e54fb700c 100755 --- a/packages/telemetry/src/ingest-slog-entrypoint.js +++ b/packages/telemetry/src/ingest-slog-entrypoint.js @@ -56,12 +56,18 @@ async function run() { const lines = readline.createInterface({ input: slogF }); const slogFileName = slogFile || '*stdin*'; - const progressFileName = `${slogFileName}.ingest-progress`; - if (!fs.existsSync(progressFileName)) { - const progress = { virtualTimeOffset: 0, lastSlogTime: 0 }; - fs.writeFileSync(progressFileName, JSON.stringify(progress)); + const progressFileName = slogFile && `${slogFileName}.ingest-progress`; + const progress = { virtualTimeOffset: 0, lastSlogTime: 0 }; + if (progressFileName) { + if (!fs.existsSync(progressFileName)) { + fs.writeFileSync(progressFileName, JSON.stringify(progress)); + } else { + Object.assign( + progress, + JSON.parse(fs.readFileSync(progressFileName).toString()), + ); + } } - const progress = JSON.parse(fs.readFileSync(progressFileName).toString()); let linesProcessedThisPeriod = 0; let startOfLastPeriod = 0; @@ -75,7 +81,9 @@ async function run() { return; } await slogSender.forceFlush?.(); - fs.writeFileSync(progressFileName, JSON.stringify(progress)); + if (progressFileName) { + fs.writeFileSync(progressFileName, JSON.stringify(progress)); + } }; console.warn(`parsing`, slogFileName); From 21349448b3b9379a9da43218a59a7e7eaf4f5a9e Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Mon, 28 Oct 2024 03:57:47 +0000 Subject: [PATCH 7/7] feat(telemetry): ingest-slog throttle and flush per block --- .../telemetry/src/ingest-slog-entrypoint.js | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/packages/telemetry/src/ingest-slog-entrypoint.js b/packages/telemetry/src/ingest-slog-entrypoint.js index 73e54fb700c..ee6c6fca36e 100755 --- a/packages/telemetry/src/ingest-slog-entrypoint.js +++ b/packages/telemetry/src/ingest-slog-entrypoint.js @@ -11,7 +11,8 @@ import { makeSlogSender } from './make-slog-sender.js'; const LINE_COUNT_TO_FLUSH = 10000; const ELAPSED_MS_TO_FLUSH = 3000; -const MAX_LINE_COUNT_PER_PERIOD = 1000; +const MAX_LINE_COUNT_PER_PERIOD = 10000; +const MAX_BLOCKS_PER_PERIOD = 10; const PROCESSING_PERIOD = 1000; async function run() { @@ -70,6 +71,7 @@ async function run() { } let linesProcessedThisPeriod = 0; + let blocksInThisPeriod = 0; let startOfLastPeriod = 0; let lastTime = Date.now(); @@ -114,9 +116,14 @@ async function run() { continue; } + const isAfterCommit = obj.type === 'cosmic-swingset-after-commit-stats'; + // Maybe wait for the next period to process a bunch of lines. let maybeWait; - if (linesProcessedThisPeriod >= MAX_LINE_COUNT_PER_PERIOD) { + if ( + linesProcessedThisPeriod >= MAX_LINE_COUNT_PER_PERIOD || + blocksInThisPeriod >= MAX_BLOCKS_PER_PERIOD + ) { const delayMS = PROCESSING_PERIOD - (now - startOfLastPeriod); maybeWait = new Promise(resolve => setTimeout(resolve, delayMS)); } @@ -126,8 +133,8 @@ async function run() { if (now - startOfLastPeriod >= PROCESSING_PERIOD) { startOfLastPeriod = now; linesProcessedThisPeriod = 0; + blocksInThisPeriod = 0; } - linesProcessedThisPeriod += 1; if (progress.virtualTimeOffset) { const virtualTime = obj.time + progress.virtualTimeOffset; @@ -141,6 +148,13 @@ async function run() { // Use the original. slogSender(obj); } + + linesProcessedThisPeriod += 1; + if (isAfterCommit) { + blocksInThisPeriod += 1; + lastTime = Date.now(); + await stats(true); + } } await stats(true);