diff --git a/ui/bun.lockb b/ui/bun.lockb index fa5fc39..9855131 100755 Binary files a/ui/bun.lockb and b/ui/bun.lockb differ diff --git a/ui/package.json b/ui/package.json index 25d2426..4cf779c 100644 --- a/ui/package.json +++ b/ui/package.json @@ -14,8 +14,8 @@ "@sundaeswap/prettier-config": "^2.0.13", "@types/bun": "latest", "@types/d3": "^7.4.3", - "@types/msgpack-lite": "^0.1.11", "@types/oboe": "^2.1.4", + "@types/pako": "^2.0.3", "@types/react": "^18.3.12", "@types/react-dom": "^18.3.1", "@vitejs/plugin-react": "^4.3.3", @@ -31,10 +31,12 @@ "@visx/responsive": "^3.12.0", "concat-stream": "^2.0.0", "d3": "^7.9.0", + "debounce": "^2.2.0", "linebyline": "^1.3.0", "msgpack-lite": "^0.1.26", "next": "^15.0.2", "oboe": "^2.1.7", + "pako": "^2.1.0", "react": "^18.3.1", "react-dom": "^18.3.1", "react-force-graph": "^1.44.6", diff --git a/ui/src/app/Test.tsx b/ui/src/app/Test.tsx index 383bfda..12e9762 100644 --- a/ui/src/app/Test.tsx +++ b/ui/src/app/Test.tsx @@ -3,13 +3,13 @@ import { useGraphContext } from "@/contexts/GraphContext/context"; export const Test = () => { - const { topography, transactions, messages, currentTime, maxTime } = useGraphContext(); + const { state: { topography, transactions, messages, currentTime, maxTime } } = useGraphContext(); return (

Nodes: {topography.nodes.size}
Links: {topography.links.size}
- Total Events: {messages.length}
+ Total Events: {messages.size}
Transaction List: {transactions.size}
Current Time: {currentTime}
Max Time: {maxTime} diff --git a/ui/src/app/api/messages/batch/route.ts b/ui/src/app/api/messages/batch/route.ts index 8639fb4..6af8700 100644 --- a/ui/src/app/api/messages/batch/route.ts +++ b/ui/src/app/api/messages/batch/route.ts @@ -1,13 +1,10 @@ import { createReadStream, statSync } from "fs"; -import msgpack from "msgpack-lite"; import { NextResponse } from "next/server"; -import readline from 'readline'; +import readline from "readline"; import { IServerMessage } from "@/components/Graph/types"; import { messagesPath } from "../../utils"; -const MESSAGE_BUFFER_IN_MS = 200; - async function findStartPosition(filePath: string, targetTimestamp: number) { const fileSize = statSync(filePath).size; let left = 0; @@ -29,17 +26,20 @@ async function findStartPosition(filePath: string, targetTimestamp: number) { return bestPosition; } -function getTimestampAtPosition(filePath: string, position: number): Promise { +function getTimestampAtPosition( + filePath: string, + position: number, +): Promise { return new Promise((resolve, reject) => { const stream = createReadStream(filePath, { start: position }); let foundNewLine = false; let adjustedPosition = position; // Read a few bytes to find the newline character - stream.on('data', (chunk) => { + stream.on("data", (chunk) => { const decoded = chunk.toString("utf8"); for (let i = 0; i < decoded.length; i++) { - if (decoded[i] === '\n') { + if (decoded[i] === "\n") { foundNewLine = true; adjustedPosition += i + 1; // Move to the start of the next line break; @@ -49,151 +49,141 @@ function getTimestampAtPosition(filePath: string, position: number): Promise { + stream.on("close", () => { if (foundNewLine) { // Now use readline to get the timestamp from the new line - const lineStream = createReadStream(filePath, { start: adjustedPosition }); + const lineStream = createReadStream(filePath, { + start: adjustedPosition, + }); const rl = readline.createInterface({ input: lineStream, crlfDelay: Infinity, }); - rl.on('line', (line) => { + rl.on("line", (line) => { const message: IServerMessage = JSON.parse(line); const timestamp = message.time / 1_000_000; rl.close(); resolve(timestamp); }); - rl.on('error', (err) => { + rl.on("error", (err) => { reject(err); }); } else { - reject(new Error("Could not find a newline character in the provided range")); + reject( + new Error("Could not find a newline character in the provided range"), + ); } }); - stream.on('error', (err) => { + stream.on("error", (err) => { reject(err); }); }); } -export async function GET(req: Request) { +export async function GET(req: Request, res: Response) { try { const url = new URL(req.url); - const currentTime = parseInt(url.searchParams.get("time") || "", 10); + const startTime = parseInt(url.searchParams.get("startTime") || ""); + const speed = parseInt(url.searchParams.get("speed") || ""); - if (isNaN(currentTime)) { - return new NextResponse("Invalid currentTime parameter", { status: 400 }); + if (isNaN(startTime)) { + return new NextResponse(null, { status: 400, statusText: "Invalid currentTime parameter" }); } - const startPosition = await findStartPosition(messagesPath, currentTime); - const fileStream = createReadStream(messagesPath, { encoding: "utf8", start: startPosition }); + if (isNaN(speed)) { + return new NextResponse(null, { status: 400, statusText: "Invalid speed parameter" }); + } + + const startPosition = await findStartPosition(messagesPath, startTime); + const fileStream = createReadStream(messagesPath, { + encoding: "utf8", + start: startPosition, + }); const rl = readline.createInterface({ input: fileStream, - crlfDelay: Infinity - }) - + crlfDelay: Infinity, + }); + + let initialEventTime: number | null = null; + let interval: Timer | undefined; + const startTimeOnServer = Date.now(); + const eventBuffer: { line: string; timeToSend: number }[] = []; + let rlClosed = false; + const stream = new ReadableStream({ + cancel() { + clearInterval(interval); + rl.close(); + }, + start(controller) { - rl.on("line", (line) => { + const processEventBuffer = () => { + const now = Date.now(); + + while (eventBuffer.length > 0) { + const { line, timeToSend } = eventBuffer[0]; + if (timeToSend <= now) { + // Send the event to the client + controller.enqueue(`data: ${line}\n\n`); + eventBuffer.shift(); + } else { + // Next event isn't ready yet + break; + } + } + + // Close the stream if all events have been sent and the file has been fully read + if (eventBuffer.length === 0 && rlClosed) { + clearInterval(interval); + controller.close(); + } + }; + + interval = setInterval(processEventBuffer, 50); + + const processLine = (line: string) => { try { const message: IServerMessage = JSON.parse(line); - const aboveLowerLimit = message.time / 1_000_000 >= currentTime; - const underUpperLimit = message.time / 1_000_000 < currentTime + MESSAGE_BUFFER_IN_MS; - - // Check if the message falls within the time range - if (aboveLowerLimit && underUpperLimit) { - controller.enqueue(msgpack.encode(message)) - } + const eventTime = message.time; // Timestamp in nanoseconds - // Free up resources if we're already pass the buffer window. - if (message.time / 1_000_000 >= currentTime + MESSAGE_BUFFER_IN_MS) { - rl.close(); - fileStream.destroy(); + if (initialEventTime === null) { + initialEventTime = eventTime; } + + const deltaTime = eventTime - initialEventTime; // Difference in nanoseconds + const adjustedDelay = (deltaTime / 1_000_000) * speed; // Convert to ms and apply multiplier + + const timeToSend = startTimeOnServer + adjustedDelay; + + eventBuffer.push({ line, timeToSend }); } catch (error) { - // Handle JSON parse errors or other issues controller.error(error); } + }; + + rl.on("line", (line) => { + processLine(line); }); - + rl.on("close", () => { - controller.close(); + console.log('rl closed') }); - + rl.on("error", (error) => { controller.error(error); + console.log('rl error') }); - } + }, }); - // const stream = new ReadableStream( - // { - // start(controller) { - // start = performance.now(); - // fileStream.on("data", (chunk) => { - // buffer += chunk; - // let lines = buffer.split("\n"); - // buffer = lines.pop() || ""; // Keep the last incomplete line - - // for (const line of lines) { - // if (!line.trim()) continue; - - // try { - // const message: IServerMessage = JSON.parse(line); - // const aboveLowerLimit = message.time / 1_000_000 >= currentTime - halfRange; - // const underUpperLimit = message.time / 1_000_000 < currentTime + halfRange; - - // // Check if the message falls within the time range - // if (aboveLowerLimit && underUpperLimit && !sent) { - // // Stream the message if it matches the time range - // controller.enqueue(new TextEncoder().encode(JSON.stringify(message) + "\n")); - // sent = true; - // } - // } catch (error) { - // console.error("Error parsing JSON line:", error); - // controller.error(new Error("Error parsing JSON line")); - // fileStream.destroy(); - // break; - // } - // } - // }); - - // fileStream.on("end", () => { - // end = performance.now(); - // console.log(end - start); - // if (buffer.trim()) { - // try { - // const message = JSON.parse(buffer); // Parse the last incomplete line - // controller.enqueue( - // new TextEncoder().encode(JSON.stringify(message) + "\n"), - // ); - // } catch (error) { - // console.error("Error parsing final JSON line:", error); - // controller.error(new Error("Error parsing final JSON line")); - // } - // } - // controller.close(); - // }); - - // fileStream.on("error", (error) => { - // console.error("File stream error:", error); - // controller.error(new Error("File stream error")); - // }); - // }, - // }, - // { - // highWaterMark: 100_000, - // }, - // ); - - // Return a streaming response return new NextResponse(stream, { headers: { - "Content-Type": "application/jsonl", - "Transfer-Encoding": "chunked", + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", }, }); } catch (e) { diff --git a/ui/src/app/api/topography/route.ts b/ui/src/app/api/topography/route.ts deleted file mode 100644 index 312fa51..0000000 --- a/ui/src/app/api/topography/route.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { parse } from "@iarna/toml"; -import { createReadStream } from 'fs'; -import { NextResponse } from 'next/server'; -import path from 'path'; - -import { ILink, INode } from "@/components/Graph/types"; - -export async function GET(req: Request) { - try { - const filePath = path.resolve(__dirname, "../../../../../../../sim-rs/test_data/realistic.toml"); - const fileStream = createReadStream(filePath, { encoding: "utf8", highWaterMark: 100_000 }); - const result = await parse.stream(fileStream) as unknown as { links: ILink[]; nodes: INode[] }; - - // Return a streaming response - return NextResponse.json(result); - } catch (e) { - return new NextResponse(null, { - status: 500, - statusText: (e as Error)?.message - }) - } -} diff --git a/ui/src/app/api/transactions/last/route.ts b/ui/src/app/api/transactions/last/route.ts deleted file mode 100644 index 03a1a4d..0000000 --- a/ui/src/app/api/transactions/last/route.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { EMessageType, IServerMessage } from "@/components/Graph/types"; -import { closeSync, openSync, readSync, statSync } from "fs"; -import { NextResponse } from "next/server"; -import { messagesPath } from "../../utils"; - -async function getLastTransactionReceived(filePath: string, bufferSize = 1024): Promise { - return new Promise((resolve, reject) => { - const fileSize = statSync(filePath).size; - if (fileSize === 0) { - return reject(new Error("File is empty")); - } - - const fileDescriptor = openSync(filePath, 'r'); - let buffer = Buffer.alloc(bufferSize); - let position = fileSize; - let lastLine = ""; - let foundLastTransactionReceived = false; - - while (position > 0 && !foundLastTransactionReceived) { - // Calculate how many bytes to read - const bytesToRead = Math.min(bufferSize, position); - position -= bytesToRead; - - // Read from the file - readSync(fileDescriptor, new Uint8Array(buffer.buffer), 0, bytesToRead, position); - const chunk = buffer.toString('utf8', 0, bytesToRead); - - // Search for the last newline character - const lines = chunk.split(/\n/).reverse(); - for (const line of lines) { - if (!line) { - continue; - } - - try { - const message: IServerMessage = JSON.parse(line); - if (message.message.type === EMessageType.TransactionReceived) { - lastLine = line; - foundLastTransactionReceived = true; - break; - } - } catch (e) { - console.log(`Could not parse: ${line}`) - } - } - - position -= bytesToRead; - } - - closeSync(fileDescriptor); - - if (!foundLastTransactionReceived && lastLine.length === 0) { - return reject(new Error("Could not find any complete line in the file")); - } - - if (!lastLine) { - reject("Could not find the last transaction.") - } else { - resolve(lastLine.trim()); - } - }); -} - -export async function GET() { - try { - const line = await getLastTransactionReceived(messagesPath); - console.log(line) - const data: IServerMessage = JSON.parse(line); - return NextResponse.json(data); - } catch(e) { - return new NextResponse(null, { - status: 500 - }) - } -} diff --git a/ui/src/app/page.tsx b/ui/src/app/page.tsx index cc19031..5df2625 100644 --- a/ui/src/app/page.tsx +++ b/ui/src/app/page.tsx @@ -1,16 +1,17 @@ -import { Graph } from "@/components/Graph/Graph"; +import { GraphWrapper } from "@/components/Graph/GraphWapper"; import Image from "next/image"; +import { getSetSimulationMaxTime, getSimulationTopography } from "./queries"; export default async function Home() { - // const [messages, topography] = await Promise.all([ - // getMessages(), - // getTopography() - // ]) + const [maxTime, topography] = await Promise.all([ + getSetSimulationMaxTime(), + getSimulationTopography(), + ]); return (

- +