diff --git a/Cargo.lock b/Cargo.lock index 0284b5d..f6aeaa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1820,6 +1820,7 @@ dependencies = [ "dotenv", "futures", "headers", + "reqwest-eventsource", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index c3b2a49..f358749 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.7.7 anyhow = "1.0" axum = { version = "0.7.4" } clap = { version = "4.4.12", features = ["derive", "env"] } +reqwest-eventsource = "0.5.0" dotenv = "0.15.0" futures = "0.3" headers = "0.4" diff --git a/src/context.rs b/src/context.rs index 77e04ae..0f87802 100644 --- a/src/context.rs +++ b/src/context.rs @@ -16,6 +16,7 @@ use crate::config::Config; use amp_client::client::Client; use amp_common::scm::client::Client as ScmClient; use amp_common::scm::driver::github; +use amp_common::scm::driver::github::constants::GITHUB_ENDPOINT; use std::sync::Arc; /// The core type through which handler functions can access common API state. @@ -36,7 +37,7 @@ pub struct Context { impl Context { pub async fn new(config: Config) -> anyhow::Result { let client = Arc::new(Client::new(&config.amp_server, config.auth_token.clone())); - let github_client = Arc::new(ScmClient::new(github::default())); + let github_client = Arc::new(ScmClient::new(github::new(GITHUB_ENDPOINT, config.auth_token.clone()))); Ok(Context { config, client, github_client }) } } diff --git a/src/handlers/file.rs b/src/handlers/file.rs index d4b5bcd..8e2cd72 100644 --- a/src/handlers/file.rs +++ b/src/handlers/file.rs @@ -67,10 +67,7 @@ pub async fn get(State(ctx): State>, Path((id, path)): Path<(Uuid, )] pub async fn create( State(ctx): State>, - - Path(id): Path, - Path(path): Path, - + Path((id, path)): Path<(Uuid, String)>, Json(req): Json, ) -> Result { Ok((StatusCode::CREATED, Json(FileService::create(ctx, id, path, req.content).await?))) diff --git a/src/handlers/logger.rs b/src/handlers/logger.rs index c79b927..d98b50f 100644 --- a/src/handlers/logger.rs +++ b/src/handlers/logger.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::sync::Arc; use axum::extract::{Path, State}; -use axum::http::StatusCode; -use axum::response::IntoResponse; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::{Stream, StreamExt}; use uuid::Uuid; use crate::context::Context; -use crate::errors::Result; +use crate::errors::{ApiError, Result}; use crate::services::LoggerService; // The Logging Service Handlers. @@ -37,7 +38,21 @@ use crate::services::LoggerService; ), tag = "Logging" )] -pub async fn logs(Path(id): Path, State(ctx): State>) -> Result { - LoggerService::logs(ctx, id).await; - Ok(StatusCode::OK) +pub async fn logs( + Path(id): Path, + State(ctx): State>, +) -> Result>>, ApiError> { + let event_source = LoggerService::logs(ctx, id).await?; + + let stream = event_source + .map(|line| { + if let Ok(reqwest_eventsource::Event::Message(message)) = line { + Event::default().data(message.event) + } else { + Event::default() + } + }) + .map(Ok); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) } diff --git a/src/services/logger.rs b/src/services/logger.rs index 281e61a..07fea3c 100644 --- a/src/services/logger.rs +++ b/src/services/logger.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use reqwest_eventsource::EventSource; use std::sync::Arc; use uuid::Uuid; use crate::context::Context; +use crate::errors::ApiError; pub struct LoggerService; impl LoggerService { - pub async fn logs(_ctx: Arc, _id: Uuid) { - unreachable!() + pub async fn logs(ctx: Arc, id: Uuid) -> Result { + let playbook = ctx.client.playbooks().get(&id.to_string()).map_err(ApiError::NotFoundPlaybook)?; + + Ok(ctx.client.actors().logs(&id.to_string(), &playbook.title)) } }