Skip to content

Commit

Permalink
Send extracted links via channel
Browse files Browse the repository at this point in the history
This lets extracted links get checked as soon as the first link is
extracted instead of after all links are extracted
  • Loading branch information
TimoFreiberg committed Mar 28, 2021
1 parent efa18ab commit b3a9e91
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
10 changes: 6 additions & 4 deletions src/bin/lychee/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
.accepted(accepted)
.build()?;

let links = collector::collect_links(
let mut links = collector::collect_links(
inputs,
cfg.base_url.clone(),
cfg.skip_missing,
Expand All @@ -129,10 +129,11 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
let pb = match cfg.no_progress {
true => None,
false => {
let bar = ProgressBar::new(links.len() as u64)
.with_style(ProgressStyle::default_bar().template(
let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template(
"{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}",
));
bar.set_length(0);
bar.set_message("Extracting links");
bar.enable_steady_tick(100);
Some(bar)
}
Expand All @@ -145,8 +146,9 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {

let bar = pb.clone();
tokio::spawn(async move {
for link in links {
while let Some(link) = links.recv().await {
if let Some(pb) = &bar {
pb.inc_length(1);
pb.set_message(&link.to_string());
};
send_req.send(link).await.unwrap();
Expand Down
48 changes: 31 additions & 17 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub async fn collect_links(
base_url: Option<String>,
skip_missing_inputs: bool,
max_concurrency: usize,
) -> Result<HashSet<Request>> {
) -> Result<tokio::sync::mpsc::Receiver<Request>> {
let base_url = match base_url {
Some(url) => Some(Url::parse(&url)?),
_ => None,
Expand Down Expand Up @@ -234,18 +234,32 @@ pub async fn collect_links(
}
}

// Note: we could dispatch links to be checked as soon as we get them,
// instead of building a HashSet with all links.
// This optimization would speed up cases where there's
// a lot of inputs and/or the inputs are large (e.g. big files).
let mut collected_links: HashSet<Request> = HashSet::new();

for handle in extract_links_handles {
let links = handle.await?;
collected_links.extend(links);
}
let (links_tx, links_rx) = tokio::sync::mpsc::channel(max_concurrency);
tokio::spawn(async move {
let mut collected_links = HashSet::new();

for handle in extract_links_handles {
// Unwrap should be fine because joining fails:
// * if the Task was dropped (which we don't do)
// * if the Task panicked. Propagating panics is correct here.
let requests = handle
.await
.expect("Awaiting termination of link handle failed");
for request in requests {
if !collected_links.contains(&request) {
collected_links.insert(request.clone());
// Unwrap should be fine because sending fails
// if the receiver was closed - in which case we can't continue anyway
links_tx
.send(request)
.await
.expect("Extractor could not send link to channel");
}
}
}
});

Ok(collected_links)
Ok(links_rx)
}

#[cfg(test)]
Expand Down Expand Up @@ -292,11 +306,11 @@ mod test {
},
];

let responses = collect_links(inputs, None, false, 8).await?;
let links = responses
.into_iter()
.map(|r| r.uri)
.collect::<HashSet<Uri>>();
let mut responses = collect_links(inputs, None, false, 8).await?;
let mut links = HashSet::new();
while let Some(request) = responses.recv().await {
links.insert(request.uri);
}

let mut expected_links: HashSet<Uri> = HashSet::new();
expected_links.insert(website(TEST_STRING));
Expand Down

0 comments on commit b3a9e91

Please sign in to comment.