Skip to content

Commit

Permalink
Add ST_Split
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Mar 27, 2024
1 parent da433e8 commit be4df2e
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod intersects;
#[cfg(feature = "geos")]
mod make_envelope;
#[cfg(feature = "geos")]
mod split;
#[cfg(feature = "geos")]
mod srid;
mod translate;

Expand All @@ -34,5 +36,7 @@ pub use intersects::*;
#[cfg(feature = "geos")]
pub use make_envelope::*;
#[cfg(feature = "geos")]
pub use split::*;
#[cfg(feature = "geos")]
pub use srid::*;
pub use translate::*;
154 changes: 154 additions & 0 deletions src/function/split.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use crate::geo::{GeometryArray, GeometryArrayBuilder};
use crate::DFResult;
use arrow_array::cast::AsArray;
use arrow_array::{BooleanArray, GenericBinaryArray, OffsetSizeTrait};
use arrow_schema::DataType;
use datafusion_common::{internal_datafusion_err, internal_err};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use geos::Geom;
use rayon::iter::IntoParallelIterator;
use std::any::Any;
use std::sync::Arc;

#[derive(Debug)]
pub struct SplitUdf {
signature: Signature,
aliases: Vec<String>,
}

impl SplitUdf {
pub fn new() -> Self {
Self {
signature: Signature::uniform(
2,
vec![DataType::Binary, DataType::LargeBinary],
Volatility::Immutable,
),
aliases: vec!["st_split".to_string()],
}
}
}

impl ScalarUDFImpl for SplitUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ST_Split"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let (arr0, arr1) = match (args[0].clone(), args[1].clone()) {
(ColumnarValue::Array(arr0), ColumnarValue::Array(arr1)) => (arr0, arr1),
(ColumnarValue::Array(arr0), ColumnarValue::Scalar(scalar)) => {
(arr0.clone(), scalar.to_array_of_size(arr0.len())?)
}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(arr1)) => {
(scalar.to_array_of_size(arr1.len())?, arr1)
}
(ColumnarValue::Scalar(scalar0), ColumnarValue::Scalar(scalar1)) => {
(scalar0.to_array_of_size(1)?, scalar1.to_array_of_size(1)?)
}
};
if arr0.len() != arr1.len() {
return internal_err!("Two arrays length is not same");
}

match (arr0.data_type(), arr1.data_type()) {
(DataType::Binary, DataType::Binary) => {
let arr0 = arr0.as_binary::<i32>();
let arr1 = arr1.as_binary::<i32>();
split::<i32, i32>(arr0, arr1)
}
(DataType::LargeBinary, DataType::Binary) => {
let arr0 = arr0.as_binary::<i64>();
let arr1 = arr1.as_binary::<i32>();
split::<i64, i32>(arr0, arr1)
}
(DataType::Binary, DataType::LargeBinary) => {
let arr0 = arr0.as_binary::<i32>();
let arr1 = arr1.as_binary::<i64>();
split::<i32, i64>(arr0, arr1)
}
(DataType::LargeBinary, DataType::LargeBinary) => {
let arr0 = arr0.as_binary::<i64>();
let arr1 = arr1.as_binary::<i64>();
split::<i64, i64>(arr0, arr1)
}
_ => unreachable!(),
}
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

impl Default for SplitUdf {
fn default() -> Self {
Self::new()
}
}

fn split<O: OffsetSizeTrait, F: OffsetSizeTrait>(
arr0: &GenericBinaryArray<O>,
arr1: &GenericBinaryArray<F>,
) -> DFResult<ColumnarValue> {
let geom_vec = (0..arr0.geom_len())
.into_par_iter()
.map(
|geom_index| match (arr0.geos_value(geom_index)?, arr1.geos_value(geom_index)?) {
(Some(geom0), Some(geom1)) => {
let boundary = geom0.boundary().map_err(|e| {
internal_datafusion_err!("Failed to do boundary, error: {}", e)
})?;
let union = boundary.union(&geom1).map_err(|e| {
internal_datafusion_err!("Failed to do union, error: {}", e)
})?;
let (result, ..) = union.polygonize_full().map_err(|e| {
internal_datafusion_err!("Failed to do polygonize_full, error: {}", e)
})?;

Ok(Some(result))
}
_ => Ok(None),
},
)
.collect::<DFResult<Vec<Option<geos::Geometry>>>>()?;
let builder = GeometryArrayBuilder::<O>::from(&geom_vec);
Ok(ColumnarValue::Array(Arc::new(builder.build())))
}

#[cfg(test)]
mod tests {
use crate::function::{GeomFromTextUdf, SplitUdf};
use arrow::util::pretty::pretty_format_batches;
use datafusion::prelude::SessionContext;
use datafusion_expr::ScalarUDF;

#[tokio::test]
async fn split() {
let ctx = SessionContext::new();
ctx.register_udf(ScalarUDF::from(GeomFromTextUdf::new()));
ctx.register_udf(ScalarUDF::from(SplitUdf::new()));
let df = ctx
.sql("select ST_Split(ST_GeomFromText('LINESTRING ( 0 0, 1 1, 2 2 )'), ST_GeomFromText('POINT(1 1)'))")
.await
.unwrap();
assert_eq!(
pretty_format_batches(&df.collect().await.unwrap())
.unwrap()
.to_string(),
""
);
}
}
13 changes: 13 additions & 0 deletions src/geo/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,16 @@ impl<O: OffsetSizeTrait> From<&[Option<geo::GeometryCollection>]> for GeometryAr
geo_vec.as_slice().into()
}
}

#[cfg(feature = "geos")]
impl<O: OffsetSizeTrait> From<&[Option<geos::Geometry>]> for GeometryArrayBuilder<O> {
fn from(value: &[Option<geos::Geometry>]) -> Self {
let mut builder = GeometryArrayBuilder::<O>::new(WkbDialect::Ewkb, value.len());
for geom in value {
builder
.append_geos_geometry(geom)
.expect("geometry data is valid");
}
builder
}
}

0 comments on commit be4df2e

Please sign in to comment.