Skip to content

Commit

Permalink
consume stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Guocork committed Sep 3, 2024
1 parent efd8311 commit 34bdb92
Showing 1 changed file with 7 additions and 12 deletions.
19 changes: 7 additions & 12 deletions controllers/src/timeout_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use amp_common::resource::Playbook;
use amp_resources::playbook::delete;
use chrono::{DateTime, Duration, TimeDelta, Utc};
use futures::{StreamExt, TryStreamExt};
use futures::{future, StreamExt};
use kube::Client;
use kube::{
runtime::{reflector, watcher, WatchStreamExt},
Expand Down Expand Up @@ -53,7 +53,7 @@ pub async fn new(ctx: &Arc<Context>) {
let api = Api::<Playbook>::all(client.clone());
let config = watcher::Config::default();
let (reader, writer) = reflector::store();
let mut obs = watcher(api, config).reflect(writer).applied_objects().boxed();
let rf = reflector(writer, watcher(api, config));

tokio::spawn(async move {
if let Err(e) = reader.wait_until_ready().await {
Expand All @@ -70,16 +70,11 @@ pub async fn new(ctx: &Arc<Context>) {
tokio::time::sleep(std::time::Duration::from_secs(5 * 60)).await;
}
});
loop {
match obs.try_next().await {
Ok(Some(s)) => info!("The {} playbook has been changed.", s.spec.title),
Ok(None) => continue,
Err(e) => {
error!("Resolve namespace stream failed: {}", e.to_string());
continue;
}
}
}

rf.applied_objects()
.for_each(|_| future::ready(()))
.await;

}

async fn handle(playbook: &Playbook, client: &Client) -> anyhow::Result<()> {
Expand Down

0 comments on commit 34bdb92

Please sign in to comment.