diff --git a/src/bin/lychee/main.rs b/src/bin/lychee/main.rs index 6d108cf2e6..5aa7c8f478 100644 --- a/src/bin/lychee/main.rs +++ b/src/bin/lychee/main.rs @@ -118,7 +118,7 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { .accepted(accepted) .build()?; - let links = collector::collect_links( + let mut links = collector::collect_links( inputs, cfg.base_url.clone(), cfg.skip_missing, @@ -129,10 +129,11 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let pb = match cfg.no_progress { true => None, false => { - let bar = ProgressBar::new(links.len() as u64) - .with_style(ProgressStyle::default_bar().template( + let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template( "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", )); + bar.set_length(0); + bar.set_message("Extracting links"); bar.enable_steady_tick(100); Some(bar) } @@ -145,8 +146,9 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let bar = pb.clone(); tokio::spawn(async move { - for link in links { + while let Some(link) = links.recv().await { if let Some(pb) = &bar { + pb.inc_length(1); pb.set_message(&link.to_string()); }; send_req.send(link).await.unwrap(); diff --git a/src/collector.rs b/src/collector.rs index 2a9b47f5df..eeddd4b699 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -201,7 +201,7 @@ pub async fn collect_links( base_url: Option, skip_missing_inputs: bool, max_concurrency: usize, -) -> Result> { +) -> Result> { let base_url = match base_url { Some(url) => Some(Url::parse(&url)?), _ => None, @@ -234,18 +234,32 @@ pub async fn collect_links( } } - // Note: we could dispatch links to be checked as soon as we get them, - // instead of building a HashSet with all links. - // This optimization would speed up cases where there's - // a lot of inputs and/or the inputs are large (e.g. big files). - let mut collected_links: HashSet = HashSet::new(); - - for handle in extract_links_handles { - let links = handle.await?; - collected_links.extend(links); - } + let (links_tx, links_rx) = tokio::sync::mpsc::channel(max_concurrency); + tokio::spawn(async move { + let mut collected_links = HashSet::new(); + + for handle in extract_links_handles { + // Unwrap should be fine because joining fails: + // * if the Task was dropped (which we don't do) + // * if the Task panicked. Propagating panics is correct here. + let requests = handle + .await + .expect("Awaiting termination of link handle failed"); + for request in requests { + if !collected_links.contains(&request) { + collected_links.insert(request.clone()); + // Unwrap should be fine because sending fails + // if the receiver was closed - in which case we can't continue anyway + links_tx + .send(request) + .await + .expect("Extractor could not send link to channel"); + } + } + } + }); - Ok(collected_links) + Ok(links_rx) } #[cfg(test)] @@ -292,11 +306,11 @@ mod test { }, ]; - let responses = collect_links(inputs, None, false, 8).await?; - let links = responses - .into_iter() - .map(|r| r.uri) - .collect::>(); + let mut responses = collect_links(inputs, None, false, 8).await?; + let mut links = HashSet::new(); + while let Some(request) = responses.recv().await { + links.insert(request.uri); + } let mut expected_links: HashSet = HashSet::new(); expected_links.insert(website(TEST_STRING));