diff --git a/hipcheck-common/proto/hipcheck/v1/hipcheck.proto b/hipcheck-common/proto/hipcheck/v1/hipcheck.proto index 63f043c8..41b42775 100644 --- a/hipcheck-common/proto/hipcheck/v1/hipcheck.proto +++ b/hipcheck-common/proto/hipcheck/v1/hipcheck.proto @@ -192,8 +192,11 @@ enum QueryState { // Something has gone wrong. QUERY_STATE_UNSPECIFIED = 0; - // We are submitting a new query. - QUERY_STATE_SUBMIT = 1; + // We are sending a query to a plugin and we are expecting to need to send more chunks + QUERY_STATE_SUBMIT_IN_PROGRESS = 4; + + // We are completed submitting a new query. + QUERY_STATE_SUBMIT_COMPLETE = 1; // We are replying to a query and expect more chunks. QUERY_STATE_REPLY_IN_PROGRESS = 2; diff --git a/hipcheck-common/src/chunk.rs b/hipcheck-common/src/chunk.rs index 7d7967ab..c67fe5d4 100644 --- a/hipcheck-common/src/chunk.rs +++ b/hipcheck-common/src/chunk.rs @@ -36,11 +36,20 @@ fn estimate_size(msg: &PluginQuery) -> usize { } pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result> { - // Chunking only does something on response objects, mostly because - // we don't have a state to represent "SubmitInProgress" - if msg.state == QueryState::Submit as i32 { - return Ok(vec![msg]); - } + // in_progress_state - the state the PluginQuery is in for all queries in the resulting Vec, + // EXCEPT the last one + // + // completion_state - the state the PluginQuery is in if it is the last chunked message + let (in_progress_state, completion_state) = match msg.state() { + // if the message gets chunked, then it must either be a reply or submission that is in process + QueryState::Unspecified => return Err(anyhow!("msg in Unspecified query state")), + QueryState::SubmitInProgress | QueryState::SubmitComplete => { + (QueryState::SubmitInProgress, QueryState::SubmitComplete) + } + QueryState::ReplyInProgress | QueryState::ReplyComplete => { + (QueryState::ReplyInProgress, QueryState::ReplyComplete) + } + }; let mut out: Vec = vec![]; let mut base: PluginQuery = msg; @@ -58,9 +67,9 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result Result 0 && base.key.bytes().len() > 0 { // steal from key - query.key = drain_at_most_n_bytes(&mut base.key, remaining)?; - remaining -= query.key.bytes().len(); + chunked_query.key = drain_at_most_n_bytes(&mut base.key, remaining)?; + remaining -= chunked_query.key.bytes().len(); made_progress = true; } if remaining > 0 && base.output.bytes().len() > 0 { // steal from output - query.output = drain_at_most_n_bytes(&mut base.output, remaining)?; - remaining -= query.output.bytes().len(); + chunked_query.output = drain_at_most_n_bytes(&mut base.output, remaining)?; + remaining -= chunked_query.output.bytes().len(); made_progress = true; } @@ -96,7 +105,7 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result Result return Err(Error::UnspecifiedQueryState), - QueryState::Submit => return Err(Error::ReceivedSubmitWhenExpectingReplyChunk), + QueryState::SubmitInProgress | QueryState::SubmitComplete => { + return Err(Error::ReceivedSubmitWhenExpectingReplyChunk) + } QueryState::ReplyInProgress | QueryState::ReplyComplete => { if state == QueryState::ReplyComplete { raw.state = QueryState::ReplyComplete.into(); @@ -209,23 +225,40 @@ mod test { #[test] fn test_chunking() { - let query = PluginQuery { - id: 0, - state: QueryState::ReplyComplete as i32, - publisher_name: "".to_owned(), - plugin_name: "".to_owned(), - query_name: "".to_owned(), - // This key will cause the chunk not to occur on a char boundary - key: "aこれは実験です".to_owned(), - output: "".to_owned(), - concern: vec!["< 10".to_owned(), "0123456789".to_owned()], - }; - let res = match chunk_with_size(query, 10) { - Ok(r) => r, - Err(e) => { - panic!("{e}"); - } - }; - assert_eq!(res.len(), 4); + // test both reply and submission chunking + let states = [ + (QueryState::SubmitInProgress, QueryState::SubmitComplete), + (QueryState::ReplyInProgress, QueryState::ReplyComplete), + ]; + + for (intermediate_state, final_state) in states.into_iter() { + let query = PluginQuery { + id: 0, + state: final_state as i32, + publisher_name: "".to_owned(), + plugin_name: "".to_owned(), + query_name: "".to_owned(), + // This key will cause the chunk not to occur on a char boundary + key: "aこれは実験です".to_owned(), + output: "".to_owned(), + concern: vec!["< 10".to_owned(), "0123456789".to_owned()], + }; + let res = match chunk_with_size(query, 10) { + Ok(r) => r, + Err(e) => { + panic!("{e}"); + } + }; + // ensure first 3 are ...InProgress + assert_eq!( + res.iter() + .filter(|x| x.state() == intermediate_state) + .count(), + 3 + ); + // ensure last one is ...Complete + assert_eq!(res.last().unwrap().state(), final_state); + assert_eq!(res.len(), 4); + } } } diff --git a/hipcheck-common/src/error.rs b/hipcheck-common/src/error.rs index 5fcbb3a0..c2dad06d 100644 --- a/hipcheck-common/src/error.rs +++ b/hipcheck-common/src/error.rs @@ -11,6 +11,10 @@ pub enum Error { #[error("unexpected ReplyInProgress state for query")] UnexpectedReplyInProgress, + /// The `PluginEngine` received a message with the unexpected status `RequestInProgress` + #[error("unexpected RequestInProgress state for query")] + UnexpectedRequestInProgress, + /// The `PluginEngine` received a message with a request-type status when it expected a reply #[error("remote sent QuerySubmit when reply chunk expected")] ReceivedSubmitWhenExpectingReplyChunk, diff --git a/hipcheck-common/src/types.rs b/hipcheck-common/src/types.rs index 6944959a..34219f46 100644 --- a/hipcheck-common/src/types.rs +++ b/hipcheck-common/src/types.rs @@ -29,7 +29,8 @@ impl TryFrom for QueryDirection { fn try_from(value: QueryState) -> Result { match value { QueryState::Unspecified => Err(Error::UnspecifiedQueryState), - QueryState::Submit => Ok(QueryDirection::Request), + QueryState::SubmitInProgress => Err(Error::UnexpectedRequestInProgress), + QueryState::SubmitComplete => Ok(QueryDirection::Request), QueryState::ReplyInProgress => Err(Error::UnexpectedReplyInProgress), QueryState::ReplyComplete => Ok(QueryDirection::Response), } @@ -39,7 +40,7 @@ impl TryFrom for QueryDirection { impl From for QueryState { fn from(value: QueryDirection) -> Self { match value { - QueryDirection::Request => QueryState::Submit, + QueryDirection::Request => QueryState::SubmitComplete, QueryDirection::Response => QueryState::ReplyComplete, } } diff --git a/sdk/rust/src/error.rs b/sdk/rust/src/error.rs index d823dcea..1e5340a8 100644 --- a/sdk/rust/src/error.rs +++ b/sdk/rust/src/error.rs @@ -71,6 +71,7 @@ impl From for Error { use hipcheck_common::error::Error::*; match value { UnspecifiedQueryState => Error::UnspecifiedQueryState, + UnexpectedRequestInProgress => Error::UnexpectedReplyInProgress, UnexpectedReplyInProgress => Error::UnexpectedReplyInProgress, ReceivedSubmitWhenExpectingReplyChunk => Error::ReceivedSubmitWhenExpectingReplyChunk, MoreAfterQueryComplete { id } => Error::MoreAfterQueryComplete { id }, diff --git a/sdk/rust/src/plugin_engine.rs b/sdk/rust/src/plugin_engine.rs index a2b14aca..705fa65a 100644 --- a/sdk/rust/src/plugin_engine.rs +++ b/sdk/rust/src/plugin_engine.rs @@ -431,7 +431,7 @@ impl HcSessionSocket { return Ok(HandleAction::ForwardMsgToExistingSession(tx)); } - if query.state() == QueryState::Submit { + if [QueryState::SubmitInProgress, QueryState::SubmitComplete].contains(&query.state()) { return Ok(HandleAction::CreateSession); } diff --git a/xtask/src/task/mod.rs b/xtask/src/task/mod.rs index f944fc57..67b77ddb 100644 --- a/xtask/src/task/mod.rs +++ b/xtask/src/task/mod.rs @@ -6,6 +6,6 @@ pub mod buf; pub mod changelog; pub mod check; pub mod ci; +pub mod manifest; pub mod rfd; pub mod site; -pub mod manifest;