Skip to content

Commit

Permalink
chore: working event stream and animator
Browse files Browse the repository at this point in the history
  • Loading branch information
cjkoepke committed Nov 23, 2024
1 parent e5b0d90 commit 13093ee
Show file tree
Hide file tree
Showing 21 changed files with 636 additions and 841 deletions.
Binary file modified ui/bun.lockb
Binary file not shown.
4 changes: 3 additions & 1 deletion ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"@sundaeswap/prettier-config": "^2.0.13",
"@types/bun": "latest",
"@types/d3": "^7.4.3",
"@types/msgpack-lite": "^0.1.11",
"@types/oboe": "^2.1.4",
"@types/pako": "^2.0.3",
"@types/react": "^18.3.12",
"@types/react-dom": "^18.3.1",
"@vitejs/plugin-react": "^4.3.3",
Expand All @@ -31,10 +31,12 @@
"@visx/responsive": "^3.12.0",
"concat-stream": "^2.0.0",
"d3": "^7.9.0",
"debounce": "^2.2.0",
"linebyline": "^1.3.0",
"msgpack-lite": "^0.1.26",
"next": "^15.0.2",
"oboe": "^2.1.7",
"pako": "^2.1.0",
"react": "^18.3.1",
"react-dom": "^18.3.1",
"react-force-graph": "^1.44.6",
Expand Down
4 changes: 2 additions & 2 deletions ui/src/app/Test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import { useGraphContext } from "@/contexts/GraphContext/context";

export const Test = () => {
const { topography, transactions, messages, currentTime, maxTime } = useGraphContext();
const { state: { topography, transactions, messages, currentTime, maxTime } } = useGraphContext();

return (
<p>
Nodes: {topography.nodes.size}<br/>
Links: {topography.links.size}<br/>
Total Events: {messages.length}<br/>
Total Events: {messages.size}<br/>
Transaction List: {transactions.size}<br/>
Current Time: {currentTime}<br/>
Max Time: {maxTime}
Expand Down
194 changes: 92 additions & 102 deletions ui/src/app/api/messages/batch/route.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { createReadStream, statSync } from "fs";
import msgpack from "msgpack-lite";
import { NextResponse } from "next/server";
import readline from 'readline';
import readline from "readline";

import { IServerMessage } from "@/components/Graph/types";
import { messagesPath } from "../../utils";

const MESSAGE_BUFFER_IN_MS = 200;

async function findStartPosition(filePath: string, targetTimestamp: number) {
const fileSize = statSync(filePath).size;
let left = 0;
Expand All @@ -29,17 +26,20 @@ async function findStartPosition(filePath: string, targetTimestamp: number) {
return bestPosition;
}

function getTimestampAtPosition(filePath: string, position: number): Promise<number> {
function getTimestampAtPosition(
filePath: string,
position: number,
): Promise<number> {
return new Promise((resolve, reject) => {
const stream = createReadStream(filePath, { start: position });
let foundNewLine = false;
let adjustedPosition = position;

// Read a few bytes to find the newline character
stream.on('data', (chunk) => {
stream.on("data", (chunk) => {
const decoded = chunk.toString("utf8");
for (let i = 0; i < decoded.length; i++) {
if (decoded[i] === '\n') {
if (decoded[i] === "\n") {
foundNewLine = true;
adjustedPosition += i + 1; // Move to the start of the next line
break;
Expand All @@ -49,151 +49,141 @@ function getTimestampAtPosition(filePath: string, position: number): Promise<num
stream.close(); // Stop reading once the newline is found
});

stream.on('close', () => {
stream.on("close", () => {
if (foundNewLine) {
// Now use readline to get the timestamp from the new line
const lineStream = createReadStream(filePath, { start: adjustedPosition });
const lineStream = createReadStream(filePath, {
start: adjustedPosition,
});
const rl = readline.createInterface({
input: lineStream,
crlfDelay: Infinity,
});

rl.on('line', (line) => {
rl.on("line", (line) => {
const message: IServerMessage = JSON.parse(line);
const timestamp = message.time / 1_000_000;
rl.close();
resolve(timestamp);
});

rl.on('error', (err) => {
rl.on("error", (err) => {
reject(err);
});
} else {
reject(new Error("Could not find a newline character in the provided range"));
reject(
new Error("Could not find a newline character in the provided range"),
);
}
});

stream.on('error', (err) => {
stream.on("error", (err) => {
reject(err);
});
});
}

export async function GET(req: Request) {
export async function GET(req: Request, res: Response) {
try {
const url = new URL(req.url);
const currentTime = parseInt(url.searchParams.get("time") || "", 10);
const startTime = parseInt(url.searchParams.get("startTime") || "");
const speed = parseInt(url.searchParams.get("speed") || "");

if (isNaN(currentTime)) {
return new NextResponse("Invalid currentTime parameter", { status: 400 });
if (isNaN(startTime)) {
return new NextResponse(null, { status: 400, statusText: "Invalid currentTime parameter" });
}

const startPosition = await findStartPosition(messagesPath, currentTime);
const fileStream = createReadStream(messagesPath, { encoding: "utf8", start: startPosition });
if (isNaN(speed)) {
return new NextResponse(null, { status: 400, statusText: "Invalid speed parameter" });
}

const startPosition = await findStartPosition(messagesPath, startTime);
const fileStream = createReadStream(messagesPath, {
encoding: "utf8",
start: startPosition,
});
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
})

crlfDelay: Infinity,
});

let initialEventTime: number | null = null;
let interval: Timer | undefined;
const startTimeOnServer = Date.now();
const eventBuffer: { line: string; timeToSend: number }[] = [];
let rlClosed = false;

const stream = new ReadableStream({
cancel() {
clearInterval(interval);
rl.close();
},

start(controller) {
rl.on("line", (line) => {
const processEventBuffer = () => {
const now = Date.now();

while (eventBuffer.length > 0) {
const { line, timeToSend } = eventBuffer[0];
if (timeToSend <= now) {
// Send the event to the client
controller.enqueue(`data: ${line}\n\n`);
eventBuffer.shift();
} else {
// Next event isn't ready yet
break;
}
}

// Close the stream if all events have been sent and the file has been fully read
if (eventBuffer.length === 0 && rlClosed) {
clearInterval(interval);
controller.close();
}
};

interval = setInterval(processEventBuffer, 50);

const processLine = (line: string) => {
try {
const message: IServerMessage = JSON.parse(line);
const aboveLowerLimit = message.time / 1_000_000 >= currentTime;
const underUpperLimit = message.time / 1_000_000 < currentTime + MESSAGE_BUFFER_IN_MS;

// Check if the message falls within the time range
if (aboveLowerLimit && underUpperLimit) {
controller.enqueue(msgpack.encode(message))
}
const eventTime = message.time; // Timestamp in nanoseconds

// Free up resources if we're already pass the buffer window.
if (message.time / 1_000_000 >= currentTime + MESSAGE_BUFFER_IN_MS) {
rl.close();
fileStream.destroy();
if (initialEventTime === null) {
initialEventTime = eventTime;
}

const deltaTime = eventTime - initialEventTime; // Difference in nanoseconds
const adjustedDelay = (deltaTime / 1_000_000) * speed; // Convert to ms and apply multiplier

const timeToSend = startTimeOnServer + adjustedDelay;

eventBuffer.push({ line, timeToSend });
} catch (error) {
// Handle JSON parse errors or other issues
controller.error(error);
}
};

rl.on("line", (line) => {
processLine(line);
});

rl.on("close", () => {
controller.close();
console.log('rl closed')
});

rl.on("error", (error) => {
controller.error(error);
console.log('rl error')
});
}
},
});

// const stream = new ReadableStream(
// {
// start(controller) {
// start = performance.now();
// fileStream.on("data", (chunk) => {
// buffer += chunk;
// let lines = buffer.split("\n");
// buffer = lines.pop() || ""; // Keep the last incomplete line

// for (const line of lines) {
// if (!line.trim()) continue;

// try {
// const message: IServerMessage = JSON.parse(line);
// const aboveLowerLimit = message.time / 1_000_000 >= currentTime - halfRange;
// const underUpperLimit = message.time / 1_000_000 < currentTime + halfRange;

// // Check if the message falls within the time range
// if (aboveLowerLimit && underUpperLimit && !sent) {
// // Stream the message if it matches the time range
// controller.enqueue(new TextEncoder().encode(JSON.stringify(message) + "\n"));
// sent = true;
// }
// } catch (error) {
// console.error("Error parsing JSON line:", error);
// controller.error(new Error("Error parsing JSON line"));
// fileStream.destroy();
// break;
// }
// }
// });

// fileStream.on("end", () => {
// end = performance.now();
// console.log(end - start);
// if (buffer.trim()) {
// try {
// const message = JSON.parse(buffer); // Parse the last incomplete line
// controller.enqueue(
// new TextEncoder().encode(JSON.stringify(message) + "\n"),
// );
// } catch (error) {
// console.error("Error parsing final JSON line:", error);
// controller.error(new Error("Error parsing final JSON line"));
// }
// }
// controller.close();
// });

// fileStream.on("error", (error) => {
// console.error("File stream error:", error);
// controller.error(new Error("File stream error"));
// });
// },
// },
// {
// highWaterMark: 100_000,
// },
// );

// Return a streaming response
return new NextResponse(stream, {
headers: {
"Content-Type": "application/jsonl",
"Transfer-Encoding": "chunked",
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} catch (e) {
Expand Down
22 changes: 0 additions & 22 deletions ui/src/app/api/topography/route.ts

This file was deleted.

Loading

0 comments on commit 13093ee

Please sign in to comment.