Skip to content

Commit

Permalink
AVRO-3779: using rust bigdecimal (#2302)
Browse files Browse the repository at this point in the history
* AVRO-3779: using rust bigdecimal
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
clesaec authored Oct 9, 2023
1 parent 3ea027a commit c91b887
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 9 deletions.
14 changes: 14 additions & 0 deletions lang/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lang/rust/avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typed-builder = { default-features = false, version = "0.16.2" }
uuid = { default-features = false, version = "1.4.1", features = ["serde", "std"] }
xz2 = { default-features = false, version = "0.1.7", optional = true }
zstd = { default-features = false, version = "0.12.4+zstd.1.5.2", optional = true }
bigdecimal = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
quad-rand = { default-features = false, version = "0.2.1" }
Expand Down
85 changes: 83 additions & 2 deletions lang/rust/avro/src/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::{AvroResult, Error};
use crate::{
decode::{decode_len, decode_long},
encode::{encode_bytes, encode_long},
types::Value,
AvroResult, Error,
};
use bigdecimal::BigDecimal;
use num_bigint::{BigInt, Sign};
use std::io::Read;

#[derive(Debug, Clone)]
pub struct Decimal {
Expand Down Expand Up @@ -105,12 +112,47 @@ impl<T: AsRef<[u8]>> From<T> for Decimal {
}
}

pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
encode_bytes(&big_endian_value, &mut buffer);
encode_long(exponent, &mut buffer);

buffer
}

pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> Result<BigDecimal, Error> {
let mut bytes: &[u8] = bytes.as_slice();
let mut big_decimal_buffer = match decode_len(&mut bytes) {
Ok(size) => vec![0u8; size],
Err(_err) => return Err(Error::BigDecimalLen),
};

bytes
.read_exact(&mut big_decimal_buffer[..])
.map_err(Error::ReadDouble)?;

match decode_long(&mut bytes) {
Ok(Value::Long(scale_value)) => {
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
let decimal = BigDecimal::new(big_int, scale_value);
Ok(decimal)
}
_ => Err(Error::BigDecimalScale),
}
}

#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use bigdecimal::{One, Zero};
use pretty_assertions::assert_eq;
use std::convert::TryFrom;
use std::{
convert::TryFrom,
ops::{Div, Mul},
};

#[test]
fn test_decimal_from_bytes_from_ref_decimal() -> TestResult {
Expand All @@ -133,4 +175,43 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3779_bigdecimal_serial() -> TestResult {
let value: bigdecimal::BigDecimal =
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
let mut current: bigdecimal::BigDecimal = bigdecimal::BigDecimal::one();

for iter in 1..180 {
let result: Vec<u8> = serialize_big_decimal(&current);

let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for iter {iter}"
);
assert_eq!(
current,
deserialize_big_decimal.unwrap(),
"not equals for ${iter}"
);
current = current.mul(&value);
}

let result: Vec<u8> = serialize_big_decimal(&BigDecimal::zero());
let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for zero"
);
assert_eq!(
BigDecimal::zero(),
deserialize_big_decimal.unwrap(),
"not equals for zero"
);

Ok(())
}
}
12 changes: 9 additions & 3 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::{
decimal::Decimal,
decimal::{deserialize_big_decimal, Decimal},
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Expand All @@ -36,7 +36,7 @@ use std::{
use uuid::Uuid;

#[inline]
fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
pub(crate) fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
zag_i64(reader).map(Value::Long)
}

Expand All @@ -46,7 +46,7 @@ fn decode_int<R: Read>(reader: &mut R) -> AvroResult<Value> {
}

#[inline]
fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
pub(crate) fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
let len = zag_i64(reader)?;
safe_len(usize::try_from(len).map_err(|e| Error::ConvertI64ToUsize(e, len))?)
}
Expand Down Expand Up @@ -114,6 +114,12 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
},
schema => Err(Error::ResolveDecimalSchema(schema.into())),
},
Schema::BigDecimal => {
match decode_internal(&Schema::Bytes, names, enclosing_namespace, reader)? {
Value::Bytes(bytes) => deserialize_big_decimal(&bytes).map(Value::BigDecimal),
value => Err(Error::BytesValue(value.into())),
}
}
Schema::Uuid => Ok(Value::Uuid(
Uuid::from_str(
match decode_internal(&Schema::String, names, enclosing_namespace, reader)? {
Expand Down
9 changes: 7 additions & 2 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::{
decimal::serialize_big_decimal,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema, SchemaKind,
Expand All @@ -40,13 +41,13 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) -> AvroResul
encode_internal(value, schema, rs.get_names(), &None, buffer)
}

fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
pub(crate) fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
let bytes = s.as_ref();
encode_long(bytes.len() as i64, buffer);
buffer.extend_from_slice(bytes);
}

fn encode_long(i: i64, buffer: &mut Vec<u8>) {
pub(crate) fn encode_long(i: i64, buffer: &mut Vec<u8>) {
zig_i64(i, buffer)
}

Expand Down Expand Up @@ -116,6 +117,10 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
&uuid.to_string(),
buffer,
),
Value::BigDecimal(bg) => {
let mut buf: Vec<u8> = serialize_big_decimal(bg);
buffer.append(&mut buf);
}
Value::Bytes(bytes) => match *schema {
Schema::Bytes => encode_bytes(bytes, buffer),
Schema::Fixed { .. } => buffer.extend(bytes),
Expand Down
12 changes: 12 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub enum Error {
#[error("expected UUID, got: {0:?}")]
GetUuid(ValueKind),

#[error("expected BigDecimal, got: {0:?}")]
GetBigdecimal(ValueKind),

#[error("Fixed bytes of size 12 expected, got Fixed of size {0}")]
GetDecimalFixedBytes(usize),

Expand Down Expand Up @@ -289,6 +292,15 @@ pub enum Error {
#[error("The decimal precision ({precision}) must be a positive number")]
DecimalPrecisionMuBePositive { precision: usize },

#[error("Unreadable decimal sign")]
BigDecimalSign,

#[error("Unreadable length for decimal inner bytes")]
BigDecimalLen,

#[error("Unreadable decimal scale")]
BigDecimalScale,

#[error("Unexpected `type` {0} variant for `logicalType`")]
GetLogicalTypeVariant(serde_json::Value),

Expand Down
39 changes: 39 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Schema {
/// Logical type which represents `Decimal` values. The underlying type is serialized and
/// deserialized as `Schema::Bytes` or `Schema::Fixed`.
Decimal(DecimalSchema),
/// Logical type which represents `Decimal` values without predefined scale.
/// The underlying type is serialized and deserialized as `Schema::Bytes`
BigDecimal,
/// A universally unique identifier, annotating a string.
Uuid,
/// Logical type which represents the number of days since the unix epoch.
Expand Down Expand Up @@ -189,6 +192,7 @@ impl From<&types::Value> for SchemaKind {
Value::Enum(_, _) => Self::Enum,
Value::Fixed(_, _) => Self::Fixed,
Value::Decimal { .. } => Self::Decimal,
Value::BigDecimal(_) => Self::BigDecimal,
Value::Uuid(_) => Self::Uuid,
Value::Date(_) => Self::Date,
Value::TimeMillis(_) => Self::TimeMillis,
Expand Down Expand Up @@ -1359,6 +1363,10 @@ impl Parser {
inner,
}));
}
"big-decimal" => {
logical_verify_type(complex, &[SchemaKind::Bytes], self, enclosing_namespace)?;
return Ok(Schema::BigDecimal);
}
"uuid" => {
logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?;
return Ok(Schema::Uuid);
Expand Down Expand Up @@ -1909,6 +1917,12 @@ impl Serialize for Schema {
map.serialize_entry("precision", precision)?;
map.end()
}
Schema::BigDecimal => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "bytes")?;
map.serialize_entry("logicalType", "big-decimal")?;
map.end()
}
Schema::Uuid => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "string")?;
Expand Down Expand Up @@ -5155,6 +5169,31 @@ mod tests {
Ok(())
}

#[test]
fn test_avro_3779_bigdecimal_schema() -> TestResult {
let schema = json!(
{
"type": "record",
"name": "recordWithDecimal",
"fields": [
{
"name": "decimal",
"type": "bytes",
"logicalType": "big-decimal"
}
]
});

let parse_result = Schema::parse(&schema);
assert!(
parse_result.is_ok(),
"parse result must be ok, got: {:?}",
parse_result
);

Ok(())
}

#[test]
fn test_avro_3820_deny_invalid_field_names() -> TestResult {
let schema_str = r#"
Expand Down
Loading

0 comments on commit c91b887

Please sign in to comment.