Skip to content

Commit

Permalink
Use rayon to accelerate computing
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Mar 6, 2024
1 parent 185e618 commit f4f7a73
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ geo = "0.28"
geos = { version = "8.3", features = ["v3_10_0", "geo"], optional = true }
#geozero = { version = "0.12", features = ["with-wkb"] }
geozero = { git = "https://github.com/lewiszlw/geozero.git", rev = "40e33a714b624b90497ec7032ac5be1d9f02aa13", features = ["with-wkb"] }
rayon = "1.9"

[dev-dependencies]
arrow = "50"
Expand Down
79 changes: 36 additions & 43 deletions src/function/intersects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow_array::{Array, BooleanArray, GenericBinaryArray, OffsetSizeTrait};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use rayon::prelude::*;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -66,38 +67,22 @@ impl ScalarUDFImpl for IntersectsUdf {
(DataType::Binary, DataType::Binary) => {
let arr0 = arr0.as_binary::<i32>();
let arr1 = arr1.as_binary::<i32>();
let mut bool_vec = vec![];
for i in 0..arr0.len() {
bool_vec.push(intersects::<i32, i32>(arr0, arr1, i)?);
}
Ok(ColumnarValue::Array(Arc::new(BooleanArray::from(bool_vec))))
intersects::<i32, i32>(arr0, arr1)
}
(DataType::LargeBinary, DataType::Binary) => {
let arr0 = arr0.as_binary::<i64>();
let arr1 = arr1.as_binary::<i32>();
let mut bool_vec = vec![];
for i in 0..arr0.len() {
bool_vec.push(intersects::<i64, i32>(arr0, arr1, i)?);
}
Ok(ColumnarValue::Array(Arc::new(BooleanArray::from(bool_vec))))
intersects::<i64, i32>(arr0, arr1)
}
(DataType::Binary, DataType::LargeBinary) => {
let arr0 = arr0.as_binary::<i32>();
let arr1 = arr1.as_binary::<i64>();
let mut bool_vec = vec![];
for i in 0..arr0.len() {
bool_vec.push(intersects::<i32, i64>(arr0, arr1, i)?);
}
Ok(ColumnarValue::Array(Arc::new(BooleanArray::from(bool_vec))))
intersects::<i32, i64>(arr0, arr1)
}
(DataType::LargeBinary, DataType::LargeBinary) => {
let arr0 = arr0.as_binary::<i64>();
let arr1 = arr1.as_binary::<i64>();
let mut bool_vec = vec![];
for i in 0..arr0.len() {
bool_vec.push(intersects::<i64, i64>(arr0, arr1, i)?);
}
Ok(ColumnarValue::Array(Arc::new(BooleanArray::from(bool_vec))))
intersects::<i64, i64>(arr0, arr1)
}
_ => unreachable!(),
}
Expand All @@ -117,30 +102,38 @@ impl Default for IntersectsUdf {
fn intersects<O: OffsetSizeTrait, F: OffsetSizeTrait>(
arr0: &GenericBinaryArray<O>,
arr1: &GenericBinaryArray<F>,
geom_index: usize,
) -> DFResult<Option<bool>> {
#[cfg(feature = "geos")]
{
use datafusion_common::DataFusionError;
use geos::Geom;
match (arr0.geos_value(geom_index)?, arr1.geos_value(geom_index)?) {
(Some(geom0), Some(geom1)) => {
let result = geom0.intersects(&geom1).map_err(|e| {
DataFusionError::Internal(format!("Failed to do intersects, error: {}", e))
})?;
Ok(Some(result))
) -> DFResult<ColumnarValue> {
let bool_vec = (0..arr0.len())
.into_par_iter()
.map(|geom_index| {
#[cfg(feature = "geos")]
{
use datafusion_common::DataFusionError;
use geos::Geom;
match (arr0.geos_value(geom_index)?, arr1.geos_value(geom_index)?) {
(Some(geom0), Some(geom1)) => {
let result = geom0.intersects(&geom1).map_err(|e| {
DataFusionError::Internal(format!(
"Failed to do intersects, error: {}",
e
))
})?;
Ok(Some(result))
}
_ => Ok(None),
}
}
_ => Ok(None),
}
}
#[cfg(not(feature = "geos"))]
{
use geo::Intersects;
match (arr0.geo_value(geom_index)?, arr1.geo_value(geom_index)?) {
(Some(geom0), Some(geom1)) => Ok(Some(geom0.intersects(&geom1))),
_ => Ok(None),
}
}
#[cfg(not(feature = "geos"))]
{
use geo::Intersects;
match (arr0.geo_value(geom_index)?, arr1.geo_value(geom_index)?) {
(Some(geom0), Some(geom1)) => Ok(Some(geom0.intersects(&geom1))),
_ => Ok(None),
}
}
})
.collect::<DFResult<Vec<Option<bool>>>>()?;
Ok(ColumnarValue::Array(Arc::new(BooleanArray::from(bool_vec))))
}

#[cfg(test)]
Expand Down

0 comments on commit f4f7a73

Please sign in to comment.