Skip to content

Commit

Permalink
AVRO-3912: Fix Big decimal deser (apache#2599)
Browse files Browse the repository at this point in the history
* AVRO-3912: Fix Big decimal deser
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
clesaec committed Dec 4, 2023
1 parent 1c1294c commit 6107c12
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 95 deletions.
203 changes: 203 additions & 0 deletions lang/rust/avro/src/bigdecimal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::{
decode::{decode_len, decode_long},
encode::{encode_bytes, encode_long},
types::Value,
Error,
};
use bigdecimal::BigDecimal;
use num_bigint::BigInt;
use std::io::Read;

pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {
// encode big decimal, without global size
let mut buffer: Vec<u8> = Vec::new();
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
encode_bytes(&big_endian_value, &mut buffer);
encode_long(exponent, &mut buffer);

// encode global size and content
let mut final_buffer: Vec<u8> = Vec::new();
encode_bytes(&buffer, &mut final_buffer);
final_buffer
}

pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> Result<BigDecimal, Error> {
let mut bytes: &[u8] = bytes.as_slice();
let mut big_decimal_buffer = match decode_len(&mut bytes) {
Ok(size) => vec![0u8; size],
Err(err) => return Err(Error::BigDecimalLen(Box::new(err))),
};

bytes
.read_exact(&mut big_decimal_buffer[..])
.map_err(Error::ReadDouble)?;

match decode_long(&mut bytes) {
Ok(Value::Long(scale_value)) => {
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
let decimal = BigDecimal::new(big_int, scale_value);
Ok(decimal)
}
_ => Err(Error::BigDecimalScale),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
types::{Record, Value},
Codec, Reader, Schema, Writer,
};
use apache_avro_test_helper::TestResult;
use bigdecimal::{One, Zero};
use pretty_assertions::assert_eq;
use std::{
fs::File,
io::BufReader,
ops::{Div, Mul},
str::FromStr,
};

#[test]
fn test_avro_3779_bigdecimal_serial() -> TestResult {
let value: BigDecimal =
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
let mut current: BigDecimal = BigDecimal::one();

for iter in 1..180 {
let buffer: Vec<u8> = serialize_big_decimal(&current);

let mut as_slice = buffer.as_slice();
decode_long(&mut as_slice)?;

let mut result: Vec<u8> = Vec::new();
result.extend_from_slice(as_slice);

let deserialize_big_decimal: Result<BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for iter {iter}"
);
assert_eq!(current, deserialize_big_decimal?, "not equals for {iter}");
current = current.mul(&value);
}

let buffer: Vec<u8> = serialize_big_decimal(&BigDecimal::zero());
let mut as_slice = buffer.as_slice();
decode_long(&mut as_slice)?;

let mut result: Vec<u8> = Vec::new();
result.extend_from_slice(as_slice);

let deserialize_big_decimal: Result<BigDecimal, Error> = deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for zero"
);
assert_eq!(
BigDecimal::zero(),
deserialize_big_decimal?,
"not equals for zero"
);

Ok(())
}

#[test]
fn test_avro_3779_record_with_bg() -> TestResult {
let schema_str = r#"
{
"type": "record",
"name": "test",
"fields": [
{
"name": "field_name",
"type": "bytes",
"logicalType": "big-decimal"
}
]
}
"#;
let schema = Schema::parse_str(schema_str)?;

// build record with big decimal value
let mut record = Record::new(&schema).unwrap();
let val = BigDecimal::new(BigInt::from(12), 2);
record.put("field_name", val.clone());

// write a record
let codec = Codec::Null;
let mut writer = Writer::builder()
.schema(&schema)
.codec(codec)
.writer(Vec::new())
.build();

writer.append(record.clone())?;
writer.flush()?;

// read record
let wrote_data = writer.into_inner()?;
let mut reader = Reader::new(&wrote_data[..])?;

let value = reader.next().unwrap()?;

// extract field value
let big_decimal_value: &Value = match value {
Value::Record(ref fields) => Ok(&fields[0].1),
other => Err(format!("Expected a Value::Record, got: {other:?}")),
}?;

let x1res: &BigDecimal = match big_decimal_value {
Value::BigDecimal(ref s) => Ok(s),
other => Err(format!("Expected Value::BigDecimal, got: {other:?}")),
}?;
assert_eq!(&val, x1res);

Ok(())
}

#[test]
fn test_avro_3779_from_java_file() -> TestResult {
// Open file generated with Java code to ensure compatibility
// with Java big decimal logical type.
let file: File = File::open("./tests/bigdec.avro")?;
let mut reader = Reader::new(BufReader::new(&file))?;
let next_element = reader.next();
assert!(next_element.is_some());
let value = next_element.unwrap()?;
let bg = match value {
Value::Record(ref fields) => Ok(&fields[0].1),
other => Err(format!("Expected a Value::Record, got: {other:?}")),
}?;
let value_big_decimal = match bg {
Value::BigDecimal(val) => Ok(val),
other => Err(format!("Expected a Value::BigDecimal, got: {other:?}")),
}?;

let ref_value = BigDecimal::from_str("2.24")?;
assert_eq!(&ref_value, value_big_decimal);

Ok(())
}
}
85 changes: 2 additions & 83 deletions lang/rust/avro/src/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::{
decode::{decode_len, decode_long},
encode::{encode_bytes, encode_long},
types::Value,
AvroResult, Error,
};
use bigdecimal::BigDecimal;
use crate::{AvroResult, Error};
use num_bigint::{BigInt, Sign};
use std::io::Read;

#[derive(Debug, Clone)]
pub struct Decimal {
Expand Down Expand Up @@ -112,47 +105,12 @@ impl<T: AsRef<[u8]>> From<T> for Decimal {
}
}

pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
encode_bytes(&big_endian_value, &mut buffer);
encode_long(exponent, &mut buffer);

buffer
}

pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> Result<BigDecimal, Error> {
let mut bytes: &[u8] = bytes.as_slice();
let mut big_decimal_buffer = match decode_len(&mut bytes) {
Ok(size) => vec![0u8; size],
Err(_err) => return Err(Error::BigDecimalLen),
};

bytes
.read_exact(&mut big_decimal_buffer[..])
.map_err(Error::ReadDouble)?;

match decode_long(&mut bytes) {
Ok(Value::Long(scale_value)) => {
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
let decimal = BigDecimal::new(big_int, scale_value);
Ok(decimal)
}
_ => Err(Error::BigDecimalScale),
}
}

#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use bigdecimal::{One, Zero};
use pretty_assertions::assert_eq;
use std::{
convert::TryFrom,
ops::{Div, Mul},
};
use std::convert::TryFrom;

#[test]
fn test_decimal_from_bytes_from_ref_decimal() -> TestResult {
Expand All @@ -175,43 +133,4 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3779_bigdecimal_serial() -> TestResult {
let value: bigdecimal::BigDecimal =
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
let mut current: bigdecimal::BigDecimal = bigdecimal::BigDecimal::one();

for iter in 1..180 {
let result: Vec<u8> = serialize_big_decimal(&current);

let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for iter {iter}"
);
assert_eq!(
current,
deserialize_big_decimal.unwrap(),
"not equals for ${iter}"
);
current = current.mul(&value);
}

let result: Vec<u8> = serialize_big_decimal(&BigDecimal::zero());
let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for zero"
);
assert_eq!(
BigDecimal::zero(),
deserialize_big_decimal.unwrap(),
"not equals for zero"
);

Ok(())
}
}
3 changes: 2 additions & 1 deletion lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::{
decimal::{deserialize_big_decimal, Decimal},
bigdecimal::deserialize_big_decimal,
decimal::Decimal,
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Expand Down
6 changes: 3 additions & 3 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::{
decimal::serialize_big_decimal,
bigdecimal::serialize_big_decimal,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema, SchemaKind, UnionSchema,
Expand Down Expand Up @@ -130,8 +130,8 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
buffer,
),
Value::BigDecimal(bg) => {
let mut buf: Vec<u8> = serialize_big_decimal(bg);
buffer.append(&mut buf);
let buf: Vec<u8> = serialize_big_decimal(bg);
buffer.extend_from_slice(buf.as_slice());
}
Value::Bytes(bytes) => match *schema {
Schema::Bytes => encode_bytes(bytes, buffer),
Expand Down
1 change: 1 addition & 0 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@
//! assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema));
//! ```
mod bigdecimal;
mod codec;
mod de;
mod decimal;
Expand Down
3 changes: 2 additions & 1 deletion lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! Logic handling the intermediate representation of Avro values.
use crate::{
decimal::{deserialize_big_decimal, serialize_big_decimal, Decimal},
bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
decimal::Decimal,
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
Expand Down
Binary file added lang/rust/avro/tests/bigdec.avro
Binary file not shown.
Loading

0 comments on commit 6107c12

Please sign in to comment.