Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Fix aio io_getevents (#593)
Browse files Browse the repository at this point in the history
Signed-off-by: zh Wang <[email protected]>

Signed-off-by: zh Wang <[email protected]>
  • Loading branch information
hhy3 authored Dec 9, 2022
1 parent a8abbf7 commit 532b2b2
Showing 1 changed file with 54 additions and 39 deletions.
93 changes: 54 additions & 39 deletions thirdparty/DiskANN/src/linux_aligned_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace {

void execute_io(io_context_t ctx, uint64_t maxnr, int fd,
const std::vector<AlignedRead> &read_reqs,
uint64_t n_retries = 3) {
uint64_t n_retries = 10) {
#ifdef DEBUG
for (auto &req : read_reqs) {
assert(IS_ALIGNED(req.len, 512));
Expand All @@ -28,15 +28,13 @@ namespace {
#endif

// break-up requests into chunks of size maxnr each
uint64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr;
for (uint64_t iter = 0; iter < n_iters; iter++) {
uint64_t n_ops =
std::min((uint64_t) read_reqs.size() - (iter * maxnr),
(uint64_t) maxnr);
int64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr;
for (int64_t iter = 0; iter < n_iters; iter++) {
int64_t n_ops = std::min(read_reqs.size() - (iter * maxnr), maxnr);
std::vector<iocb_t *> cbs(n_ops, nullptr);
std::vector<io_event_t> evts(n_ops);
std::vector<struct iocb> cb(n_ops);
for (uint64_t j = 0; j < n_ops; j++) {
for (int64_t j = 0; j < n_ops; j++) {
io_prep_pread(cb.data() + j, fd, read_reqs[j + iter * maxnr].buf,
read_reqs[j + iter * maxnr].len,
read_reqs[j + iter * maxnr].offset);
Expand All @@ -49,41 +47,58 @@ namespace {
cbs[i] = cb.data() + i;
}

uint64_t n_tries = 0;
while (n_tries < n_retries) {
n_tries++;
// issue reads
int64_t ret = io_submit(ctx, (int64_t) n_ops, cbs.data());
// if requests didn't get accepted
if (ret != (int64_t) n_ops) {
LOG(WARNING) << "io_submit() failed; returned " << ret
<< ", expected=" << n_ops << ", ernno=" << -ret << "="
<< ::strerror(-ret) << ", try #" << n_tries + 1
<< ", ctx: " << ctx;
continue;
int64_t ret;
int64_t num_submitted = 0, submit_retry = 0;
while (num_submitted < n_ops) {
while ((ret = io_submit(ctx, n_ops - num_submitted,
cbs.data() + num_submitted)) < 0) {
if (-ret != EINTR) {
std::stringstream err;
err << "Unknown error occur in io_submit, errno: " << -ret << ", "
<< strerror(-ret);
throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__,
__LINE__);
}
}
num_submitted += ret;
if (num_submitted < n_ops) {
submit_retry++;
if (submit_retry <= n_retries) {
LOG(WARNING) << "io_submit() failed; submit: " << num_submitted
<< ", expected: " << n_ops
<< ", retry: " << submit_retry;
} else {
std::stringstream err;
err << "io_submit failed after retried " << n_retries << " times";
throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__,
__LINE__);
}
}
// wait on io_getevents
ret = io_getevents(ctx, (int64_t) n_ops, (int64_t) n_ops, evts.data(),
nullptr);
// if requests didn't complete
if (ret != (int64_t) n_ops) {
LOG(WARNING) << "io_getevents() failed; returned " << ret
<< ", expected=" << n_ops << ", ernno=" << -ret << "="
<< ::strerror(-ret) << ", try #" << n_tries + 1;
continue;
};
break;
}
if (n_tries == n_retries) {
LOG(WARNING) << "Aio failed, using pread instead";
for (int j = 0; j < n_ops; ++j) {
size_t len = read_reqs[j + iter * maxnr].len;
auto ret = pread(fd, read_reqs[j + iter * maxnr].buf, len,
read_reqs[j + iter * maxnr].offset);
if (ret != read_reqs[j + iter * maxnr].len) {

int64_t num_read = 0, read_retry = 0;
while (num_read < n_ops) {
while ((ret = io_getevents(ctx, n_ops - num_read, n_ops - num_read,
evts.data() + num_read, nullptr)) < 0) {
if (-ret != EINTR) {
std::stringstream err;
err << "Unknown error occur in io_getevents, errno: " << -ret
<< ", " << strerror(-ret);
throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__,
__LINE__);
}
}
num_read += ret;
if (num_read < n_ops) {
read_retry++;
if (read_retry <= n_retries) {
LOG(WARNING) << "io_getevents() failed; read: " << num_read
<< ", expected: " << n_ops
<< ", retry: " << read_retry;
} else {
std::stringstream err;
err << "pread() failed; returned " << ret << ", expected=" << len
<< ", ernno=" << errno << "=" << ::strerror(-ret);
err << "io_getevents failed after retried " << n_retries
<< " times";
throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__,
__LINE__);
}
Expand Down

0 comments on commit 532b2b2

Please sign in to comment.