Skip to content

Commit

Permalink
Merge pull request #8889 from Agoric/8636-fs-slogSender
Browse files Browse the repository at this point in the history
slogSender using positions in file instead of mmap
  • Loading branch information
mergify[bot] authored Feb 14, 2024
2 parents 2ffb5b8 + 515ef12 commit 23a970d
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 227 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ module.exports = {
plugins: ['@typescript-eslint', 'prettier'],
extends: ['@agoric', 'plugin:ava/recommended'],
rules: {
// UNTIL on Endo with https://github.com/endojs/endo/pull/2032
'@endo/no-nullish-coalescing': 'off',

'@typescript-eslint/prefer-ts-expect-error': 'warn',
'@typescript-eslint/no-floating-promises': 'error',
// so that floating-promises can be explicitly permitted with void operator
Expand Down
3 changes: 1 addition & 2 deletions packages/telemetry/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"@opentelemetry/semantic-conventions": "~1.9.0",
"anylogger": "^0.21.0",
"better-sqlite3": "^9.1.1",
"bufferfromfile": "agoric-labs/BufferFromFile#Agoric-built",
"tmp": "^0.2.1"
},
"devDependencies": {
Expand All @@ -64,6 +63,6 @@
"workerThreads": false
},
"typeCoverage": {
"atLeast": 87.55
"atLeast": 87.14
}
}
237 changes: 151 additions & 86 deletions packages/telemetry/src/flight-recorder.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
// @ts-check
/* global Buffer */
/// <reference types="ses" />

// https://github.com/Agoric/agoric-sdk/issues/3742#issuecomment-1028451575
// I'd mmap() a 100MB file, reserve a few bytes for offsets, then use the rest
// as a circular buffer to hold length-prefixed records. The agd process would
// keep writing new events into the RAM window and updating the start/end
// pointers, with some sequencing to make sure the record gets written before
// the pointer is updated. Then, no mattter how abruptly the process is
// terminated, as long as the host computer itself is still running, the on-disk
// file would contain the most recent state, and anybody who reads the file will
// get the most recent state. The host kernel (linux) is under no obligation to
// flush it to disk any particular time, but knows when reads happen, so there's
// no coherency problem, and the speed is unaffected by disk write speeds.

import BufferFromFile from 'bufferfromfile';
import { promises as fsPromises } from 'fs';
import path from 'path';
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import path from 'node:path';
import { serializeSlogObj } from './serialize-slog-obj.js';

const { Fail } = assert;
Expand All @@ -31,12 +21,16 @@ const I_ARENA_START = 4 * BigUint64Array.BYTES_PER_ELEMENT;

const RECORD_HEADER_SIZE = BigUint64Array.BYTES_PER_ELEMENT;

/**
* Initializes a circular buffer with the given size, creating the buffer file if it doesn't exist or is not large enough.
*
* @param {string} bufferFile - the file path for the circular buffer
* @param {number} circularBufferSize - the size of the circular buffer
* @returns {Promise<bigint>} the size of the initialized circular buffer
*/
const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
if (!circularBufferSize) {
return undefined;
}
// If the file doesn't exist, or is not large enough, create it.
const stbuf = await fsPromises.stat(bufferFile).catch(e => {
const stbuf = await fsp.stat(bufferFile).catch(e => {
if (e.code === 'ENOENT') {
return undefined;
}
Expand All @@ -57,59 +51,29 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
header.setBigUint64(I_CIRC_START, 0n);
header.setBigUint64(I_CIRC_END, 0n);

await fsPromises.mkdir(path.dirname(bufferFile), { recursive: true });
await fsPromises.writeFile(bufferFile, headerBuf);
await fsp.mkdir(path.dirname(bufferFile), { recursive: true });
await fsp.writeFile(bufferFile, headerBuf);

if (stbuf && stbuf.size >= circularBufferSize) {
// File is big enough.
return arenaSize;
}

// Increase the file size.
await fsPromises.truncate(bufferFile, circularBufferSize);
await fsp.truncate(bufferFile, circularBufferSize);
return arenaSize;
};

export const makeMemoryMappedCircularBuffer = async ({
circularBufferSize = DEFAULT_CBUF_SIZE,
stateDir = '/tmp',
circularBufferFilename,
}) => {
const filename = circularBufferFilename || `${stateDir}/${DEFAULT_CBUF_FILE}`;
// console.log({ circularBufferFilename, filename });

const newArenaSize = await initializeCircularBuffer(
filename,
circularBufferSize,
);
/** @typedef {Awaited<ReturnType<typeof makeSimpleCircularBuffer>>} CircularBuffer */

/**
* @type {Uint8Array}
* BufferFromFile mmap()s the file into the process address space.
*/
const fileBuf = BufferFromFile(filename).Uint8Array();
const header = new DataView(fileBuf.buffer, 0, I_ARENA_START);

// Detect the arena size from the header, if not initialized.
const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE);
const arenaSize = newArenaSize || hdrArenaSize;

const hdrMagic = header.getBigUint64(I_MAGIC);
SLOG_MAGIC === hdrMagic ||
Fail`${filename} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`;
arenaSize === hdrArenaSize ||
Fail`${filename} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`;
const arena = new Uint8Array(
fileBuf.buffer,
header.byteLength,
Number(arenaSize),
);

/**
* @param {Uint8Array} outbuf
* @param {number} [offset] offset relative to the current trailing edge (circStart) of the data
* @returns {IteratorResult<Uint8Array, void>}
*/
/**
*
* @param {bigint} arenaSize
* @param {DataView} header
* @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord
* @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise<void>} writeRecord
*/
function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) {
const readCircBuf = (outbuf, offset = 0) => {
offset + outbuf.byteLength <= arenaSize ||
Fail`Reading past end of circular buffer`;
Expand All @@ -132,19 +96,13 @@ export const makeMemoryMappedCircularBuffer = async ({
// The data is contiguous, like ---AAABBB---
return { done: true, value: undefined };
}
outbuf.set(arena.subarray(readStart, readStart + firstReadLength));
if (firstReadLength < outbuf.byteLength) {
outbuf.set(
arena.subarray(0, outbuf.byteLength - firstReadLength),
firstReadLength,
);
}
readRecord(outbuf, readStart, firstReadLength);
return { done: false, value: outbuf };
};

/** @param {Uint8Array} data */
const writeCircBuf = data => {
if (RECORD_HEADER_SIZE + data.byteLength > arena.byteLength) {
/** @type {(data: Uint8Array) => Promise<void>} */
const writeCircBuf = async data => {
if (RECORD_HEADER_SIZE + data.byteLength > arenaSize) {
// The data is too big to fit in the arena, so skip it.
const tooBigRecord = JSON.stringify({
type: 'slog-record-too-big',
Expand All @@ -153,14 +111,17 @@ export const makeMemoryMappedCircularBuffer = async ({
data = new TextEncoder().encode(tooBigRecord);
}

if (RECORD_HEADER_SIZE + data.byteLength > arena.byteLength) {
if (RECORD_HEADER_SIZE + data.byteLength > arenaSize) {
// Silently drop, it just doesn't fit.
return;
}

// Allocate for the data and a header
const record = new Uint8Array(RECORD_HEADER_SIZE + data.byteLength);
// Set the data, after the header
record.set(data, RECORD_HEADER_SIZE);

// Set the size in the header
const lengthPrefix = new DataView(record.buffer);
lengthPrefix.setBigUint64(0, BigInt(data.byteLength));

Expand Down Expand Up @@ -206,31 +167,135 @@ export const makeMemoryMappedCircularBuffer = async ({
);
}

arena.set(record.subarray(0, firstWriteLength), Number(circEnd));
if (firstWriteLength < record.byteLength) {
// Write to the beginning of the arena.
arena.set(record.subarray(firstWriteLength, record.byteLength), 0);
}
header.setBigUint64(
I_CIRC_END,
(circEnd + BigInt(record.byteLength)) % arenaSize,
);

return writeRecord(record, firstWriteLength, circEnd);
};

const writeJSON = (obj, jsonObj = serializeSlogObj(obj)) => {
// Prepend a newline so that the file can be more easily manipulated.
const data = new TextEncoder().encode(`\n${jsonObj}`);
// console.log('have obj', obj, data);
writeCircBuf(data);
return { readCircBuf, writeCircBuf };
}

/**
* @param {{
* circularBufferSize?: number,
* stateDir?: string,
* circularBufferFilename?: string
* }} opts
*/
export const makeSimpleCircularBuffer = async ({
circularBufferSize = DEFAULT_CBUF_SIZE,
stateDir = '/tmp',
circularBufferFilename,
}) => {
const filename = circularBufferFilename || `${stateDir}/${DEFAULT_CBUF_FILE}`;

const newArenaSize = await initializeCircularBuffer(
filename,
circularBufferSize,
);

const file = await fsp.open(filename, 'r+');

const headerBuffer = Buffer.alloc(I_ARENA_START);

await file.read({
buffer: headerBuffer,
length: I_ARENA_START,
position: 0,
});
const header = new DataView(headerBuffer.buffer);

// Detect the arena size from the header, if not initialized.
const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE);
const arenaSize = newArenaSize || hdrArenaSize;

const hdrMagic = header.getBigUint64(I_MAGIC);
SLOG_MAGIC === hdrMagic ||
Fail`${filename} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`;
arenaSize === hdrArenaSize ||
Fail`${filename} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`;

/** @type {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} */
const readRecord = (outbuf, readStart, firstReadLength) => {
const bytesRead = fs.readSync(file.fd, outbuf, {
length: firstReadLength,
position: Number(readStart) + I_ARENA_START,
});
assert.equal(bytesRead, firstReadLength, 'Too few bytes read');

if (bytesRead < outbuf.byteLength) {
fs.readSync(file.fd, outbuf, {
offset: firstReadLength,
length: outbuf.byteLength - firstReadLength,
position: I_ARENA_START,
});
}
};

/**
* Writes to the file, offset by the header size. Also updates the file header.
*
* @param {Uint8Array} record
* @param {number} firstWriteLength
* @param {bigint} circEnd
*/
const writeRecord = async (record, firstWriteLength, circEnd) => {
await file.write(
record,
// TS saying options bag not available
0,
firstWriteLength,
I_ARENA_START + Number(circEnd),
);
if (firstWriteLength < record.byteLength) {
// Write to the beginning of the arena.
await file.write(
record,
firstWriteLength,
record.byteLength - firstWriteLength,
I_ARENA_START,
);
}

// Write out the updated file header.
// This is somewhat independent of writing the record itself, but it needs
// updating each time a record is written.
await file.write(headerBuffer, undefined, undefined, 0);
};

return { readCircBuf, writeCircBuf, writeJSON };
return finishCircularBuffer(arenaSize, header, readRecord, writeRecord);
};

export const makeSlogSender = async opts => {
const { writeJSON } = await makeMemoryMappedCircularBuffer(opts);
/**
*
* @param {Pick<Awaited<ReturnType<typeof makeSimpleCircularBuffer>>, 'writeCircBuf'>} circBuf
*/
export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
/** @type {Promise<void>} */
let toWrite = Promise.resolve();
const writeJSON = (obj, serialized = serializeSlogObj(obj)) => {
// Prepend a newline so that the file can be more easily manipulated.
const data = new TextEncoder().encode(`\n${serialized}`);
// console.log('have obj', obj, data);
toWrite = toWrite.then(() => writeCircBuf(data));
};
return Object.assign(writeJSON, {
forceFlush: async () => {},
forceFlush: async () => {
await toWrite;
},
usesJsonObject: true,
});
};

/**
* Loaded dynamically by makeSlogSender()
*
* @type {import('./index.js').MakeSlogSender}
*/
export const makeSlogSender = async opts => {
const { writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ writeCircBuf });
};
4 changes: 2 additions & 2 deletions packages/telemetry/src/frcat-entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import '@endo/init';

import { makeMemoryMappedCircularBuffer } from './flight-recorder.js';
import { makeSimpleCircularBuffer } from './flight-recorder.js';

const main = async () => {
const files = process.argv.slice(2);
Expand All @@ -14,7 +14,7 @@ const main = async () => {
}

for await (const file of files) {
const { readCircBuf } = await makeMemoryMappedCircularBuffer({
const { readCircBuf } = await makeSimpleCircularBuffer({
circularBufferFilename: file,
circularBufferSize: 0,
});
Expand Down
11 changes: 7 additions & 4 deletions packages/telemetry/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export * from './make-slog-sender.js';
* shutdown?: () => Promise<void>;
* }} SlogSender
*/
/**
* @typedef {(opts: import('./index.js').MakeSlogSenderOptions) => SlogSender | undefined} MakeSlogSender
*/
/**
* @typedef {MakeSlogSenderCommonOptions & Record<string, unknown>} MakeSlogSenderOptions
* @typedef {object} MakeSlogSenderCommonOptions
Expand All @@ -34,9 +37,9 @@ export const tryFlushSlogSender = async (
await Promise.resolve(slogSender?.forceFlush?.()).catch(err => {
log?.('Failed to flush slog sender', err);
if (err.errors) {
err.errors.forEach(error => {
for (const error of err.errors) {
log?.('nested error:', error);
});
}
}
if (env.SLOGSENDER_FAIL_ON_ERROR) {
throw err;
Expand Down Expand Up @@ -67,12 +70,12 @@ export const getResourceAttributes = ({
}
if (OTEL_RESOURCE_ATTRIBUTES) {
// Allow overriding resource attributes.
OTEL_RESOURCE_ATTRIBUTES.split(',').forEach(kv => {
for (const kv of OTEL_RESOURCE_ATTRIBUTES.split(',')) {
const match = kv.match(/^([^=]*)=(.*)$/);
if (match) {
resourceAttributes[match[1]] = match[2];
}
});
}
}
return resourceAttributes;
};
Expand Down
Loading

0 comments on commit 23a970d

Please sign in to comment.