From 1d792d0592d39b6f17abc97601e21b5d0589475f Mon Sep 17 00:00:00 2001 From: jiahao6635 Date: Thu, 25 Jan 2024 01:36:42 +0800 Subject: [PATCH 1/3] Modification of joint investigation issues --- src/context.rs | 3 ++- src/handlers/file.rs | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/context.rs b/src/context.rs index 77e04ae..0f87802 100644 --- a/src/context.rs +++ b/src/context.rs @@ -16,6 +16,7 @@ use crate::config::Config; use amp_client::client::Client; use amp_common::scm::client::Client as ScmClient; use amp_common::scm::driver::github; +use amp_common::scm::driver::github::constants::GITHUB_ENDPOINT; use std::sync::Arc; /// The core type through which handler functions can access common API state. @@ -36,7 +37,7 @@ pub struct Context { impl Context { pub async fn new(config: Config) -> anyhow::Result { let client = Arc::new(Client::new(&config.amp_server, config.auth_token.clone())); - let github_client = Arc::new(ScmClient::new(github::default())); + let github_client = Arc::new(ScmClient::new(github::new(GITHUB_ENDPOINT, config.auth_token.clone()))); Ok(Context { config, client, github_client }) } } diff --git a/src/handlers/file.rs b/src/handlers/file.rs index d4b5bcd..8e2cd72 100644 --- a/src/handlers/file.rs +++ b/src/handlers/file.rs @@ -67,10 +67,7 @@ pub async fn get(State(ctx): State>, Path((id, path)): Path<(Uuid, )] pub async fn create( State(ctx): State>, - - Path(id): Path, - Path(path): Path, - + Path((id, path)): Path<(Uuid, String)>, Json(req): Json, ) -> Result { Ok((StatusCode::CREATED, Json(FileService::create(ctx, id, path, req.content).await?))) From be2edf539a50b15baf11fe4b16e907cc0bc2a82b Mon Sep 17 00:00:00 2001 From: jiahao6635 Date: Fri, 26 Jan 2024 00:14:58 +0800 Subject: [PATCH 2/3] Implement the log interface to return Sse --- Cargo.lock | 2 ++ Cargo.toml | 2 ++ src/handlers/logger.rs | 27 +++++++++++++++++++++------ src/services/logger.rs | 8 ++++++-- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5007f7e..ef0ab80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1820,6 +1820,8 @@ dependencies = [ "dotenv", "futures", "headers", + "log", + "reqwest-eventsource", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 0591e40..551c898 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.7.7 anyhow = "1.0" axum = { version = "0.7.4" } clap = { version = "4.4.12", features = ["derive", "env"] } +reqwest-eventsource = "0.5.0" dotenv = "0.15.0" futures = "0.3" headers = "0.4" @@ -37,3 +38,4 @@ utoipa-swagger-ui = { version = "6.0.0", features = ["axum"] } uuid = { version = "1.6.1", features = ["serde", "v4", "fast-rng", "macro-diagnostics"] } url = "2.4.1" chrono = "0.4.31" +log = "0.4.20" diff --git a/src/handlers/logger.rs b/src/handlers/logger.rs index c79b927..d98b50f 100644 --- a/src/handlers/logger.rs +++ b/src/handlers/logger.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::sync::Arc; use axum::extract::{Path, State}; -use axum::http::StatusCode; -use axum::response::IntoResponse; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::{Stream, StreamExt}; use uuid::Uuid; use crate::context::Context; -use crate::errors::Result; +use crate::errors::{ApiError, Result}; use crate::services::LoggerService; // The Logging Service Handlers. @@ -37,7 +38,21 @@ use crate::services::LoggerService; ), tag = "Logging" )] -pub async fn logs(Path(id): Path, State(ctx): State>) -> Result { - LoggerService::logs(ctx, id).await; - Ok(StatusCode::OK) +pub async fn logs( + Path(id): Path, + State(ctx): State>, +) -> Result>>, ApiError> { + let event_source = LoggerService::logs(ctx, id).await?; + + let stream = event_source + .map(|line| { + if let Ok(reqwest_eventsource::Event::Message(message)) = line { + Event::default().data(message.event) + } else { + Event::default() + } + }) + .map(Ok); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) } diff --git a/src/services/logger.rs b/src/services/logger.rs index 281e61a..07fea3c 100644 --- a/src/services/logger.rs +++ b/src/services/logger.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use reqwest_eventsource::EventSource; use std::sync::Arc; use uuid::Uuid; use crate::context::Context; +use crate::errors::ApiError; pub struct LoggerService; impl LoggerService { - pub async fn logs(_ctx: Arc, _id: Uuid) { - unreachable!() + pub async fn logs(ctx: Arc, id: Uuid) -> Result { + let playbook = ctx.client.playbooks().get(&id.to_string()).map_err(ApiError::NotFoundPlaybook)?; + + Ok(ctx.client.actors().logs(&id.to_string(), &playbook.title)) } } From 39e8b2d2be98d0828dabd309d8472c794594ed78 Mon Sep 17 00:00:00 2001 From: jiahao6635 Date: Fri, 26 Jan 2024 00:21:22 +0800 Subject: [PATCH 3/3] Implement the log interface to return Sse --- Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 9d07ae5..f6aeaa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1820,7 +1820,6 @@ dependencies = [ "dotenv", "futures", "headers", - "log", "reqwest-eventsource", "serde", "serde_json",