Skip to content

Commit

Permalink
Reduce memory usage and improve perf when reading VCF headers (#631) (#…
Browse files Browse the repository at this point in the history
…632)

Co-authored-by: George Powley <[email protected]>
  • Loading branch information
github-actions[bot] and gspowley authored Dec 14, 2023
1 parent 7c10e3a commit 3e7e9a1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 146 deletions.
221 changes: 77 additions & 144 deletions libtiledbvcf/src/dataset/tiledbvcfdataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,8 @@ std::shared_ptr<tiledb::Array> TileDBVCFDataset::open_data_array(
std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers_v4(
const std::vector<SampleAndId>& samples,
std::unordered_map<std::string, size_t>* lookup_map,
const bool all_samples,
const bool first_sample) const {
bool all_samples,
bool first_sample) const {
LOG_DEBUG(
"[fetch_vcf_headers_v4] start all_samples={} first_sample={} "
"(VmRSS = {})",
Expand Down Expand Up @@ -1062,175 +1062,108 @@ std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers_v4(
throw std::runtime_error(
"Cannot fetch TileDB-VCF vcf headers; Array object unexpectedly null");

Query query(*ctx_, *vcf_header_array_);
// Use a managed query to fetch the headers. Note that the previous
// implementation used the TILEDB_ROW_MAJOR layout to read the results in
// sorted order. However, reading results in sorted order is not required
// because we are adding the results to a map. The managed query uses the
// TILEDB_UNORDERED layout which is more efficient.
auto mq =
std::make_unique<ManagedQuery>(vcf_header_array_, "fetch_vcf_headers_v4");
mq->select_columns({"sample", "header"});

// By default all samples are read.
if (!samples.empty()) {
// If all samples but we have a sample list we know its sorted and can use
// the min/max
if (all_samples) {
query.add_range(
0, samples[0].sample_name, samples[samples.size() - 1].sample_name);
} else {
for (const auto& sample : samples) {
query.add_range(0, sample.sample_name, sample.sample_name);
}
// If samples are provided, only read those samples
for (const auto& sample : samples) {
mq->select_point("sample", sample.sample_name);
}
} else if (all_samples) {
// When no samples are passed grab the first one
auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0);
if (!non_empty_domain.first.empty())
query.add_range(0, non_empty_domain.first, non_empty_domain.second);
} else if (first_sample) {
// When no samples are passed grab the first one
// Only read the first sample, based on the non-empty domain. We handle the
// case where the first sample was deleted below.
auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0);
if (!non_empty_domain.first.empty())
query.add_range(0, non_empty_domain.first, non_empty_domain.first);
if (!non_empty_domain.first.empty()) {
mq->select_point("sample", non_empty_domain.first);
} else if (non_empty_domain.second.empty()) {
// If the lower and upper bounds of the non-empty domain are empty, then
// the array is empty or the array contains an annotation VCF with an
// empty sample name. In either case, we will read the VCF headers for
// "all" samples to avoid an infinite loop looking for the first sample.
first_sample = false;
}
}
query.set_layout(TILEDB_ROW_MAJOR);

uint64_t header_offset_element = 0;
uint64_t header_data_element = 0;
uint64_t sample_offset_element = 0;
uint64_t sample_data_element = 0;
#if TILEDB_VERSION_MAJOR == 2 && TILEDB_VERSION_MINOR < 2
std::pair<uint64_t, uint64_t> header_est_size =
query.est_result_size_var("header");
header_offset_element =
std::max(header_est_size.first, static_cast<uint64_t>(1));
header_data_element =
std::max(header_est_size.second / sizeof(char), static_cast<uint64_t>(1));

// Sample estimate
std::pair<uint64_t, uint64_t> sample_est_size =
query.est_result_size_var("sample");
sample_offset_element =
std::max(sample_est_size.first, static_cast<uint64_t>(1));
sample_data_element =
std::max(sample_est_size.second / sizeof(char), static_cast<uint64_t>(1));
#else
std::array<uint64_t, 2> header_est_size = query.est_result_size_var("header");
header_offset_element =
std::max(header_est_size[0] / sizeof(uint64_t), static_cast<uint64_t>(1));
header_data_element =
std::max(header_est_size[1] / sizeof(char), static_cast<uint64_t>(1));

// Sample estimate
std::array<uint64_t, 2> sample_est_size = query.est_result_size_var("sample");
sample_offset_element =
std::max(sample_est_size[0] / sizeof(uint64_t), static_cast<uint64_t>(1));
sample_data_element =
std::max(sample_est_size[1] / sizeof(char), static_cast<uint64_t>(1));
#endif

std::vector<uint64_t> offsets(header_offset_element);
std::vector<char> data(header_data_element);
std::vector<uint64_t> sample_offsets(sample_offset_element);
std::vector<char> sample_data(sample_data_element);
LOG_DEBUG(
"[fetch_vcf_headers_v4] allocate done (VmRSS = {})",
utils::memory_usage_str());

Query::Status status;
uint32_t sample_idx = 0;
while (!mq->is_complete()) {
mq->submit();
auto num_rows = mq->results()->num_rows();

// Handle the case where the first sample was deleted - create a new
// query that reads all samples and exit the loop after processing the
// first sample.
if (first_sample && num_rows == 0) {
LOG_DEBUG(
"[fetch_vcf_headers_v4] the first sample was deleted, resubmitting");
mq = std::make_unique<ManagedQuery>(
vcf_header_array_, "fetch_vcf_headers_v4");
mq->select_columns({"sample", "header"});
}

do {
// Always reset buffer to avoid issue with core library and REST not using
// original buffer sizes
query.set_buffer("header", offsets, data);
query.set_buffer("sample", sample_offsets, sample_data);

status = query.submit();

LOG_DEBUG(
"[fetch_vcf_headers_v4] query done (VmRSS = {})",
utils::memory_usage_str());

auto result_el = query.result_buffer_elements();
uint64_t num_offsets = result_el["header"].first;
uint64_t num_chars = result_el["header"].second;
uint64_t num_samples_offsets = result_el["sample"].first;
uint64_t num_samples_chars = result_el["sample"].second;

bool has_results = num_chars != 0;

if (status == Query::Status::INCOMPLETE && !has_results) {
// If there are no results, double the size of the buffer and then
// resubmit the query.

if (num_chars == 0)
data.resize(data.size() * 2);

if (num_offsets == 0)
offsets.resize(offsets.size() * 2);

if (num_samples_chars == 0)
sample_data.resize(sample_data.size() * 2);

if (num_samples_offsets == 0)
sample_offsets.resize(sample_offsets.size() * 2);

} else if (has_results) {
// Parse the samples.

for (size_t offset_idx = 0; offset_idx < num_offsets; ++offset_idx) {
// Get sample
char* sample_beg = sample_data.data() + sample_offsets[offset_idx];
uint64_t sample_end = offset_idx == num_samples_offsets - 1 ?
num_samples_chars :
sample_offsets[offset_idx + 1];
uint64_t sample_size = sample_end - sample_offsets[offset_idx];
std::string sample(sample_beg, sample_size);

char* beg_hdr = data.data() + offsets[offset_idx];
uint64_t end =
offset_idx == num_offsets - 1 ? num_chars : offsets[offset_idx + 1];
uint64_t start = offsets[offset_idx];
uint64_t hdr_size = end - start;
for (unsigned int i = 0; i < num_rows; i++) {
// Create a string from the string_view to guarantee null termination
// as required by htslib APIs.
auto hdr_str = std::string(mq->string_view("header", i));
auto sample = std::string(mq->string_view("sample", i));

std::string hdr_str(beg_hdr, hdr_size);
bcf_hdr_t* hdr = bcf_hdr_init("r");
if (!hdr) {
throw std::runtime_error(
"Error fetching VCF header data; error allocating VCF header.");
}

bcf_hdr_t* hdr = bcf_hdr_init("r");
if (!hdr)
throw std::runtime_error(
"Error fetching VCF header data; error allocating VCF header.");
if (0 != bcf_hdr_parse(hdr, const_cast<char*>(hdr_str.c_str()))) {
throw std::runtime_error(
"TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF "
"header for sample " +
sample + ".");
}

if (0 != bcf_hdr_parse(hdr, const_cast<char*>(hdr_str.c_str()))) {
if (!sample.empty()) {
if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) {
throw std::runtime_error(
"TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF "
"header for sample " +
"TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample "
"to "
"BCF header for sample " +
sample + ".");
}
}

if (!sample.empty()) {
if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) {
throw std::runtime_error(
"TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample "
"to "
"BCF header for sample " +
sample + ".");
}
}
if (bcf_hdr_sync(hdr) < 0) {
throw std::runtime_error(
"Error in bcftools: failed to update VCF header.");
}

if (bcf_hdr_sync(hdr) < 0)
throw std::runtime_error(
"Error in bcftools: failed to update VCF header.");
result.emplace(
std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy)));
if (lookup_map != nullptr) {
(*lookup_map)[sample] = sample_idx;
}

result.emplace(
std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy)));
if (lookup_map != nullptr)
(*lookup_map)[sample] = sample_idx;
++sample_idx;

++sample_idx;
// Exit the query loop if we only want the first sample.
if (first_sample) {
goto done;
}
}
} while (status == Query::Status::INCOMPLETE);
}

done:
if (tiledb_stats_enabled_)
tiledb::Stats::enable();
LOG_DEBUG(
"[fetch_vcf_headers_v4] done (VmRSS = {})", utils::memory_usage_str());
return result;
} // namespace vcf
}

std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers(
const std::vector<SampleAndId>& samples) const {
Expand Down
4 changes: 2 additions & 2 deletions libtiledbvcf/src/stats/column_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ using namespace tiledb;
*
*/
class ColumnBuffer {
inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 26; // 64 MiB
inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 27; // 128 MiB
inline static const std::string CONFIG_KEY_INIT_BYTES =
"soma.init_buffer_bytes";
"vcf.init_buffer_bytes";

public:
//===================================================================
Expand Down

0 comments on commit 3e7e9a1

Please sign in to comment.