From 34bdb92c26fb4a35adc54a3de39ba56b0476b4ba Mon Sep 17 00:00:00 2001 From: Guocork Date: Tue, 3 Sep 2024 19:15:23 +0800 Subject: [PATCH] consume stream --- controllers/src/timeout_controller.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/controllers/src/timeout_controller.rs b/controllers/src/timeout_controller.rs index 11583c3..8fc49c8 100644 --- a/controllers/src/timeout_controller.rs +++ b/controllers/src/timeout_controller.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use amp_common::resource::Playbook; use amp_resources::playbook::delete; use chrono::{DateTime, Duration, TimeDelta, Utc}; -use futures::{StreamExt, TryStreamExt}; +use futures::{future, StreamExt}; use kube::Client; use kube::{ runtime::{reflector, watcher, WatchStreamExt}, @@ -53,7 +53,7 @@ pub async fn new(ctx: &Arc) { let api = Api::::all(client.clone()); let config = watcher::Config::default(); let (reader, writer) = reflector::store(); - let mut obs = watcher(api, config).reflect(writer).applied_objects().boxed(); + let rf = reflector(writer, watcher(api, config)); tokio::spawn(async move { if let Err(e) = reader.wait_until_ready().await { @@ -70,16 +70,11 @@ pub async fn new(ctx: &Arc) { tokio::time::sleep(std::time::Duration::from_secs(5 * 60)).await; } }); - loop { - match obs.try_next().await { - Ok(Some(s)) => info!("The {} playbook has been changed.", s.spec.title), - Ok(None) => continue, - Err(e) => { - error!("Resolve namespace stream failed: {}", e.to_string()); - continue; - } - } - } + + rf.applied_objects() + .for_each(|_| future::ready(())) + .await; + } async fn handle(playbook: &Playbook, client: &Client) -> anyhow::Result<()> {