From 2528794517dd623ce20f063cd74ef332dfe063a4 Mon Sep 17 00:00:00 2001 From: Markus Ast Date: Mon, 14 Oct 2019 09:08:46 +0200 Subject: [PATCH] implement auto-reconnect to SRS Refs #18 --- CHANGELOG.md | 7 ++ datis/Cargo.toml | 2 +- datis/src/error.rs | 4 +- datis/src/srs.rs | 288 +++++++++++++++++++++++++------------------- datis/src/worker.rs | 3 +- 5 files changed, 174 insertions(+), 130 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31ea2fa..79a63ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [0.9.1] - 2019-10-14 +### Fixed +- Do not SPAM logs when connection to SRS is lost #18 + +### Changed +- Automatically try to re-connect to SRS if connection to SRS got lost or if DATIS is started before SRS + ## [0.9.0] - 2019-08-13 ### Added - Option to enable debug logging (useful when investigating issues; logs into `Saved Games\DCS\Logs\DATIS.log`) diff --git a/datis/Cargo.toml b/datis/Cargo.toml index 8aaec15..683e2c3 100644 --- a/datis/Cargo.toml +++ b/datis/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "datis" -version = "0.9.0" +version = "0.9.1" authors = ["Markus Ast "] edition = "2018" diff --git a/datis/src/error.rs b/datis/src/error.rs index 86f37ad..1083119 100644 --- a/datis/src/error.rs +++ b/datis/src/error.rs @@ -38,9 +38,7 @@ impl fmt::Display for Error { "Error: Trying to access undefined lua global or table key: {}", key )?, - GcloudTTL(json) => { - write!(f, "Error calling Gcloud TTS service: {}", json.to_string(),)? - } + GcloudTTL(json) => write!(f, "Error calling Gcloud TTS service: {}", json.to_string(),)?, _ => write!(f, "Error: {}", self.description())?, } diff --git a/datis/src/srs.rs b/datis/src/srs.rs index e5719c6..a090e97 100644 --- a/datis/src/srs.rs +++ b/datis/src/srs.rs @@ -1,4 +1,4 @@ -use std::io::{self, BufRead, BufReader, Cursor, Write}; +use std::io::{self, Cursor, Write}; use std::net::TcpStream; use std::time::{Duration, Instant}; use std::{fmt, thread}; @@ -45,138 +45,58 @@ impl AtisSrsClient { return Ok(()); } - let mut stream = TcpStream::connect(("127.0.0.1", self.port))?; - stream.set_nodelay(true)?; - stream.set_read_timeout(Some(Duration::from_millis(100)))?; - - let name = format!("ATIS {}", self.station.name); - let mut position = self.station.airfield.position.clone(); - position.alt += 100.0; // increase sending alt to 100m above ground for LOS - let sync_msg = Message { - client: Some(Client { - client_guid: &self.sguid, - name: &name, - position: position.clone(), - coalition: Coalition::Blue, - radio_info: Some(RadioInfo { - name: "ATIS", - pos: position.clone(), - ptt: false, - radios: vec![Radio { - enc: false, - enc_key: 0, - enc_mode: 0, // no encryption - freq_max: 1.0, - freq_min: 1.0, - freq: self.station.atis_freq as f64, - modulation: 0, - name: "ATIS", - sec_freq: 0.0, - volume: 1.0, - freq_mode: 0, // Cockpit - vol_mode: 0, // Cockpit - expansion: false, - channel: -1, - simul: false, - }], - control: 0, // HOTAS - selected: 0, - unit: &name, - unit_id: 0, - simultaneous_transmission: true, - }), - }), - msg_type: MsgType::Sync, - version: "1.6.0.0", - }; - - serde_json::to_writer(&stream, &sync_msg)?; - stream.write_all(&[b'\n'])?; - - let mut rd = BufReader::new(stream.try_clone().unwrap()); // TODO: unwrap? - - // spawn audio broadcast thread + // spawn thread that sends sync messages to SRS let sguid = self.sguid.clone(); - let gcloud_key = self.gcloud_key.clone(); let station = self.station.clone(); - let exporter = self.exporter.clone(); - let srs_port = self.port + 1; - self.worker.push(Worker::new(move |ctx| { - if let Err(err) = audio_broadcast(ctx, sguid, gcloud_key, station, exporter, srs_port) { - error!("Error starting SRS broadcast: {}", err); - } - })); - - // spawn thread that sends an update RPC call to SRS every ~5 seconds - let sguid = self.sguid.clone(); - let name = self.station.name.clone(); + let srs_sync_port = self.port; self.worker.push(Worker::new(move |ctx| { - let mut send_update = || -> Result<(), Error> { - // send update - let upd_msg = Message { - client: Some(Client { - client_guid: &sguid, - name: &name, - position: position.clone(), - coalition: Coalition::Blue, - radio_info: None, - }), - msg_type: MsgType::Update, - version: "1.5.6.0", - }; - - serde_json::to_writer(&mut stream, &upd_msg)?; - stream.write_all(&[b'\n'])?; - - Ok(()) - }; - loop { - if let Err(err) = send_update() { - error!("Error sending update to SRS: {}", err); + match srs_update(&ctx, &sguid, &station, srs_sync_port) { + Ok(false) => return, + Ok(true) => {} + Err(err) => { + error!("Error sending/receiving SRS update message: {}", err); + } } - // debug!("SRS Update sent"); - - if ctx.should_stop_timeout(Duration::from_secs(5)) { + // TODO: exponential backoff? + info!("Trying to reconnect update connection in 10 seconds"); + if ctx.should_stop_timeout(Duration::from_secs(10)) { return; } } })); + // spawn audio broadcast thread + let sguid = self.sguid.clone(); + let gcloud_key = self.gcloud_key.clone(); + let station = self.station.clone(); + let exporter = self.exporter.clone(); + let srs_voice_port = self.port + 1; self.worker.push(Worker::new(move |ctx| { - let mut data = Vec::new(); - loop { - match rd.read_until(b'\n', &mut data) { - Ok(bytes_read) => { - if bytes_read == 0 { - return; - } - - data.clear(); - // ignore received messages ... + match audio_broadcast( + &ctx, + &sguid, + &gcloud_key, + &station, + &exporter, + srs_voice_port, + ) { + Ok(false) => return, + Ok(true) => {} + Err(err) => { + error!("Error sending ATIS report to SRS: {}", err); } - Err(err) => match err.kind() { - io::ErrorKind::TimedOut => {} - _ => { - error!( - "Error ({:?}) receiving update from SRS: {}", - err.kind(), - err - ); - } - }, } - if ctx.should_stop() { + // TODO: exponential backoff? + info!("Trying to reconnect voice connection in 10 seconds"); + if ctx.should_stop_timeout(Duration::from_secs(10)) { return; } } })); - - // TODO: endless loop required? - Ok(()) } @@ -199,14 +119,117 @@ impl AtisSrsClient { } } +fn srs_update( + ctx: &Context, + sguid: &str, + station: &Station, + srs_sync_port: u16, +) -> Result { + let mut stream = TcpStream::connect(("127.0.0.1", srs_sync_port))?; + stream.set_nodelay(true)?; + stream.set_read_timeout(Some(Duration::from_millis(100)))?; + + let name = format!("ATIS {}", station.name); + let mut position = station.airfield.position.clone(); + position.alt += 100.0; // increase sending alt to 100m above ground for LOS + + // send initial SYNC message + let sync_msg = Message { + client: Some(Client { + client_guid: &sguid, + name: &name, + position: position.clone(), + coalition: Coalition::Blue, + radio_info: Some(RadioInfo { + name: "ATIS", + pos: position.clone(), + ptt: false, + radios: vec![Radio { + enc: false, + enc_key: 0, + enc_mode: 0, // no encryption + freq_max: 1.0, + freq_min: 1.0, + freq: station.atis_freq as f64, + modulation: 0, + name: "ATIS", + sec_freq: 0.0, + volume: 1.0, + freq_mode: 0, // Cockpit + vol_mode: 0, // Cockpit + expansion: false, + channel: -1, + simul: false, + }], + control: 0, // HOTAS + selected: 0, + unit: &name, + unit_id: 0, + simultaneous_transmission: true, + }), + }), + msg_type: MsgType::Sync, + version: "1.6.0.0", + }; + + serde_json::to_writer(&stream, &sync_msg)?; + stream.write_all(&[b'\n'])?; + + let mut last_update = Instant::now(); + let update_interval = Duration::from_secs(5); + let mut sink = io::sink(); + + loop { + // sends an update RPC call to SRS every ~5 seconds + if last_update.elapsed() > update_interval { + let upd_msg = Message { + client: Some(Client { + client_guid: &sguid, + name: &name, + position: position.clone(), + coalition: Coalition::Blue, + radio_info: None, + }), + msg_type: MsgType::Update, + version: "1.5.6.0", + }; + + serde_json::to_writer(&mut stream, &upd_msg)?; + stream.write_all(&[b'\n'])?; + + last_update = Instant::now(); + } + + // receive and discard messages from the SRS server + match io::copy(&mut stream, &mut sink) { + Ok(bytes_read) => { + if bytes_read == 0 { + // the connection has been closed by SRS + return Ok(true); + } + } + Err(err) => match err.kind() { + io::ErrorKind::TimedOut => {} + _ => { + return Err(err.into()); + } + }, + } + + if ctx.should_stop() { + return Ok(false); + } + } +} + fn audio_broadcast( - ctx: Context, - sguid: String, - gloud_key: String, - station: Station, - exporter: ReportExporter, + ctx: &Context, + sguid: &str, + gloud_key: &str, + station: &Station, + exporter: &ReportExporter, srs_port: u16, -) -> Result<(), Error> { +) -> Result { let interval = Duration::from_secs(60 * 60); // 60min let mut interval_start; let mut report_ix = 0; @@ -225,6 +248,8 @@ fn audio_broadcast( let mut stream = TcpStream::connect(("127.0.0.1", srs_port))?; stream.set_nodelay(true)?; + stream.set_read_timeout(Some(Duration::from_millis(10)))?; + let mut sink = io::sink(); loop { let elapsed = Instant::now() - interval_start; @@ -248,17 +273,32 @@ fn audio_broadcast( stream.write_all(&frame)?; id += 1; - // 32 kBit/s - let secs = (size * 8) as f64 / 1024.0 / 32.0; + // check whether the TCP connection is still alive (by trying to read and it and expect it to time out) + match io::copy(&mut stream, &mut sink) { + Ok(bytes_read) if bytes_read == 0 => { + // the connection has been closed by SRS + warn!("SRS voice connection has been closed, restarting broadcast ..."); + return Ok(true); + } + Err(err) => match err.kind() { + io::ErrorKind::TimedOut => {} + _ => { + return Err(err.into()); + } + }, + _ => {} + } + // wait for the current ~playtime before sending the next package + let secs = (size * 8) as f64 / 1024.0 / 32.0; // 32 kBit/s let playtime = Duration::from_millis((secs * 1000.0) as u64); - let elapsed = Instant::now() - start; + let elapsed = start.elapsed(); if playtime > elapsed { thread::sleep(playtime - elapsed); } if ctx.should_stop() { - return Ok(()); + return Ok(false); } } @@ -275,7 +315,7 @@ fn audio_broadcast( } if ctx.should_stop_timeout(Duration::from_secs(3)) { - return Ok(()); + return Ok(false); } data = audio.into_inner(); diff --git a/datis/src/worker.rs b/datis/src/worker.rs index 16262a5..6063b9d 100644 --- a/datis/src/worker.rs +++ b/datis/src/worker.rs @@ -20,8 +20,7 @@ enum Command { impl Worker { pub fn new(f: F) -> Self where - F: FnOnce(Context) -> T, - F: Send + 'static, + F: Send + 'static + FnOnce(Context) -> T, T: Send + 'static, { let (tx, rx) = channel();