Skip to content

Commit

Permalink
fix: recreate read stream on error (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
mierak authored Nov 7, 2024
1 parent d1087e7 commit 67771d6
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ All notable changes to this project will be documented in this file.
- Scrolloff issues in Playlists pane after rename/move
- Few typos in UI and internal messages
- Click to select and rendering issues in SongInfo and Decoder modals
- Read stream not being emptied after encountering error while reading MPD's response

### Deprecated

Expand Down
10 changes: 10 additions & 0 deletions src/mpd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ impl<'name> Client<'name> {
pub fn send<'cmd>(&mut self, command: &'cmd str) -> Result<ProtoClient<'cmd, '_, Self>, MpdError> {
ProtoClient::new(command, self)
}

fn clear_read_buf(&mut self) -> Result<()> {
log::trace!("Reinitialized read buffer");
self.rx = BufReader::new(self.stream.try_clone()?);
Ok(())
}
}

impl<'name> SocketClient for Client<'name> {
Expand All @@ -226,4 +232,8 @@ impl<'name> SocketClient for Client<'name> {
fn read(&mut self) -> &mut impl BufRead {
&mut self.rx
}

fn clear_read_buf(&mut self) -> Result<()> {
self.clear_read_buf()
}
}
116 changes: 79 additions & 37 deletions src/mpd/proto_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub trait SocketClient {
fn reconnect(&mut self) -> MpdResult<&impl SocketClient>;
fn write(&mut self, bytes: &[u8]) -> std::io::Result<()>;
fn read(&mut self) -> &mut impl BufRead;
fn clear_read_buf(&mut self) -> Result<()>;
}

impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
Expand All @@ -50,6 +51,7 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
}

fn execute(&mut self, command: &str) -> Result<&mut Self, MpdError> {
trace!(command = self.command; "Executing command");
if let Err(e) = self.client.write([command, "\n"].concat().as_bytes()) {
if e.kind() == std::io::ErrorKind::BrokenPipe {
self.client.reconnect()?;
Expand All @@ -65,16 +67,28 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {

pub(super) fn read_ok(mut self) -> Result<(), MpdError> {
trace!(command = self.command; "Reading command");
let read = self.client.read();
match Self::read_line(read) {
match self.read_line() {
Ok(MpdLine::Ok) => Ok(()),
Ok(MpdLine::Value(val)) => Err(MpdError::Generic(format!("Expected 'OK' but got '{val}'"))),
Err(MpdError::ClientClosed) => {
self.client.reconnect()?;
self.execute(self.command)?;
self.read_ok()
}
Err(e) => Err(e),
Err(e) => {
self.client.clear_read_buf()?;
Err(e)
}
}
}

fn next<V: FromMpd>(&mut self, v: &mut V, val: String) -> Result<(), MpdError> {
match v.next(val) {
Ok(val) => Ok(val),
Err(e) => {
self.client.clear_read_buf()?;
Err(e)
}
}
}

Expand All @@ -84,17 +98,19 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
{
trace!(command = self.command; "Reading command");
let mut result = V::default();
let read = self.client.read();
loop {
match Self::read_line(read) {
match self.read_line() {
Ok(MpdLine::Ok) => return Ok(result),
Ok(MpdLine::Value(val)) => result.next(val)?,
Ok(MpdLine::Value(val)) => self.next(&mut result, val)?,
Err(MpdError::ClientClosed) => {
self.client.reconnect()?;
self.execute(self.command)?;
return self.read_response::<V>();
}
Err(e) => return Err(e),
Err(e) => {
self.client.clear_read_buf()?;
return Err(e);
}
};
}
}
Expand All @@ -106,20 +122,22 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
trace!(command = self.command; "Reading command");
let mut result = V::default();
let mut found_any = false;
let read = self.client.read();
loop {
match Self::read_line(read) {
match self.read_line() {
Ok(MpdLine::Ok) => return if found_any { Ok(Some(result)) } else { Ok(None) },
Ok(MpdLine::Value(val)) => {
found_any = true;
result.next(val)?;
self.next(&mut result, val)?;
}
Err(MpdError::ClientClosed) => {
self.client.reconnect()?;
self.execute(self.command)?;
return self.read_opt_response::<V>();
}
Err(e) => return Err(e),
Err(e) => {
self.client.clear_read_buf()?;
return Err(e);
}
}
}
}
Expand All @@ -134,28 +152,35 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
self.execute(&format!("{} {}", self.command, buf.len()))?;
self._read_bin(&mut buf)
}
Err(e) => Err(e),
Err(e) => {
self.client.clear_read_buf()?;
Err(e)
}
};
loop {
self.execute(&format!("{} {}", self.command, buf.len()))?;
if let Some(response) = self._read_bin(&mut buf)? {
if buf.len() >= response.size_total as usize || response.bytes_read == 0 {
trace!( len = buf.len();"Finshed reading binary response");
break;
match self._read_bin(&mut buf) {
Ok(Some(response)) => {
if buf.len() >= response.size_total as usize || response.bytes_read == 0 {
trace!( len = buf.len();"Finshed reading binary response");
break;
}
}
Ok(None) => return Ok(None),
Err(e) => {
self.client.clear_read_buf()?;
return Err(e);
}
} else {
return Ok(None);
}
}
Ok(Some(buf))
}

fn _read_bin(&mut self, binary_buf: &mut Vec<u8>) -> Result<Option<BinaryMpdResponse>, MpdError> {
let mut result = BinaryMpdResponse::default();
let read = self.client.read();
{
loop {
match Self::read_line(read)? {
match self.read_line()? {
MpdLine::Ok => {
log::warn!("Expected binary data but got 'OK'");
return Ok(None);
Expand All @@ -179,16 +204,19 @@ impl<'cmd, 'client, C: SocketClient> ProtoClient<'cmd, 'client, C> {
};
}
}

let read = self.client.read();
let mut handle = read.take(result.bytes_read);
let _ = handle.read_to_end(binary_buf)?;
let _ = read.read_line(&mut String::new()); // MPD prints an empty new line at the end of binary response
match Self::read_line(read)? {
match self.read_line()? {
MpdLine::Ok => Ok(Some(result)),
MpdLine::Value(val) => Err(MpdError::Generic(format!("Expected 'OK' but got '{val}'"))),
}
}

fn read_line<R: BufRead>(read: &mut R) -> Result<MpdLine, MpdError> {
fn read_line(&mut self) -> Result<MpdLine, MpdError> {
let read = self.client.read();
let mut line = String::new();

let bytes_read = match read.read_line(&mut line) {
Expand Down Expand Up @@ -260,48 +288,60 @@ mod tests {
fn read(&mut self) -> &mut impl std::io::BufRead {
&mut self.read
}

fn clear_read_buf(&mut self) -> anyhow::Result<()> {
Ok(())
}
}

mod read_mpd_line {

use std::io::Cursor;
use std::io::{BufReader, Cursor};

use crate::tests::fixtures::mpd_client::{client, TestMpdClient};
use rstest::rstest;

use crate::mpd::{
errors::{ErrorCode, MpdError, MpdFailureResponse},
proto_client::{tests::TestClient, MpdLine, ProtoClient},
proto_client::{MpdLine, ProtoClient},
};

#[test]
fn returns_ok() {
let result = ProtoClient::<TestClient>::read_line(&mut Cursor::new(b"OK enenene"));
#[rstest]
fn returns_ok(mut client: TestMpdClient) {
client.set_read_content(Box::new(Cursor::new(b"OK enenene")));
let mut client = ProtoClient::new("", &mut client).unwrap();
let result = client.read_line();

assert_eq!(Ok(MpdLine::Ok), result);
}

#[test]
fn returns_ok_for_list_ok() {
let result = ProtoClient::<TestClient>::read_line(&mut Cursor::new(b"list_OK enenene"));
#[rstest]
fn returns_ok_for_list_ok(mut client: TestMpdClient) {
client.set_read_content(Box::new(Cursor::new(b"list_OK enenene")));
let mut client = ProtoClient::new("", &mut client).unwrap();
let result = client.read_line();

assert_eq!(Ok(MpdLine::Ok), result);
}

#[test]
fn returns_mpd_err() {
#[rstest]
fn returns_mpd_err(mut client: TestMpdClient) {
let err = MpdFailureResponse {
code: ErrorCode::PlayerSync,
command_list_index: 2,
command: "some_cmd".to_string(),
message: "error message boi".to_string(),
};

let result =
ProtoClient::<TestClient>::read_line(&mut Cursor::new(b"ACK [55@2] {some_cmd} error message boi"));
client.set_read_content(Box::new(Cursor::new(b"ACK [55@2] {some_cmd} error message boi")));
let mut client = ProtoClient::new("", &mut client).unwrap();
let result = client.read_line();

assert_eq!(Err(MpdError::Mpd(err)), result);
}

#[test]
fn returns_client_closed_on_broken_pipe() {
#[rstest]
fn returns_client_closed_on_broken_pipe(mut client: TestMpdClient) {
struct Mock;
impl std::io::BufRead for Mock {
fn consume(&mut self, _amt: usize) {}
Expand All @@ -315,7 +355,9 @@ mod tests {
}
}

let result = ProtoClient::<TestClient>::read_line(&mut Mock);
client.set_read(BufReader::new(Box::new(Mock)));
let mut client = ProtoClient::new("", &mut client).unwrap();
let result = client.read_line();

assert_eq!(Err(MpdError::ClientClosed), result);
}
Expand Down
39 changes: 37 additions & 2 deletions src/tests/fixtures/mpd_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, ops::AddAssign, time::Duration};
use std::{
collections::HashMap,
io::{BufRead, BufReader, Cursor},
ops::AddAssign,
time::Duration,
};

use itertools::Itertools;
use rstest::fixture;
Expand All @@ -10,6 +15,7 @@ use crate::mpd::{
},
errors::MpdError,
mpd_client::{Filter, MpdClient, QueueMoveTarget, SaveMode, SingleOrRange, Tag, ValueChange},
proto_client::SocketClient,
};

#[fixture]
Expand Down Expand Up @@ -65,6 +71,7 @@ pub fn client() -> TestMpdClient {
volume: Volume::new(100),
status: Status::default(),
calls: HashMap::default(),
rx: BufReader::new(Box::new(Cursor::new(String::new()))),
}
}

Expand All @@ -73,7 +80,6 @@ pub struct TestPlaylist {
pub name: String,
}

#[derive(Default)]
pub struct TestMpdClient {
pub songs: Vec<Song>,
pub queue: Vec<usize>,
Expand All @@ -82,6 +88,17 @@ pub struct TestMpdClient {
pub volume: Volume,
pub status: Status,
pub calls: HashMap<String, u32>,
pub rx: BufReader<Box<dyn BufRead>>,
}

impl TestMpdClient {
pub fn set_read_content(&mut self, content: Box<dyn BufRead>) {
self.rx = BufReader::new(Box::new(content));
}

pub fn set_read(&mut self, read: BufReader<Box<dyn BufRead>>) {
self.rx = read;
}
}

type MpdResult<T> = Result<T, MpdError>;
Expand Down Expand Up @@ -499,3 +516,21 @@ impl MpdClient for TestMpdClient {
todo!("Not yet implemented")
}
}

impl SocketClient for TestMpdClient {
fn reconnect(&mut self) -> MpdResult<&impl SocketClient> {
Ok(self)
}

fn write(&mut self, _bytes: &[u8]) -> std::io::Result<()> {
Ok(())
}

fn read(&mut self) -> &mut impl BufRead {
&mut self.rx
}

fn clear_read_buf(&mut self) -> anyhow::Result<()> {
Ok(())
}
}

0 comments on commit 67771d6

Please sign in to comment.