Skip to content

Commit

Permalink
packed row group range read
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Dec 26, 2024
1 parent f325184 commit 06013d9
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 5 deletions.
19 changes: 19 additions & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,28 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

// Test only
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const size_t start_row_group,
const size_t end_row_group,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const std::vector<size_t>& start_row_groups,
const std::vector<size_t>& end_row_groups,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

std::shared_ptr<arrow::Schema> schema() const override;
Expand All @@ -73,6 +90,8 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
size_t memory_limit_;
size_t buffer_available_;
std::vector<std::unique_ptr<parquet::arrow::FileReader>> file_readers_;
std::vector<size_t> start_row_groups_;
std::vector<size_t> end_row_groups_;
std::vector<std::shared_ptr<arrow::KeyValueMetadata>> metadata_;
std::vector<std::queue<std::shared_ptr<arrow::Table>>> tables_;
std::vector<ColumnGroupState> column_group_states_;
Expand Down
7 changes: 7 additions & 0 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ typedef void* CFileSystem;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

int OpenWithRowGroupRange(const char* path,
struct ArrowSchema* schema,
const int64_t start_row_group,
const int64_t end_row_group,
const int64_t buffer_size,
struct ArrowArrayStream* out);

#ifdef __cplusplus
}
#endif
48 changes: 46 additions & 2 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,52 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const size_t start_row_group,
const size_t end_row_group,
const int64_t buffer_size)
: PackedRecordBatchReader(fs,
std::vector<std::string>{path},
schema,
std::vector<ColumnOffset>(),
std::set<int>(),
std::vector<size_t>(1, start_row_group),
std::vector<size_t>(1, end_row_group),
buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: PackedRecordBatchReader(fs,
paths,
schema,
column_offsets,
needed_columns,
std::vector<size_t>(paths.size(), 0),
std::vector<size_t>(paths.size(), std::numeric_limits<int>::max()),
buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const std::vector<size_t>& start_row_groups,
const std::vector<size_t>& end_row_groups,
const int64_t buffer_size)
: schema_(schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
read_count_(0),
start_row_groups_(start_row_groups),
end_row_groups_(end_row_groups) {
auto cols = std::set(needed_columns);
if (cols.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
Expand All @@ -71,14 +105,19 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
}
file_readers_.emplace_back(std::move(result.value()));
}
if (start_row_groups.size() != paths.size() || end_row_groups.size() != paths.size()) {
throw std::runtime_error("start_row_groups and end_row_groups must match the number of paths");
}

for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "metadata not found in file " << i;
throw std::runtime_error(metadata.status().ToString());
}
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
auto rg_size = PackedMetaSerde::deserialize(metadata.ValueOrDie());
row_group_sizes_.push_back(rg_size);
end_row_groups_[i] = std::min(end_row_groups[i], static_cast<size_t>(rg_size.size()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
// Initialize table states and chunk manager
Expand All @@ -98,6 +137,11 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
auto advance_row_group = [&](int i) -> int64_t {
auto& reader = file_readers_[i];
int rg = column_group_states_[i].row_group_offset + 1;
if (rg < start_row_groups_[i] || rg > end_row_groups_[i]) {
LOG_STORAGE_DEBUG_ << "Row group " << rg << " is out of range [" << start_row_groups_[i] << ", "
<< end_row_groups_[i] << "]";
return -1;
}
int num_row_groups = reader->parquet_reader()->metadata()->num_row_groups();
if (rg >= num_row_groups) {
// No more row groups. It means we're done or there is an error.
Expand Down
12 changes: 11 additions & 1 deletion cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
return OpenWithRowGroupRange(path, schema, 0, std::numeric_limits<int64_t>::max(), buffer_size, out);
}

int OpenWithRowGroupRange(const char* path,
struct ArrowSchema* schema,
const int64_t start_row_group,
const int64_t end_row_group,
const int64_t buffer_size,
struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
Expand All @@ -36,7 +45,8 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, start_row_group,
end_row_group, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
Expand Down
8 changes: 7 additions & 1 deletion go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package packed
#include "arrow/c/helpers.h"
*/
import "C"

import (
"errors"
"fmt"
"math"
"unsafe"

"github.com/apache/arrow/go/v12/arrow"
Expand All @@ -32,6 +34,10 @@ import (
)

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
return OpenWithRowGroupRange(path, schema, bufferSize, 0, math.MaxInt)
}

func OpenWithRowGroupRange(path string, schema *arrow.Schema, bufferSize int, start_row_group int, end_row_group int) (arrio.Reader, error) {
// var cSchemaPtr uintptr
// cSchema := cdata.SchemaFromPtr(cSchemaPtr)
var cas cdata.CArrowSchema
Expand All @@ -41,7 +47,7 @@ func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, erro

cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.Open(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
status := C.OpenWithRowGroupRange(cPath, casPtr, C.int64_t(bufferSize), C.int64_t(start_row_group), C.int64_t(end_row_group), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status))
}
Expand Down
10 changes: 9 additions & 1 deletion go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRead(t *testing.T) {
)
}
}
//rec := b.NewRecord()
// rec := b.NewRecord()

path := "testdata/0"
// file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
Expand All @@ -66,4 +66,12 @@ func TestRead(t *testing.T) {
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(300), rr.NumRows())

// test row group range read
reader, err = OpenWithRowGroupRange(path, schema, 1, 2, 10*1024*1024 /* 10MB */)
assert.NoError(t, err)
rr, err = reader.Read()
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(300), rr.NumRows())
}

0 comments on commit 06013d9

Please sign in to comment.