diff --git a/lang/rust/avro/src/codec.rs b/lang/rust/avro/src/codec.rs index a394cad2545..9ec9d88c690 100644 --- a/lang/rust/avro/src/codec.rs +++ b/lang/rust/avro/src/codec.rs @@ -21,18 +21,6 @@ use libflate::deflate::{Decoder, Encoder}; use std::io::{Read, Write}; use strum_macros::{EnumIter, EnumString, IntoStaticStr}; -#[cfg(feature = "bzip")] -use bzip2::{ - read::{BzDecoder, BzEncoder}, - Compression, -}; -#[cfg(feature = "snappy")] -extern crate crc32fast; -#[cfg(feature = "snappy")] -use crc32fast::Hasher; -#[cfg(feature = "xz")] -use xz2::read::{XzDecoder, XzEncoder}; - /// The compression codec used to compress blocks. #[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)] #[strum(serialize_all = "kebab_case")] @@ -49,15 +37,16 @@ pub enum Codec { /// CRC32 checksum of the uncompressed data in the block. Snappy, #[cfg(feature = "zstandard")] - Zstandard, + /// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/) + Zstandard(zstandard::ZstandardSettings), #[cfg(feature = "bzip")] /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/) /// compression library. - Bzip2, + Bzip2(bzip::Bzip2Settings), #[cfg(feature = "xz")] /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/) /// compression library. - Xz, + Xz(xz::XzSettings), } impl From for Value { @@ -87,7 +76,7 @@ impl Codec { .compress(&stream[..], &mut encoded[..]) .map_err(Error::SnappyCompress)?; - let mut hasher = Hasher::new(); + let mut hasher = crc32fast::Hasher::new(); hasher.update(&stream[..]); let checksum = hasher.finalize(); let checksum_as_bytes = checksum.to_be_bytes(); @@ -98,22 +87,26 @@ impl Codec { *stream = encoded; } #[cfg(feature = "zstandard")] - Codec::Zstandard => { - let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap(); + Codec::Zstandard(settings) => { + let mut encoder = + zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap(); encoder.write_all(stream).map_err(Error::ZstdCompress)?; *stream = encoder.finish().unwrap(); } #[cfg(feature = "bzip")] - Codec::Bzip2 => { - let mut encoder = BzEncoder::new(&stream[..], Compression::best()); + Codec::Bzip2(settings) => { + use bzip2::read::BzEncoder; + + let mut encoder = BzEncoder::new(&stream[..], settings.compression()); let mut buffer = Vec::new(); encoder.read_to_end(&mut buffer).unwrap(); *stream = buffer; } #[cfg(feature = "xz")] - Codec::Xz => { - let compression_level = 9; - let mut encoder = XzEncoder::new(&stream[..], compression_level); + Codec::Xz(settings) => { + use xz2::read::XzEncoder; + + let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32); let mut buffer = Vec::new(); encoder.read_to_end(&mut buffer).unwrap(); *stream = buffer; @@ -148,7 +141,7 @@ impl Codec { last_four.copy_from_slice(&stream[(stream.len() - 4)..]); let expected: u32 = u32::from_be_bytes(last_four); - let mut hasher = Hasher::new(); + let mut hasher = crc32fast::Hasher::new(); hasher.update(&decoded); let actual = hasher.finalize(); @@ -158,21 +151,30 @@ impl Codec { decoded } #[cfg(feature = "zstandard")] - Codec::Zstandard => { + Codec::Zstandard(_settings) => { + use std::io::BufReader; + use zstd::zstd_safe; + let mut decoded = Vec::new(); - let mut decoder = zstd::Decoder::new(&stream[..]).unwrap(); + let buffer_size = zstd_safe::DCtx::in_size(); + let buffer = BufReader::with_capacity(buffer_size, &stream[..]); + let mut decoder = zstd::Decoder::new(buffer).unwrap(); std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?; decoded } #[cfg(feature = "bzip")] - Codec::Bzip2 => { + Codec::Bzip2(_) => { + use bzip2::read::BzDecoder; + let mut decoder = BzDecoder::new(&stream[..]); let mut decoded = Vec::new(); decoder.read_to_end(&mut decoded).unwrap(); decoded } #[cfg(feature = "xz")] - Codec::Xz => { + Codec::Xz(_) => { + use xz2::read::XzDecoder; + let mut decoder = XzDecoder::new(&stream[..]); let mut decoded: Vec = Vec::new(); decoder.read_to_end(&mut decoded).unwrap(); @@ -183,6 +185,72 @@ impl Codec { } } +#[cfg(feature = "bzip")] +pub mod bzip { + use bzip2::Compression; + + #[derive(Clone, Copy, Eq, PartialEq, Debug)] + pub struct Bzip2Settings { + pub compression_level: u8, + } + + impl Bzip2Settings { + pub fn new(compression_level: u8) -> Self { + Self { compression_level } + } + + pub(crate) fn compression(&self) -> Compression { + Compression::new(self.compression_level as u32) + } + } + + impl Default for Bzip2Settings { + fn default() -> Self { + Bzip2Settings::new(Compression::best().level() as u8) + } + } +} + +#[cfg(feature = "zstandard")] +pub mod zstandard { + #[derive(Clone, Copy, Eq, PartialEq, Debug)] + pub struct ZstandardSettings { + pub compression_level: u8, + } + + impl ZstandardSettings { + pub fn new(compression_level: u8) -> Self { + Self { compression_level } + } + } + + impl Default for ZstandardSettings { + fn default() -> Self { + Self::new(0) + } + } +} + +#[cfg(feature = "xz")] +pub mod xz { + #[derive(Clone, Copy, Eq, PartialEq, Debug)] + pub struct XzSettings { + pub compression_level: u8, + } + + impl XzSettings { + pub fn new(compression_level: u8) -> Self { + Self { compression_level } + } + } + + impl Default for XzSettings { + fn default() -> Self { + XzSettings::new(9) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -216,19 +284,19 @@ mod tests { #[cfg(feature = "zstandard")] #[test] fn zstd_compress_and_decompress() -> TestResult { - compress_and_decompress(Codec::Zstandard) + compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default())) } #[cfg(feature = "bzip")] #[test] fn bzip_compress_and_decompress() -> TestResult { - compress_and_decompress(Codec::Bzip2) + compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default())) } #[cfg(feature = "xz")] #[test] fn xz_compress_and_decompress() -> TestResult { - compress_and_decompress(Codec::Xz) + compress_and_decompress(Codec::Xz(xz::XzSettings::default())) } fn compress_and_decompress(codec: Codec) -> TestResult { @@ -250,13 +318,19 @@ mod tests { assert_eq!(<&str>::from(Codec::Snappy), "snappy"); #[cfg(feature = "zstandard")] - assert_eq!(<&str>::from(Codec::Zstandard), "zstandard"); + assert_eq!( + <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())), + "zstandard" + ); #[cfg(feature = "bzip")] - assert_eq!(<&str>::from(Codec::Bzip2), "bzip2"); + assert_eq!( + <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())), + "bzip2" + ); #[cfg(feature = "xz")] - assert_eq!(<&str>::from(Codec::Xz), "xz"); + assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz"); } #[test] @@ -270,13 +344,22 @@ mod tests { assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy); #[cfg(feature = "zstandard")] - assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard); + assert_eq!( + Codec::from_str("zstandard").unwrap(), + Codec::Zstandard(zstandard::ZstandardSettings::default()) + ); #[cfg(feature = "bzip")] - assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2); + assert_eq!( + Codec::from_str("bzip2").unwrap(), + Codec::Bzip2(bzip::Bzip2Settings::default()) + ); #[cfg(feature = "xz")] - assert_eq!(Codec::from_str("xz").unwrap(), Codec::Xz); + assert_eq!( + Codec::from_str("xz").unwrap(), + Codec::Xz(xz::XzSettings::default()) + ); assert!(Codec::from_str("not a codec").is_err()); } diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index e00b9d83081..7f7f57f3bfd 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -867,6 +867,12 @@ pub use crate::{ serde_avro_slice, serde_avro_slice_opt, }, }; +#[cfg(feature = "bzip")] +pub use codec::bzip::Bzip2Settings; +#[cfg(feature = "xz")] +pub use codec::xz::XzSettings; +#[cfg(feature = "zstandard")] +pub use codec::zstandard::ZstandardSettings; pub use codec::Codec; pub use de::from_value; pub use decimal::Decimal; diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs index 121f8e25758..39c3473c83e 100644 --- a/lang/rust/avro/src/reader.rs +++ b/lang/rust/avro/src/reader.rs @@ -75,8 +75,6 @@ impl<'r, R: Read> Block<'r, R> { /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on /// its content. fn read_header(&mut self) -> AvroResult<()> { - let meta_schema = Schema::map(Schema::Bytes); - let mut buf = [0u8; 4]; self.reader .read_exact(&mut buf) @@ -86,12 +84,16 @@ impl<'r, R: Read> Block<'r, R> { return Err(Error::HeaderMagic); } + let meta_schema = Schema::map(Schema::Bytes); if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? { self.read_writer_schema(&metadata)?; self.codec = read_codec(&metadata)?; for (key, value) in metadata { - if key == "avro.schema" || key == "avro.codec" { + if key == "avro.schema" + || key == "avro.codec" + || key == "avro.codec.compression_level" + { // already processed } else if key.starts_with("avro.") { warn!("Ignoring unknown metadata key: {}", key); @@ -262,16 +264,48 @@ fn read_codec(metadata: &HashMap) -> AvroResult { }) .map(|codec_res| match codec_res { Ok(codec) => match Codec::from_str(codec) { - Ok(codec) => Ok(codec), + Ok(codec) => match codec { + #[cfg(feature = "bzip")] + Codec::Bzip2(_) => { + use crate::Bzip2Settings; + if let Some(Value::Bytes(bytes)) = + metadata.get("avro.codec.compression_level") + { + Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0]))) + } else { + Ok(codec) + } + } + #[cfg(feature = "xz")] + Codec::Xz(_) => { + use crate::XzSettings; + if let Some(Value::Bytes(bytes)) = + metadata.get("avro.codec.compression_level") + { + Ok(Codec::Xz(XzSettings::new(bytes[0]))) + } else { + Ok(codec) + } + } + #[cfg(feature = "zstandard")] + Codec::Zstandard(_) => { + use crate::ZstandardSettings; + if let Some(Value::Bytes(bytes)) = + metadata.get("avro.codec.compression_level") + { + Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0]))) + } else { + Ok(codec) + } + } + _ => Ok(codec), + }, Err(_) => Err(Error::CodecNotSupported(codec.to_owned())), }, Err(err) => Err(err), }); - match result { - Some(res) => res, - None => Ok(Codec::Null), - } + result.unwrap_or_else(|| Ok(Codec::Null)) } /// Main interface for reading Avro formatted values. diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 02938d694fd..976193a812a 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -369,6 +369,30 @@ impl<'a, W: Write> Writer<'a, W> { let mut metadata = HashMap::with_capacity(2); metadata.insert("avro.schema", Value::Bytes(schema_bytes)); metadata.insert("avro.codec", self.codec.into()); + match self.codec { + #[cfg(feature = "bzip")] + Codec::Bzip2(settings) => { + metadata.insert( + "avro.codec.compression_level", + Value::Bytes(vec![settings.compression_level]), + ); + } + #[cfg(feature = "xz")] + Codec::Xz(settings) => { + metadata.insert( + "avro.codec.compression_level", + Value::Bytes(vec![settings.compression_level]), + ); + } + #[cfg(feature = "zstandard")] + Codec::Zstandard(settings) => { + metadata.insert( + "avro.codec.compression_level", + Value::Bytes(vec![settings.compression_level]), + ); + } + _ => {} + } for (k, v) in &self.user_metadata { metadata.insert(k.as_str(), v.clone()); diff --git a/lang/rust/avro/tests/codecs.rs b/lang/rust/avro/tests/codecs.rs new file mode 100644 index 00000000000..5017d338966 --- /dev/null +++ b/lang/rust/avro/tests/codecs.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use apache_avro::{ + types::{Record, Value}, + Codec, Reader, Schema, Writer, +}; +use apache_avro_test_helper::TestResult; + +#[test] +fn avro_4032_null_codec_settings() -> TestResult { + avro_4032_codec_settings(Codec::Null) +} +#[test] +fn avro_4032_deflate_codec_settings() -> TestResult { + avro_4032_codec_settings(Codec::Deflate) +} + +#[test] +#[cfg(feature = "bzip")] +fn avro_4032_bzip_codec_settings() -> TestResult { + use apache_avro::Bzip2Settings; + use bzip2::Compression; + let codec = Codec::Bzip2(Bzip2Settings::new(Compression::fast().level() as u8)); + avro_4032_codec_settings(codec) +} + +#[test] +#[cfg(feature = "xz")] +fn avro_4032_xz_codec_settings() -> TestResult { + use apache_avro::XzSettings; + let codec = Codec::Xz(XzSettings::new(8)); + avro_4032_codec_settings(codec) +} + +#[test] +#[cfg(feature = "zstandard")] +fn avro_4032_zstandard_codec_settings() -> TestResult { + use apache_avro::ZstandardSettings; + let compression_level = 13; + let codec = Codec::Zstandard(ZstandardSettings::new(compression_level)); + avro_4032_codec_settings(codec) +} + +fn avro_4032_codec_settings(codec: Codec) -> TestResult { + let schema = Schema::parse_str( + r#" + { + "type": "record", + "name": "Test", + "fields": [ + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "string"} + ] + }"#, + )?; + + let mut writer = Writer::with_codec(&schema, Vec::new(), codec); + let mut record = Record::new(writer.schema()).unwrap(); + record.put("f1", 27_i32); + record.put("f2", "foo"); + writer.append(record)?; + let input = writer.into_inner()?; + let mut reader = Reader::new(&input[..])?; + assert_eq!( + reader.next().unwrap()?, + Value::Record(vec![ + ("f1".to_string(), Value::Int(27)), + ("f2".to_string(), Value::String("foo".to_string())), + ]) + ); + assert!(reader.next().is_none()); + + Ok(()) +}