Skip to content

Commit

Permalink
implement auto-reconnect to SRS
Browse files Browse the repository at this point in the history
Refs #18
  • Loading branch information
rkusa committed Oct 14, 2019
1 parent e1e6d35 commit 2528794
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 130 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.9.1] - 2019-10-14
### Fixed
- Do not SPAM logs when connection to SRS is lost #18

### Changed
- Automatically try to re-connect to SRS if connection to SRS got lost or if DATIS is started before SRS

## [0.9.0] - 2019-08-13
### Added
- Option to enable debug logging (useful when investigating issues; logs into `Saved Games\DCS\Logs\DATIS.log`)
Expand Down
2 changes: 1 addition & 1 deletion datis/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "datis"
version = "0.9.0"
version = "0.9.1"
authors = ["Markus Ast <[email protected]>"]
edition = "2018"

Expand Down
4 changes: 1 addition & 3 deletions datis/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ impl fmt::Display for Error {
"Error: Trying to access undefined lua global or table key: {}",
key
)?,
GcloudTTL(json) => {
write!(f, "Error calling Gcloud TTS service: {}", json.to_string(),)?
}
GcloudTTL(json) => write!(f, "Error calling Gcloud TTS service: {}", json.to_string(),)?,
_ => write!(f, "Error: {}", self.description())?,
}

Expand Down
288 changes: 164 additions & 124 deletions datis/src/srs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{self, BufRead, BufReader, Cursor, Write};
use std::io::{self, Cursor, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use std::{fmt, thread};
Expand Down Expand Up @@ -45,138 +45,58 @@ impl AtisSrsClient {
return Ok(());
}

let mut stream = TcpStream::connect(("127.0.0.1", self.port))?;
stream.set_nodelay(true)?;
stream.set_read_timeout(Some(Duration::from_millis(100)))?;

let name = format!("ATIS {}", self.station.name);
let mut position = self.station.airfield.position.clone();
position.alt += 100.0; // increase sending alt to 100m above ground for LOS
let sync_msg = Message {
client: Some(Client {
client_guid: &self.sguid,
name: &name,
position: position.clone(),
coalition: Coalition::Blue,
radio_info: Some(RadioInfo {
name: "ATIS",
pos: position.clone(),
ptt: false,
radios: vec![Radio {
enc: false,
enc_key: 0,
enc_mode: 0, // no encryption
freq_max: 1.0,
freq_min: 1.0,
freq: self.station.atis_freq as f64,
modulation: 0,
name: "ATIS",
sec_freq: 0.0,
volume: 1.0,
freq_mode: 0, // Cockpit
vol_mode: 0, // Cockpit
expansion: false,
channel: -1,
simul: false,
}],
control: 0, // HOTAS
selected: 0,
unit: &name,
unit_id: 0,
simultaneous_transmission: true,
}),
}),
msg_type: MsgType::Sync,
version: "1.6.0.0",
};

serde_json::to_writer(&stream, &sync_msg)?;
stream.write_all(&[b'\n'])?;

let mut rd = BufReader::new(stream.try_clone().unwrap()); // TODO: unwrap?

// spawn audio broadcast thread
// spawn thread that sends sync messages to SRS
let sguid = self.sguid.clone();
let gcloud_key = self.gcloud_key.clone();
let station = self.station.clone();
let exporter = self.exporter.clone();
let srs_port = self.port + 1;
self.worker.push(Worker::new(move |ctx| {
if let Err(err) = audio_broadcast(ctx, sguid, gcloud_key, station, exporter, srs_port) {
error!("Error starting SRS broadcast: {}", err);
}
}));

// spawn thread that sends an update RPC call to SRS every ~5 seconds
let sguid = self.sguid.clone();
let name = self.station.name.clone();
let srs_sync_port = self.port;
self.worker.push(Worker::new(move |ctx| {
let mut send_update = || -> Result<(), Error> {
// send update
let upd_msg = Message {
client: Some(Client {
client_guid: &sguid,
name: &name,
position: position.clone(),
coalition: Coalition::Blue,
radio_info: None,
}),
msg_type: MsgType::Update,
version: "1.5.6.0",
};

serde_json::to_writer(&mut stream, &upd_msg)?;
stream.write_all(&[b'\n'])?;

Ok(())
};

loop {
if let Err(err) = send_update() {
error!("Error sending update to SRS: {}", err);
match srs_update(&ctx, &sguid, &station, srs_sync_port) {
Ok(false) => return,
Ok(true) => {}
Err(err) => {
error!("Error sending/receiving SRS update message: {}", err);
}
}

// debug!("SRS Update sent");

if ctx.should_stop_timeout(Duration::from_secs(5)) {
// TODO: exponential backoff?
info!("Trying to reconnect update connection in 10 seconds");
if ctx.should_stop_timeout(Duration::from_secs(10)) {
return;
}
}
}));

// spawn audio broadcast thread
let sguid = self.sguid.clone();
let gcloud_key = self.gcloud_key.clone();
let station = self.station.clone();
let exporter = self.exporter.clone();
let srs_voice_port = self.port + 1;
self.worker.push(Worker::new(move |ctx| {
let mut data = Vec::new();

loop {
match rd.read_until(b'\n', &mut data) {
Ok(bytes_read) => {
if bytes_read == 0 {
return;
}

data.clear();
// ignore received messages ...
match audio_broadcast(
&ctx,
&sguid,
&gcloud_key,
&station,
&exporter,
srs_voice_port,
) {
Ok(false) => return,
Ok(true) => {}
Err(err) => {
error!("Error sending ATIS report to SRS: {}", err);
}
Err(err) => match err.kind() {
io::ErrorKind::TimedOut => {}
_ => {
error!(
"Error ({:?}) receiving update from SRS: {}",
err.kind(),
err
);
}
},
}

if ctx.should_stop() {
// TODO: exponential backoff?
info!("Trying to reconnect voice connection in 10 seconds");
if ctx.should_stop_timeout(Duration::from_secs(10)) {
return;
}
}
}));

// TODO: endless loop required?

Ok(())
}

Expand All @@ -199,14 +119,117 @@ impl AtisSrsClient {
}
}

fn srs_update(
ctx: &Context,
sguid: &str,
station: &Station,
srs_sync_port: u16,
) -> Result<bool, Error> {
let mut stream = TcpStream::connect(("127.0.0.1", srs_sync_port))?;
stream.set_nodelay(true)?;
stream.set_read_timeout(Some(Duration::from_millis(100)))?;

let name = format!("ATIS {}", station.name);
let mut position = station.airfield.position.clone();
position.alt += 100.0; // increase sending alt to 100m above ground for LOS

// send initial SYNC message
let sync_msg = Message {
client: Some(Client {
client_guid: &sguid,
name: &name,
position: position.clone(),
coalition: Coalition::Blue,
radio_info: Some(RadioInfo {
name: "ATIS",
pos: position.clone(),
ptt: false,
radios: vec![Radio {
enc: false,
enc_key: 0,
enc_mode: 0, // no encryption
freq_max: 1.0,
freq_min: 1.0,
freq: station.atis_freq as f64,
modulation: 0,
name: "ATIS",
sec_freq: 0.0,
volume: 1.0,
freq_mode: 0, // Cockpit
vol_mode: 0, // Cockpit
expansion: false,
channel: -1,
simul: false,
}],
control: 0, // HOTAS
selected: 0,
unit: &name,
unit_id: 0,
simultaneous_transmission: true,
}),
}),
msg_type: MsgType::Sync,
version: "1.6.0.0",
};

serde_json::to_writer(&stream, &sync_msg)?;
stream.write_all(&[b'\n'])?;

let mut last_update = Instant::now();
let update_interval = Duration::from_secs(5);
let mut sink = io::sink();

loop {
// sends an update RPC call to SRS every ~5 seconds
if last_update.elapsed() > update_interval {
let upd_msg = Message {
client: Some(Client {
client_guid: &sguid,
name: &name,
position: position.clone(),
coalition: Coalition::Blue,
radio_info: None,
}),
msg_type: MsgType::Update,
version: "1.5.6.0",
};

serde_json::to_writer(&mut stream, &upd_msg)?;
stream.write_all(&[b'\n'])?;

last_update = Instant::now();
}

// receive and discard messages from the SRS server
match io::copy(&mut stream, &mut sink) {
Ok(bytes_read) => {
if bytes_read == 0 {
// the connection has been closed by SRS
return Ok(true);
}
}
Err(err) => match err.kind() {
io::ErrorKind::TimedOut => {}
_ => {
return Err(err.into());
}
},
}

if ctx.should_stop() {
return Ok(false);
}
}
}

fn audio_broadcast(
ctx: Context,
sguid: String,
gloud_key: String,
station: Station,
exporter: ReportExporter,
ctx: &Context,
sguid: &str,
gloud_key: &str,
station: &Station,
exporter: &ReportExporter,
srs_port: u16,
) -> Result<(), Error> {
) -> Result<bool, Error> {
let interval = Duration::from_secs(60 * 60); // 60min
let mut interval_start;
let mut report_ix = 0;
Expand All @@ -225,6 +248,8 @@ fn audio_broadcast(

let mut stream = TcpStream::connect(("127.0.0.1", srs_port))?;
stream.set_nodelay(true)?;
stream.set_read_timeout(Some(Duration::from_millis(10)))?;
let mut sink = io::sink();

loop {
let elapsed = Instant::now() - interval_start;
Expand All @@ -248,17 +273,32 @@ fn audio_broadcast(
stream.write_all(&frame)?;
id += 1;

// 32 kBit/s
let secs = (size * 8) as f64 / 1024.0 / 32.0;
// check whether the TCP connection is still alive (by trying to read and it and expect it to time out)
match io::copy(&mut stream, &mut sink) {
Ok(bytes_read) if bytes_read == 0 => {
// the connection has been closed by SRS
warn!("SRS voice connection has been closed, restarting broadcast ...");
return Ok(true);
}
Err(err) => match err.kind() {
io::ErrorKind::TimedOut => {}
_ => {
return Err(err.into());
}
},
_ => {}
}

// wait for the current ~playtime before sending the next package
let secs = (size * 8) as f64 / 1024.0 / 32.0; // 32 kBit/s
let playtime = Duration::from_millis((secs * 1000.0) as u64);
let elapsed = Instant::now() - start;
let elapsed = start.elapsed();
if playtime > elapsed {
thread::sleep(playtime - elapsed);
}

if ctx.should_stop() {
return Ok(());
return Ok(false);
}
}

Expand All @@ -275,7 +315,7 @@ fn audio_broadcast(
}

if ctx.should_stop_timeout(Duration::from_secs(3)) {
return Ok(());
return Ok(false);
}

data = audio.into_inner();
Expand Down
3 changes: 1 addition & 2 deletions datis/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ enum Command {
impl<T> Worker<T> {
pub fn new<F>(f: F) -> Self
where
F: FnOnce(Context) -> T,
F: Send + 'static,
F: Send + 'static + FnOnce(Context) -> T,
T: Send + 'static,
{
let (tx, rx) = channel();
Expand Down

0 comments on commit 2528794

Please sign in to comment.