Skip to content

Commit

Permalink
feat: inital handling of plugin startup and context management
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson committed Aug 13, 2024
1 parent 55a5fba commit ab57345
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 0 deletions.
2 changes: 2 additions & 0 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod git2_log_shim;
mod git2_rustls_transport;
mod log_bridge;
mod metric;
#[allow(unused)]
mod plugin;
mod report;
mod session;
mod setup;
Expand Down
79 changes: 79 additions & 0 deletions hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::plugin::{HcPluginClient, Plugin, PluginContext};
use crate::{hc_error, Result};
use std::process::Command;

const MAX_SPAWN_ATTEMPTS: usize = 5;
const MAX_CONN_ATTEMPTS: usize = 4;

pub struct PluginExecutor {}
impl PluginExecutor {
pub fn new() -> Self {
PluginExecutor {}
}
fn get_available_port(&mut self) -> Result<u16> {
for i in 40000..u16::MAX {
if let Ok(bind) = std::net::TcpListener::bind(format!("127.0.0.1:{i}")) {
drop(bind);
return Ok(i);
}
}
Err(hc_error!("Failed to find available port"))
}
pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result<PluginContext> {
// Plugin startup design has inherent TOCTOU flaws since we tell the plugin
// which port we expect it to bind to. We can try to ensure the port we pass
// on the cmdline is not already in use, but it is still possible for that
// port to become unavailable between our check and the plugin's bind attempt.
// Hence the need for subsequent attempts if we get unlucky
let mut spawn_attempts: usize = 0;
while spawn_attempts < MAX_SPAWN_ATTEMPTS {
// Find free port for process. Don't use spawn_attempts, since
let port = self.get_available_port()?;
let port_str = port.to_string();
// Spawn plugin process
let Ok(mut proc) = Command::new(&plugin.entrypoint)
.args(["--port", port_str.as_str()])
.spawn()
else {
spawn_attempts += 1;
continue;
};
// Potential timing race condition if we try to connect before the plugin
// has had a chance to bind to its port. We try up to MAX_CONN_ATTEMPTS
// times
let mut conn_attempts = 0;
let mut opt_grpc: Option<HcPluginClient> = None;
while conn_attempts < MAX_CONN_ATTEMPTS {
if let Ok(grpc) =
PluginClient::connect(format!("http://127.0.0.1:{port_str}")).await
{
opt_grpc = Some(grpc);
break;
} else {
conn_attempts += 1;
}
}
// If opt_grpc is None, we did not manage to connect to the plugin. Kill it
// and try again
let Some(grpc) = opt_grpc else {
if let Err(e) = proc.kill() {
println!("Failed to kill child process for plugin: {e}");
}
spawn_attempts += 1;
continue;
};
// We now have an open gRPC connection to our plugin process
return Ok(PluginContext {
plugin: plugin.clone(),
port,
grpc,
proc,
});
}
Err(hc_error!(
"Reached max spawn attempts for plugin {}",
plugin.name
))
}
}
13 changes: 13 additions & 0 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod manager;
mod types;

use crate::plugin::manager::*;
pub use crate::plugin::types::*;

pub fn dummy() {
let plugin = Plugin {
name: "dummy".to_owned(),
entrypoint: "./dummy".to_owned(),
};
let manager = PluginExecutor::new();
}
147 changes: 147 additions & 0 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::hipcheck::{
Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty,
Schema as PluginSchema,
};
use crate::{hc_error, Result};
use serde_json::Value;
use std::ops::Not;
use std::process::Child;
use tonic::transport::Channel;

pub type HcPluginClient = PluginClient<Channel>;

#[derive(Clone, Debug)]
pub struct Plugin {
pub name: String,
pub entrypoint: String,
}

// Hipcheck-facing version of struct from crate::hipcheck
pub struct Schema {
pub query_name: String,
pub key_schema: Value,
pub output_schema: Value,
}
impl TryFrom<PluginSchema> for Schema {
type Error = crate::error::Error;
fn try_from(value: PluginSchema) -> Result<Self> {
let key_schema: Value = serde_json::from_str(value.key_schema.as_str())?;
let output_schema: Value = serde_json::from_str(value.output_schema.as_str())?;
Ok(Schema {
query_name: value.query_name,
key_schema,
output_schema,
})
}
}

// Hipcheck-facing version of struct from crate::hipcheck
pub struct ConfigurationResult {
pub status: ConfigurationStatus,
pub message: Option<String>,
}
impl TryFrom<PluginConfigResult> for ConfigurationResult {
type Error = crate::error::Error;
fn try_from(value: PluginConfigResult) -> Result<Self> {
let status: ConfigurationStatus = value.status.try_into()?;
let message = value.message.is_empty().not().then_some(value.message);
Ok(ConfigurationResult { status, message })
}
}
// hipcheck::ConfigurationStatus has an enum that captures both error and success
// scenarios. The below code allows interpreting the struct as a Rust Result. If
// the success variant was the status, Ok(()) is returned, otherwise the code
// is stuffed into a custom error type enum that equals the protoc-generated one
// minus the success variant.
impl ConfigurationResult {
pub fn as_result(&self) -> std::result::Result<(), ConfigError> {
let Ok(error) = self.status.try_into() else {
return Ok(());
};
Err(ConfigError::new(error, self.message.clone()))
}
}
pub enum ConfigErrorType {
Unknown = 0,
MissingRequiredConfig = 2,
UnrecognizedConfig = 3,
InvalidConfigValue = 4,
}
impl TryFrom<ConfigurationStatus> for ConfigErrorType {
type Error = crate::error::Error;
fn try_from(value: ConfigurationStatus) -> Result<Self> {
use ConfigErrorType::*;
use ConfigurationStatus::*;
Ok(match value as i32 {
x if x == ErrorUnknown as i32 => Unknown,
x if x == ErrorMissingRequiredConfiguration as i32 => MissingRequiredConfig,
x if x == ErrorUnrecognizedConfiguration as i32 => UnrecognizedConfig,
x if x == ErrorInvalidConfigurationValue as i32 => InvalidConfigValue,
_ => {
return Err(hc_error!("status value is not an error"));
}
})
}
}
pub struct ConfigError {
error: ConfigErrorType,
message: Option<String>,
}
impl ConfigError {
pub fn new(error: ConfigErrorType, message: Option<String>) -> Self {
ConfigError { error, message }
}
}

// State for managing an actively running plugin process
pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
pub grpc: HcPluginClient,
pub proc: Child,
}
// Redefinition of `grpc` field's functions with more useful types, additional
// error & sanity checking
impl PluginContext {
pub async fn get_query_schemas(&mut self) -> Result<Vec<Schema>> {
let mut res = self.grpc.get_query_schemas(Empty {}).await?;
let stream = res.get_mut();
let mut schemas = vec![];
loop {
let opt_msg = match stream.message().await {
Err(e) => return Err(hc_error!("{}", e)),
Ok(om) => om,
};
match opt_msg {
None => {
break;
}
Some(msg) => {
schemas.push(msg.try_into()?);
}
}
}
Ok(schemas)
}
pub async fn set_configuration(&mut self, conf: &Value) -> Result<ConfigurationResult> {
let conf_query = Configuration {
configuration: serde_json::to_string(&conf)?,
};
let res = self.grpc.set_configuration(conf_query).await?;
res.into_inner().try_into()
}
// TODO - the String in the result should be replaced with a structured
// type once the policy expression code is integrated
pub async fn get_default_policy_expression(&mut self) -> Result<String> {
let mut res = self.grpc.get_default_policy_expression(Empty {}).await?;
Ok(res.get_ref().policy_expression.to_owned())
}
}
impl Drop for PluginContext {
fn drop(&mut self) {
if let Err(e) = self.proc.kill() {
println!("Failed to kill child: {e}");
}
}
}

0 comments on commit ab57345

Please sign in to comment.