Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Commit

Permalink
Add LZ4 compression (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao authored Apr 25, 2018
1 parent 8ef823d commit 97806c9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ flate2 = "0.2"
thrift = "0.0.4"
x86intrin = "0.4.3"
chrono = "0.4"
lz4 = "1.22"

[dev-dependencies]
lazy_static = "1"
Expand Down
18 changes: 18 additions & 0 deletions benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ compress!(compress_snappy_double, Compression::SNAPPY, 5);
compress!(compress_snappy_fixed, Compression::SNAPPY, 6);
compress!(compress_snappy_int96, Compression::SNAPPY, 7);

compress!(compress_lz4_binary, Compression::LZ4, 0);
compress!(compress_lz4_int32, Compression::LZ4, 1);
compress!(compress_lz4_int64, Compression::LZ4, 2);
compress!(compress_lz4_boolean, Compression::LZ4, 3);
compress!(compress_lz4_float, Compression::LZ4, 4);
compress!(compress_lz4_double, Compression::LZ4, 5);
compress!(compress_lz4_fixed, Compression::LZ4, 6);
compress!(compress_lz4_int96, Compression::LZ4, 7);

decompress!(decompress_brotli_binary, Compression::BROTLI, 0);
decompress!(decompress_brotli_int32, Compression::BROTLI, 1);
decompress!(decompress_brotli_int64, Compression::BROTLI, 2);
Expand All @@ -163,3 +172,12 @@ decompress!(decompress_snappy_float, Compression::SNAPPY, 4);
decompress!(decompress_snappy_double, Compression::SNAPPY, 5);
decompress!(decompress_snappy_fixed, Compression::SNAPPY, 6);
decompress!(decompress_snappy_int96, Compression::SNAPPY, 7);

decompress!(decompress_lz4_binary, Compression::LZ4, 0);
decompress!(decompress_lz4_int32, Compression::LZ4, 1);
decompress!(decompress_lz4_int64, Compression::LZ4, 2);
decompress!(decompress_lz4_boolean, Compression::LZ4, 3);
decompress!(decompress_lz4_float, Compression::LZ4, 4);
decompress!(decompress_lz4_double, Compression::LZ4, 5);
decompress!(decompress_lz4_fixed, Compression::LZ4, 6);
decompress!(decompress_lz4_int96, Compression::LZ4, 7);
55 changes: 55 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use snap::{decompress_len, Decoder, Encoder};
use lz4;

/// Parquet compression codec interface.
pub trait Codec {
Expand All @@ -71,6 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec))
}
Expand Down Expand Up @@ -168,6 +170,54 @@ impl Codec for BrotliCodec {
}


const LZ4_BUFFER_SIZE: usize = 4096;

/// Codec for LZ4 compression algorithm.
pub struct LZ4Codec {}

impl LZ4Codec {
/// Creates new LZ4 compression codec.
fn new() -> Self {
Self {}
}
}

impl Codec for LZ4Codec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
let len = decoder.read(&mut buffer)?;
if len == 0 {
break;
}
total_len += len;
output_buf.write_all(&buffer[0..len])?;
}
Ok(total_len)
}

fn compress(&mut self, input_buf: &[u8]) -> Result<Vec<u8>> {
let mut encoder = lz4::EncoderBuilder::new().build(Vec::new())?;
let mut from = 0;
loop {
let to = ::std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
let (v, result) = encoder.finish();
match result {
Ok(_) => Ok(v),
e => Err(general_err!("Error when finishing compressing with LZ4: {:?}", e))
}
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -223,4 +273,9 @@ mod tests {
fn test_codec_brotli() {
test_codec(CodecType::BROTLI);
}

#[test]
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ extern crate rand;
extern crate x86intrin;
extern crate parquet_format;
extern crate chrono;
extern crate lz4;

#[macro_use]
pub mod errors;
Expand Down

0 comments on commit 97806c9

Please sign in to comment.