diff --git a/lkgpt/src/llm.rs b/lkgpt/src/llm.rs index 9e59f44..16d10fd 100644 --- a/lkgpt/src/llm.rs +++ b/lkgpt/src/llm.rs @@ -96,7 +96,8 @@ pub fn run_llm( .build() .unwrap(); - async_runtime.rt.block_on(async { + let rt = async_runtime.rt.clone(); + rt.block_on(async { let mut gpt_resp_stream = llm_channel.client.chat().create_stream(request).await.unwrap(); while let Some(result) = gpt_resp_stream.next().await { diff --git a/lkgpt/src/main.rs b/lkgpt/src/main.rs index 2c7a8eb..2d00626 100644 --- a/lkgpt/src/main.rs +++ b/lkgpt/src/main.rs @@ -2,6 +2,7 @@ mod controls; mod frame_capture; mod llm; +mod room_events; mod server; mod stt; mod tts; @@ -31,7 +32,8 @@ use livekit::{ use bevy::{ app::ScheduleRunnerPlugin, core::Name, core_pipeline::tonemapping::Tonemapping, log::LogPlugin, - prelude::*, render::renderer::RenderDevice, time::common_conditions::on_timer, + prelude::*, render::renderer::RenderDevice, tasks::AsyncComputeTaskPool, + time::common_conditions::on_timer, }; use bevy_panorbit_camera::{PanOrbitCamera, PanOrbitCameraPlugin}; @@ -51,7 +53,8 @@ use serde::{Deserialize, Serialize}; use stt::STT; use crate::{ - controls::WorldControlChannel, llm::LLMChannel, server::RoomData, tts::TTS, video::VideoChannel, + controls::WorldControlChannel, llm::LLMChannel, room_events::handle_room_events, + server::RoomData, tts::TTS, video::VideoChannel, }; pub const LIVEKIT_API_SECRET: &str = "LIVEKIT_API_SECRET"; @@ -69,7 +72,8 @@ pub struct AsyncRuntime { impl FromWorld for AsyncRuntime { fn from_world(_world: &mut World) -> Self { let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - AsyncRuntime { rt: std::sync::Arc::new(rt) } + + Self { rt: std::sync::Arc::new(rt) } } } @@ -140,140 +144,6 @@ pub struct LivekitRoom { room_events: tokio::sync::mpsc::UnboundedReceiver, } -// SYSTEM -pub fn handle_room_events( - async_runtime: Res, - llm_channel: Res, - stt_client: ResMut, - _video_channel: Res, - audio_syncer: ResMut, - mut room_events: ResMut, - _single_frame_data: ResMut, -) { - while let Ok(event) = room_events.room_events.try_recv() { - println!("\n\n🤡received room event {:?}", event); - match event { - RoomEvent::TrackSubscribed { track, publication: _, participant: _user } => { - match track { - RemoteTrack::Audio(audio_track) => { - let audio_rtc_track = audio_track.rtc_track(); - let mut audio_stream = NativeAudioStream::new(audio_rtc_track); - let audio_should_stop = audio_syncer.should_stop.clone(); - let stt_client = stt_client.clone(); - async_runtime.rt.spawn(async move { - while let Some(frame) = audio_stream.next().await { - if audio_should_stop.load(Ordering::Relaxed) { - continue; - } - - let audio_buffer = frame - .data - .iter() - .map(|sample| sample.to_sample::()) - .collect::>(); - - if audio_buffer.is_empty() { - warn!("empty audio frame | {:#?}", audio_buffer); - continue; - } - - if let Err(e) = stt_client.send(audio_buffer) { - error!("Couldn't send audio frame to stt {e}"); - }; - } - }); - }, - RemoteTrack::Video(video_track) => { - let video_rtc_track = video_track.rtc_track(); - let pixel_size = 4; - let mut video_stream = NativeVideoStream::new(video_rtc_track); - - async_runtime.rt.spawn(async move { - // every 10 video frames - let mut i = 0; - while let Some(frame) = video_stream.next().await { - log::error!("🤡received video frame | {:#?}", frame); - // VIDEO FRAME BUFFER (i420_buffer) - let video_frame_buffer = frame.buffer.to_i420(); - let width = video_frame_buffer.width(); - let height = video_frame_buffer.height(); - let rgba_stride = video_frame_buffer.width() * pixel_size; - - let (stride_y, stride_u, stride_v) = video_frame_buffer.strides(); - let (data_y, data_u, data_v) = video_frame_buffer.data(); - - let rgba_buffer = RgbaImage::new(width, height); - let rgba_raw = unsafe { - std::slice::from_raw_parts_mut( - rgba_buffer.as_raw().as_ptr() as *mut u8, - rgba_buffer.len(), - ) - }; - - livekit::webrtc::native::yuv_helper::i420_to_rgba( - data_y, - stride_y, - data_u, - stride_u, - data_v, - stride_v, - rgba_raw, - rgba_stride, - video_frame_buffer.width() as i32, - video_frame_buffer.height() as i32, - ); - - if let Err(e) = rgba_buffer.save(format!("camera/{i}.png")) { - log::error!("Couldn't save video frame {e}"); - }; - i += 1; - } - info!("🤡ended video thread"); - }); - }, - }; - }, - RoomEvent::DataReceived { payload, kind, topic: _, participant: _ } => { - if kind == DataPacketKind::Reliable { - if let Some(payload) = payload.as_ascii() { - let room_text: serde_json::Result = - serde_json::from_str(payload.as_str()); - match room_text { - Ok(room_text) => { - if let Err(e) = - llm_channel.tx.send(format!("[chat]{} ", room_text.message)) - { - error!("Couldn't send the text to gpt {e}") - }; - }, - Err(e) => { - warn!("Couldn't deserialize room text. {e:#?}"); - }, - } - - info!("text from room {:#?}", payload.as_str()); - } - } - }, - // ignoring the participant for now, currently assuming there is only one participant - RoomEvent::TrackMuted { participant: _, publication: _ } => { - audio_syncer.should_stop.store(true, Ordering::Relaxed); - }, - RoomEvent::TrackUnmuted { participant: _, publication: _ } => { - audio_syncer.should_stop.store(false, Ordering::Relaxed); - }, - // RoomEvent::ActiveSpeakersChanged { speakers } => { - // if speakers.is_empty() { - // audio_syncer.should_stop.store(true, Ordering::Relaxed); - // } - // let is_main_participant_muted = speakers.iter().any(|speaker| speaker.name() != "kitt"); - // audio_syncer.should_stop.store(is_main_participant_muted, Ordering::Relaxed); - // } - _ => info!("received room event {:?}", event), - } - } -} - pub struct TracksPublicationData { pub video_src: NativeVideoSource, pub video_pub: LocalTrackPublication, @@ -335,7 +205,7 @@ fn setup_gaussian_cloud( ) { // let remote_file = Some("https://huggingface.co/datasets/cs50victor/splats/resolve/main/train/point_cloud/iteration_7000/point_cloud.gcloud"); // TODO: figure out how to load remote files later - let splat_file = "splats/train/point_cloud/iteration_7000/point_cloud.gcloud"; + let splat_file = "splats/bonsai/point_cloud/iteration_7000/point_cloud.gcloud"; log::info!("loading {}", splat_file); let cloud = asset_server.load(splat_file.to_string()); @@ -350,11 +220,16 @@ fn setup_gaussian_cloud( String::from("main_scene"), ); - commands.spawn((GaussianSplattingBundle { cloud, ..default() }, Name::new("gaussian_cloud"))); + let gs = GaussianSplattingBundle { cloud, ..default() }; + commands.spawn((gs, Name::new("gaussian_cloud"))); commands.spawn(( Camera3dBundle { - transform: Transform::from_translation(Vec3::new(0.0, 1.5, 5.0)), + transform: Transform { + translation: Vec3::new(-0.59989005, -0.88360703, -2.0863006), + rotation: Quat::from_xyzw(-0.97177905, -0.026801618, 0.13693734, -0.1901983), + scale: Vec3::new(1.0, 1.0, 1.0), + }, tonemapping: Tonemapping::None, camera: Camera { target: render_target, ..default() }, ..default() @@ -375,6 +250,7 @@ pub fn sync_bevy_and_server_resources( mut server_state_clone: ResMut, mut set_app_state: ResMut>, scene_controller: Res, + audio_syncer: Res, ) { if !server_state_clone.dirty { let participant_room_name = &(server_state_clone.state.lock().0).clone(); @@ -399,22 +275,46 @@ pub fn sync_bevy_and_server_resources( info!("initializing required bevy resources"); - let tts = async_runtime.rt.block_on(TTS::new(audio_src)).unwrap(); let llm_channel = LLMChannel::new(); let llm_tx = llm_channel.tx.clone(); + let llm_channel_tx = llm_tx.clone(); + + let tts = async_runtime.rt.block_on(TTS::new(audio_src)).unwrap(); + let stt = async_runtime.rt.block_on(STT::new(llm_tx)).unwrap(); + let video_channel = VideoChannel::new(); commands.insert_resource(llm_channel); commands.init_resource::(); - let stt = async_runtime.rt.block_on(STT::new(llm_tx)).unwrap(); - commands.insert_resource(stt); + commands.insert_resource(stt.clone()); commands.init_resource::(); commands.insert_resource(tts); commands.insert_resource(stream_frame_data); - commands.insert_resource(livekit_room); + // commands.insert_resource(livekit_room); set_app_state.set(AppState::Active); + + let audio_syncer = audio_syncer.should_stop.clone(); + let rt = async_runtime.rt.clone(); + async_runtime.rt.spawn(handle_room_events( + rt, + llm_channel_tx, + stt, + video_channel, + audio_syncer, + livekit_room, + 4, + )); + /* + async_runtime: Res, + llm_channel: Res, + stt_client: ResMut, + _video_channel: Res, + audio_syncer: ResMut, + mut room_events: ResMut, + single_frame_data: ResMut, + */ server_state_clone.dirty = true; }, Err(e) => { @@ -488,18 +388,18 @@ fn main() { app.init_resource::(); app.add_event::(); - // app.add_systems(Update, move_camera); + app.add_systems(Update, move_camera); app.add_systems(Update, server::shutdown_bevy_remotely); - app.add_systems( - Update, - handle_room_events - .run_if(resource_exists::()) - .run_if(resource_exists::()) - .run_if(resource_exists::()) - .run_if(resource_exists::()), - ); + // app.add_systems( + // Update, + // room_events::handle_room_events + // .run_if(resource_exists::()) + // .run_if(resource_exists::()) + // .run_if(resource_exists::()) + // .run_if(resource_exists::()), + // ); app.add_systems( Update, @@ -514,13 +414,15 @@ fn main() { sync_bevy_and_server_resources.run_if(on_timer(std::time::Duration::from_secs(2))), ); - // app.add_systems(OnEnter(AppState::Active), setup_gaussian_cloud); + app.add_systems(OnEnter(AppState::Active), setup_gaussian_cloud); app.run(); } fn move_camera(mut camera: Query<&mut Transform, With>) { for mut transform in camera.iter_mut() { - transform.translation.x += 5.0; + transform.translation.x += 0.0005; + transform.translation.y += 0.0005; + transform.translation.z += 0.0005; } } diff --git a/lkgpt/src/room_events.rs b/lkgpt/src/room_events.rs new file mode 100644 index 0000000..26bce41 --- /dev/null +++ b/lkgpt/src/room_events.rs @@ -0,0 +1,206 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use bevy::ecs::system::{Res, ResMut}; +use futures::{future, FutureExt, StreamExt}; +use image::RgbaImage; +use livekit::{ + track::RemoteTrack, + webrtc::{ + audio_stream::native::NativeAudioStream, video_frame::VideoBuffer, + video_stream::native::NativeVideoStream, + }, + DataPacketKind, RoomEvent, +}; +use log::{error, info, warn}; +use rodio::cpal::Sample; +use tokio::runtime::Runtime; + +use crate::{llm, stt::STT, video, AsyncRuntime, AudioSync, LivekitRoom, RoomText}; + +async fn handle_video(mut video_stream: NativeVideoStream, pixel_size: u32) { + // every 10 video frames + let mut i = 0; + info!("📸 handling video"); + while let Some(frame) = video_stream.next().await { + log::error!("🤡received video frame | {:#?}", frame); + // VIDEO FRAME BUFFER (i420_buffer) + let video_frame_buffer = frame.buffer.to_i420(); + let width = video_frame_buffer.width(); + let height = video_frame_buffer.height(); + let rgba_stride = video_frame_buffer.width() * pixel_size; + + let (stride_y, stride_u, stride_v) = video_frame_buffer.strides(); + let (data_y, data_u, data_v) = video_frame_buffer.data(); + + let rgba_buffer = RgbaImage::new(width, height); + let rgba_raw = unsafe { + std::slice::from_raw_parts_mut( + rgba_buffer.as_raw().as_ptr() as *mut u8, + rgba_buffer.len(), + ) + }; + + livekit::webrtc::native::yuv_helper::i420_to_rgba( + data_y, + stride_y, + data_u, + stride_u, + data_v, + stride_v, + rgba_raw, + rgba_stride, + video_frame_buffer.width() as i32, + video_frame_buffer.height() as i32, + ); + + if let Err(e) = rgba_buffer.save(format!("camera/{i}.png")) { + log::error!("Couldn't save video frame {e}"); + }; + i += 1; + } + info!("🤡ended video thread"); +} + +pub async fn handle_room_events( + async_runtime: Arc, + llm_channel_tx: crossbeam_channel::Sender, + stt_client: STT, + _video_channel: video::VideoChannel, + should_stop: Arc, + mut room_events: LivekitRoom, + pixel_size: u32, +) { + while let Some(event) = room_events.room_events.recv().await { + println!("\n\n🤡received room event {:?}", event); + match event { + RoomEvent::TrackSubscribed { track, publication: _, participant: _user } => { + match track { + RemoteTrack::Audio(audio_track) => { + let audio_rtc_track = audio_track.rtc_track(); + let mut audio_stream = NativeAudioStream::new(audio_rtc_track); + let audio_should_stop = should_stop.clone(); + let stt_client = stt_client.clone(); + let rt = async_runtime.clone(); + + std::thread::spawn(move || { + while let Some(frame) = rt.block_on(audio_stream.next()) { + if audio_should_stop.load(Ordering::Relaxed) { + continue; + } + + let audio_buffer = frame + .data + .iter() + .map(|sample| sample.to_sample::()) + .collect::>(); + + if audio_buffer.is_empty() { + warn!("empty audio frame | {:#?}", audio_buffer); + continue; + } + + if let Err(e) = stt_client.send(audio_buffer) { + error!("Couldn't send audio frame to stt {e}"); + }; + } + error!("audio thread ended"); + }); + }, + RemoteTrack::Video(video_track) => { + let video_rtc_track = video_track.rtc_track(); + let mut video_stream = NativeVideoStream::new(video_rtc_track); + let rt = async_runtime.clone(); + + std::thread::spawn(move || { + // every 10 video frames + let mut i = 0; + info!("📸 handling video"); + loop { + while let Some(Some(frame)) = video_stream.next().now_or_never() { + log::error!("🤡received video frame | {:#?}", frame); + // VIDEO FRAME BUFFER (i420_buffer) + let video_frame_buffer = frame.buffer.to_i420(); + let width = video_frame_buffer.width(); + let height = video_frame_buffer.height(); + let rgba_stride = video_frame_buffer.width() * pixel_size; + + let (stride_y, stride_u, stride_v) = + video_frame_buffer.strides(); + let (data_y, data_u, data_v) = video_frame_buffer.data(); + + let rgba_buffer = RgbaImage::new(width, height); + let rgba_raw = unsafe { + std::slice::from_raw_parts_mut( + rgba_buffer.as_raw().as_ptr() as *mut u8, + rgba_buffer.len(), + ) + }; + + livekit::webrtc::native::yuv_helper::i420_to_rgba( + data_y, + stride_y, + data_u, + stride_u, + data_v, + stride_v, + rgba_raw, + rgba_stride, + video_frame_buffer.width() as i32, + video_frame_buffer.height() as i32, + ); + + if let Err(e) = rgba_buffer.save(format!("camera/{i}.png")) { + log::error!("Couldn't save video frame {e}"); + }; + i += 1; + } + } + info!("🤡ended video thread"); + }); + }, + }; + }, + RoomEvent::DataReceived { payload, kind, topic: _, participant: _ } => { + if kind == DataPacketKind::Reliable { + if let Some(payload) = payload.as_ascii() { + let room_text: serde_json::Result = + serde_json::from_str(payload.as_str()); + match room_text { + Ok(room_text) => { + if let Err(e) = + llm_channel_tx.send(format!("[chat]{} ", room_text.message)) + { + error!("Couldn't send the text to gpt {e}") + }; + }, + Err(e) => { + warn!("Couldn't deserialize room text. {e:#?}"); + }, + } + + info!("text from room {:#?}", payload.as_str()); + } + } + }, + // ignoring the participant for now, currently assuming there is only one participant + RoomEvent::TrackMuted { participant: _, publication: _ } => { + should_stop.store(true, Ordering::Relaxed); + }, + RoomEvent::TrackUnmuted { participant: _, publication: _ } => { + should_stop.store(false, Ordering::Relaxed); + }, + // RoomEvent::ActiveSpeakersChanged { speakers } => { + // if speakers.is_empty() { + // should_stop.store(true, Ordering::Relaxed); + // } + // let is_main_participant_muted = speakers.iter().any(|speaker| speaker.name() != "kitt"); + // should_stop.store(is_main_participant_muted, Ordering::Relaxed); + // } + RoomEvent::ConnectionQualityChanged { quality: _, participant: _ } => {}, + _ => info!("received room event {:?}", event), + } + } +} diff --git a/lkgpt/src/stt.rs b/lkgpt/src/stt.rs index 9fc8b69..f5e0a2f 100644 --- a/lkgpt/src/stt.rs +++ b/lkgpt/src/stt.rs @@ -35,36 +35,36 @@ impl ezsockets::ClientExt for WSClient { async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { let data: Value = serde_json::from_str(&text)?; - let transcript_details = data["channel"]["alternatives"][0].clone(); + let transcript = data["channel"]["alternatives"][0]["transcript"].clone(); - info!("\n\n\nreceived message from deepgram: {}", data); - info!("\n\n\nreceived message from deepgram: {}", transcript_details); - - // if transcript_details!= Value::Null { - // self.to_gpt.send(transcript_details.to_string())?; - // } + if transcript != Value::Null { + info!("🎉 from deepgram {transcript}"); + if let Err(e) = self.llm_channel_tx.send(transcript.to_string()) { + error!("Error sending to LLM: {}", e); + }; + } Ok(()) } async fn on_binary(&mut self, bytes: Vec) -> Result<(), ezsockets::Error> { - info!("received bytes: {bytes:?}"); + info!("received bytes from deepgram: {bytes:?}"); Ok(()) } async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { - info!("DEEPGRAM ON CALL: {call:?}"); + info!("Deepgram ON CALL: {call:?}"); let () = call; Ok(()) } async fn on_connect(&mut self) -> Result<(), ezsockets::Error> { - info!("DEEPGRAM CONNECTED 🎉"); + info!("Deepgram CONNECTED 🎉"); Ok(()) } async fn on_connect_fail(&mut self, e: WSError) -> Result { - error!("DEEPGRAM connection FAIL 💔 {e}"); + error!("Deepgram CONNECTION FAILED | {e}"); Ok(ClientCloseMode::Reconnect) } @@ -72,12 +72,12 @@ impl ezsockets::ClientExt for WSClient { &mut self, frame: Option, ) -> Result { - error!("DEEPGRAM connection CLOSE 💔 {frame:?}"); + error!("Deepgram CONNECTION CLOSED | {frame:?}"); Ok(ClientCloseMode::Reconnect) } async fn on_disconnect(&mut self) -> Result { - error!("DEEPGRAM disconnect 💔"); + error!("Deepgram disconnected"); Ok(ClientCloseMode::Reconnect) } } @@ -91,15 +91,18 @@ impl STT { heartbeat: Duration::from_secs(11), timeout: Duration::from_secs(30 * 60), // 30 minutes heartbeat_ping_msg_fn: Arc::new(|_t: Duration| { - RawMessage::Text(json!({ - "type": "KeepAlive", - }).to_string()) + RawMessage::Text( + json!({ + "type": "KeepAlive", + }) + .to_string(), + ) }), }) .header("Authorization", &format!("Token {}", deepgram_api_key)) - .query_parameter("model", "enhanced") - // .query_parameter("model", "nova-2-conversationalai") + .query_parameter("model", "nova-2-conversationalai") .query_parameter("smart_format", "true") + .query_parameter("version", "latest") .query_parameter("filler_words", "true"); let (ws_client, _) = diff --git a/lkgpt/src/tts.rs b/lkgpt/src/tts.rs index 05bae6b..2be67ec 100644 --- a/lkgpt/src/tts.rs +++ b/lkgpt/src/tts.rs @@ -12,7 +12,6 @@ use ezsockets::{ use futures::StreamExt; use livekit::webrtc::{audio_frame::AudioFrame, audio_source::native::NativeAudioSource}; use log::{error, info}; -use parking_lot::Mutex; use serde::Serialize; use serde_json::Value; use std::{ @@ -62,7 +61,6 @@ struct RegularMessage { pub struct TTS { ws_client: Client, pub started: Arc, - eleven_labs_api_key: String, } impl TTS { @@ -95,15 +93,10 @@ impl ezsockets::ClientExt for WSClient { if base64_audio != Value::Null { let data = std::borrow::Cow::from(decode_base64_audio(base64_audio.as_str().unwrap())?); - const FRAME_DURATION: Duration = Duration::from_millis(500); // Write 0.5s of audio at a time - let ms = FRAME_DURATION.as_millis() as u32; - let num_channels = self.audio_src.num_channels(); let sample_rate = self.audio_src.sample_rate(); let samples_per_channel = 1_u32; - let num_samples = (sample_rate / 1000 * ms) as usize; - let audio_frame = AudioFrame { data, num_channels, sample_rate, samples_per_channel }; self.audio_src.capture_frame(&audio_frame).await?; @@ -184,7 +177,7 @@ impl TTS { ) }), }) - .header("xi-api-key", eleven_labs_api_key.clone()); + .header("xi-api-key", eleven_labs_api_key); let (ws_client, _) = ezsockets::connect( |_client| WSClient { audio_src, tts_ws_started: started.clone() }, @@ -199,10 +192,10 @@ impl TTS { generation_config: GenerationConfig { chunk_length_schedule: [50] }, })?)?; - Ok(Self { ws_client, started, eleven_labs_api_key }) + Ok(Self { ws_client, started }) } - pub fn start(&mut self) -> anyhow::Result<()> { + pub fn restart(&mut self) -> anyhow::Result<()> { self.started.store(true, Ordering::Relaxed); self.send(" ".to_string())?; Ok(()) @@ -225,7 +218,7 @@ impl TTS { let msg = msg?; if !self.started.load(Ordering::Relaxed) { - self.start()?; + self.restart()?; } info!("sending to eleven labs {msg}"); diff --git a/lkgpt/src/video.rs b/lkgpt/src/video.rs index e62ddee..3b70c88 100644 --- a/lkgpt/src/video.rs +++ b/lkgpt/src/video.rs @@ -16,12 +16,17 @@ pub struct VideoChannel { rx: crossbeam_channel::Receiver>, } -impl FromWorld for VideoChannel { - fn from_world(_: &mut World) -> Self { +impl Default for VideoChannel { + fn default() -> Self { let (tx, rx) = crossbeam_channel::unbounded::>(); Self { tx, rx } } } +impl VideoChannel { + pub fn new() -> Self { + Self::default() + } +} async fn video_stream_handler(mut video: NativeVideoStream) { let mut counter = 0_u8; diff --git a/meet/app/r/[name]/page.tsx b/meet/app/r/[name]/page.tsx index 203bed5..7a45f9e 100644 --- a/meet/app/r/[name]/page.tsx +++ b/meet/app/r/[name]/page.tsx @@ -39,7 +39,7 @@ export default function LivekitRoom({ params }: { params: { name: string } }) { ) : (
-

Kitt2

+ {/*

Kitt2

*/}