Skip to content

Commit

Permalink
Use datafusion macros to simplify error creation
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Mar 7, 2024
1 parent b4053fc commit 79e4642
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 94 deletions.
14 changes: 4 additions & 10 deletions benches/geoarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_schema::DataType;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_expr::functions::make_scalar_function;
use datafusion::prelude::SessionContext;
use datafusion_common::DataFusionError;
use datafusion_common::{exec_err, DataFusionError};
use datafusion_expr::{
create_udf, ReturnTypeFunction, ScalarUDF, Signature, TypeSignature, Volatility,
};
Expand Down Expand Up @@ -104,20 +104,14 @@ fn st_geomfromtext(args: &[ArrayRef]) -> DFResult<Arc<dyn Array>> {
pub fn intersects() -> ScalarUDF {
let intersects = |args: &[ArrayRef]| -> datafusion::error::Result<Arc<dyn Array>> {
if args.len() != 2 {
return Err(DataFusionError::Execution(
"st_intersects must have only three args.".to_string(),
));
return exec_err!("st_intersects must have only three args.");
}
let Ok(wkb_array_a) = WKBArray::<i32>::try_from(&args[0] as &dyn Array) else {
return Err(DataFusionError::Execution(
"st_intersects input 0 can not convert to WKBArray<i32>.".to_string(),
));
return exec_err!("st_intersects input 0 can not convert to WKBArray<i32>.");
};

let Ok(wkb_array_b) = WKBArray::<i32>::try_from(&args[1] as &dyn Array) else {
return Err(DataFusionError::Execution(
"st_intersects input 1 can not convert to WKBArray<i32>.".to_string(),
));
return exec_err!("st_intersects input 1 can not convert to WKBArray<i32>.");
};
let result: BooleanArray = wkb_array_a
.iter_geo()
Expand Down
9 changes: 5 additions & 4 deletions src/function/as_ewkt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::DFResult;
use arrow_array::cast::AsArray;
use arrow_array::{GenericBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_datafusion_err, DataFusionError};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geozero::{GeozeroGeometry, ToWkt};
use std::any::Any;
Expand Down Expand Up @@ -91,9 +91,10 @@ fn to_ewkt<O: OffsetSizeTrait>(
) -> DFResult<Option<String>> {
let geom = wkb_arr.geos_value(geom_index)?;
let ewkt = match geom {
Some(geom) => Some(geom.to_ewkt(geom.srid()).map_err(|_| {
DataFusionError::Internal("Failed to convert geometry to ewkt".to_string())
})?),
Some(geom) => Some(
geom.to_ewkt(geom.srid())
.map_err(|_| internal_datafusion_err!("Failed to convert geometry to ewkt"))?,
),
None => None,
};
Ok(ewkt)
Expand Down
9 changes: 5 additions & 4 deletions src/function/as_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::DFResult;
use arrow_array::cast::AsArray;
use arrow_array::{GenericBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_datafusion_err, DataFusionError};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geozero::ToWkt;
use std::any::Any;
Expand Down Expand Up @@ -100,9 +100,10 @@ fn to_wkt<O: OffsetSizeTrait>(
}
};
let wkt = match geom {
Some(geom) => Some(geom.to_wkt().map_err(|_| {
DataFusionError::Internal("Failed to convert geometry to wkt".to_string())
})?),
Some(geom) => Some(
geom.to_wkt()
.map_err(|_| internal_datafusion_err!("Failed to convert geometry to wkt"))?,
),
None => None,
};
Ok(wkt)
Expand Down
4 changes: 2 additions & 2 deletions src/function/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
.unwrap();
ctx.register_udaf(AggregateUDF::from(ExtentUdaf::new()));
let df = ctx
.sql("select ST_Extent(geom), name from geom_table group by name")
.sql("select ST_Extent(geom), name from geom_table group by name order by name")
.await
.unwrap();
assert_eq!(
Expand All @@ -221,8 +221,8 @@ mod tests {
"+----------------------------------------------+------+
| st_extent(geom_table.geom) | name |
+----------------------------------------------+------+
| {xmin: 2.0, ymin: 3.0, xmax: 7.0, ymax: 8.0} | b |
| {xmin: 0.0, ymin: 1.0, xmax: 5.0, ymax: 6.0} | a |
| {xmin: 2.0, ymin: 3.0, xmax: 7.0, ymax: 8.0} | b |
+----------------------------------------------+------+"
);
}
Expand Down
11 changes: 3 additions & 8 deletions src/function/geom_from_text.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::geo::GeometryArrayBuilder;
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::ScalarValue;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geozero::wkb::WkbDialect;
use geozero::{GeozeroGeometry, ToWkb};
Expand Down Expand Up @@ -50,9 +50,7 @@ impl ScalarUDFImpl for GeomFromTextUdf {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let srid = if args.len() == 2 {
let ColumnarValue::Scalar(ScalarValue::Int64(Some(srid))) = &args[1] else {
return Err(DataFusionError::Internal(
"The second arg should be int64".to_string(),
));
return internal_err!("The second arg should be int64");
};
Some(*srid as i32)
} else {
Expand All @@ -68,10 +66,7 @@ impl ScalarUDFImpl for GeomFromTextUdf {
Some(data) => {
let wkt = geozero::wkt::Wkt(data);
let ewkb = wkt.to_ewkb(wkt.dims(), srid).map_err(|e| {
DataFusionError::Internal(format!(
"Failed to convert wkt to ewkb, error: {}",
e
))
internal_datafusion_err!("Failed to convert wkt to ewkb, error: {}", e)
})?;
builder.append_wkb(Some(&ewkb))?;
}
Expand Down
11 changes: 3 additions & 8 deletions src/function/geom_from_wkb.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::geo::GeometryArrayBuilder;
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::ScalarValue;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geozero::wkb::WkbDialect;
use geozero::{GeozeroGeometry, ToWkb};
Expand Down Expand Up @@ -50,9 +50,7 @@ impl ScalarUDFImpl for GeomFromWkbUdf {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let srid = if args.len() == 2 {
let ColumnarValue::Scalar(ScalarValue::Int64(Some(srid))) = &args[1] else {
return Err(DataFusionError::Internal(
"The second arg should be int32".to_string(),
));
return internal_err!("The second arg should be int32");
};
Some(*srid as i32)
} else {
Expand All @@ -68,10 +66,7 @@ impl ScalarUDFImpl for GeomFromWkbUdf {
Some(data) => {
let wkb = geozero::wkb::Wkb(data);
let ewkb = wkb.to_ewkb(wkb.dims(), srid).map_err(|e| {
DataFusionError::Internal(format!(
"Failed to convert wkb to ewkb, error: {}",
e
))
internal_datafusion_err!("Failed to convert wkb to ewkb, error: {}", e)
})?;
builder.append_wkb(Some(&ewkb))?;
}
Expand Down
15 changes: 5 additions & 10 deletions src/function/intersects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::DFResult;
use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray, GenericBinaryArray, OffsetSizeTrait};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use rayon::prelude::*;
use std::any::Any;
Expand Down Expand Up @@ -59,9 +59,7 @@ impl ScalarUDFImpl for IntersectsUdf {
}
};
if arr0.len() != arr1.len() {
return Err(DataFusionError::Internal(
"Two arrays length is not same".to_string(),
));
return internal_err!("Two arrays length is not same");
}
match (arr0.data_type(), arr1.data_type()) {
(DataType::Binary, DataType::Binary) => {
Expand Down Expand Up @@ -111,12 +109,9 @@ fn intersects<O: OffsetSizeTrait, F: OffsetSizeTrait>(
use geos::Geom;
match (arr0.geos_value(geom_index)?, arr1.geos_value(geom_index)?) {
(Some(geom0), Some(geom1)) => {
let result = geom0.intersects(&geom1).map_err(|e| {
DataFusionError::Internal(format!(
"Failed to do intersects, error: {}",
e
))
})?;
let result = geom0
.intersects(&geom1)
.map_err(|e| internal_err!("Failed to do intersects, error: {}", e))?;
Ok(Some(result))
}
_ => Ok(None),
Expand Down
16 changes: 6 additions & 10 deletions src/function/make_envelope.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::geo::GeometryArrayBuilder;
use arrow_schema::DataType;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geos::CoordSeq;
use geozero::wkb::WkbDialect;
Expand Down Expand Up @@ -57,9 +57,7 @@ impl ScalarUDFImpl for MakeEnvelopeUdf {
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let error = Err(DataFusionError::Internal(
"The arg should be float64".to_string(),
));
let error = internal_err!("The arg should be float64");
let ColumnarValue::Scalar(ScalarValue::Float64(Some(xmin))) = args[0] else {
return error;
};
Expand All @@ -74,9 +72,7 @@ impl ScalarUDFImpl for MakeEnvelopeUdf {
};
let srid = if args.len() == 5 {
let ColumnarValue::Scalar(ScalarValue::Int64(Some(srid))) = args[4] else {
return Err(DataFusionError::Internal(
"The fifth arg should be int64".to_string(),
));
return internal_err!("The fifth arg should be int64");
};
Some(srid)
} else {
Expand All @@ -90,11 +86,11 @@ impl ScalarUDFImpl for MakeEnvelopeUdf {
&[xmax, ymin],
&[xmin, ymin],
])
.map_err(|_| DataFusionError::Internal("Failed to create coord req".to_string()))?;
.map_err(|_| internal_datafusion_err!("Failed to create coord req"))?;
let exterior = geos::Geometry::create_linear_ring(coords)
.map_err(|_| DataFusionError::Internal("Failed to create exterior".to_string()))?;
.map_err(|_| internal_datafusion_err!("Failed to create exterior"))?;
let mut polygon = geos::Geometry::create_polygon(exterior, vec![])
.map_err(|_| DataFusionError::Internal("Failed to create polygon".to_string()))?;
.map_err(|_| internal_datafusion_err!("Failed to create polygon"))?;

let mut builder = if let Some(srid) = srid {
polygon.set_srid(srid as usize);
Expand Down
10 changes: 3 additions & 7 deletions src/function/translate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::geo::{GeometryArray, GeometryArrayBuilder};
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility};
use geo::Translate;
use std::any::Any;
Expand Down Expand Up @@ -55,14 +55,10 @@ impl ScalarUDFImpl for TranslateUdf {

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let ColumnarValue::Scalar(ScalarValue::Float64(Some(x_offset))) = args[1] else {
return Err(DataFusionError::Internal(
"The second arg should be f64 scalar".to_string(),
));
return internal_err!("The second arg should be f64 scalar");
};
let ColumnarValue::Scalar(ScalarValue::Float64(Some(y_offset))) = args[2] else {
return Err(DataFusionError::Internal(
"The third arg should be f64 scalar".to_string(),
));
return internal_err!("The third arg should be f64 scalar");
};

match args[0].data_type() {
Expand Down
12 changes: 5 additions & 7 deletions src/geo/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::geo::dialect::decode_wkb_dialect;
use crate::DFResult;
use arrow_array::types::GenericBinaryType;
use arrow_array::{Array, GenericByteArray, OffsetSizeTrait};
use datafusion_common::DataFusionError;
use datafusion_common::{internal_datafusion_err, DataFusionError};
use geozero::wkb::FromWkb;

pub trait GeometryArray {
Expand All @@ -14,9 +14,8 @@ pub trait GeometryArray {
if let Some(wkb) = self.wkb(geom_index) {
let dialect = decode_wkb_dialect(wkb[0])?;
let mut rdr = std::io::Cursor::new(&wkb[1..]);
let value = geo::Geometry::from_wkb(&mut rdr, dialect).map_err(|e| {
DataFusionError::Internal(format!("Failed to parse wkb, error: {}", e))
})?;
let value = geo::Geometry::from_wkb(&mut rdr, dialect)
.map_err(|e| internal_datafusion_err!("Failed to parse wkb, error: {}", e))?;
Ok(Some(value))
} else {
Ok(None)
Expand All @@ -28,9 +27,8 @@ pub trait GeometryArray {
if let Some(wkb) = self.wkb(geom_index) {
let dialect = decode_wkb_dialect(wkb[0])?;
let mut rdr = std::io::Cursor::new(&wkb[1..]);
let value = geos::Geometry::from_wkb(&mut rdr, dialect).map_err(|e| {
DataFusionError::Internal(format!("Failed to parse wkb, error: {}", e))
})?;
let value = geos::Geometry::from_wkb(&mut rdr, dialect)
.map_err(|e| internal_datafusion_err!("Failed to parse wkb, error: {}", e))?;
Ok(Some(value))
} else {
Ok(None)
Expand Down
14 changes: 4 additions & 10 deletions src/geo/box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_array::types::Float64Type;
use arrow_array::{Array, Float64Array, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use std::sync::Arc;

#[derive(Debug, Clone)]
Expand All @@ -30,9 +30,7 @@ impl Box2d {

pub fn value(arr: &StructArray, index: usize) -> DFResult<Option<Box2d>> {
if arr.data_type() != &Box2d::data_type() {
return Err(DataFusionError::Internal(
"StructArray data type is not matched".to_string(),
));
return internal_err!("StructArray data type is not matched");
}
if index >= arr.len() || arr.is_null(index) {
return Ok(None);
Expand All @@ -49,9 +47,7 @@ impl TryFrom<&ScalarValue> for Box2d {
fn try_from(value: &ScalarValue) -> Result<Self, Self::Error> {
if let ScalarValue::Struct(arr) = value {
if arr.data_type() != &Box2d::data_type() {
return Err(DataFusionError::Internal(
"ScalarValue data type is not matched".to_string(),
));
return internal_err!("ScalarValue data type is not matched");
}
let xmin = arr.column(0).as_primitive::<Float64Type>().value(0);
let ymin = arr.column(1).as_primitive::<Float64Type>().value(0);
Expand All @@ -64,9 +60,7 @@ impl TryFrom<&ScalarValue> for Box2d {
ymax,
})
} else {
Err(DataFusionError::Internal(
"ScalarValue is not struct".to_string(),
))
internal_err!("ScalarValue is not struct")
}
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/geo/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_array::builder::UInt8BufferBuilder;
use arrow_array::types::GenericBinaryType;
use arrow_array::{GenericByteArray, OffsetSizeTrait};
use arrow_buffer::{BufferBuilder, NullBufferBuilder, OffsetBuffer};
use datafusion_common::DataFusionError;
use datafusion_common::{internal_datafusion_err, DataFusionError};
use geozero::wkb::{FromWkb, WkbDialect};
use geozero::{GeozeroGeometry, ToWkb};

Expand Down Expand Up @@ -44,9 +44,7 @@ impl<O: OffsetSizeTrait> GeometryArrayBuilder<O> {
if let Some(geom) = geom {
let wkb = geom
.to_wkb_dialect(self.dialect, geom.dims(), geom.srid(), vec![])
.map_err(|e| {
DataFusionError::Internal(format!("Failed to convert to wkb, error: {}", e))
})?;
.map_err(|e| internal_datafusion_err!("Failed to convert to wkb, error: {}", e))?;
self.internal_append_wkb(&wkb);
} else {
self.append_null();
Expand All @@ -60,9 +58,7 @@ impl<O: OffsetSizeTrait> GeometryArrayBuilder<O> {
if let Some(geom) = geom {
let wkb = geom
.to_wkb_dialect(self.dialect, geom.dims(), geom.srid(), vec![])
.map_err(|e| {
DataFusionError::Internal(format!("Failed to convert to wkb, error: {}", e))
})?;
.map_err(|e| internal_datafusion_err!("Failed to convert to wkb, error: {}", e))?;
self.internal_append_wkb(&wkb);
} else {
self.append_null();
Expand Down Expand Up @@ -103,12 +99,12 @@ fn check_wkb(wkb: &[u8], dialect: WkbDialect) -> DFResult<()> {
#[cfg(feature = "geos")]
{
let _ = geos::Geometry::from_wkb(&mut rdr, dialect)
.map_err(|e| DataFusionError::Internal(format!("Failed to parse wkb, error: {}", e)))?;
.map_err(|e| internal_datafusion_err!("Failed to parse wkb, error: {}", e))?;
}
#[cfg(not(feature = "geos"))]
{
let _ = geo::Geometry::from_wkb(&mut rdr, dialect)
.map_err(|e| DataFusionError::Internal(format!("Failed to parse wkb, error: {}", e)))?;
.map_err(|e| internal_datafusion_err!("Failed to parse wkb, error: {}", e))?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit 79e4642

Please sign in to comment.