Skip to content

Commit

Permalink
Pass offload status to managementd
Browse files Browse the repository at this point in the history
  • Loading branch information
hardiesoft committed Nov 6, 2024
1 parent f6c8797 commit b5c2471
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 22 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ authors = ["Cacophony Developers <[email protected]>"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
framebuffer = "0.3.1"
rppal = "0.19.0"
chrono = { version = "0.4.38", features = ["serde"] }
byteorder = "1.4.3"
Expand All @@ -33,7 +32,6 @@ sha256 = "1.5.0"

[target.aarch64-unknown-linux-musl]
# applies to target-specific builds

# rPi3
rustflags = "-C target-cpu=cortex-a53"

Expand Down
27 changes: 23 additions & 4 deletions src/camera_transfer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,25 @@ pub fn enter_camera_transfer_loop(
let crc_from_remote_dup = LittleEndian::read_u16(&header_slice[12..14]);
let crc_from_remote_inv = LittleEndian::read_u16(&header_slice[14..16]);
let crc_from_remote_inv_dup = LittleEndian::read_u16(&header_slice[16..=17]);
let transfer_type_check = transfer_type == transfer_type_dup;
let mut transfer_block = 0;
let is_file_transfer_message = transfer_type_check
&& transfer_type >= CAMERA_BEGIN_FILE_TRANSFER
&& transfer_type <= CAMERA_BEGIN_AND_END_FILE_TRANSFER;
let is_file_transfer_progress_message = transfer_type_check
&& (transfer_type == CAMERA_BEGIN_FILE_TRANSFER
|| transfer_type == CAMERA_END_FILE_TRANSFER);
let header_crc_check = if is_file_transfer_progress_message {
transfer_block = crc_from_remote_dup;

crc_from_remote_inv_dup == crc_from_remote_inv
&& crc_from_remote_inv.not() == crc_from_remote
} else {
crc_from_remote == crc_from_remote_dup
&& crc_from_remote_inv_dup == crc_from_remote_inv
&& crc_from_remote_inv.not() == crc_from_remote
};
let num_bytes_check = num_bytes == num_bytes_dup;
let header_crc_check = crc_from_remote == crc_from_remote_dup
&& crc_from_remote_inv_dup == crc_from_remote_inv
&& crc_from_remote_inv.not() == crc_from_remote;
let transfer_type_check = transfer_type == transfer_type_dup;
if !num_bytes_check || !header_crc_check || !transfer_type_check {
// Just log the *first* time the header integrity check fails in a session.
if !header_integrity_check_has_failed {
Expand Down Expand Up @@ -325,6 +338,12 @@ pub fn enter_camera_transfer_loop(
// Always write the return buffer
spi.write(&return_payload_buf).unwrap();
if crc == crc_from_remote {
if is_file_transfer_progress_message {
recording_state.update_offload_progress(transfer_block);
} else if !is_file_transfer_message && recording_state.is_offloading() {
recording_state.end_offload();
}

match transfer_type {
CAMERA_CONNECT_INFO => {
radiometry_enabled = LittleEndian::read_u32(&chunk[0..4]) == 2;
Expand Down
33 changes: 23 additions & 10 deletions src/dbus_audio.rs → src/dbus_managementd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rustbus::{get_system_bus_path, DispatchConn, DuplexConn};
use std::{process, thread};
use thread_priority::{ThreadBuilderExt, ThreadPriority};

// TC2-Agent dbus audio service
// TC2-Agent dbus managementd service
type MyHandleEnv<'a, 'b> = HandleEnvironment<RecordingState, ()>;

fn default_handler(
Expand All @@ -19,7 +19,7 @@ fn default_handler(
Ok(None)
}

fn audio_handler(
fn managementd_handler(
recording_state_ctx: &mut RecordingState,
_matches: Matches,
msg: &MarshalledMessage,
Expand All @@ -41,6 +41,20 @@ fn audio_handler(
response.body.push_param(if recording_state_ctx.is_in_audio_mode() { 1 } else { 0 })?;
response.body.push_param(status as u8)?;
Ok(Some(response))
} else if msg.dynheader.member.as_ref().unwrap() == "offloadstatus" {
let mut response = msg.dynheader.make_response();
if let Some((percent_complete, remaining_seconds)) =
recording_state_ctx.get_offload_status()
{
response.body.push_param(1)?;
response.body.push_param(0)?; // percent_complete
response.body.push_param(0)?; // remaining_seconds
} else {
response.body.push_param(0)?;
response.body.push_param(0)?;
response.body.push_param(0)?;
}
Ok(Some(response))
} else {
Ok(None)
}
Expand All @@ -53,14 +67,14 @@ pub enum AudioStatus {
Recording = 4,
}

pub fn setup_dbus_test_audio_recording_service(recording_state: &RecordingState) {
pub fn setup_dbus_managementd_recording_service(recording_state: &RecordingState) {
// set up dbus service for handling messages between managementd and tc2-agent about when
// to make test audio recordings.
// to make test audio recordings, and for getting the status of any file offloads
let recording_state = recording_state.clone();
let session_path = get_system_bus_path().unwrap();
let _dbus_thread = thread::Builder::new().name("dbus-service".to_string()).spawn_with_priority(
ThreadPriority::Max,
move |_| {
let _dbus_thread = thread::Builder::new()
.name("dbus-managementd-service".to_string())
.spawn_with_priority(ThreadPriority::Max, move |_| {
let mut dbus_conn =
DuplexConn::connect_to_bus(session_path, false).unwrap_or_else(|e| {
error!("Error connecting to system DBus: {}", e);
Expand All @@ -82,8 +96,7 @@ pub fn setup_dbus_test_audio_recording_service(recording_state: &RecordingState)

let mut dispatch_conn =
DispatchConn::new(dbus_conn, recording_state, Box::new(default_handler));
dispatch_conn.add_handler("/org/cacophony/TC2Agent", Box::new(audio_handler));
dispatch_conn.add_handler("/org/cacophony/TC2Agent", Box::new(managementd_handler));
dispatch_conn.run().unwrap();
},
);
});
}
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod camera_transfer_state;
mod cptv_frame_dispatch;
mod cptv_header;
mod dbus_attiny_i2c;
mod dbus_audio;
mod dbus_managementd;
mod detection_mask;
mod device_config;
mod double_buffer;
Expand Down Expand Up @@ -39,7 +39,7 @@ use simplelog::*;

use crate::camera_transfer_state::enter_camera_transfer_loop;
use crate::dbus_attiny_i2c::exit_if_attiny_version_is_not_as_expected;
use crate::dbus_audio::setup_dbus_test_audio_recording_service;
use crate::dbus_managementd::setup_dbus_managementd_recording_service;
use crate::device_config::watch_local_config_file_changes;
use crate::device_config::DeviceConfig;
use crate::frame_socket_server::spawn_frame_socket_server_thread;
Expand All @@ -50,7 +50,7 @@ use crate::recording_state::RecordingState;
const AUDIO_SHEBANG: u16 = 1;

const EXPECTED_RP2040_FIRMWARE_HASH: &str = include_str!("../_releases/tc2-firmware.sha256");
const EXPECTED_RP2040_FIRMWARE_VERSION: u32 = 14;
const EXPECTED_RP2040_FIRMWARE_VERSION: u32 = 15;
const EXPECTED_ATTINY_FIRMWARE_VERSION: u8 = 1;

const SEGMENT_LENGTH: usize = 9760;
Expand Down Expand Up @@ -96,7 +96,7 @@ fn main() {
});

let mut recording_state = RecordingState::new();
let _dbus_audio_thread = setup_dbus_test_audio_recording_service(&recording_state);
let _dbus_audio_thread = setup_dbus_managementd_recording_service(&recording_state);

let current_config = device_config.unwrap();
let initial_config = current_config.clone();
Expand Down
66 changes: 64 additions & 2 deletions src/recording_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::dbus_attiny_i2c::{dbus_write_attiny_command, read_tc2_agent_state};
use crate::dbus_audio::AudioStatus;
use crate::dbus_managementd::AudioStatus;
use log::error;
use rustbus::DuplexConn;
use std::process;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
Expand Down Expand Up @@ -68,11 +68,18 @@ impl RecordingModeState {
}
}

struct OffloadProgress {
remaining_offload_bytes: AtomicU32,
total_offload_bytes: AtomicU32,
start_time: AtomicU64,
}

#[derive(Clone)]
pub struct RecordingState {
rp2040_recording_state_inner: Arc<AtomicU8>,
test_recording_state_inner: Arc<AtomicU8>,
recording_mode_state: RecordingModeState,
offload_state: Arc<OffloadProgress>,
}

impl RecordingState {
Expand All @@ -81,6 +88,11 @@ impl RecordingState {
rp2040_recording_state_inner: Arc::new(AtomicU8::new(tc2_agent_state::NOT_READY)),
test_recording_state_inner: Arc::new(AtomicU8::new(0)),
recording_mode_state: RecordingModeState::new(),
offload_state: Arc::new(OffloadProgress {
remaining_offload_bytes: AtomicU32::new(0),
total_offload_bytes: AtomicU32::new(0),
start_time: AtomicU64::new(0),
}),
}
}

Expand All @@ -93,6 +105,56 @@ impl RecordingState {
== tc2_agent_state::RECORDING
}

pub fn is_offloading(&self) -> bool {
self.offload_state.start_time.load(Ordering::Relaxed) != 0
}

pub fn get_offload_status(&self) -> Option<(u32, u32)> {
if self.is_offloading() {
// Return estimated time remaining in seconds
let transfer_start_time = self.offload_state.start_time.load(Ordering::Relaxed);
let now_ms = chrono::Local::now().timestamp_millis() as u64;
let elapsed = now_ms - transfer_start_time;
let remaining_bytes =
self.offload_state.remaining_offload_bytes.load(Ordering::Relaxed);
let total_bytes = self.offload_state.total_offload_bytes.load(Ordering::Relaxed);
let bytes_transferred = total_bytes - remaining_bytes;
let bytes_per_second = bytes_transferred as f32 / ((elapsed as f32) / 1000.0);
let remaining_seconds = remaining_bytes as f32 / bytes_per_second;
let percent_complete = (remaining_bytes as f32 / total_bytes as f32) * 100.0;
Some((percent_complete as u32, remaining_seconds as u32))
} else {
None
}
}

pub fn update_offload_progress(&mut self, block: u16) {
// Block is from 0..2047
// Page is from 0..63
// Page size is 2048 bytes
let pages_remaining = (block as u32 + 1) * 64;
let bytes_remaining = pages_remaining * 2048;
if self.offload_state.start_time.load(Ordering::Relaxed) == 0 {
self.offload_state
.start_time
.store(chrono::Local::now().timestamp_millis() as u64, Ordering::Relaxed);
self.offload_state.total_offload_bytes.store(bytes_remaining, Ordering::Relaxed);
}
let pages_remaining = block as u32 * 64;
let bytes_remaining = pages_remaining * 2048;
let last_remaining = self.offload_state.remaining_offload_bytes.load(Ordering::Relaxed);
if bytes_remaining < last_remaining && last_remaining != 0 {
// Make sure the number can only go down once offload has started.
self.offload_state.remaining_offload_bytes.store(bytes_remaining, Ordering::Relaxed);
}
}

pub fn end_offload(&mut self) {
self.offload_state.start_time.store(0, Ordering::Relaxed);
self.offload_state.total_offload_bytes.store(0, Ordering::Relaxed);
self.offload_state.remaining_offload_bytes.store(0, Ordering::Relaxed);
}

pub fn sync_state_from_attiny(&mut self, conn: &mut DuplexConn) -> u8 {
let state = read_tc2_agent_state(conn);
if let Ok(state) = state {
Expand Down

0 comments on commit b5c2471

Please sign in to comment.