Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin Salsa integration #298

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions hipcheck/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#![allow(unused)]

use crate::plugin::{ActivePlugin, HcPluginCore, PluginExecutor, PluginResponse, PluginWithConfig};
use crate::{hc_error, Result};
use serde_json::Value;
use std::sync::{Arc, LazyLock};
use tokio::runtime::Runtime;

// Salsa doesn't natively support async functions, so our recursive `query()` function that
// interacts with plugins (which use async) has to get a handle to the underlying runtime,
// spawn and block on a task to query the plugin, then choose whether to recurse or return.

static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());

#[salsa::query_group(HcEngineStorage)]
pub trait HcEngine: salsa::Database {
#[salsa::input]
fn core(&self) -> Arc<HcPluginCore>;

fn query(&self, publisher: String, plugin: String, query: String, key: Value) -> Result<Value>;
}

fn query(
db: &dyn HcEngine,
publisher: String,
plugin: String,
query: String,
key: Value,
) -> Result<Value> {
let runtime = RUNTIME.handle();
let core = db.core();
// Find the plugin
let Some(p_handle) = core.plugins.get(&plugin) else {
return Err(hc_error!("No such plugin {}::{}", publisher, plugin));
};
// Initiate the query. If remote closed or we got our response immediately,
// return
let mut ar = match runtime.block_on(p_handle.query(query, key))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => return Ok(v),
PluginResponse::AwaitingResult(a) => a,
};
// Otherwise, the plugin needs more data to continue. Recursively query
// (with salsa memo-ization) to get the needed data, and resume our
// current query by providing the plugin the answer.
loop {
let answer = db.query(
ar.publisher.clone(),
ar.plugin.clone(),
ar.query.clone(),
ar.key.clone(),
)?;
ar = match runtime.block_on(p_handle.resume_query(ar, answer))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => return Ok(v),
PluginResponse::AwaitingResult(a) => a,
};
}
}

#[salsa::database(HcEngineStorage)]
pub struct HcEngineImpl {
// Query storage
storage: salsa::Storage<Self>,
}

impl salsa::Database for HcEngineImpl {}

impl HcEngineImpl {
// Really HcEngineImpl and HcPluginCore do the same thing right now, except HcPluginCore
// has an async constructor. If we can manipulate salsa to accept async functions, we
// could consider merging the two structs. Although maybe its wise to keep HcPluginCore
// independent of Salsa.
pub fn new(executor: PluginExecutor, plugins: Vec<(PluginWithConfig)>) -> Result<Self> {
let runtime = RUNTIME.handle();
let core = runtime.block_on(HcPluginCore::new(executor, plugins))?;
let mut engine = HcEngineImpl {
storage: Default::default(),
};
engine.set_core(Arc::new(core));
Ok(engine)
}
// TODO - "run" function that takes analysis heirarchy and target, and queries each
// analysis plugin to kick off the execution
}
53 changes: 26 additions & 27 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod command_util;
mod config;
mod context;
mod data;
mod engine;
mod error;
mod git2_log_shim;
mod git2_rustls_transport;
Expand Down Expand Up @@ -37,9 +38,10 @@ use crate::analysis::report_builder::Report;
use crate::analysis::score::score_results;
use crate::cache::HcCache;
use crate::context::Context as _;
use crate::engine::{HcEngine, HcEngineImpl};
use crate::error::Error;
use crate::error::Result;
use crate::plugin::{HcPluginCore, Plugin, PluginExecutor, PluginWithConfig};
use crate::plugin::{Plugin, PluginExecutor, PluginWithConfig};
use crate::session::session::Session;
use crate::setup::{resolve_and_transform_source, SourceType};
use crate::shell::verbosity::Verbosity;
Expand Down Expand Up @@ -640,7 +642,6 @@ fn check_github_token() -> StdResult<(), EnvVarCheckError> {
}

fn cmd_plugin() {
use tokio::runtime::Runtime;
let tgt_dir = "./target/debug";
let entrypoint = pathbuf![tgt_dir, "dummy_rand_data"];
let plugin = Plugin {
Expand All @@ -655,30 +656,29 @@ fn cmd_plugin() {
/* jitter_percent */ 10,
)
.unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("Started executor");
let mut core = match HcPluginCore::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
)
.await
{
Ok(c) => c,
Err(e) => {
println!("{e}");
return;
}
};
match core.run().await {
Ok(_) => {
println!("HcCore run completed");
}
Err(e) => {
println!("HcCore run failed with '{e}'");
}
};
});
let engine = match HcEngineImpl::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
) {
Ok(e) => e,
Err(e) => {
println!("Failed to create engine: {e}");
return;
}
};
let res = match engine.query(
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(7),
) {
Ok(r) => r,
Err(e) => {
println!("Query failed: {e}");
return;
}
};
println!("Result: {res}");
}

fn cmd_ready(config: &CliConfig) {
Expand Down Expand Up @@ -728,7 +728,6 @@ fn cmd_ready(config: &CliConfig) {
Err(e) => println!("{:<17} {}", "Policy Path:", e),
}


match &ready.github_token_check {
Ok(_) => println!("{:<17} Found!", "GitHub Token:"),
Err(e) => println!("{:<17} {}", "GitHub Token:", e),
Expand Down
15 changes: 4 additions & 11 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub async fn initialize_plugins(
Ok(out)
}

struct ActivePlugin {
#[derive(Debug)]
pub struct ActivePlugin {
next_id: Mutex<usize>,
channel: PluginTransport,
}
Expand Down Expand Up @@ -91,9 +92,10 @@ impl ActivePlugin {
}
}

#[derive(Debug)]
pub struct HcPluginCore {
executor: PluginExecutor,
plugins: HashMap<String, ActivePlugin>,
pub plugins: HashMap<String, ActivePlugin>,
}
impl HcPluginCore {
// When this object is returned, the plugins are all connected but the
Expand Down Expand Up @@ -128,13 +130,4 @@ impl HcPluginCore {
// Now we have a set of started and initialized plugins to interact with
Ok(HcPluginCore { executor, plugins })
}
// @Temporary
pub async fn run(&mut self) -> Result<()> {
let handle = self.plugins.get("rand_data").unwrap();
let resp = handle
.query("rand_data".to_owned(), serde_json::json!(7))
.await?;
println!("Plugin response: {resp:?}");
Ok(())
}
}
4 changes: 4 additions & 0 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Plugin {
}

// Hipcheck-facing version of struct from crate::hipcheck
#[derive(Clone, Debug)]
pub struct Schema {
pub query_name: String,
pub key_schema: Value,
Expand Down Expand Up @@ -118,6 +119,7 @@ impl std::fmt::Display for ConfigError {
}

// State for managing an actively running plugin process
#[derive(Debug)]
pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
Expand Down Expand Up @@ -273,6 +275,7 @@ impl TryFrom<Query> for PluginQuery {
}
}

#[derive(Debug)]
pub struct MultiplexedQueryReceiver {
rx: Streaming<PluginQuery>,
backlog: HashMap<i32, VecDeque<PluginQuery>>,
Expand Down Expand Up @@ -314,6 +317,7 @@ impl MultiplexedQueryReceiver {

// Encapsulate an "initialized" state of a Plugin with interfaces that abstract
// query chunking to produce whole messages for the Hipcheck engine
#[derive(Debug)]
pub struct PluginTransport {
pub schemas: HashMap<String, Schema>,
pub default_policy_expr: String, // TODO - update with policy_expr type
Expand Down