diff --git a/Cargo.toml b/Cargo.toml index 45b20b4..fdb4b99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ authors = ["Cacophony Developers "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -framebuffer = "0.3.1" rppal = "0.19.0" chrono = { version = "0.4.38", features = ["serde"] } byteorder = "1.4.3" @@ -33,7 +32,6 @@ sha256 = "1.5.0" [target.aarch64-unknown-linux-musl] # applies to target-specific builds - # rPi3 rustflags = "-C target-cpu=cortex-a53" diff --git a/src/camera_transfer_state.rs b/src/camera_transfer_state.rs index 827f493..4093a1a 100644 --- a/src/camera_transfer_state.rs +++ b/src/camera_transfer_state.rs @@ -248,12 +248,25 @@ pub fn enter_camera_transfer_loop( let crc_from_remote_dup = LittleEndian::read_u16(&header_slice[12..14]); let crc_from_remote_inv = LittleEndian::read_u16(&header_slice[14..16]); let crc_from_remote_inv_dup = LittleEndian::read_u16(&header_slice[16..=17]); + let transfer_type_check = transfer_type == transfer_type_dup; + let mut transfer_block = 0; + let is_file_transfer_message = transfer_type_check + && transfer_type >= CAMERA_BEGIN_FILE_TRANSFER + && transfer_type <= CAMERA_BEGIN_AND_END_FILE_TRANSFER; + let is_file_transfer_progress_message = transfer_type_check + && (transfer_type == CAMERA_BEGIN_FILE_TRANSFER + || transfer_type == CAMERA_END_FILE_TRANSFER); + let header_crc_check = if is_file_transfer_progress_message { + transfer_block = crc_from_remote_dup; + crc_from_remote_inv_dup == crc_from_remote_inv + && crc_from_remote_inv.not() == crc_from_remote + } else { + crc_from_remote == crc_from_remote_dup + && crc_from_remote_inv_dup == crc_from_remote_inv + && crc_from_remote_inv.not() == crc_from_remote + }; let num_bytes_check = num_bytes == num_bytes_dup; - let header_crc_check = crc_from_remote == crc_from_remote_dup - && crc_from_remote_inv_dup == crc_from_remote_inv - && crc_from_remote_inv.not() == crc_from_remote; - let transfer_type_check = transfer_type == transfer_type_dup; if !num_bytes_check || !header_crc_check || !transfer_type_check { // Just log the *first* time the header integrity check fails in a session. if !header_integrity_check_has_failed { @@ -325,6 +338,12 @@ pub fn enter_camera_transfer_loop( // Always write the return buffer spi.write(&return_payload_buf).unwrap(); if crc == crc_from_remote { + if is_file_transfer_progress_message { + recording_state.update_offload_progress(transfer_block); + } else if !is_file_transfer_message && recording_state.is_offloading() { + recording_state.end_offload(); + } + match transfer_type { CAMERA_CONNECT_INFO => { radiometry_enabled = LittleEndian::read_u32(&chunk[0..4]) == 2; diff --git a/src/dbus_audio.rs b/src/dbus_managementd.rs similarity index 74% rename from src/dbus_audio.rs rename to src/dbus_managementd.rs index 7f81fe8..8bd3991 100644 --- a/src/dbus_audio.rs +++ b/src/dbus_managementd.rs @@ -7,7 +7,7 @@ use rustbus::{get_system_bus_path, DispatchConn, DuplexConn}; use std::{process, thread}; use thread_priority::{ThreadBuilderExt, ThreadPriority}; -// TC2-Agent dbus audio service +// TC2-Agent dbus managementd service type MyHandleEnv<'a, 'b> = HandleEnvironment; fn default_handler( @@ -19,7 +19,7 @@ fn default_handler( Ok(None) } -fn audio_handler( +fn managementd_handler( recording_state_ctx: &mut RecordingState, _matches: Matches, msg: &MarshalledMessage, @@ -41,6 +41,20 @@ fn audio_handler( response.body.push_param(if recording_state_ctx.is_in_audio_mode() { 1 } else { 0 })?; response.body.push_param(status as u8)?; Ok(Some(response)) + } else if msg.dynheader.member.as_ref().unwrap() == "offloadstatus" { + let mut response = msg.dynheader.make_response(); + if let Some((percent_complete, remaining_seconds)) = + recording_state_ctx.get_offload_status() + { + response.body.push_param(1)?; + response.body.push_param(0)?; // percent_complete + response.body.push_param(0)?; // remaining_seconds + } else { + response.body.push_param(0)?; + response.body.push_param(0)?; + response.body.push_param(0)?; + } + Ok(Some(response)) } else { Ok(None) } @@ -53,14 +67,14 @@ pub enum AudioStatus { Recording = 4, } -pub fn setup_dbus_test_audio_recording_service(recording_state: &RecordingState) { +pub fn setup_dbus_managementd_recording_service(recording_state: &RecordingState) { // set up dbus service for handling messages between managementd and tc2-agent about when - // to make test audio recordings. + // to make test audio recordings, and for getting the status of any file offloads let recording_state = recording_state.clone(); let session_path = get_system_bus_path().unwrap(); - let _dbus_thread = thread::Builder::new().name("dbus-service".to_string()).spawn_with_priority( - ThreadPriority::Max, - move |_| { + let _dbus_thread = thread::Builder::new() + .name("dbus-managementd-service".to_string()) + .spawn_with_priority(ThreadPriority::Max, move |_| { let mut dbus_conn = DuplexConn::connect_to_bus(session_path, false).unwrap_or_else(|e| { error!("Error connecting to system DBus: {}", e); @@ -82,8 +96,7 @@ pub fn setup_dbus_test_audio_recording_service(recording_state: &RecordingState) let mut dispatch_conn = DispatchConn::new(dbus_conn, recording_state, Box::new(default_handler)); - dispatch_conn.add_handler("/org/cacophony/TC2Agent", Box::new(audio_handler)); + dispatch_conn.add_handler("/org/cacophony/TC2Agent", Box::new(managementd_handler)); dispatch_conn.run().unwrap(); - }, - ); + }); } diff --git a/src/main.rs b/src/main.rs index 3c2713f..b7a84ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ mod camera_transfer_state; mod cptv_frame_dispatch; mod cptv_header; mod dbus_attiny_i2c; -mod dbus_audio; +mod dbus_managementd; mod detection_mask; mod device_config; mod double_buffer; @@ -39,7 +39,7 @@ use simplelog::*; use crate::camera_transfer_state::enter_camera_transfer_loop; use crate::dbus_attiny_i2c::exit_if_attiny_version_is_not_as_expected; -use crate::dbus_audio::setup_dbus_test_audio_recording_service; +use crate::dbus_managementd::setup_dbus_managementd_recording_service; use crate::device_config::watch_local_config_file_changes; use crate::device_config::DeviceConfig; use crate::frame_socket_server::spawn_frame_socket_server_thread; @@ -50,7 +50,7 @@ use crate::recording_state::RecordingState; const AUDIO_SHEBANG: u16 = 1; const EXPECTED_RP2040_FIRMWARE_HASH: &str = include_str!("../_releases/tc2-firmware.sha256"); -const EXPECTED_RP2040_FIRMWARE_VERSION: u32 = 14; +const EXPECTED_RP2040_FIRMWARE_VERSION: u32 = 15; const EXPECTED_ATTINY_FIRMWARE_VERSION: u8 = 1; const SEGMENT_LENGTH: usize = 9760; @@ -96,7 +96,7 @@ fn main() { }); let mut recording_state = RecordingState::new(); - let _dbus_audio_thread = setup_dbus_test_audio_recording_service(&recording_state); + let _dbus_audio_thread = setup_dbus_managementd_recording_service(&recording_state); let current_config = device_config.unwrap(); let initial_config = current_config.clone(); diff --git a/src/recording_state.rs b/src/recording_state.rs index f96231e..1bf1691 100644 --- a/src/recording_state.rs +++ b/src/recording_state.rs @@ -1,9 +1,9 @@ use crate::dbus_attiny_i2c::{dbus_write_attiny_command, read_tc2_agent_state}; -use crate::dbus_audio::AudioStatus; +use crate::dbus_managementd::AudioStatus; use log::error; use rustbus::DuplexConn; use std::process; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -68,11 +68,18 @@ impl RecordingModeState { } } +struct OffloadProgress { + remaining_offload_bytes: AtomicU32, + total_offload_bytes: AtomicU32, + start_time: AtomicU64, +} + #[derive(Clone)] pub struct RecordingState { rp2040_recording_state_inner: Arc, test_recording_state_inner: Arc, recording_mode_state: RecordingModeState, + offload_state: Arc, } impl RecordingState { @@ -81,6 +88,11 @@ impl RecordingState { rp2040_recording_state_inner: Arc::new(AtomicU8::new(tc2_agent_state::NOT_READY)), test_recording_state_inner: Arc::new(AtomicU8::new(0)), recording_mode_state: RecordingModeState::new(), + offload_state: Arc::new(OffloadProgress { + remaining_offload_bytes: AtomicU32::new(0), + total_offload_bytes: AtomicU32::new(0), + start_time: AtomicU64::new(0), + }), } } @@ -93,6 +105,56 @@ impl RecordingState { == tc2_agent_state::RECORDING } + pub fn is_offloading(&self) -> bool { + self.offload_state.start_time.load(Ordering::Relaxed) != 0 + } + + pub fn get_offload_status(&self) -> Option<(u32, u32)> { + if self.is_offloading() { + // Return estimated time remaining in seconds + let transfer_start_time = self.offload_state.start_time.load(Ordering::Relaxed); + let now_ms = chrono::Local::now().timestamp_millis() as u64; + let elapsed = now_ms - transfer_start_time; + let remaining_bytes = + self.offload_state.remaining_offload_bytes.load(Ordering::Relaxed); + let total_bytes = self.offload_state.total_offload_bytes.load(Ordering::Relaxed); + let bytes_transferred = total_bytes - remaining_bytes; + let bytes_per_second = bytes_transferred as f32 / ((elapsed as f32) / 1000.0); + let remaining_seconds = remaining_bytes as f32 / bytes_per_second; + let percent_complete = (remaining_bytes as f32 / total_bytes as f32) * 100.0; + Some((percent_complete as u32, remaining_seconds as u32)) + } else { + None + } + } + + pub fn update_offload_progress(&mut self, block: u16) { + // Block is from 0..2047 + // Page is from 0..63 + // Page size is 2048 bytes + let pages_remaining = (block as u32 + 1) * 64; + let bytes_remaining = pages_remaining * 2048; + if self.offload_state.start_time.load(Ordering::Relaxed) == 0 { + self.offload_state + .start_time + .store(chrono::Local::now().timestamp_millis() as u64, Ordering::Relaxed); + self.offload_state.total_offload_bytes.store(bytes_remaining, Ordering::Relaxed); + } + let pages_remaining = block as u32 * 64; + let bytes_remaining = pages_remaining * 2048; + let last_remaining = self.offload_state.remaining_offload_bytes.load(Ordering::Relaxed); + if bytes_remaining < last_remaining && last_remaining != 0 { + // Make sure the number can only go down once offload has started. + self.offload_state.remaining_offload_bytes.store(bytes_remaining, Ordering::Relaxed); + } + } + + pub fn end_offload(&mut self) { + self.offload_state.start_time.store(0, Ordering::Relaxed); + self.offload_state.total_offload_bytes.store(0, Ordering::Relaxed); + self.offload_state.remaining_offload_bytes.store(0, Ordering::Relaxed); + } + pub fn sync_state_from_attiny(&mut self, conn: &mut DuplexConn) -> u8 { let state = read_tc2_agent_state(conn); if let Ok(state) = state {