Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Commit

Permalink
Row-based read support (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi authored and sunchao committed Mar 29, 2018
1 parent f2f0993 commit 67636b0
Show file tree
Hide file tree
Showing 18 changed files with 2,227 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ addons:
- binutils-dev

rust:
- nightly
- nightly-2018-03-26

before_script:
- git clone https://github.com/apache/thrift.git
Expand Down
Binary file added data/nested_lists.snappy.parquet
Binary file not shown.
Binary file added data/nested_maps.snappy.parquet
Binary file not shown.
Binary file added data/nonnullable.impala.parquet
Binary file not shown.
Binary file added data/nullable.impala.parquet
Binary file not shown.
Binary file added data/nulls.snappy.parquet
Binary file not shown.
43 changes: 43 additions & 0 deletions src/bin/parquet-read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
extern crate parquet;

use std::env;
use std::fs::File;
use std::path::Path;
use std::process;

use parquet::file::reader::{FileReader, SerializedFileReader};

fn main() {
let args: Vec<String> = env::args().collect();
if args.len() != 2 && args.len() != 3 {
println!("Usage: read-file <file-path> [num-records]");
process::exit(1);
}

let mut num_records: Option<usize> = None;
if args.len() == 3 {
match args[2].parse() {
Ok(value) => num_records = Some(value),
Err(e) => panic!("Error when reading value for [num-records], {}", e)
}
}

let path = Path::new(&args[1]);
let file = File::open(&path).unwrap();
let parquet_reader = SerializedFileReader::new(file).unwrap();

// Use full schema as projected schema
let mut iter = parquet_reader.get_row_iter(None).unwrap();

let mut start = 0;
let end = num_records.unwrap_or(0);
let all_records = num_records.is_none();

while all_records || start < end {
match iter.next() {
Some(row) => println!("{}", row),
None => break,
}
start += 1;
}
}
2 changes: 1 addition & 1 deletion src/bin/dump-schema.rs → src/bin/parquet-schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::printer::{print_parquet_metadata, print_file_metadata};

fn main() {
let args: Vec<_> = env::args().collect();
let args: Vec<String> = env::args().collect();
if args.len() != 2 && args.len() != 3 {
println!("Usage: dump-schema <file-path> <verbose>");
process::exit(1);
Expand Down
13 changes: 13 additions & 0 deletions src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl FileMetaData {
pub fn schema_descr(&self) -> &SchemaDescriptor {
&self.schema_descr
}

pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}
}

pub type RowGroupMetaDataPtr = Rc<RowGroupMetaData>;
Expand Down Expand Up @@ -129,6 +133,10 @@ impl RowGroupMetaData {
self.schema_descr.as_ref()
}

pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}

pub fn from_thrift(
schema_descr: SchemaDescPtr, mut rg: RowGroup
) -> Result<RowGroupMetaData> {
Expand Down Expand Up @@ -192,6 +200,11 @@ impl ColumnChunkMetaData {
self.column_descr.as_ref()
}

/// Reference counted clone of descriptor for this column
pub fn column_descr_ptr(&self) -> ColumnDescPtr {
self.column_descr.clone()
}

/// All encodings used for this column
pub fn encodings(&self) -> &Vec<Encoding> {
&self.encodings
Expand Down
57 changes: 23 additions & 34 deletions src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use byteorder::{LittleEndian, ByteOrder};
use thrift::protocol::TCompactInputProtocol;
use parquet_thrift::parquet::FileMetaData as TFileMetaData;
use parquet_thrift::parquet::{PageType, PageHeader};
use schema::types::{self, SchemaDescriptor};
use schema::types::{self, Type as SchemaType, SchemaDescriptor};
use column::page::{Page, PageReader};
use column::reader::{ColumnReader, ColumnReaderImpl};
use compression::{Codec, create_codec};
use record::reader::RowIter;
use util::io::FileChunk;
use util::memory::ByteBufferPtr;

Expand All @@ -50,6 +51,12 @@ pub trait FileReader {
/// the same as this. Otherwise, the row group metadata stored in the row group reader
/// may outlive the file reader.
fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>>;

/// Get full iterator of `Row` from a file (over all row groups).
/// Iterator will automatically load the next row group to advance.
/// Projected schema can be a subset of or equal to the file schema, when it is None,
/// full file schema is assumed.
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}

/// Parquet row group reader API. With this, user can get metadata information about the
Expand All @@ -66,6 +73,11 @@ pub trait RowGroupReader {

/// Get value reader for the `i`th column chunk
fn get_column_reader(&self, i: usize) -> Result<ColumnReader>;

/// Get iterator of `Row` from this row group.
/// Projected schema can be a subset of or equal to the file schema, when it is None,
/// full file schema is assumed.
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}


Expand Down Expand Up @@ -174,6 +186,10 @@ impl FileReader for SerializedFileReader {
let f = self.buf.get_ref().try_clone()?;
Ok(Box::new(SerializedRowGroupReader::new(f, row_group_metadata)))
}

fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
RowIter::from_file(projection, self)
}
}

/// A serialized impl for row group reader
Expand All @@ -183,7 +199,7 @@ pub struct SerializedRowGroupReader {
}

impl SerializedRowGroupReader {
pub fn new(file: File, metadata: RowGroupMetaDataPtr ) -> Self {
pub fn new(file: File, metadata: RowGroupMetaDataPtr) -> Self {
let buf = BufReader::new(file);
Self { buf, metadata }
}
Expand Down Expand Up @@ -237,6 +253,10 @@ impl RowGroupReader for SerializedRowGroupReader {
};
Ok(col_reader)
}

fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
RowIter::from_row_group(projection, self)
}
}


Expand Down Expand Up @@ -382,9 +402,7 @@ impl PageReader for SerializedPageReader {
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::env;
use std::io::Write;
use util::test_common::{get_temp_file, get_test_file};

#[test]
fn test_file_reader_metadata_size_smaller_than_footer() {
Expand Down Expand Up @@ -577,33 +595,4 @@ mod tests {
}
assert_eq!(page_count, 2);
}

fn get_test_file(file_name: &str) -> fs::File {
let mut path_buf = env::current_dir().unwrap();
path_buf.push("data");
path_buf.push(file_name);
let file = File::open(path_buf.as_path());
assert!(file.is_ok());
file.unwrap()
}

fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
// build tmp path to a file in "target/debug/testdata"
let mut path_buf = env::current_dir().unwrap();
path_buf.push("target");
path_buf.push("debug");
path_buf.push("testdata");
fs::create_dir_all(&path_buf).unwrap();
path_buf.push(file_name);

// write file content
let mut tmp_file = File::create(path_buf.as_path()).unwrap();
tmp_file.write_all(content).unwrap();
tmp_file.sync_all().unwrap();

// read file and return file handle
let file = File::open(path_buf.as_path());
assert!(file.is_ok());
file.unwrap()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod parquet_thrift;
#[macro_use]
pub mod util;
pub mod column;
pub mod record;
pub mod compression;

pub mod schema;
Expand Down
Loading

0 comments on commit 67636b0

Please sign in to comment.