diff --git a/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp b/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp index cb2b75607..9ec8f5093 100644 --- a/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp +++ b/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp @@ -16,7 +16,7 @@ namespace { void execute_io(io_context_t ctx, uint64_t maxnr, int fd, const std::vector &read_reqs, - uint64_t n_retries = 3) { + uint64_t n_retries = 10) { #ifdef DEBUG for (auto &req : read_reqs) { assert(IS_ALIGNED(req.len, 512)); @@ -28,15 +28,13 @@ namespace { #endif // break-up requests into chunks of size maxnr each - uint64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr; - for (uint64_t iter = 0; iter < n_iters; iter++) { - uint64_t n_ops = - std::min((uint64_t) read_reqs.size() - (iter * maxnr), - (uint64_t) maxnr); + int64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr; + for (int64_t iter = 0; iter < n_iters; iter++) { + int64_t n_ops = std::min(read_reqs.size() - (iter * maxnr), maxnr); std::vector cbs(n_ops, nullptr); std::vector evts(n_ops); std::vector cb(n_ops); - for (uint64_t j = 0; j < n_ops; j++) { + for (int64_t j = 0; j < n_ops; j++) { io_prep_pread(cb.data() + j, fd, read_reqs[j + iter * maxnr].buf, read_reqs[j + iter * maxnr].len, read_reqs[j + iter * maxnr].offset); @@ -49,41 +47,58 @@ namespace { cbs[i] = cb.data() + i; } - uint64_t n_tries = 0; - while (n_tries < n_retries) { - n_tries++; - // issue reads - int64_t ret = io_submit(ctx, (int64_t) n_ops, cbs.data()); - // if requests didn't get accepted - if (ret != (int64_t) n_ops) { - LOG(WARNING) << "io_submit() failed; returned " << ret - << ", expected=" << n_ops << ", ernno=" << -ret << "=" - << ::strerror(-ret) << ", try #" << n_tries + 1 - << ", ctx: " << ctx; - continue; + int64_t ret; + int64_t num_submitted = 0, submit_retry = 0; + while (num_submitted < n_ops) { + while ((ret = io_submit(ctx, n_ops - num_submitted, + cbs.data() + num_submitted)) < 0) { + if (-ret != EINTR) { + std::stringstream err; + err << "Unknown error occur in io_submit, errno: " << -ret << ", " + << strerror(-ret); + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); + } + } + num_submitted += ret; + if (num_submitted < n_ops) { + submit_retry++; + if (submit_retry <= n_retries) { + LOG(WARNING) << "io_submit() failed; submit: " << num_submitted + << ", expected: " << n_ops + << ", retry: " << submit_retry; + } else { + std::stringstream err; + err << "io_submit failed after retried " << n_retries << " times"; + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); + } } - // wait on io_getevents - ret = io_getevents(ctx, (int64_t) n_ops, (int64_t) n_ops, evts.data(), - nullptr); - // if requests didn't complete - if (ret != (int64_t) n_ops) { - LOG(WARNING) << "io_getevents() failed; returned " << ret - << ", expected=" << n_ops << ", ernno=" << -ret << "=" - << ::strerror(-ret) << ", try #" << n_tries + 1; - continue; - }; - break; } - if (n_tries == n_retries) { - LOG(WARNING) << "Aio failed, using pread instead"; - for (int j = 0; j < n_ops; ++j) { - size_t len = read_reqs[j + iter * maxnr].len; - auto ret = pread(fd, read_reqs[j + iter * maxnr].buf, len, - read_reqs[j + iter * maxnr].offset); - if (ret != read_reqs[j + iter * maxnr].len) { + + int64_t num_read = 0, read_retry = 0; + while (num_read < n_ops) { + while ((ret = io_getevents(ctx, n_ops - num_read, n_ops - num_read, + evts.data() + num_read, nullptr)) < 0) { + if (-ret != EINTR) { + std::stringstream err; + err << "Unknown error occur in io_getevents, errno: " << -ret + << ", " << strerror(-ret); + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); + } + } + num_read += ret; + if (num_read < n_ops) { + read_retry++; + if (read_retry <= n_retries) { + LOG(WARNING) << "io_getevents() failed; read: " << num_read + << ", expected: " << n_ops + << ", retry: " << read_retry; + } else { std::stringstream err; - err << "pread() failed; returned " << ret << ", expected=" << len - << ", ernno=" << errno << "=" << ::strerror(-ret); + err << "io_getevents failed after retried " << n_retries + << " times"; throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, __LINE__); }