Skip to content

Commit

Permalink
feat: implement submit chunking for RFD 0010
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Casey <[email protected]>
  • Loading branch information
patrickjcasey committed Dec 20, 2024
1 parent a02ceea commit 26d697a
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 38 deletions.
7 changes: 5 additions & 2 deletions hipcheck-common/proto/hipcheck/v1/hipcheck.proto
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ enum QueryState {
// Something has gone wrong.
QUERY_STATE_UNSPECIFIED = 0;

// We are submitting a new query.
QUERY_STATE_SUBMIT = 1;
// We are sending a query to a plugin and we are expecting to need to send more chunks
UERY_STATE_SUBMIT_IN_PROGRESS = 4;

// We are completed submitting a new query.
QUERY_STATE_SUBMIT_COMPLETE = 1;

// We are replying to a query and expect more chunks.
QUERY_STATE_REPLY_IN_PROGRESS = 2;
Expand Down
97 changes: 65 additions & 32 deletions hipcheck-common/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ fn estimate_size(msg: &PluginQuery) -> usize {
}

pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<PluginQuery>> {
// Chunking only does something on response objects, mostly because
// we don't have a state to represent "SubmitInProgress"
if msg.state == QueryState::Submit as i32 {
return Ok(vec![msg]);
}
// in_progress_state - the state the PluginQuery is in for all queries in the resulting Vec,
// EXCEPT the last one
//
// completion_state - the state the PluginQuery is in if it is the last chunked message
let (in_progress_state, completion_state) = match msg.state() {
// if the message gets chunked, then it must either be a reply or submission that is in process
QueryState::Unspecified => return Err(anyhow!("msg in Unspecified query state")),
QueryState::SubmitInProgress | QueryState::SubmitComplete => {
(QueryState::SubmitInProgress, QueryState::SubmitComplete)
}
QueryState::ReplyInProgress | QueryState::ReplyComplete => {
(QueryState::ReplyInProgress, QueryState::ReplyComplete)
}
};

let mut out: Vec<PluginQuery> = vec![];
let mut base: PluginQuery = msg;
Expand All @@ -58,9 +67,9 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
// For this loop, we want to take at most MAX_SIZE bytes because that's
// all that can fit in a PluginQuery
let mut remaining = max_est_size;
let mut query = PluginQuery {
let mut chunked_query = PluginQuery {
id: base.id,
state: QueryState::ReplyInProgress as i32,
state: in_progress_state as i32,
publisher_name: base.publisher_name.clone(),
plugin_name: base.plugin_name.clone(),
query_name: base.query_name.clone(),
Expand All @@ -71,15 +80,15 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug

if remaining > 0 && base.key.bytes().len() > 0 {
// steal from key
query.key = drain_at_most_n_bytes(&mut base.key, remaining)?;
remaining -= query.key.bytes().len();
chunked_query.key = drain_at_most_n_bytes(&mut base.key, remaining)?;
remaining -= chunked_query.key.bytes().len();
made_progress = true;
}

if remaining > 0 && base.output.bytes().len() > 0 {
// steal from output
query.output = drain_at_most_n_bytes(&mut base.output, remaining)?;
remaining -= query.output.bytes().len();
chunked_query.output = drain_at_most_n_bytes(&mut base.output, remaining)?;
remaining -= chunked_query.output.bytes().len();
made_progress = true;
}

Expand All @@ -96,7 +105,7 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
} else if c_bytes <= remaining {
// steal this concern
let concern = base.concern.swap_remove(i);
query.concern.push(concern);
chunked_query.concern.push(concern);
remaining -= c_bytes;
made_progress = true;
}
Expand All @@ -106,9 +115,14 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
l -= 1;
}

out.push(query);
out.push(chunked_query);
}
out.push(base);

// ensure the last message in the chunked messages is set to the appropriate Complete state
if let Some(last) = out.last_mut() {
last.state = completion_state as i32;
}
Ok(out)
}

Expand Down Expand Up @@ -162,7 +176,9 @@ impl QuerySynthesizer {
.map_err(|_| Error::UnspecifiedQueryState)?;
match state {
QueryState::Unspecified => return Err(Error::UnspecifiedQueryState),
QueryState::Submit => return Err(Error::ReceivedSubmitWhenExpectingReplyChunk),
QueryState::SubmitInProgress | QueryState::SubmitComplete => {
return Err(Error::ReceivedSubmitWhenExpectingReplyChunk)
}
QueryState::ReplyInProgress | QueryState::ReplyComplete => {
if state == QueryState::ReplyComplete {
raw.state = QueryState::ReplyComplete.into();
Expand Down Expand Up @@ -209,23 +225,40 @@ mod test {

#[test]
fn test_chunking() {
let query = PluginQuery {
id: 0,
state: QueryState::ReplyComplete as i32,
publisher_name: "".to_owned(),
plugin_name: "".to_owned(),
query_name: "".to_owned(),
// This key will cause the chunk not to occur on a char boundary
key: "aこれは実験です".to_owned(),
output: "".to_owned(),
concern: vec!["< 10".to_owned(), "0123456789".to_owned()],
};
let res = match chunk_with_size(query, 10) {
Ok(r) => r,
Err(e) => {
panic!("{e}");
}
};
assert_eq!(res.len(), 4);
// test both reply and submission chunking
let states = [
(QueryState::SubmitInProgress, QueryState::SubmitComplete),
(QueryState::ReplyInProgress, QueryState::ReplyComplete),
];

for (intermediate_state, final_state) in states.into_iter() {
let query = PluginQuery {
id: 0,
state: final_state as i32,
publisher_name: "".to_owned(),
plugin_name: "".to_owned(),
query_name: "".to_owned(),
// This key will cause the chunk not to occur on a char boundary
key: "aこれは実験です".to_owned(),
output: "".to_owned(),
concern: vec!["< 10".to_owned(), "0123456789".to_owned()],
};
let res = match chunk_with_size(query, 10) {
Ok(r) => r,
Err(e) => {
panic!("{e}");
}
};
// ensure first 3 are ...InProgress
assert_eq!(
res.iter()
.filter(|x| x.state() == intermediate_state)
.count(),
3
);
// ensure last one is ...Complete
assert_eq!(res.last().unwrap().state(), final_state);
assert_eq!(res.len(), 4);
}
}
}
4 changes: 4 additions & 0 deletions hipcheck-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub enum Error {
#[error("unexpected ReplyInProgress state for query")]
UnexpectedReplyInProgress,

/// The `PluginEngine` received a message with the unexpected status `RequestInProgress`
#[error("unexpected RequestInProgress state for query")]
UnexpectedRequestInProgress,

/// The `PluginEngine` received a message with a request-type status when it expected a reply
#[error("remote sent QuerySubmit when reply chunk expected")]
ReceivedSubmitWhenExpectingReplyChunk,
Expand Down
5 changes: 3 additions & 2 deletions hipcheck-common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ impl TryFrom<QueryState> for QueryDirection {
fn try_from(value: QueryState) -> Result<Self, Self::Error> {
match value {
QueryState::Unspecified => Err(Error::UnspecifiedQueryState),
QueryState::Submit => Ok(QueryDirection::Request),
QueryState::SubmitInProgress => Err(Error::UnexpectedRequestInProgress),
QueryState::SubmitComplete => Ok(QueryDirection::Request),
QueryState::ReplyInProgress => Err(Error::UnexpectedReplyInProgress),
QueryState::ReplyComplete => Ok(QueryDirection::Response),
}
Expand All @@ -39,7 +40,7 @@ impl TryFrom<QueryState> for QueryDirection {
impl From<QueryDirection> for QueryState {
fn from(value: QueryDirection) -> Self {
match value {
QueryDirection::Request => QueryState::Submit,
QueryDirection::Request => QueryState::SubmitComplete,
QueryDirection::Response => QueryState::ReplyComplete,
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl From<hipcheck_common::error::Error> for Error {
use hipcheck_common::error::Error::*;
match value {
UnspecifiedQueryState => Error::UnspecifiedQueryState,
UnexpectedRequestInProgress => Error::UnexpectedReplyInProgress,
UnexpectedReplyInProgress => Error::UnexpectedReplyInProgress,
ReceivedSubmitWhenExpectingReplyChunk => Error::ReceivedSubmitWhenExpectingReplyChunk,
MoreAfterQueryComplete { id } => Error::MoreAfterQueryComplete { id },
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/src/plugin_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl HcSessionSocket {
return Ok(HandleAction::ForwardMsgToExistingSession(tx));
}

if query.state() == QueryState::Submit {
if [QueryState::SubmitInProgress, QueryState::SubmitComplete].contains(&query.state()) {
return Ok(HandleAction::CreateSession);
}

Expand Down
2 changes: 1 addition & 1 deletion xtask/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub mod buf;
pub mod changelog;
pub mod check;
pub mod ci;
pub mod manifest;
pub mod rfd;
pub mod site;
pub mod manifest;

0 comments on commit 26d697a

Please sign in to comment.