Skip to content

Commit

Permalink
file record batch reader
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Dec 30, 2024
1 parent e0473b6 commit 7cd3c32
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ jobs:

- name: Build c++
run: cd cpp && make

- name: Run cpp tests
run: cd cpp/build/Release/test && ./milvus_test

- name: Run tests
run: cd go && make && make test
File renamed without changes.
53 changes: 53 additions & 0 deletions cpp/include/milvus-storage/format/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,61 @@
#include "format/reader.h"
#include "parquet/arrow/reader.h"
#include "storage/options.h"
#include "common/config.h"
namespace milvus_storage {

class FileRecordBatchReader : public arrow::RecordBatchReader {
public:
/**
* @brief FileRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints.
*
* @param fs The Arrow filesystem interface.
* @param path Path to the Parquet file.
* @param schema Expected schema of the Parquet file.
* @param buffer_size Memory limit for reading row groups.
* @param row_group_offset The starting row group index to read.
* @param row_group_num The number of row groups to read.
*/
FileRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE,
const size_t row_group_offset = 0,
const size_t row_group_num = std::numeric_limits<size_t>::max());

/**
* @brief Returns the schema of the Parquet file.
*
* @return A shared pointer to the Arrow schema.
*/
std::shared_ptr<arrow::Schema> schema() const;

/**
* @brief Reads the next record batch from the file.
*
* @param out A shared pointer to the output record batch.
* @return Arrow Status indicating success or failure.
*/
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out);

/**
* @brief Closes the reader and releases resources.
*
* @return Arrow Status indicating success or failure.
*/
arrow::Status Close();

private:
std::shared_ptr<arrow::Schema> schema_;
std::unique_ptr<parquet::arrow::FileReader> file_reader_;
size_t current_row_group_ = 0;
size_t read_count_ = 0;

int64_t buffer_size_;
std::vector<size_t> row_group_sizes_;
size_t row_group_offset_;
};

class ParquetFileReader : public Reader {
public:
ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader);
Expand Down
68 changes: 68 additions & 0 deletions cpp/src/format/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,84 @@
#include <arrow/record_batch.h>
#include <arrow/table_builder.h>
#include <arrow/type_fwd.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/type_fwd.h>
#include <iterator>
#include <memory>
#include <utility>
#include <vector>
#include "arrow/table.h"
#include "common/macro.h"
#include "common/serde.h"
#include "common/log.h"
#include "common/arrow_util.h"

namespace milvus_storage {

FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size,
const size_t row_group_offset,
const size_t row_group_num)
: schema_(schema), row_group_offset_(row_group_offset), buffer_size_(buffer_size) {
auto result = MakeArrowFileReader(fs, path);
if (!result.ok()) {
LOG_STORAGE_ERROR_ << "Error making file reader:" << result.status().ToString();
throw std::runtime_error(result.status().ToString());
}
file_reader_ = std::move(result.value());

auto metadata = file_reader_->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path;
throw std::runtime_error(metadata.status().ToString());
}
auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie());
if (row_group_offset >= all_row_group_sizes.size()) {
std::string error_msg =
"Row group offset exceeds total number of row groups. "
"Row group offset: " +
std::to_string(row_group_offset) + ", Total row groups: " + std::to_string(all_row_group_sizes.size());
LOG_STORAGE_ERROR_ << error_msg;
throw std::out_of_range(error_msg);
}
size_t end_offset = std::min(row_group_offset + row_group_num, all_row_group_sizes.size());
row_group_sizes_.assign(all_row_group_sizes.begin() + row_group_offset, all_row_group_sizes.begin() + end_offset);
}

std::shared_ptr<arrow::Schema> FileRecordBatchReader::schema() const { return schema_; }

arrow::Status FileRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
std::vector<int> rgs_to_read;
size_t buffer_size = 0;

while (current_row_group_ < row_group_sizes_.size() &&
buffer_size + row_group_sizes_[current_row_group_] <= buffer_size_) {
rgs_to_read.push_back(current_row_group_ + row_group_offset_);
buffer_size += row_group_sizes_[current_row_group_];
current_row_group_++;
}

if (rgs_to_read.empty()) {
*out = nullptr;
return arrow::Status::OK();
}

std::shared_ptr<arrow::Table> table = nullptr;
RETURN_NOT_OK(file_reader_->ReadRowGroups(rgs_to_read, &table));
*out = table->CombineChunksToBatch().ValueOrDie();
return arrow::Status::OK();
}

arrow::Status FileRecordBatchReader::Close() {
LOG_STORAGE_DEBUG_ << "FileRecordBatchReader closed after reading " << read_count_ << " times.";
file_reader_ = nullptr;
schema_ = nullptr;
row_group_sizes_.clear();
return arrow::Status::OK();
}

ParquetFileReader::ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader) : reader_(std::move(reader)) {}

Result<std::shared_ptr<arrow::RecordBatch>> GetRecordAtOffset(arrow::RecordBatchReader* reader, int64_t offset) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "filesystem/fs.h"
#include <boost/variant.hpp>
#include "common/config.h"
#include "packed/utils/serde.h"
#include "common/serde.h"
#include "filesystem/s3/multi_part_upload_s3_fs.h"

namespace milvus_storage {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "common/log.h"
#include "packed/chunk_manager.h"
#include "common/config.h"
#include "packed/utils/serde.h"
#include "common/serde.h"

namespace milvus_storage {

Expand Down
61 changes: 61 additions & 0 deletions cpp/test/format/parquet/file_reader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "packed_test_base.h"

namespace milvus_storage {

TEST_F(FileReaderTest, FileRecordBatchReader) {
std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8()),
};
auto schema = arrow::schema(fields);

// exeed row group range, should throw out_of_range
std::string path = file_path_ + "/0";
EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_, 100), std::out_of_range);

// file not exist, should throw runtime_error
path = file_path_ + "/file_not_exist";
EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_), std::runtime_error);

// read all row groups
path = file_path_ + "/0";
MemRecordBatchReader fr(*fs_, path, schema, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto m_table, mr.ToTable());
ASSERT_STATUS_OK(mr.Close());

std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(0, 1),
ColumnOffset(0, 2),
};
std::vector<std::string> paths = {file_path_ + "/0"};
PackedRecordBatchReader pr(*fs_, paths, schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto p_table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());
ASSERT_EQ(m_table->num_rows(), p_table->num_rows());

// read row group 1
path = file_path_ + "/0";
MemRecordBatchReader fr2(*fs_, path, schema, reader_memory_, 1, 1);
ASSERT_AND_ARROW_ASSIGN(auto rg_table, mr2.ToTable());
ASSERT_STATUS_OK(fr2.Close());
ASSERT_GT(m_table->num_rows(), rg_table->num_rows());
}

} // namespace milvus_storage
14 changes: 10 additions & 4 deletions go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ CFLAGS += $(CONAN_CFLAGS)
CXXFLAGS += $(CONAN_CXXFLAGS)
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR)
CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR)) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR)

.EXPORT_ALL_VARIABLES:
.PHONY: build
.PHONY: build test proto

build:
@echo "CPPFLAGS: $(CPPFLAGS)"
@echo "LDFLAGS: $(LDFLAGS)"
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" go build ./...

test:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) -lmilvus-storage" go test -timeout 30s ./...
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -timeout 30s ./...

proto:
mkdir -p proto/manifest_proto
mkdir -p proto/schema_proto
protoc -I="proto" --go_out=paths=source_relative:./proto/manifest_proto proto/manifest.proto
protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto

protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto

0 comments on commit 7cd3c32

Please sign in to comment.