Skip to content

Commit

Permalink
No more Borrow
Browse files Browse the repository at this point in the history
  • Loading branch information
vigna committed Nov 12, 2024
1 parent dab068e commit bfdb136
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 46 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change Log

## [0.2.0] - 2024-08-09

### Improved

* Argument specifying a thread pool are now simply references.

## [0.1.4] - 2024-08-09

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "webgraph"
version = "0.1.4"
version = "0.2.0"
edition = "2021"
description = "A Rust port of the WebGraph framework (http://webgraph.di.unimi.it/)."
repository = "https://github.com/vigna/webgraph-rs/"
Expand Down
5 changes: 2 additions & 3 deletions src/algo/llp/gap_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
* SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
*/

use std::borrow::Borrow;

use crate::traits::*;
use dsi_progress_logger::prelude::*;
use lender::prelude::*;
use rayon::ThreadPool;
use sux::prelude::*;

/// Computes the gap cost, that is, the sum of the costs of the logarithms
Expand All @@ -21,7 +20,7 @@ pub(crate) fn compute_log_gap_cost<G: SequentialGraph + Sync>(
graph: &G,
arc_granularity: usize,
deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
thread_pool: impl Borrow<rayon::ThreadPool>,
thread_pool: &ThreadPool,
pr: Option<&mut ProgressLogger>,
) -> f64 {
graph.par_apply(
Expand Down
2 changes: 1 addition & 1 deletion src/cli/from/arcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn from_csv(args: CliArgs) -> Result<()> {
&g,
args.num_nodes,
args.ca.into(),
thread_pool,
&thread_pool,
dir,
&target_endianness.unwrap_or_else(|| BE::NAME.into()),
)
Expand Down
6 changes: 3 additions & 3 deletions src/cli/transform/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where
&sorted,
graph.num_nodes(),
args.ca.into(),
thread_pool,
&thread_pool,
dir,
&target_endianness,
)?;
Expand Down Expand Up @@ -258,7 +258,7 @@ where
&sorted,
graph.num_nodes(),
args.ca.into(),
thread_pool,
&thread_pool,
dir,
&target_endianness,
)?;
Expand All @@ -282,7 +282,7 @@ where
&sorted,
seq_graph.num_nodes(),
args.ca.into(),
thread_pool,
&thread_pool,
dir,
&target_endianness,
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/transform/transpose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
&sorted,
sorted.num_nodes(),
args.ca.into(),
thread_pool,
&thread_pool,
dir,
&target_endianness.unwrap_or_else(|| E::NAME.into()),
)?;
Expand Down
24 changes: 9 additions & 15 deletions src/graphs/bvgraph/comp/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use anyhow::{ensure, Context, Result};
use dsi_bitstream::prelude::*;
use dsi_progress_logger::prelude::*;
use lender::prelude::*;
use std::borrow::Borrow;
use rayon::ThreadPool;

use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -198,7 +199,7 @@ impl BvComp<()> {
graph: &G,
num_nodes: usize,
compression_flags: CompFlags,
threads: impl Borrow<rayon::ThreadPool>,
threads: &ThreadPool,
tmp_dir: P,
endianness: &str,
) -> Result<u64>
Expand All @@ -214,9 +215,7 @@ impl BvComp<()> {
// compress the transposed graph
Self::parallel_iter::<BigEndian, _>(
basename,
graph
.split_iter(threads.borrow().current_num_threads())
.into_iter(),
graph.split_iter(threads.current_num_threads()).into_iter(),
num_nodes,
compression_flags,
threads,
Expand All @@ -231,9 +230,7 @@ impl BvComp<()> {
// compress the transposed graph
Self::parallel_iter::<LittleEndian, _>(
basename,
graph
.split_iter(threads.borrow().current_num_threads())
.into_iter(),
graph.split_iter(threads.current_num_threads()).into_iter(),
num_nodes,
compression_flags,
threads,
Expand All @@ -249,7 +246,7 @@ impl BvComp<()> {
basename: impl AsRef<Path> + Send + Sync,
graph: &(impl SequentialGraph + SplitLabeling),
compression_flags: CompFlags,
threads: impl Borrow<rayon::ThreadPool>,
threads: &ThreadPool,
tmp_dir: impl AsRef<Path>,
) -> Result<u64>
where
Expand All @@ -258,9 +255,7 @@ impl BvComp<()> {
{
Self::parallel_iter(
basename,
graph
.split_iter(threads.borrow().current_num_threads())
.into_iter(),
graph.split_iter(threads.current_num_threads()).into_iter(),
graph.num_nodes(),
compression_flags,
threads,
Expand All @@ -278,14 +273,13 @@ impl BvComp<()> {
iter: impl Iterator<Item = L>,
num_nodes: usize,
compression_flags: CompFlags,
threads: impl Borrow<rayon::ThreadPool>,
threads: &ThreadPool,
tmp_dir: impl AsRef<Path>,
) -> Result<u64>
where
BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodeWrite<E>,
BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
{
let thread_pool = threads.borrow();
let tmp_dir = tmp_dir.as_ref();
let basename = basename.as_ref();

Expand All @@ -296,7 +290,7 @@ impl BvComp<()> {

let thread_path = |thread_id: usize| tmp_dir.join(format!("{:016x}.bitstream", thread_id));

thread_pool.in_place_scope(|s| {
threads.in_place_scope(|s| {
let cp_flags = &compression_flags;

for (thread_id, mut thread_lender) in iter.enumerate() {
Expand Down
6 changes: 3 additions & 3 deletions src/graphs/bvgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

//! An implementation of the Bv format.
//!
//! The format has been described by Paolo Boldi and Sebastiano Vigna in [“The
//! The format has been described by Paolo Boldi and Sebastiano Vigna in “[The
//! WebGraph Framework I: Compression
//! Techniques](http://vigna.di.unimi.it/papers.php#BoVWFI), in *Proc. of the
//! Techniques](http://vigna.di.unimi.it/papers.php#BoVWFI), in *Proc. of the
//! 13th international conference on World Wide Web*, WWW 2004, pages 595-602,
//! ACM [DOI
//! ACM. [DOI
//! 10.1145/988672.988752](https://dl.acm.org/doi/10.1145/988672.988752).
//!
//! The implementation is compatible with the [Java
Expand Down
11 changes: 5 additions & 6 deletions src/traits/labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use core::{
use dsi_progress_logger::prelude::*;
use impl_tools::autoimpl;
use lender::*;
use std::borrow::Borrow;
use rayon::ThreadPool;

use sux::traits::Succ;

/// A labeling that can be accessed sequentially.
Expand Down Expand Up @@ -105,7 +106,7 @@ pub trait SequentialLabeling {
func: F,
fold: R,
node_granularity: usize,
thread_pool: impl Borrow<rayon::ThreadPool>,
thread_pool: &ThreadPool,
pl: Option<&mut ProgressLogger>,
) -> A
where
Expand All @@ -117,7 +118,6 @@ pub trait SequentialLabeling {
let pl_lock = pl.map(std::sync::Mutex::new);
let num_nodes = self.num_nodes();
let num_scoped_threads = thread_pool
.borrow()
.current_num_threads()
.min(num_nodes / node_granularity)
.max(1);
Expand All @@ -126,7 +126,7 @@ pub trait SequentialLabeling {

// create a channel to receive the result
let (tx, rx) = std::sync::mpsc::channel();
thread_pool.borrow().in_place_scope(|scope| {
thread_pool.in_place_scope(|scope| {
for _ in 0..num_scoped_threads {
// create some references so that we can share them across threads
let pl_lock = &pl_lock;
Expand Down Expand Up @@ -187,7 +187,7 @@ pub trait SequentialLabeling {
fold: R,
arc_granularity: usize,
deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
thread_pool: impl Borrow<rayon::ThreadPool>,
thread_pool: &ThreadPool,
pl: Option<&mut ProgressLogger>,
) -> A
where
Expand All @@ -196,7 +196,6 @@ pub trait SequentialLabeling {
T: Send,
A: Default + Send,
{
let thread_pool = thread_pool.borrow();
let pl_lock = pl.map(std::sync::Mutex::new);
let num_nodes = self.num_nodes();
let num_arcs = self.num_arcs_hint().unwrap();
Expand Down
10 changes: 4 additions & 6 deletions src/transform/perm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
* SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
*/

use std::borrow::Borrow;

use crate::graphs::arc_list_graph;
use crate::prelude::sort_pairs::{BatchIterator, KMergeIters};
use crate::prelude::*;
use anyhow::{ensure, Context, Result};
use dsi_progress_logger::prelude::*;
use lender::*;
use rayon::ThreadPool;
use sux::traits::BitFieldSlice;
use tempfile::Builder;

Expand Down Expand Up @@ -75,7 +74,7 @@ pub fn permute_split<S, P>(
graph: &S,
perm: &P,
batch_size: usize,
threads: impl Borrow<rayon::ThreadPool>,
threads: &ThreadPool,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>>
where
S: SequentialGraph + SplitLabeling,
Expand All @@ -90,11 +89,10 @@ where
// get a premuted view
let pgraph = PermutedGraph { graph, perm };

let pool = threads.borrow();
let num_threads = pool.current_num_threads();
let num_threads = threads.current_num_threads();
let mut dirs = vec![];

let edges = pool.in_place_scope(|scope| {
let edges = threads.in_place_scope(|scope| {
let (tx, rx) = std::sync::mpsc::channel();

for (thread_id, iter) in pgraph.split_iter(num_threads).enumerate() {
Expand Down
10 changes: 4 additions & 6 deletions src/transform/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
* SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
*/

use std::borrow::Borrow;

use crate::graphs::{
arc_list_graph, no_selfloops_graph::NoSelfLoopsGraph, union_graph::UnionGraph,
};
Expand All @@ -16,6 +14,7 @@ use anyhow::{Context, Result};
use dsi_progress_logger::prelude::*;
use itertools::{Dedup, Itertools};
use lender::*;
use rayon::ThreadPool;
use tempfile::Builder;

use super::transpose;
Expand Down Expand Up @@ -111,18 +110,17 @@ pub fn simplify(
pub fn simplify_split<S>(
graph: &S,
batch_size: usize,
threads: impl Borrow<rayon::ThreadPool>,
threads: &ThreadPool,
) -> Result<Left<arc_list_graph::ArcListGraph<itertools::Dedup<KMergeIters<BatchIterator<()>, ()>>>>>
where
S: SequentialGraph + SplitLabeling,
{
let pool = threads.borrow();
let num_threads = pool.current_num_threads();
let num_threads = threads.current_num_threads();
let (tx, rx) = std::sync::mpsc::channel();

let mut dirs = vec![];

pool.in_place_scope(|scope| {
threads.in_place_scope(|scope| {
let mut thread_id = 0;
#[allow(clippy::explicit_counter_loop)] // enumerate requires some extra bounds here
for iter in graph.split_iter(num_threads) {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_par_bvcomp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn _test_par_bvcomp(basename: &str) -> Result<()> {
&tmp_basename,
&graph,
comp_flags,
rayon::ThreadPoolBuilder::new()
&rayon::ThreadPoolBuilder::new()
.num_threads(thread_num)
.build()
.expect("Failed to create thread pool"),
Expand Down

0 comments on commit bfdb136

Please sign in to comment.