From 12fff4abc107ccdeffcb2d692abed51aeb712f78 Mon Sep 17 00:00:00 2001 From: Ruchir28 Date: Thu, 23 May 2024 12:01:53 +0530 Subject: [PATCH 1/3] Added Mediasoup for WebRTC calls --- apps/frontend/package.json | 1 + apps/frontend/src/App.tsx | 4 + apps/frontend/src/hooks/useSfu.ts | 93 ++++ apps/frontend/src/screens/Meet.tsx | 125 +++++ apps/frontend/src/utils/MessageDispatcher.ts | 21 + apps/frontend/src/utils/SfuService.ts | 499 +++++++++++++++++++ apps/sfu/package.json | 22 + apps/sfu/src/Room.ts | 467 +++++++++++++++++ apps/sfu/src/RoomManager.ts | 48 ++ apps/sfu/src/User.ts | 40 ++ apps/sfu/src/index.ts | 94 ++++ apps/sfu/tsconfig.json | 109 ++++ 12 files changed, 1523 insertions(+) create mode 100644 apps/frontend/src/hooks/useSfu.ts create mode 100644 apps/frontend/src/screens/Meet.tsx create mode 100644 apps/frontend/src/utils/MessageDispatcher.ts create mode 100644 apps/frontend/src/utils/SfuService.ts create mode 100644 apps/sfu/package.json create mode 100644 apps/sfu/src/Room.ts create mode 100644 apps/sfu/src/RoomManager.ts create mode 100644 apps/sfu/src/User.ts create mode 100644 apps/sfu/src/index.ts create mode 100644 apps/sfu/tsconfig.json diff --git a/apps/frontend/package.json b/apps/frontend/package.json index 51d2eef1..7d207831 100644 --- a/apps/frontend/package.json +++ b/apps/frontend/package.json @@ -21,6 +21,7 @@ "clsx": "^2.1.0", "history": "^5.3.0", "lucide-react": "^0.372.0", + "mediasoup-client": "^3.7.8", "react": "^18.2.0", "react-confetti": "^6.1.0", "react-dom": "^18.2.0", diff --git a/apps/frontend/src/App.tsx b/apps/frontend/src/App.tsx index e0a1abfa..b28e2658 100644 --- a/apps/frontend/src/App.tsx +++ b/apps/frontend/src/App.tsx @@ -8,6 +8,7 @@ import { RecoilRoot } from 'recoil'; import { useUser } from '@repo/store/useUser'; import { Loader } from './components/Loader'; import { Layout } from './layout'; +import Meet from './screens/Meet'; function App() { return ( @@ -35,6 +36,9 @@ function AuthApp() { path="/game/:gameId" element={} />} /> + }>} + /> ); diff --git a/apps/frontend/src/hooks/useSfu.ts b/apps/frontend/src/hooks/useSfu.ts new file mode 100644 index 00000000..6056819a --- /dev/null +++ b/apps/frontend/src/hooks/useSfu.ts @@ -0,0 +1,93 @@ +import { useEffect, useRef, useState } from 'react'; +import { SfuService, UserConsumer } from '../utils/SfuService'; +import { Producer } from 'mediasoup-client/lib/types'; +import { User } from '../../../../packages/store/src/atoms/user'; + +export const useSfu = ( + roomId: string, + user: User | null, +) => { + + const sfuService = useRef(null); + const [videoProducer, setVideoProducer] = useState(); + const [audioProducer, setAudioProducer] = useState(); + const [consumers, setConsumers] = useState([]); + + const handleNewConsumer = (consumer: UserConsumer) => { + setConsumers((prevConsumers) => [ + ...prevConsumers.filter((c) => c.id !== consumer.id), + consumer, + ]); + }; + + const handleConsumerClosed = (userId: string) => { + setConsumers((prevConsumers) => { + const new_consumers = prevConsumers.filter((consumer) => consumer.id !== userId); + return new_consumers; + }); + }; + + const handleTransportConnected = () => { + startConsuming(); + startProducing(); + } + + useEffect(() => { + + if(!roomId || !user) { + console.error('RoomId or User not provided'); + return; + } + + if (!sfuService.current) { + sfuService.current = new SfuService(roomId, user); + } + + sfuService.current.on(SfuService.NEW_CONSUMER_EVENT, handleNewConsumer); + sfuService.current.on(SfuService.CONSUMER_CLOSED_EVENT, handleConsumerClosed); + sfuService.current.on(SfuService.TRANSPORT_CONNECTED_EVENT, handleTransportConnected); + + return () => { + if (sfuService.current) { + console.log('CALLED CLOSE'); + sfuService.current.cleanUp(); + sfuService.current.off(SfuService.NEW_CONSUMER_EVENT, handleNewConsumer); + sfuService.current.off(SfuService.CONSUMER_CLOSED_EVENT, handleConsumerClosed); + sfuService.current.off(SfuService.TRANSPORT_CONNECTED_EVENT,handleTransportConnected); + sfuService.current = null; + } + }; + }, [roomId, user ? user.token : undefined]); + + + const startProducing = async () => { + if (!sfuService.current) { + console.error('Sfu Service not initialized'); + return; + } + try { + const { audioProducer, videoProducer } = + await sfuService.current.createProducer(); + if (audioProducer) { + setAudioProducer(audioProducer); + } + if (videoProducer) { + setVideoProducer(videoProducer); + } + } catch (error) { + console.error('Error creating producer', error); + } + }; + + const startConsuming = async () => { + if (!sfuService.current) return; + const consumer = await sfuService.current.createConsumer(); + setConsumers([...consumer]); + }; + + return { + videoProducer, + audioProducer, + consumers, + }; +}; diff --git a/apps/frontend/src/screens/Meet.tsx b/apps/frontend/src/screens/Meet.tsx new file mode 100644 index 00000000..2068729d --- /dev/null +++ b/apps/frontend/src/screens/Meet.tsx @@ -0,0 +1,125 @@ +import { useUser } from '@repo/store/useUser'; +import { useSfu } from '../hooks/useSfu'; +import { useEffect, useRef, useState } from 'react'; +import { useParams, useNavigate } from 'react-router-dom'; + +const Meet = () => { + + const user = useUser(); + + const videoRef = useRef(null); + const navigate = useNavigate(); + // consumers videos ref + + const {roomId} = useParams(); + + useEffect(() => { + if(!user) { + navigate('/login'); + } + },[user]); + + const { + videoProducer, + audioProducer, + consumers, + } = useSfu(roomId!, user); + + + useEffect(() => { + if (!videoRef.current) return; + const stream = new MediaStream(); + if(videoProducer && videoProducer.track) { + stream.addTrack(videoProducer.track); + } + if(audioProducer && audioProducer.track) { + stream.addTrack(audioProducer.track); + } + videoRef.current.srcObject = stream; + }, [videoProducer,audioProducer]); + + if (!user) { + return
Redirecting to Login ...
; + } + + return ( +
+

Meeting

+
+

Producer

+ { + audioProducer?.pause(); + videoProducer?.pause(); + }} + onPlay={() => { + videoProducer?.resume(); + audioProducer?.resume(); + }} + /> +
+
+

Consumers

+ {consumers.map((consumer, index) => ( + console.log('CONSUMER in here', consumer), +
+

Consumer {index + 1} is active

+ { + consumer.audioConsumer?.pause(); + consumer.videoConsumer?.pause(); + }} + onPlay={() => { + consumer.audioConsumer?.resume(); + consumer.videoConsumer?.resume(); + }} + /> +
+ ))} +
+
+ ); +}; + +const VideoPlayer: React.FC<{ + videoTrack: MediaStreamTrack | null; + audioTrack: MediaStreamTrack | null; + onPlay: () => void; + onPause: () => void; +}> = (props) => { + const videoRef = useRef(null); + + useEffect(() => { + if (videoRef.current) { + const stream = new MediaStream(); + if (props.audioTrack) { + stream.addTrack(props.audioTrack); + } + if (props.videoTrack) { + stream.addTrack(props.videoTrack); + } + videoRef.current.srcObject = stream; + videoRef.current.onplay = props.onPlay; + videoRef.current.onpause = props.onPause; + } + }); + + return ( +
+ +
+ ); +}; + +export default Meet; diff --git a/apps/frontend/src/utils/MessageDispatcher.ts b/apps/frontend/src/utils/MessageDispatcher.ts new file mode 100644 index 00000000..b94e5240 --- /dev/null +++ b/apps/frontend/src/utils/MessageDispatcher.ts @@ -0,0 +1,21 @@ +class MessageDispatcher { + private handlers: Map void)[]> = new Map(); + + registerHandler(type: string, handler: (payload: any) => void): () => void { + const existing = this.handlers.get(type) || []; + existing.push(handler); + this.handlers.set(type, existing); + // Return a function to deregister the handler + return () => this.handlers.set(type, existing.filter((h) => h !== handler)); + } + + dispatch(event: MessageEvent) { + const message = JSON.parse(event.data); + console.log('Received message:', message); + const handler = this.handlers.get(message.type); + if (handler) { + handler.forEach((h) => h(message.payload)); + } + } + } + export default MessageDispatcher; \ No newline at end of file diff --git a/apps/frontend/src/utils/SfuService.ts b/apps/frontend/src/utils/SfuService.ts new file mode 100644 index 00000000..18bf43d3 --- /dev/null +++ b/apps/frontend/src/utils/SfuService.ts @@ -0,0 +1,499 @@ +import { + Consumer, + Device, + Producer, + Transport, +} from 'mediasoup-client/lib/types'; +import MessageDispatcher from './MessageDispatcher'; +import { EventEmitter } from 'events'; +import { User } from '../../../../packages/store/src/atoms/user'; + +export interface UserConsumer { + id: string; + videoConsumer: Consumer<{userId: string}> | null; + audioConsumer: Consumer<{userId: string}> | null; +} + +export class SfuService extends EventEmitter { + roomId: string; + webSocket: WebSocket; + device: Device; + senderTransport: Transport<{userId: string}> | null; + receiverTransport: Transport<{userId: string}> | null; + videoProducer: Producer<{userId: string}> | null; + audioProducer: Producer<{userId: string}> | null; + producersToConsume: {id: string, userId: string}[]; + consumers: Map; + user: { id: string; token: string; name: string }; + messageDispatcher: MessageDispatcher; + + + constructor( + roomId: string, + user: User, + ) { + super(); + this.roomId = roomId; + this.webSocket = new WebSocket(`ws://localhost:8081?token=${user.token}`); + this.device = new Device(); + this.senderTransport = null; + this.receiverTransport = null; + this.videoProducer = null; + this.audioProducer = null; + this.consumers = new Map(); + this.user = user; + this.messageDispatcher = new MessageDispatcher(); + this.producersToConsume = []; + this.addHandlers(); + } + + static NEW_CONSUMER_EVENT = 'newConsumer'; + static CONSUMER_CLOSED_EVENT = 'consumerClosed'; + static TRANSPORT_CONNECTED_EVENT = 'transportConnected'; + + + addHandlers() { + this.webSocket.onmessage = (event) => { + this.messageDispatcher.dispatch(event); + }; + this.webSocket.onopen = () => { + console.log('Connected to the server'); + this.webSocket.send( + JSON.stringify({ + type: 'JOIN_ROOM', + payload: { + roomId: this.roomId, + }, + }), + ); + }; + + this.messageDispatcher.registerHandler( + 'WEBRTC_TRANSPORT', + async (payload) => { + const { id, iceParameters, iceCandidates, dtlsParameters } = + payload.sender; + + // Load this device + await this.device.load({ + routerRtpCapabilities: payload.routerRtpCapabilities, + }); + + const senderTransport = this.device.createSendTransport({ + id, + iceParameters, + iceCandidates, + dtlsParameters, + appData: { + userId: this.user.id, + }, + }); + + const receiverTransPort = this.device.createRecvTransport({ + id: payload.receiver.id, + iceParameters: payload.receiver.iceParameters, + iceCandidates: payload.receiver.iceCandidates, + dtlsParameters: payload.receiver.dtlsParameters, + appData: { + userId: this.user.id, + }, + }); + this.producersToConsume = payload.producers; // producers to server as consumers in client + this.senderTransport = senderTransport; + this.receiverTransport = receiverTransPort; + this.setUpSenderTransport(); + this.setUpReceiverTransport(); + this.emit(SfuService.TRANSPORT_CONNECTED_EVENT); + }, + ); + + this.messageDispatcher.registerHandler("ROOM_ERROR", (payload) => { + console.error("Room error", payload.message); + }); + + + + this.messageDispatcher.registerHandler( + 'WEBRTC_NEW_PRODUCER', + async (payload) => { + try { + console.log('New producer to consume', payload.producerId); + this.producersToConsume.push({ + id: payload.producerId, + userId: payload.userId, + }); + const consumer = await this.createSingleConsumer({ + id: payload.producerId, + userId: payload.userId, + }); + const consumerUserId = consumer.appData.userId as string; + const userConsumer = this.consumers.get(consumerUserId) ?? { + id: consumerUserId, + audioConsumer: null, + videoConsumer: null, + }; + + if (consumer.kind === 'audio') { + userConsumer.audioConsumer = consumer; + } else { + userConsumer.videoConsumer = consumer; + } + + this.consumers.set(consumerUserId, userConsumer); + this.emit(SfuService.NEW_CONSUMER_EVENT, userConsumer); + } catch (error) { + console.error('Failed to create consumer:', error); + } + }, + ); + + this.messageDispatcher.registerHandler( + 'WEBRTC_USER_DISCONNECTED', + (payload) => { + const userId = payload.userId as string; + const userConsumer = this.consumers.get(userId); + + console.log('User disconnected', userId, userConsumer); + + // Close both consumers + if (userConsumer?.audioConsumer) { + userConsumer.audioConsumer.close(); + } + if (userConsumer?.videoConsumer) { + userConsumer.videoConsumer.close(); + } + this.consumers.delete(userId); + this.emit(SfuService.CONSUMER_CLOSED_EVENT, userId); + }, + ); + + this.webSocket.onclose = () => { + this.cleanUp(); + }; + } + + cleanUp = () => { + // Close WebSocket connection + if (this.webSocket.readyState === WebSocket.OPEN) { + this.webSocket.close(); + } + + // Stop producer if it exists + if (this.videoProducer) { + this.videoProducer.close(); + this.videoProducer = null; + } + + if (this.audioProducer) { + this.audioProducer.close(); + this.audioProducer = null; + } + + // Stop all consumers + this.consumers.forEach((consumer) => { + if (consumer.audioConsumer) { + consumer.audioConsumer.close(); + } + if (consumer.videoConsumer) { + consumer.videoConsumer.close(); + } + }); + this.consumers = new Map(); + + // Close sender transport if it exists + if (this.senderTransport) { + this.senderTransport.close(); + this.senderTransport = null; + } + + // Close receiver transport if it exists + if (this.receiverTransport) { + this.receiverTransport.close(); + this.receiverTransport = null; + } + }; + + setUpSenderTransport = async () => { + if (!this.senderTransport) { + throw new Error('Sender transport is not created'); + } + + this.senderTransport!.on( + 'connect', + async ({ dtlsParameters }, callback, errback) => { + // Here we must communicate our local parameters to our remote transport. + try { + this.webSocket.send( + JSON.stringify({ + type: 'WEBRTC_CONNECT', + payload: { + transportId: this.senderTransport!.id, + dtlsParameters, + }, + }), + ); + + // Done in the server, tell our transport. + const deregisterHandler = this.messageDispatcher.registerHandler( + 'WEBRTC_CONNECT_RESPONSE', + (payload: Record) => { + if (payload.transportId === this.senderTransport!.id) { + if(payload.success === true) { + callback(); + } else { + errback(new Error('Failed to connect transport')); + } + deregisterHandler(); + } + }, + ); + } catch (error) { + // Something was wrong in server side. + errback(error as Error); + } + }, + ); + + this.senderTransport!.on( + 'produce', + async ({ kind, rtpParameters, appData }, callback, errback) => { + try { + this.webSocket.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCER', + payload: { + transportId: this.senderTransport!.id, + kind, + rtpParameters, + appData: { + ...appData, + userId: this.user.id, + }, + }, + }), + ); + const deregisterHandler = this.messageDispatcher.registerHandler( + 'WEBRTC_PRODUCER_RESPONSE', + (payload: Record) => { + if (payload.transportId === this.senderTransport!.id) { + if(payload.success) { + callback({ id: payload.producerId }); + } else { + errback(new Error('Failed to create producer')); + } + deregisterHandler(); + } + }, + ); + } catch (error) { + errback(error as Error); + } + }, + ); + }; + + setUpReceiverTransport = async () => { + if (!this.receiverTransport) { + throw new Error('Receiver transport is not created'); + } + + this.receiverTransport!.on( + 'connect', + async ({ dtlsParameters }, callback, errback) => { + // Here we must communicate our local parameters to our remote transport. + try { + this.webSocket.send( + JSON.stringify({ + type: 'WEBRTC_CONNECT', + payload: { + transportId: this.receiverTransport!.id, + dtlsParameters, + }, + }), + ); + // Done in the server, tell our transport. + const deregisterHandler = this.messageDispatcher.registerHandler( + 'WEBRTC_CONNECT_RESPONSE', + (payload: Record) => { + if (payload.transportId === this.receiverTransport!.id) { + if(payload.success === true) { + callback(); + } else { + errback(new Error('Failed to connect transport')); + } + deregisterHandler(); + } + }, + ); + } catch (error) { + // Something was wrong in server side. + errback(error as Error); + } + }, + ); + }; + + createProducer = async () => { + try { + if (!this.senderTransport) { + throw new Error('Sender transport is not created'); + } + + const stream = await navigator.mediaDevices.getUserMedia({ + video: true, + audio: true, + }); + + const videoTrack = stream.getVideoTracks()[0]; + const audioTrack = stream.getAudioTracks()[0]; + + const params = { + encodings: [ + { + scalabilityMode: 'L1T3' + } + ], + codecOptions: { + videoGoogleStartBitrate: 1000, + }, + }; + const videoProducer = await this.senderTransport.produce({ + ...params, + track: videoTrack, + appData: { + userId: this.user.id, + }, + }); + + this.videoProducer = videoProducer; + + try { + const audioProducer = await this.senderTransport.produce({ + track: audioTrack, + appData: { + userId: this.user.id, + }, + + }); + this.audioProducer = audioProducer; + } catch (error) { + console.log('Error creating audio producer', error); + } + + return { + videoProducer: this.videoProducer, + audioProducer: this.audioProducer, + }; + } catch (error) { + console.log('Error creating producer', error); + return { + videoProducer: this.videoProducer, + audioProducer: this.audioProducer, + } + } + }; + + createConsumer = async () => { + if (!this.receiverTransport) { + throw new Error('Receiver transport is not created'); + } + console.log("PRODUCERS TO CONSUME", this.producersToConsume); + const consumersPromises = this.producersToConsume.map( + async (producerInfo) => { + try { + const consumer = await this.createSingleConsumer(producerInfo); + return consumer; + } catch(error) { + console.error(`Error creating consumer for ${producerInfo.userId}`, error); + return null; + } + }, + ); + + const allConsumers = await Promise.all(consumersPromises); + + allConsumers.forEach((consumer) => { + + if(!consumer) { + console.log('Consumer is null'); + return; + } + + const consumerUserId = consumer.appData.userId as string; + + const userConsumer = this.consumers.get(consumerUserId) ?? { + id: consumerUserId, + videoConsumer: null, + audioConsumer: null, + }; + + if (consumer.kind == 'audio') { + userConsumer.audioConsumer = consumer; + } else { + userConsumer.videoConsumer = consumer; + } + + this.consumers.set(consumerUserId, userConsumer); + }); + + return this.consumers.values(); + }; + + createSingleConsumer = async ({id, userId} : {id: string,userId:string}) : Promise> => { + console.log("Creating consumer for producer", id, userId); + const consumer = new Promise>((resolve, reject) => { + this.webSocket.send( + JSON.stringify({ + type: 'WEBRTC_CONSUMER', + payload: { + transportId: this.receiverTransport!.id, + producerId: id, + rtpCapabilities: this.device.rtpCapabilities, + }, + }), + ); + + const deregisterHandler = this.messageDispatcher.registerHandler( + 'WEBRTC_CONSUMER_RESPONSE', + async (payload: Record) => { + try { + if (payload.transportId === this.receiverTransport!.id && payload.producerId === id) { + if(!payload.success) { + reject(new Error('Failed to create consumer')); + } + const consumer = await this.receiverTransport!.consume({ + id: payload.consumerId, + producerId: payload.producerId, + rtpParameters: payload.rtpParameters, + kind: payload.kind, + appData: { + userId: userId, + }, + }); + // send ack to sever for resuming consumer on server side + this.webSocket.send( + JSON.stringify({ + type: 'WEBRTC_RESUME_CONSUMER', + payload: { + consumerId: consumer.id, + }, + }), + ); + deregisterHandler(); + resolve(consumer); + } + } catch (error) { + reject(error); + deregisterHandler(); + } + }, + ); + + setTimeout(() => { + reject(new Error('Consumer creation timeout for ' + id)); + deregisterHandler(); + },10000); + }); + return consumer; + }; +} diff --git a/apps/sfu/package.json b/apps/sfu/package.json new file mode 100644 index 00000000..43f915a0 --- /dev/null +++ b/apps/sfu/package.json @@ -0,0 +1,22 @@ +{ + "name": "sfu", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "build": "npx esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js", + "start": "MEDIASOUP_WORKER_BIN=\"$(node -p 'path.resolve(__dirname, \"../../node_modules/mediasoup/worker/out/Release/mediasoup-worker\")')\" node dist/index.js", + "dev": "npm run build && npm run start" + }, + "author": "", + "license": "ISC", + "dependencies": { + "crypto": "^1.0.1", + "jsonwebtoken": "^9.0.2", + "mediasoup": "^3.14.6", + "ws": "^8.17.0" + }, + "devDependencies": { + "cross-env": "^7.0.3" + } +} diff --git a/apps/sfu/src/Room.ts b/apps/sfu/src/Room.ts new file mode 100644 index 00000000..e3e1b2cc --- /dev/null +++ b/apps/sfu/src/Room.ts @@ -0,0 +1,467 @@ +import { + AppData, + Consumer, + Producer, + Router, + WebRtcTransport, +} from 'mediasoup/node/lib/types'; +import User from './User'; +import RoomManager from './RoomManager'; + + +class Room { + + router: Router; + + id: string; + + users: User[]; + + transports: WebRtcTransport<{userId: string}>[]; + producers: Producer<{ userId: string }>[]; + consumers: Consumer<{ userId: string }>[]; + + constructor(router: Router, id: string) { + this.router = router; + this.users = []; + this.transports = []; + this.producers = []; + this.consumers = []; + this.id = id; + } + + async addPeer(user: User) { + if(this.users.find(u => u.id === user.id)) { + console.log('User already exists in the room'); + user.ws.send( + JSON.stringify({ + type: 'ROOM_ERROR', + payload: { + message: 'User already exists in the room', + success: false, + }, + }), + ); + return; + } + this.addRoomEventHandlers(user); + // create a transport for each user + let senderTransport = await this.createWebRtcTransport(user.id); + let receiverTransport = await this.createWebRtcTransport(user.id); + const { + id: senderId, + iceParameters: senderIceParameters, + iceCandidates: senderIceCandidates, + dtlsParameters: senderDtlsParameters, + } = senderTransport; + const { + id: receiverId, + iceParameters: receiverIceParameters, + iceCandidates: receiverIceCandidates, + dtlsParameters: receiverDtlsParameters, + } = receiverTransport; + // Send the transport information to the client + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_TRANSPORT', + payload: { + sender: { + id: senderId, + iceParameters: senderIceParameters, + iceCandidates: senderIceCandidates, + dtlsParameters: senderDtlsParameters, + }, + receiver: { + id: receiverId, + iceParameters: receiverIceParameters, + iceCandidates: receiverIceCandidates, + dtlsParameters: receiverDtlsParameters, + }, + routerRtpCapabilities: this.router.rtpCapabilities, + producers: this.producers.map((producer) => { + return { + id: producer.id, + userId: producer.appData.userId, + }; + }), + }, + }), + ); + this.transports.push(senderTransport); + this.transports.push(receiverTransport); + this.users.push(user); + } + + addRoomEventHandlers(user: User) { + console.log('Adding event handlers for user', user.id); + user.ws.on('message', (data) => { + const message = JSON.parse(data.toString()); + switch (message.type) { + case 'LEAVE_ROOM': + this.removeUser(user); + break; + case 'WEBRTC_CONNECT': + this.connectPeer(user, message.payload); + break; + case 'WEBRTC_PRODUCER': + this.initiliseProducer(user, message.payload); + break; + case 'WEBRTC_CONSUMER': + this.initiliseConsumer(user, message.payload); + break; + case 'WEBRTC_LIST_PRODUCERS': + this.listProducers(user); + break; + case 'WEBRTC_RESUME_CONSUMER': + this.resumeConsumer(user, message.payload); + break; + } + console.log( + 'HERE', + this.producers.map((p) => { + return { + id: p.id, + appData: p.appData, + }; + }), + ); + }); + + user.ws.on('close', () => { + this.removeUser(user); + }); + } + resumeConsumer(user: User, payload: any) { + const consumer = this.consumers.find((c) => c.id === payload.consumerId); + if (!consumer) { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_RESUME_CONSUMER_RESPONSE', + payload: { + message: 'Consumer not found', + success: false, + }, + }), + ); + return; + } + consumer.resume(); + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_RESUME_CONSUMER_RESPONSE', + payload: { + message: 'Consumer resumed', + success: true, + }, + }), + ); + } + + listProducers(user: User) { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCERS', + payload: { + producers: this.producers.map((producer) => { + return { + id: producer.id, + userId: producer.appData.userId, + }; + }), + }, + }), + ); + } + + async initiliseConsumer(user: User, payload: any) { + try { + const { transportId, producerId, rtpCapabilities } = payload; + const consumerTransport = this.transports.find(transport => transport.id === transportId); + if (!consumerTransport) { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONSUMER_RESPONSE', + payload: { + message: 'Transport not found', + success: false + }, + }), + ); + return; + } + + const producer = this.producers.find((p) => p.id === producerId); + + if (!producer) { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONSUMER_RESPONSE', + payload: { + message: 'Producer not found', + success: false, + }, + }), + ); + return; + } + + const consumer = await consumerTransport.consume({ + producerId, + rtpCapabilities, + paused: true, + appData: { + userId: producer.appData.userId, + }, + }); + + this.consumers.push(consumer); + + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONSUMER_RESPONSE', + payload: { + consumerId: consumer.id, + producerId: producerId, + kind: consumer.kind, + rtpParameters: consumer.rtpParameters, + transportId: consumerTransport.id, + appData: consumer.appData, + success: true, + }, + }), + ); + } catch (error) { + console.error(error); + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONSUMER_RESPONSE', + payload: { + message: 'Failed to create consumer', + success: false, + }, + }), + ); + } + } + + async initiliseProducer(user: User, payload: any) { + try { + const { transportId, kind, rtpParameters, appData } = payload; + const transport = this.transports.find(transport => transport.id === transportId); + if (!transport) { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCER_RESPONSE', + payload: { + message: 'Transport not found', + success: false, + }, + }), + ); + return; + } + + if (appData && appData.userId !== user.id) { + console.log('Invalid user id', appData.userId, user.id); + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCER_RESPONSE', + payload: { + message: 'Invalid user id', + success: false, + }, + }), + ); + return; + } + + // create producer + const producer = await transport.produce({ + kind, + rtpParameters, + appData, + }); + // send producer id to the client + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCER_RESPONSE', + payload: { + producerId: producer.id, + userId: producer.appData.userId, + transportId, + success: true, + }, + }), + ); + this.producers.push(producer); + + // notify all the clients about the new producer + this.users.forEach((u) => { + if (u.id !== user.id) { + console.log('sending to', user.id); + u.ws.send( + JSON.stringify({ + type: 'WEBRTC_NEW_PRODUCER', + payload: { + producerId: producer.id, + userId: producer.appData.userId, + }, + }), + ); + } + }); + } catch (error) { + console.error(error); + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_PRODUCER_RESPONSE', + payload: { + message: 'Failed to create producer', + transportId: payload.transportId, + success: false, + }, + }), + ); + } + } + + connectPeer(user: User, payload: any) { + const { transportId, dtlsParameters } = payload; + const transport = this.transports.find(transport => transport.id === transportId); + // establish connection between the client and the server + try { + if (!transport) { + throw new Error('Transport not found'); + } + transport.connect({ dtlsParameters }).then(() => { + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONNECT_RESPONSE', + payload: { + transportId, + success: true, + }, + }), + ); + }); + } catch (error) { + console.error(error); + user.ws.send( + JSON.stringify({ + type: 'WEBRTC_CONNECT_RESPONSE', + payload: { + message: 'Failed to connect to client', + transportId, + success: false, + }, + }), + ); + } + } + + async createWebRtcTransport(userId: string) { + // Server will establish peer to peer connection with each client + // and hence we need to create a transport for each client + const transport = await this.router.createWebRtcTransport({ + preferUdp: true, + listenIps: [ + { + ip: '127.0.0.1', + announcedIp: undefined, + }, + ], + appData: { + userId: userId + } + }); + return transport; + } + + removeUser(user: User) { + this.users = this.users.filter((u) => u.id !== user.id); + + const transportsToRemove = Array.from(this.transports.values()).filter((transport) => { + return transport.appData.userId === user.id; + }); + + const producersToRemove = this.producers.filter( + (p) => p.appData.userId === user.id, + ); + + const consumersToRemove = this.consumers.filter( + (c) => c.appData.userId === user.id, + ); + + console.log( + 'Removing producers', + `User ${user.id} disconnected. Removing ${producersToRemove.length} producers`, + producersToRemove.map((p) => { + return { + id: p.id, + appData: p.appData, + }; + }), + consumersToRemove.map((c) => { + return { + id: c.id, + appData: c.appData, + }; + }), + transportsToRemove.map((t) => { + return { + id: t.id, + appData: t.appData, + }; + }) + ); + + // close the producers + producersToRemove.forEach((prd) => { + prd.close(); + }); + + // close the consumers + consumersToRemove.forEach((c) => { + c.close(); + }); + + // close the transports + transportsToRemove.forEach((t) => { + // Close the transport and remove it from the map + t.close(); + }); + + // remove the producer from the list + this.producers = this.producers.filter((p) => p.appData.userId !== user.id); + + // remove the consumers for the user + this.consumers = this.consumers.filter((c) => c.appData.userId !== user.id); + + // remove the transports for the user + this.transports = this.transports.filter((t) => t.appData.userId !== user.id); + + + // notify all the clients about the removed producer + this.users.forEach((u) => { + if (u.id !== user.id) { + u.ws.send( + JSON.stringify({ + type: 'WEBRTC_USER_DISCONNECTED', + payload: { + userId: user.id, + }, + }), + ); + } + }); + + if(this.users.length === 0) { + console.log('No users left in the room. Closing the room'); + this.router.close(); + RoomManager.getInstance().deleteRoom(this.id); + } + + } +} + +export default Room; diff --git a/apps/sfu/src/RoomManager.ts b/apps/sfu/src/RoomManager.ts new file mode 100644 index 00000000..aa95dc03 --- /dev/null +++ b/apps/sfu/src/RoomManager.ts @@ -0,0 +1,48 @@ +import { createRouter } from "."; +import Room from "./Room"; + +class RoomManager { + + static instance: RoomManager; + + static getInstance() { + if (!RoomManager.instance) { + console.log('Creating new RoomManager instance'); + RoomManager.instance = new RoomManager(); + } + return RoomManager.instance; + } + + rooms: Map; + + constructor() { + this.rooms = new Map(); + } + + async createRoom(roomId: string) { + const router = await createRouter(); + const room = new Room(router, roomId); + this.rooms.set(roomId, room); + return room; + } + + async getOrCreateRoom(roomId: string) { + let room = this.rooms.get(roomId); + if (!room) { + console.log(`Creating new room ${roomId}`); + room = await this.createRoom(roomId); + } + return room; + } + + getRoom(roomId: string) { + return this.rooms.get(roomId); + } + + deleteRoom(roomId: string) { + console.log(`Deleting room ${roomId}`); + this.rooms.delete(roomId); + } +} + +export default RoomManager; \ No newline at end of file diff --git a/apps/sfu/src/User.ts b/apps/sfu/src/User.ts new file mode 100644 index 00000000..e787b482 --- /dev/null +++ b/apps/sfu/src/User.ts @@ -0,0 +1,40 @@ +import { createRouter } from "./index"; +import Room from "./Room"; +import { WebSocket } from "ws"; +import { randomUUID } from 'crypto'; +import RoomManager from "./RoomManager"; + + +class User { + id: string; + ws: WebSocket; + sid: string; + constructor(id: string, ws: WebSocket) { + this.id = id; + this.ws = ws; + this.sid = randomUUID(); + this.handler(); + } + + + + handler() { + this.ws.on('message', async (data) => { + const message = JSON.parse(data.toString()); + switch (message.type) { + case 'JOIN_ROOM': + const roomId = message.payload.roomId; + const roomManager = RoomManager.getInstance(); + const room = await roomManager.getOrCreateRoom(roomId); + room.addPeer(this); + console.log(`User ${this.id} joined room ${roomId}`); + break; + } + }); + + } + + } + + export default User; + diff --git a/apps/sfu/src/index.ts b/apps/sfu/src/index.ts new file mode 100644 index 00000000..2e5867b3 --- /dev/null +++ b/apps/sfu/src/index.ts @@ -0,0 +1,94 @@ +import { WebSocketServer, WebSocket} from 'ws'; +import url from 'url'; +import jwt from 'jsonwebtoken'; +import {createWorker} from 'mediasoup'; +import { MediaKind } from 'mediasoup/node/lib/RtpParameters'; +import User from './User'; +import { cpus } from 'os'; +import { Router } from 'mediasoup/node/lib/Router'; +import { Worker } from 'mediasoup/node/lib/Worker'; +import { AppData } from 'mediasoup/node/lib/types'; + + +const wss = new WebSocketServer({ port: 8081 }); + +const JWT_SECRET = process.env.JWT_SECRET || 'your_secret_key'; + +export const extractUserId = (token: string) => { + const decoded = jwt.verify(token, JWT_SECRET) as { userId: string }; + return decoded.userId; +}; + +const mediaCodecs = [ + { + kind: 'audio' as MediaKind, + mimeType: 'audio/opus', + clockRate: 48000, + channels: 2, + }, + { + kind: 'video' as MediaKind, + mimeType: 'video/H264', + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '42e01f', + 'level-asymmetry-allowed': 1, + }, + } +]; + +const numCPUs = cpus().length; +const workers: Worker[] = []; + + + +async function createWorkers() { + // Each worker runs as a separate process on a single cpu core + for (let i = 0; i < numCPUs; i++) { + workers.push(await createWorker({ + rtcMinPort: 40000, + rtcMaxPort: 49999, + logLevel: 'warn', + })); + } +} + +createWorkers().then(() => { + console.log(`${workers.length} Workers created`); +}).catch((err) => { + console.error('Error creating workers', err); + process.exit(1); +});; + +let workerIndex = 0; + +export async function createRouter() { + const worker = workers[workerIndex]; + const router = await worker.createRouter({ mediaCodecs }); + workerIndex = (workerIndex + 1) % workers.length; + return router; +} + +const users: { [key: string]: User } = {}; + +wss.on('connection', async function connection(ws, req) { + //@ts-ignore + const token: string = url.parse(req.url, true).query.token; + + console.log(token); + + const userId = extractUserId(token); + + let user = new User(userId, ws); + + users[user.sid] = user; + + user.ws.on('close', () => { + delete users[user.sid]; + console.log(`User ${userId} for ${user.sid} disconnected`); + }); +}); + +console.log('done'); + diff --git a/apps/sfu/tsconfig.json b/apps/sfu/tsconfig.json new file mode 100644 index 00000000..bd51ee2e --- /dev/null +++ b/apps/sfu/tsconfig.json @@ -0,0 +1,109 @@ +{ + "compilerOptions": { + /* Visit https://aka.ms/tsconfig to read more about this file */ + + /* Projects */ + // "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */ + // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ + // "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */ + // "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */ + // "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */ + // "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */ + + /* Language and Environment */ + "target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ + // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ + // "jsx": "preserve", /* Specify what JSX code is generated. */ + // "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */ + // "emitDecoratorMetadata": true, /* Emit design-type metadata for decorated declarations in source files. */ + // "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */ + // "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */ + // "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using 'jsx: react-jsx*'. */ + // "reactNamespace": "", /* Specify the object invoked for 'createElement'. This only applies when targeting 'react' JSX emit. */ + // "noLib": true, /* Disable including any library files, including the default lib.d.ts. */ + // "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */ + // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ + + /* Modules */ + "module": "commonjs", /* Specify what module code is generated. */ + "rootDir": "./src", /* Specify the root folder within your source files. */ + // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ + // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ + // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ + // "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */ + // "types": [], /* Specify type package names to be included without being referenced in a source file. */ + // "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */ + // "moduleSuffixes": [], /* List of file name suffixes to search when resolving a module. */ + // "allowImportingTsExtensions": true, /* Allow imports to include TypeScript file extensions. Requires '--moduleResolution bundler' and either '--noEmit' or '--emitDeclarationOnly' to be set. */ + // "resolvePackageJsonExports": true, /* Use the package.json 'exports' field when resolving package imports. */ + // "resolvePackageJsonImports": true, /* Use the package.json 'imports' field when resolving imports. */ + // "customConditions": [], /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */ + // "resolveJsonModule": true, /* Enable importing .json files. */ + // "allowArbitraryExtensions": true, /* Enable importing files with any extension, provided a declaration file is present. */ + // "noResolve": true, /* Disallow 'import's, 'require's or ''s from expanding the number of files TypeScript should add to a project. */ + + /* JavaScript Support */ + // "allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */ + // "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */ + // "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */ + + /* Emit */ + // "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */ + // "declarationMap": true, /* Create sourcemaps for d.ts files. */ + // "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */ + // "sourceMap": true, /* Create source map files for emitted JavaScript files. */ + // "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */ + // "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */ + "outDir": "./dist", /* Specify an output folder for all emitted files. */ + // "removeComments": true, /* Disable emitting comments. */ + // "noEmit": true, /* Disable emitting files from a compilation. */ + // "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */ + // "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types. */ + // "downlevelIteration": true, /* Emit more compliant, but verbose and less performant JavaScript for iteration. */ + // "sourceRoot": "", /* Specify the root path for debuggers to find the reference source code. */ + // "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */ + // "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */ + // "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */ + // "newLine": "crlf", /* Set the newline character for emitting files. */ + // "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */ + // "noEmitHelpers": true, /* Disable generating custom helper functions like '__extends' in compiled output. */ + // "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */ + // "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */ + // "declarationDir": "./", /* Specify the output directory for generated declaration files. */ + // "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */ + + /* Interop Constraints */ + // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ + // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ + // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ + "esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */ + // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ + "forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */ + + /* Type Checking */ + "strict": true, /* Enable all strict type-checking options. */ + // "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */ + // "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */ + // "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */ + // "strictBindCallApply": true, /* Check that the arguments for 'bind', 'call', and 'apply' methods match the original function. */ + // "strictPropertyInitialization": true, /* Check for class properties that are declared but not set in the constructor. */ + // "noImplicitThis": true, /* Enable error reporting when 'this' is given the type 'any'. */ + // "useUnknownInCatchVariables": true, /* Default catch clause variables as 'unknown' instead of 'any'. */ + // "alwaysStrict": true, /* Ensure 'use strict' is always emitted. */ + // "noUnusedLocals": true, /* Enable error reporting when local variables aren't read. */ + // "noUnusedParameters": true, /* Raise an error when a function parameter isn't read. */ + // "exactOptionalPropertyTypes": true, /* Interpret optional property types as written, rather than adding 'undefined'. */ + // "noImplicitReturns": true, /* Enable error reporting for codepaths that do not explicitly return in a function. */ + // "noFallthroughCasesInSwitch": true, /* Enable error reporting for fallthrough cases in switch statements. */ + // "noUncheckedIndexedAccess": true, /* Add 'undefined' to a type when accessed using an index. */ + // "noImplicitOverride": true, /* Ensure overriding members in derived classes are marked with an override modifier. */ + // "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */ + // "allowUnusedLabels": true, /* Disable error reporting for unused labels. */ + // "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */ + + /* Completeness */ + // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ + "skipLibCheck": true /* Skip type checking all .d.ts files. */ + } +} From 3bc7bfaa1418ea3db9ac84a10873ec5d6b2c25d4 Mon Sep 17 00:00:00 2001 From: Ruchir28 Date: Fri, 24 May 2024 15:59:34 +0530 Subject: [PATCH 2/3] Added Types for Websocket Messages --- apps/frontend/package.json | 1 + apps/frontend/src/hooks/useSfu.ts | 8 +- apps/frontend/src/screens/Meet.tsx | 84 ++-- apps/frontend/src/utils/MessageDispatcher.ts | 21 - .../src/utils/SfuMessageDispatcher.ts | 26 ++ apps/frontend/src/utils/SfuService.ts | 261 ++++++------ apps/sfu/package.json | 3 +- apps/sfu/src/Room.ts | 393 +++++++----------- apps/sfu/src/User.ts | 4 +- apps/sfu/src/index.ts | 32 +- apps/sfu/tsconfig.json | 108 +---- packages/common/package.json | 14 + packages/common/sfu/ClientMessageTypes.ts | 136 ++++++ packages/common/sfu/ServerMessageTypes.ts | 69 +++ packages/common/sfu/index.ts | 2 + packages/common/tsconfig.json | 3 + packages/store/package.json | 3 +- 17 files changed, 615 insertions(+), 553 deletions(-) delete mode 100644 apps/frontend/src/utils/MessageDispatcher.ts create mode 100644 apps/frontend/src/utils/SfuMessageDispatcher.ts create mode 100644 packages/common/package.json create mode 100644 packages/common/sfu/ClientMessageTypes.ts create mode 100644 packages/common/sfu/ServerMessageTypes.ts create mode 100644 packages/common/sfu/index.ts create mode 100644 packages/common/tsconfig.json diff --git a/apps/frontend/package.json b/apps/frontend/package.json index 7d207831..fb122dd4 100644 --- a/apps/frontend/package.json +++ b/apps/frontend/package.json @@ -16,6 +16,7 @@ "@radix-ui/react-slot": "^1.0.2", "@repo/store": "*", "@repo/ui": "*", + "@repo/common" : "*", "chess.js": "^1.0.0-beta.8", "class-variance-authority": "^0.7.0", "clsx": "^2.1.0", diff --git a/apps/frontend/src/hooks/useSfu.ts b/apps/frontend/src/hooks/useSfu.ts index 6056819a..0bee5ce8 100644 --- a/apps/frontend/src/hooks/useSfu.ts +++ b/apps/frontend/src/hooks/useSfu.ts @@ -1,7 +1,8 @@ import { useEffect, useRef, useState } from 'react'; import { SfuService, UserConsumer } from '../utils/SfuService'; import { Producer } from 'mediasoup-client/lib/types'; -import { User } from '../../../../packages/store/src/atoms/user'; +import { User } from '@repo/store/user'; +import { CustomAppData } from '@repo/common/sfu'; export const useSfu = ( roomId: string, @@ -9,8 +10,8 @@ export const useSfu = ( ) => { const sfuService = useRef(null); - const [videoProducer, setVideoProducer] = useState(); - const [audioProducer, setAudioProducer] = useState(); + const [videoProducer, setVideoProducer] = useState>(); + const [audioProducer, setAudioProducer] = useState>(); const [consumers, setConsumers] = useState([]); const handleNewConsumer = (consumer: UserConsumer) => { @@ -49,7 +50,6 @@ export const useSfu = ( return () => { if (sfuService.current) { - console.log('CALLED CLOSE'); sfuService.current.cleanUp(); sfuService.current.off(SfuService.NEW_CONSUMER_EVENT, handleNewConsumer); sfuService.current.off(SfuService.CONSUMER_CLOSED_EVENT, handleConsumerClosed); diff --git a/apps/frontend/src/screens/Meet.tsx b/apps/frontend/src/screens/Meet.tsx index 2068729d..4ae8b519 100644 --- a/apps/frontend/src/screens/Meet.tsx +++ b/apps/frontend/src/screens/Meet.tsx @@ -1,83 +1,72 @@ import { useUser } from '@repo/store/useUser'; import { useSfu } from '../hooks/useSfu'; -import { useEffect, useRef, useState } from 'react'; +import { useEffect, useRef } from 'react'; import { useParams, useNavigate } from 'react-router-dom'; const Meet = () => { - const user = useUser(); const videoRef = useRef(null); const navigate = useNavigate(); // consumers videos ref - const {roomId} = useParams(); + const { roomId } = useParams(); useEffect(() => { - if(!user) { + if (!user) { navigate('/login'); } - },[user]); - - const { - videoProducer, - audioProducer, - consumers, - } = useSfu(roomId!, user); + }, [user]); + const { videoProducer, audioProducer, consumers } = useSfu(roomId!, user); useEffect(() => { if (!videoRef.current) return; const stream = new MediaStream(); - if(videoProducer && videoProducer.track) { + if (videoProducer && videoProducer.track) { stream.addTrack(videoProducer.track); } - if(audioProducer && audioProducer.track) { + if (audioProducer && audioProducer.track) { stream.addTrack(audioProducer.track); } videoRef.current.srcObject = stream; - }, [videoProducer,audioProducer]); + }, [videoProducer, audioProducer]); if (!user) { return
Redirecting to Login ...
; } return ( -
-

Meeting

-
-

Producer

- { - audioProducer?.pause(); - videoProducer?.pause(); - }} - onPlay={() => { - videoProducer?.resume(); - audioProducer?.resume(); - }} - /> -
-
-

Consumers

- {consumers.map((consumer, index) => ( - console.log('CONSUMER in here', consumer), -
-

Consumer {index + 1} is active

+
+
+

Meeting

+
+
+ {[ + { type: 'Producer', track: videoProducer, audio: audioProducer }, + ...consumers.map((consumer, index) => ({ + type: `Consumer ${index + 1}`, + track: consumer.videoConsumer, + audio: consumer.audioConsumer, + })), + ].map((user, index) => ( +
{ - consumer.audioConsumer?.pause(); - consumer.videoConsumer?.pause(); + user.audio?.pause(); + user.track?.pause(); }} onPlay={() => { - consumer.audioConsumer?.resume(); - consumer.videoConsumer?.resume(); + user.track?.resume(); + user.audio?.resume(); }} /> +

{user.type}

))}
@@ -106,14 +95,14 @@ const VideoPlayer: React.FC<{ videoRef.current.onplay = props.onPlay; videoRef.current.onpause = props.onPause; } - }); + }, [props.audioTrack, props.videoTrack, props.onPlay, props.onPause]); return ( -
+