Skip to content

Commit

Permalink
refactor: replace bfj with stream-json
Browse files Browse the repository at this point in the history
  • Loading branch information
byCedric committed Mar 17, 2024
1 parent d518300 commit 7638f90
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 60 deletions.
Binary file modified bun.lockb
Binary file not shown.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
"build": "expo-module build",
"clean": "expo-module clean",
"lint": "eslint . --ext js,ts,tsx",
"test": "bun --test ./src",
"test": "bun --test",
"typecheck": "expo-module typecheck"
},
"license": "MIT",
"dependencies": {
"@expo/server": "^0.3.1",
"arg": "^5.0.2",
"bfj": "^8.0.0",
"compression": "^1.7.4",
"connect": "^3.7.0",
"express": "^4.18.2",
Expand Down
4 changes: 2 additions & 2 deletions src/data/StatsFileSource.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'assert';

import type { PartialStatsEntry, StatsEntry, StatsSource } from './types';
import { appendJsonLine, mapJsonLines, parseJsonLine } from '../utils/ndjson';
import { appendJsonLine, forEachJsonLines, parseJsonLine } from '../utils/ndjson';

export class StatsFileSource implements StatsSource {
constructor(public readonly statsPath: string) {
Expand All @@ -27,7 +27,7 @@ export async function listStatsEntries(statsPath: string) {
const bundlePattern = /^\["([^"]+)","([^"]+)","([^"]+)/;
const entries: PartialStatsEntry[] = [];

await mapJsonLines(statsPath, (contents, line) => {
await forEachJsonLines(statsPath, (contents, line) => {
// Skip the stats metadata line
if (line === 1) return;

Expand Down
33 changes: 21 additions & 12 deletions src/utils/__tests__/ndjson.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { describe, expect, it, mock } from 'bun:test';
import path from 'path';

import { mapJsonLines, parseJsonLine } from '../ndjson';
import { forEachJsonLines, parseJsonLine } from '../ndjson';

function fixture(...filePath: string[]) {
return path.join(__dirname, 'fixtures', ...filePath);
}

describe('mapJsonLines', () => {
it('maps each line of file', async () => {
describe('forEachJsonLines', () => {
it('iterates each line of file', async () => {
const lines: string[] = [];
await mapJsonLines(fixture('ndjson.json'), (content) => {
await forEachJsonLines(fixture('ndjson.json'), (content) => {
lines.push(content);
});

Expand All @@ -22,15 +22,16 @@ describe('mapJsonLines', () => {
]);
});

it('maps each line with line numbers starting from 1', async () => {
it('iterates each line with line numbers starting from 1', async () => {
const onReadLine = mock();
await mapJsonLines(fixture('ndjson.json'), onReadLine);

expect(onReadLine).not.toHaveBeenCalledWith(expect.any(String), 0);
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 1);
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 2);
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 3);
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 4);
await forEachJsonLines(fixture('ndjson.json'), onReadLine);

// Callback is invoked with (content, line, reader) => ...
expect(onReadLine).not.toHaveBeenCalledWith(expect.any(String), 0, expect.any(Object));
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 1, expect.any(Object));
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 2, expect.any(Object));
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 3, expect.any(Object));
expect(onReadLine).toHaveBeenCalledWith(expect.any(String), 4, expect.any(Object));
});
});

Expand All @@ -47,4 +48,12 @@ describe('parseJsonLine', () => {
'Line 99999 not found in file'
);
});

it('parses a single line from file', async () => {
expect(await parseJsonLine(fixture('stats-brent.json'), 1)).toBeObject();
});

it('parses a single line from file, with large data', async () => {
expect(await parseJsonLine(fixture('stats-brent.json'), 2)).toBeArray();
});
});
74 changes: 30 additions & 44 deletions src/utils/ndjson.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,65 @@
import events from 'events';
import fs from 'fs';
import readline from 'readline';
import stream from 'stream';
import { stringer } from 'stream-json/Stringer';
import { disassembler } from 'stream-json/Disassembler';

Check warning on line 6 in src/utils/ndjson.ts

View workflow job for this annotation

GitHub Actions / core

`stream-json/Disassembler` import should occur before import of `stream-json/Stringer`

/**
* Efficiently map through all lines within the Newline-Delimited JSON (ndjson) file, using streams.
* This won't parse the actual JSON but returns the partial string instead.
* Note, line numbers starts at `1`.
* Iterate through lines of a ndjson/jsonl file using streams.
* This won't parse the actual JSON but invokes the callback for each line.
*
* @note Line numbers starts at `1`
*/
export async function mapJsonLines(
export async function forEachJsonLines(
filePath: string,
callback: (contents: string, line: number) => any
callback: (lineContent: string, lineNumber: number, reader: readline.Interface) => any
) {
const stream = fs.createReadStream(filePath);
const reader = readline.createInterface({ input: stream });
const input = fs.createReadStream(filePath);
const reader = readline.createInterface({ input });
let lineNumber = 1;

reader.on('error', (error) => {
throw error;
});

reader.on('line', (contents) => {
callback(contents, lineNumber++);
callback(contents, lineNumber++, reader);
});

await events.once(reader, 'close');
stream.close();
}

/**
* Efficiently parse a single line from a Newline-Delimited JSON (ndjson) file, using streams.
* Note, line numbers starts at `1`.
* Parse a single line of a jsonl/ndjson file using streams.
* Once the line is found, iteration is stopped and the parsed JSON is returned.
*
* @note Line numbers starts at `1`
*/
export async function parseJsonLine<T = any>(filePath: string, line: number): Promise<T> {
const stream = fs.createReadStream(filePath);
const reader = readline.createInterface({ input: stream });
export async function parseJsonLine<T = any>(filePath: string, lineNumber: number): Promise<T> {
let lineContent = '';

let lineContents;
let lineNumber = 1;

reader.on('error', (error) => {
throw error;
});

reader.on('line', (contents) => {
if (lineNumber++ === line) {
lineContents = contents;
await forEachJsonLines(filePath, (content, line, reader) => {
if (line === lineNumber) {
lineContent = content;
reader.close();
}
});

await events.once(reader, 'close');
stream.close();

if (!lineContents) {
throw new Error(`Line ${line} not found in file: ${filePath}`);
if (!lineContent) {
throw new Error(`Line ${lineNumber} not found in file: ${filePath}`);
}

return JSON.parse(lineContents);
return JSON.parse(lineContent);
}

/** Efficiently append a new line to a Newline-Delimited JSON (ndjson) file, using streams. */
/** Append a single line of json data to a jsonl/ndjson file using streams. */
export async function appendJsonLine(filePath: string, data: unknown): Promise<void> {
// Note(cedric): keep this dependency inlined to avoid loading it in the WebUI
const bfj = require('bfj');
await bfj.write(filePath, data, {
// Force stream to append to file
flags: 'a',
// Ignore all complex data types, which shouldn't exist in the data
buffers: 'ignore',
circular: 'ignore',
iterables: 'ignore',
promises: 'ignore',
// Only enable maps, as the graph dependencies are stored as a map
maps: 'object',
});
const input = stream.Readable.from([data] as any, { objectMode: true });
const output = fs.createWriteStream(filePath, { flags: 'a' });

input.pipe(disassembler()).pipe(stringer()).pipe(output);

await events.once(output, 'finish');
await fs.promises.appendFile(filePath, '\n', 'utf-8');
}

0 comments on commit 7638f90

Please sign in to comment.