Skip to content

Commit

Permalink
server-driven updates
Browse files Browse the repository at this point in the history
  • Loading branch information
komar007 committed Sep 1, 2024
1 parent ecc05be commit dc7cd8d
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{UnixListener, UnixStream as TokioUnixStream},
pin,
sync::RwLock,
time::sleep,
sync::broadcast::{self, Sender},
time::{sleep, sleep_until, Instant},
};

#[macro_use]
Expand Down Expand Up @@ -159,27 +159,30 @@ async fn run_daemon(other_fork: UnixStream) -> Result<(), String> {
.await
.map_err(|e| format!("error reading from fork parent: {e}"))?;

let loads: HashMap<i32, f32> = HashMap::new();
let loads = Arc::new(RwLock::new(loads));
let loads2 = loads.clone();
let (loads, _) = broadcast::channel(1);
let sender = loads.clone();
tokio::spawn(async move {
loop {
measure_pid_loads(&loads2).await;
sleep(UPDATE_INTERVAL).await;
let next = Instant::now() + UPDATE_INTERVAL;
measure_pid_loads(&sender).await;
sleep_until(next).await;
}
});
loop {
match listener.accept().await {
Ok((socket, addr)) => {
info!("accepted from {addr:?}");
tokio::spawn(serve_client(socket, loads.clone()));
tokio::spawn(serve_client(socket, loads.subscribe()));
}
Err(e) => error!("accept function failed: {:?}", e),
}
}
}

async fn serve_client(mut socket: TokioUnixStream, loads: Arc<RwLock<HashMap<i32, f32>>>) {
async fn serve_client(
mut socket: TokioUnixStream,
mut loads: broadcast::Receiver<Arc<HashMap<i32, f32>>>,
) {
let (reader, writer) = socket.split();
let reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
Expand All @@ -188,32 +191,32 @@ async fn serve_client(mut socket: TokioUnixStream, loads: Arc<RwLock<HashMap<i32
})
.collect()
.await;
'infinite: loop {
'serving: loop {
let pid_loads: Vec<_> = {
let loads = loads.read().await;
let Ok(loads) = loads.recv().await else {
continue 'serving;
};
pids.iter()
.map(|pid| *loads.get(pid).unwrap_or(&f32::NAN))
.collect()
};
for pid in pid_loads {
if let Err(e) = writer.write_f32(pid).await {
error!("error writing response: {e}");
break 'infinite;
break 'serving;
}
}
if let Err(e) = writer.flush().await {
error!("error flushing stream: {e}");
break;
break 'serving;
}

sleep(UPDATE_INTERVAL).await;
}
if let Err(e) = socket.shutdown().await {
error!("error shutting down: {e}");
}
}

async fn measure_pid_loads(out_loads: &Arc<RwLock<HashMap<i32, f32>>>) {
async fn measure_pid_loads(out_loads: &Sender<Arc<HashMap<i32, f32>>>) {
let mut ticks = HashMap::new();
for _ in 0..2 {
let mut pid_children: HashMap<_, Vec<_>> = Default::default();
Expand All @@ -235,7 +238,7 @@ async fn measure_pid_loads(out_loads: &Arc<RwLock<HashMap<i32, f32>>>) {
loads.insert(stat.pid, load);
}
if !loads.is_empty() {
*out_loads.write().await = get_cumulated(&pid_children, &loads);
let _ = out_loads.send(get_cumulated(&pid_children, &loads).into());
}
sleep(MEASURE_PERIOD).await;
}
Expand Down

0 comments on commit dc7cd8d

Please sign in to comment.