From f94b6596db59908ae11ee9987360e840a702824e Mon Sep 17 00:00:00 2001 From: LimJiaYan Date: Sun, 20 Oct 2024 02:20:07 +0800 Subject: [PATCH 1/2] Integrate matching service frontend with backend: - Update WebSocket to accept an additional argument: username. - Increase queueing time from 10 seconds to 30 seconds. --- .../controllers/matchingController.js | 164 +++++++++++++++--- backend/matching-service/index.js | 48 +++-- backend/matching-service/package.json | 11 +- .../app/(authenticated)/questions/page.tsx | 153 ++++++++++++++-- 4 files changed, 318 insertions(+), 58 deletions(-) diff --git a/backend/matching-service/controllers/matchingController.js b/backend/matching-service/controllers/matchingController.js index d66cf42841..0d55c69170 100644 --- a/backend/matching-service/controllers/matchingController.js +++ b/backend/matching-service/controllers/matchingController.js @@ -1,5 +1,8 @@ // controllers/matchingController.js const { Kafka } = require('kafkajs'); +const EventEmitter = require('events'); +const QUEUE_TIME = 30000; +const BATCH_INTERVAL = 10000; // Kafka setup const kafka = new Kafka({ @@ -7,55 +10,169 @@ const kafka = new Kafka({ brokers: ['kafka:9092'], // 'kafka' is the service name from docker-compose }); +process.on('SIGTERM', async () => { + console.log('Shutting down...'); + await producer.disconnect(); + await consumer.disconnect(); + process.exit(0); + }); + const producer = kafka.producer(); const consumer = kafka.consumer({ groupId: 'matching-group' }); -// Produce a message to Kafka (used in the POST /produce route) -const produceMessage = async (req, res) => { - const { message } = req.body; +(async () => { try { await producer.connect(); - await producer.send({ - topic: 'test-topic', - messages: [{ value: message }], + await consumer.connect(); + } catch(error) { + console.error(error) + } +})(); + +const eventEmitter = new EventEmitter(); + +const matchmakeUser = async (userId, username, questions) => { + return new Promise((resolve, reject) => { + produceMessage({ + userId: userId, + username: username, + questions: questions + }, false); + eventEmitter.once(`success-${userId}`, (peerUserId, peerUsername) => { + const message = { + status: 'success', + userId: userId, + peerUserId: peerUserId, + peerUsername: peerUsername, + message: `User ${userId} matched with User ${peerUserId} (username: ${peerUsername}).` + }; + + resolve(JSON.stringify(message)); + }); + eventEmitter.once(`dequeue-${userId}`, () => { + const message = { + status: 'dequeue', + userId: userId, + message: `User ${userId} dequeued from matchmaking.` + }; + + resolve(JSON.stringify(message)); + }); + setTimeout(() => { + const message = { + status: 'timeout', + userId: userId, + message: `No matches for ${userId}.` + }; + + reject(JSON.stringify(message)); + }, QUEUE_TIME); + }) +} + +// Produce a message to Kafka (used in the POST /produce route) +const produceMessage = async (request, isRequeue = false) => { + const msg = { + userId: request.userId, + username: request.username, + questions: request.questions, + enqueueTime: isRequeue ? request.enqueueTime : Date.now() + } + const stringifiedMsg = JSON.stringify(msg) + const message = { + topic: 'test-topic', + messages: [ + {value: stringifiedMsg} + ], + } + try { + // await producer.connect(); + await producer.send(message).then(() => { + console.log(`Enqueued message: ${stringifiedMsg}`) }); - await producer.disconnect(); - res.status(200).send(`Message produced: ${message}`); + // await producer.disconnect(); } catch (error) { console.error('Error producing message:', error); - res.status(500).send('Failed to produce message'); } }; // Produce a startup message when the service starts const produceStartupMessage = async () => { try { - await producer.connect(); + // await producer.connect(); const message = 'Hello from producer'; - await producer.send({ - topic: 'test-topic', - messages: [{ value: message }], - }); + // await producer.send({ + // topic: 'test-topic', + // messages: [{ value: message }], + // }); console.log(`Produced startup message: ${message}`); - await producer.disconnect(); + // await producer.disconnect(); } catch (error) { console.error('Error producing startup message:', error); } }; +let batch = []; +const batchProcess = () => { + if (batch.length == 0) { + console.log("No messages to process in this batch cycle."); + return; + } + batch.sort((A, B) => A.questions.length - B.questions.length); + console.log(`sorted batch is`, batch); + let questionDict = new Map(); + let unmatchedUsers = new Map(); + batch.forEach((user) => { + unmatchedUsers.set(user.userId, user); + }); + for (const user of batch) { + if (Date.now() - user.enqueueTime >= QUEUE_TIME) { + // User has timed out. + // TODO: send timeout event emitter. + unmatchedUsers.delete(user.userId); + continue; + } + if (!unmatchedUsers.has(user.userId)) { + // User has already been matched. + continue; + } + + user.questions.forEach((question) => { + const peerUser = questionDict.get(question); + // Note: UserId cannot be 0, + // since 0 is falsy and would break this if-conditional. + if (peerUser && unmatchedUsers.has(peerUser.userId)) { + // Found match!! + eventEmitter.emit(`success-${user.userId}`, peerUser.userId, peerUser.username) + eventEmitter.emit(`success-${peerUser.userId}`, user.userId, user.username) + unmatchedUsers.delete(user.userId); + unmatchedUsers.delete(peerUser.userId); + } else { + // Else, keep looking + questionDict.set(question, user) + } + }) + } + for (const [key, user] of unmatchedUsers) { + produceMessage(user, true) + console.log(`User ${key} returned to queue.`) + } + batch = []; +}; + // Start consuming messages from Kafka const runConsumer = async () => { try { - await consumer.connect(); + // await consumer.connect(); + // await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); + setInterval(batchProcess, BATCH_INTERVAL); + await consumer.run({ eachMessage: async ({ topic, partition, message }) => { - console.log({ - partition, - offset: message.offset, - value: message.value.toString(), - }); + const parsedJsonMsg = JSON.parse(message.value); + batch.push(parsedJsonMsg); }, }); } catch (error) { @@ -64,7 +181,6 @@ const runConsumer = async () => { }; module.exports = { - produceMessage, - produceStartupMessage, runConsumer, -}; + matchmakeUser +}; \ No newline at end of file diff --git a/backend/matching-service/index.js b/backend/matching-service/index.js index f0357fb44c..47e9af4b6d 100644 --- a/backend/matching-service/index.js +++ b/backend/matching-service/index.js @@ -1,25 +1,35 @@ -// index.js -const express = require('express'); -const kafkaRoutes = require('./routes/matchingRoutes'); -const { produceStartupMessage, runConsumer } = require('./controllers/matchingController'); +const WebSocket = require("ws"); -// Create an Express app -const app = express(); -const port = process.env.PORT || 3002; +const wss = new WebSocket.Server({port: 3002}); +const { matchmakeUser, runConsumer, dequeueUser} = require("./controllers/matchingController"); -// Middleware to parse JSON bodies -app.use(express.json()); +console.log("Started Websocket server!!!"); -// Use the Kafka routes -app.use('/matching', kafkaRoutes); +runConsumer().catch(console.error); -// Start the Express server and Kafka consumer -app.listen(port, async () => { - console.log(`Matching service running on port ${port}`); +wss.on("connection", (ws) => { + console.log("New Client Connected"); + ws.send("Welcome to websocket server"); - // Produce a message on startup - await produceStartupMessage(); + ws.on('message', async (msg) => { + // console.log(`Received message: ${msg}`); + msg = JSON.parse(msg) + if (msg.event == "enqueue") { + let res; + // console.log(`User ${msg.userId} has been enqueued.`) + try { + res = await matchmakeUser(msg.userId, msg.username, msg.questions) + } catch (failure) { + res = failure + } + ws.send(res) + ws.close() + } else if (msg.event == "dequeue") { + console.log("User has been dequeued") + } + }); - // Start consuming messages - runConsumer().catch(console.error); -}); + ws.on("close", () => { + console.log("Client has disconnected") + }); +}) \ No newline at end of file diff --git a/backend/matching-service/package.json b/backend/matching-service/package.json index 4bb3c68b82..734fe0391f 100644 --- a/backend/matching-service/package.json +++ b/backend/matching-service/package.json @@ -3,14 +3,19 @@ "version": "1.0.0", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "echo \"Error: no test specified\" && exit 1", + "dev": "nodemon index.js" }, "dependencies": { "express": "^4.18.2", - "kafkajs": "^2.0.0" + "kafkajs": "^2.0.0", + "ws": "^8.18.0" }, "keywords": [], "author": "", "license": "ISC", - "description": "" + "description": "", + "devDependencies": { + "nodemon": "^3.1.7" + } } \ No newline at end of file diff --git a/frontend/app/(authenticated)/questions/page.tsx b/frontend/app/(authenticated)/questions/page.tsx index c4bb82bb9b..5a7f2c2804 100644 --- a/frontend/app/(authenticated)/questions/page.tsx +++ b/frontend/app/(authenticated)/questions/page.tsx @@ -9,7 +9,7 @@ import { ScrollArea } from "@/components/ui/scroll-area"; import { cn } from "@/lib/utils"; import { Check, ChevronsRight, Flag, MessageSquareText, Plus, X } from "lucide-react"; import { useRouter } from "next/navigation"; -import React, { useCallback, useEffect, useRef, useState } from "react"; +import React, { use, useCallback, useEffect, useRef, useState } from "react"; import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog"; type Question = { @@ -72,6 +72,10 @@ export default function Questions() { const matchTimerRef = useRef(null); const [isMatchFoundDialogOpen, setMatchFoundDialogOpen] = useState(false); const [isMatchFailDialogOpen, setMatchFailDialogOpen] = useState(false); + const [matchResult, setMatchResult] = useState({ id: '', username: ''}) + const timeout = useRef(false); + const selectedQuestionList = React.useRef([]) + const userInfo = useRef({ id: "", username: ""}); // authenticate user else redirect them to login page useEffect(() => { @@ -99,13 +103,7 @@ export default function Questions() { } const data = (await response.json()).data; - - // if needed - // setUsername(data.username); - // setEmail(data.email); - // form.setValue("username", data.username); - // form.setValue("email", data.email); - // userId.current = data.id; + userInfo.current = { id: data.id, username: data.username}; } catch (error) { console.error('Error during authentication:', error); router.push('/login'); // Redirect to login in case of any error @@ -185,6 +183,10 @@ export default function Questions() { const newIsSelectAll = !isSelectAll; setIsSelectAll(newIsSelectAll); + const arr: number[] = newIsSelectAll + ? filteredQuestions.map((f_qns) => f_qns.id) + : []; + // Toggle selection of all questions const updatedQuestions = questionList.map((question) => filteredQuestions.map((f_qns) => f_qns.id).includes(question.id) @@ -194,17 +196,24 @@ export default function Questions() { } : question ); + selectedQuestionList.current = arr; setQuestionList(updatedQuestions); }; // Function to handle individual question selection const handleSelectQuestion = (id: number) => { + const updatedQuestions = questionList.map((question) => question.id === id ? { ...question, selected: !question.selected } : question ); + // Update the selected questions list (collect the IDs of selected questions) + const selectedQuestions = updatedQuestions + .filter((question) => question.selected) // Only keep selected questions + .map((question) => question.id); // Extract their IDs setQuestionList(updatedQuestions); + selectedQuestionList.current = selectedQuestions; }; useEffect(() => { @@ -231,17 +240,129 @@ export default function Questions() { console.log("Selected complexities:", selectedComplexities); }, [selectedComplexities]); // This effect runs every time selectedcomplexities change - const handleMatch = useCallback(() => { + const handleMatch1 = useCallback(() => { setIsMatching(prev => !prev); setIsHovering(false); }, []); + const ws = useRef(null); // WebSocket reference + const handleMatch = useCallback(() => { + setIsMatching(prev => !prev) + setIsHovering(false); + + if (ws.current === null || ws.current.readyState === WebSocket.CLOSED || ws.current.readyState === WebSocket.CLOSING) { + console.log("Connecting to web socket for matching service ...") + // Initialize WebSocket connection if not already matching + ws.current = new WebSocket(process.env.NEXT_PUBLIC_MATCHING_API_URL || 'ws://localhost:3002/matching'); + } + + ws.current.onopen = () => { + console.log("WebSocket connection opened. Now sending msg to WebSocket."); + + const message = { + event: "enqueue", + userId: userInfo.current.id, + username: userInfo.current.username, + questions: selectedQuestionList.current, + }; + + ws.current?.send(JSON.stringify(message)); + console.log("Sent matching request to web socket:", message); + }; + + ws.current.onmessage = (event) => { + if (event.data == "Welcome to websocket server") { + console.log("receive welcome msg from websocket server") + return ; + } + // // Handle successful match + // if (message.startsWith("User") && message.includes("matched with User")) { + // // Extract userId and peerUserId from the message (if needed for display purposes) + // const matchDetails = message.match(/User (\w+) matched with User (\w+) \(username: ([\w\s]+)\)/); + // console.log("match detail", matchDetails); + // const userId = matchDetails[1]; + // const peerUserId = matchDetails[2]; + // const peerUserName = + + // // Set match result in state (adjust as needed) + // setMatchResult({ id: peerUserId, username: "" }); + // setMatchFoundDialogOpen(true); // Open match found dialog + // } else if (message.startsWith("No matches for")) { // Handle timeout/no match found message + // console.log("No matches found for the user."); + // setMatchFailDialogOpen(true); // Open match failed dialog + // } else { // Handle unexpected messages + // console.warn("Unexpected message received:", message); + // setMatchFailDialogOpen(true); + // } + const message = JSON.parse(event.data); + console.log("message receive from websocket", message); + + // Handle different message statuses + switch (message.status) { + case 'success': + console.log(`User ${message.userId} matched with User ${message.peerUserId} (username: ${message.peerUsername}).`); + setMatchResult({ id: message.peerUserId, username: message.peerUsername }); + setMatchFoundDialogOpen(true); + break; + + case 'dequeue': + console.log(`User ${message.userId} dequeued from matchmaking.`); + break; + + case 'timeout': + console.log(`No matches for user ${message.userId}.`); + setMatchFailDialogOpen(true); + break; + + default: + console.warn("Unexpected message received:", message); + setMatchFailDialogOpen(true); + break; + } + } + + ws.current.onclose = () => { + console.log("WebSocket connection closed"); + setIsMatching(false); + }; + + ws.current.onerror = (error) => { + console.error("WebSocket error:", error); + setIsMatching(false); + setMatchFailDialogOpen(true); // Show failure dialog if there's an error + }; + }, []); + + const handleCancel = useCallback(() => { + setIsMatching(false); + if (ws.current === null || ws.current.readyState === WebSocket.CLOSED || ws.current.readyState === WebSocket.CLOSING) { + console.log("Connecting to web socket for matching service ...") + // Initialize WebSocket connection if not already matching + ws.current = new WebSocket(process.env.NEXT_PUBLIC_MATCHING_API_URL || 'ws://localhost:3002/matching'); + console.log(ws.current.readyState) + } + ws.current.onopen = () => { + console.log("WebSocket connection opened"); + + const message = { + event: "dequeue", + userId: userInfo.current.id, + username: userInfo.current.username, + questions: [], + }; + + ws.current?.send(JSON.stringify(message)); + }; + }, []) + useEffect(() => { + timeout.current = false; if (isMatching) { setMatchTime(0); matchTimerRef.current = setInterval(() => { setMatchTime((prevTime) => { if (prevTime >= 32) { // we use 32 so there is buffer + timeout.current = true; clearInterval(matchTimerRef.current as NodeJS.Timeout); setMatchFailDialogOpen(true); // setMatchFoundDialogOpen(true); // use this to open match found dialog @@ -454,15 +575,23 @@ export default function Questions() {