Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3779: using rust bigdecimal #2302

Merged
merged 8 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lang/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lang/rust/avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typed-builder = { default-features = false, version = "0.16.0" }
uuid = { default-features = false, version = "1.4.1", features = ["serde", "std"] }
xz2 = { default-features = false, version = "0.1.7", optional = true }
zstd = { default-features = false, version = "0.12.4+zstd.1.5.2", optional = true }
bigdecimal = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
quad-rand = { default-features = false, version = "0.2.1" }
Expand Down
85 changes: 83 additions & 2 deletions lang/rust/avro/src/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::{AvroResult, Error};
use crate::{
decode::{decode_len, decode_long},
encode::{encode_bytes, encode_long},
types::Value,
AvroResult, Error,
};
use bigdecimal::BigDecimal;
use num_bigint::{BigInt, Sign};
use std::io::Read;

#[derive(Debug, Clone)]
pub struct Decimal {
Expand Down Expand Up @@ -105,12 +112,47 @@ impl<T: AsRef<[u8]>> From<T> for Decimal {
}
}

pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
encode_bytes(&big_endian_value, &mut buffer);
encode_long(exponent, &mut buffer);

buffer
}

pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> Result<BigDecimal, Error> {
let mut bytes: &[u8] = bytes.as_slice();
let mut big_decimal_buffer = match decode_len(&mut bytes) {
Ok(size) => vec![0u8; size],
Err(_err) => return Err(Error::BigDecimalLen),
};

bytes
.read_exact(&mut big_decimal_buffer[..])
.map_err(Error::ReadDouble)?;

match decode_long(&mut bytes) {
Ok(Value::Long(scale_value)) => {
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
let decimal = BigDecimal::new(big_int, scale_value);
Ok(decimal)
}
_ => Err(Error::BigDecimalScale),
}
}

#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use bigdecimal::{One, Zero};
use pretty_assertions::assert_eq;
use std::convert::TryFrom;
use std::{
convert::TryFrom,
ops::{Div, Mul},
};

#[test]
fn test_decimal_from_bytes_from_ref_decimal() -> TestResult {
Expand All @@ -133,4 +175,43 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3779_bigdecimal_serial() -> TestResult {
let value: bigdecimal::BigDecimal =
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
let mut current: bigdecimal::BigDecimal = bigdecimal::BigDecimal::one();

for iter in 1..180 {
let result: Vec<u8> = serialize_big_decimal(&current);

let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for iter {iter}"
);
assert_eq!(
current,
deserialize_big_decimal.unwrap(),
"not equals for ${iter}"
);
current = current.mul(&value);
}

let result: Vec<u8> = serialize_big_decimal(&BigDecimal::zero());
let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
deserialize_big_decimal(&result);
assert!(
deserialize_big_decimal.is_ok(),
"can't deserialize for zero"
);
assert_eq!(
BigDecimal::zero(),
deserialize_big_decimal.unwrap(),
"not equals for zero"
);

Ok(())
}
}
12 changes: 9 additions & 3 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::{
decimal::Decimal,
decimal::{deserialize_big_decimal, Decimal},
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Expand All @@ -36,7 +36,7 @@ use std::{
use uuid::Uuid;

#[inline]
fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
pub(crate) fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
zag_i64(reader).map(Value::Long)
}

Expand All @@ -46,7 +46,7 @@ fn decode_int<R: Read>(reader: &mut R) -> AvroResult<Value> {
}

#[inline]
fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
pub(crate) fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
let len = zag_i64(reader)?;
safe_len(usize::try_from(len).map_err(|e| Error::ConvertI64ToUsize(e, len))?)
}
Expand Down Expand Up @@ -114,6 +114,12 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
},
schema => Err(Error::ResolveDecimalSchema(schema.into())),
},
Schema::BigDecimal => {
match decode_internal(&Schema::Bytes, names, enclosing_namespace, reader)? {
Value::Bytes(bytes) => deserialize_big_decimal(&bytes).map(Value::BigDecimal),
value => Err(Error::BytesValue(value.into())),
}
}
Schema::Uuid => Ok(Value::Uuid(
Uuid::from_str(
match decode_internal(&Schema::String, names, enclosing_namespace, reader)? {
Expand Down
9 changes: 7 additions & 2 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::{
decimal::serialize_big_decimal,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema, SchemaKind,
Expand All @@ -40,13 +41,13 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) -> AvroResul
encode_internal(value, schema, rs.get_names(), &None, buffer)
}

fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
pub(crate) fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
let bytes = s.as_ref();
encode_long(bytes.len() as i64, buffer);
buffer.extend_from_slice(bytes);
}

fn encode_long(i: i64, buffer: &mut Vec<u8>) {
pub(crate) fn encode_long(i: i64, buffer: &mut Vec<u8>) {
zig_i64(i, buffer)
}

Expand Down Expand Up @@ -116,6 +117,10 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
&uuid.to_string(),
buffer,
),
Value::BigDecimal(bg) => {
let mut buf: Vec<u8> = serialize_big_decimal(bg);
buffer.append(&mut buf);
}
Value::Bytes(bytes) => match *schema {
Schema::Bytes => encode_bytes(bytes, buffer),
Schema::Fixed { .. } => buffer.extend(bytes),
Expand Down
12 changes: 12 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub enum Error {
#[error("expected UUID, got: {0:?}")]
GetUuid(ValueKind),

#[error("expected BigDecimal, got: {0:?}")]
GetBigdecimal(ValueKind),

#[error("Fixed bytes of size 12 expected, got Fixed of size {0}")]
GetDecimalFixedBytes(usize),

Expand Down Expand Up @@ -289,6 +292,15 @@ pub enum Error {
#[error("The decimal precision ({precision}) must be a positive number")]
DecimalPrecisionMuBePositive { precision: usize },

#[error("Unreadable decimal sign")]
BigDecimalSign,

#[error("Unreadable length for decimal inner bytes")]
BigDecimalLen,

#[error("Unreadable decimal scale")]
BigDecimalScale,

#[error("Unexpected `type` {0} variant for `logicalType`")]
GetLogicalTypeVariant(serde_json::Value),

Expand Down
39 changes: 39 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Schema {
/// Logical type which represents `Decimal` values. The underlying type is serialized and
/// deserialized as `Schema::Bytes` or `Schema::Fixed`.
Decimal(DecimalSchema),
/// Logical type which represents `Decimal` values without predefined scale.
/// The underlying type is serialized and deserialized as `Schema::Bytes`
BigDecimal,
/// A universally unique identifier, annotating a string.
Uuid,
/// Logical type which represents the number of days since the unix epoch.
Expand Down Expand Up @@ -189,6 +192,7 @@ impl From<&types::Value> for SchemaKind {
Value::Enum(_, _) => Self::Enum,
Value::Fixed(_, _) => Self::Fixed,
Value::Decimal { .. } => Self::Decimal,
Value::BigDecimal(_) => Self::BigDecimal,
Value::Uuid(_) => Self::Uuid,
Value::Date(_) => Self::Date,
Value::TimeMillis(_) => Self::TimeMillis,
Expand Down Expand Up @@ -1339,6 +1343,10 @@ impl Parser {
inner,
}));
}
"big-decimal" => {
logical_verify_type(complex, &[SchemaKind::Bytes], self, enclosing_namespace)?;
return Ok(Schema::BigDecimal);
}
"uuid" => {
logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?;
return Ok(Schema::Uuid);
Expand Down Expand Up @@ -1889,6 +1897,12 @@ impl Serialize for Schema {
map.serialize_entry("precision", precision)?;
map.end()
}
Schema::BigDecimal => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "bytes")?;
map.serialize_entry("logicalType", "big-decimal")?;
map.end()
}
Schema::Uuid => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "string")?;
Expand Down Expand Up @@ -5135,6 +5149,31 @@ mod tests {
Ok(())
}

#[test]
fn test_avro_3779_bigdecimal_schema() -> TestResult {
let schema = json!(
{
"type": "record",
"name": "recordWithDecimal",
"fields": [
{
"name": "decimal",
"type": "bytes",
"logicalType": "big-decimal"
}
]
});

let parse_result = Schema::parse(&schema);
assert!(
parse_result.is_ok(),
"parse result must be ok, got: {:?}",
parse_result
);

Ok(())
}

#[test]
fn test_avro_3820_deny_invalid_field_names() -> TestResult {
let schema_str = r#"
Expand Down
Loading