Skip to content

Commit

Permalink
packed row group range read
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Dec 27, 2024
1 parent e0473b6 commit 136fe37
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ jobs:

- name: Build c++
run: cd cpp && make

- name: Run cpp tests
run: cd build/Release/test && ./milvus_test

- name: Run tests
run: cd go && make && make test
24 changes: 23 additions & 1 deletion cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <packed/chunk_manager.h>
#include <packed/column_group.h>
#include "common/config.h"
#include "common/result.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -40,12 +41,21 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
// Test only
// Read a file with default buffer size
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

// Read a range of row groups from a file with default buffer size
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const size_t start_row_group,
const size_t end_row_group,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

// Read all files with default buffer size
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
Expand All @@ -57,9 +67,19 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

Result<std::shared_ptr<arrow::Table>> ReadRowGroup(int file_index, int row_group_index);

arrow::Status Close() override;

private:
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const std::vector<size_t>& start_row_groups,
const std::vector<size_t>& end_row_groups,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;
Expand All @@ -70,6 +90,8 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
size_t memory_limit_;
size_t buffer_available_;
std::vector<std::unique_ptr<parquet::arrow::FileReader>> file_readers_;
std::vector<size_t> start_row_groups_;
std::vector<size_t> end_row_groups_;
std::vector<std::shared_ptr<arrow::KeyValueMetadata>> metadata_;
std::vector<std::queue<std::shared_ptr<arrow::Table>>> tables_;
std::vector<ColumnGroupState> column_group_states_;
Expand Down
7 changes: 7 additions & 0 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ typedef void* CFileSystem;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

int OpenWithRowGroupRange(const char* path,
struct ArrowSchema* schema,
const int64_t start_row_group,
const int64_t end_row_group,
const int64_t buffer_size,
struct ArrowArrayStream* out);

#ifdef __cplusplus
}
#endif
66 changes: 59 additions & 7 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,52 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const size_t start_row_group,
const size_t end_row_group,
const int64_t buffer_size)
: PackedRecordBatchReader(fs,
std::vector<std::string>{path},
schema,
std::vector<ColumnOffset>(),
std::set<int>(),
std::vector<size_t>(1, start_row_group),
std::vector<size_t>(1, end_row_group),
buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: PackedRecordBatchReader(fs,
paths,
schema,
column_offsets,
needed_columns,
std::vector<size_t>(paths.size(), 0),
std::vector<size_t>(paths.size(), std::numeric_limits<int>::max()),
buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const std::vector<size_t>& start_row_groups,
const std::vector<size_t>& end_row_groups,
const int64_t buffer_size)
: schema_(schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
read_count_(0),
start_row_groups_(start_row_groups),
end_row_groups_(end_row_groups) {
auto cols = std::set(needed_columns);
if (cols.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
Expand All @@ -71,18 +105,22 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
}
file_readers_.emplace_back(std::move(result.value()));
}
if (start_row_groups.size() != paths.size() || end_row_groups.size() != paths.size()) {
throw std::runtime_error("start_row_groups and end_row_groups must match the number of paths");
}

for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "metadata not found in file " << i;
throw std::runtime_error(metadata.status().ToString());
}
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
auto rg_size = PackedMetaSerde::deserialize(metadata.ValueOrDie());
row_group_sizes_.push_back(rg_size);
end_row_groups_[i] = std::min(end_row_groups[i], static_cast<size_t>(rg_size.size()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
column_group_states_.emplace_back(0, start_row_groups_[i] - 1, 0);
}
// Initialize table states and chunk manager
column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0));
chunk_manager_ = std::make_unique<ChunkManager>(needed_column_offsets_, 0);
// tables are referrenced by column_offsets, so it's size is of paths's size.
tables_.resize(paths.size(), std::queue<std::shared_ptr<arrow::Table>>());
Expand All @@ -98,10 +136,9 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
auto advance_row_group = [&](int i) -> int64_t {
auto& reader = file_readers_[i];
int rg = column_group_states_[i].row_group_offset + 1;
int num_row_groups = reader->parquet_reader()->metadata()->num_row_groups();
if (rg >= num_row_groups) {
if (rg >= end_row_groups_[i]) {
// No more row groups. It means we're done or there is an error.
LOG_STORAGE_DEBUG_ << "No more row groups in file " << i << " total row groups " << num_row_groups;
LOG_STORAGE_DEBUG_ << "finished reading file " << i;
return -1;
}
int64_t rg_size = row_group_sizes_[i][rg];
Expand Down Expand Up @@ -192,6 +229,21 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
return arrow::Status::OK();
}

Result<std::shared_ptr<arrow::Table>> PackedRecordBatchReader::ReadRowGroup(int file_index, int row_group_index) {
if (file_index < 0 || file_index >= file_readers_.size()) {
throw std::out_of_range("Invalid file index");
}

auto reader = file_readers_[file_index]->parquet_reader();
if (row_group_index < 0 || row_group_index >= reader->metadata()->num_row_groups()) {
throw std::out_of_range("Invalid row group index");
}

std::shared_ptr<arrow::Table> table;
RETURN_ARROW_NOT_OK(file_readers_[file_index]->ReadRowGroup(row_group_index, &table));
return std::move(table);
}

arrow::Status PackedRecordBatchReader::Close() {
LOG_STORAGE_DEBUG_ << "PackedRecordBatchReader::Close(), total read " << read_count_ << " times";
for (int i = 0; i < column_group_states_.size(); ++i) {
Expand Down
12 changes: 11 additions & 1 deletion cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
return OpenWithRowGroupRange(path, schema, 0, std::numeric_limits<int64_t>::max(), buffer_size, out);
}

int OpenWithRowGroupRange(const char* path,
struct ArrowSchema* schema,
const int64_t start_row_group,
const int64_t end_row_group,
const int64_t buffer_size,
struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
Expand All @@ -36,7 +45,8 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, start_row_group,
end_row_group, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ class PackedTestBase : public ::testing::Test {

PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());

auto res = pr.ReadRowGroup(0, 0);
if (!res.ok()) {
ASSERT_FALSE(res.ok());
}
auto row_group = res.value();
ASSERT_TRUE(row_group->num_rows() > 0);
ASSERT_STATUS_OK(pr.Close());

ValidateTableData(table);
Expand Down
18 changes: 13 additions & 5 deletions go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ CFLAGS += $(CONAN_CFLAGS)
CXXFLAGS += $(CONAN_CXXFLAGS)
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR)
CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR)) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR)

.EXPORT_ALL_VARIABLES:
.PHONY: build
.PHONY: build test proto

build:
@echo "CPPFLAGS: $(CPPFLAGS)"
@echo "LDFLAGS: $(LDFLAGS)"
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" go build ./...

test:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) -lmilvus-storage" go test -timeout 30s ./...
@echo "CPPFLAGS: $(CPPFLAGS)"
@echo "LDFLAGS: $(LDFLAGS)"
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -timeout 30s ./...

proto:
@echo "Generating proto files..."
mkdir -p proto/manifest_proto
mkdir -p proto/schema_proto
protoc -I="proto" --go_out=paths=source_relative:./proto/manifest_proto proto/manifest.proto
protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto

protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto
8 changes: 7 additions & 1 deletion go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package packed
#include "arrow/c/helpers.h"
*/
import "C"

import (
"errors"
"fmt"
"math"
"unsafe"

"github.com/apache/arrow/go/v12/arrow"
Expand All @@ -32,6 +34,10 @@ import (
)

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
return OpenWithRowGroupRange(path, schema, bufferSize, 0, math.MaxInt32)
}

func OpenWithRowGroupRange(path string, schema *arrow.Schema, bufferSize int, start_row_group int, end_row_group int) (arrio.Reader, error) {
// var cSchemaPtr uintptr
// cSchema := cdata.SchemaFromPtr(cSchemaPtr)
var cas cdata.CArrowSchema
Expand All @@ -41,7 +47,7 @@ func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, erro

cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.Open(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
status := C.OpenWithRowGroupRange(cPath, casPtr, C.int64_t(start_row_group), C.int64_t(end_row_group), C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status))
}
Expand Down
10 changes: 9 additions & 1 deletion go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRead(t *testing.T) {
)
}
}
//rec := b.NewRecord()
// rec := b.NewRecord()

path := "testdata/0"
// file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
Expand All @@ -66,4 +66,12 @@ func TestRead(t *testing.T) {
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(300), rr.NumRows())

// test row group range read
reader, err = OpenWithRowGroupRange(path, schema, 10*1024*1024 /* 10MB */, 1, 2)
assert.NoError(t, err)
rr, err = reader.Read()
assert.NoError(t, err)
defer rr.Release()
assert.Greater(t, int64(300), rr.NumRows())
}

0 comments on commit 136fe37

Please sign in to comment.