From bb5fc9bf58053419ca81ad925a7126d2bcfb34b8 Mon Sep 17 00:00:00 2001 From: zh Wang Date: Mon, 20 Feb 2023 10:26:23 +0800 Subject: [PATCH] [Cherry-Pick]Fix aio read (#697) Signed-off-by: zh Wang --- .../DiskANN/src/linux_aligned_file_reader.cpp | 84 ++++++++++++------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp b/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp index 627fd348c..9b3c9ddf2 100644 --- a/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp +++ b/thirdparty/DiskANN/src/linux_aligned_file_reader.cpp @@ -16,7 +16,7 @@ namespace { void execute_io(io_context_t ctx, uint64_t maxnr, int fd, const std::vector &read_reqs, - uint64_t n_retries = 0) { + uint64_t n_retries = 10) { #ifdef DEBUG for (auto &req : read_reqs) { assert(IS_ALIGNED(req.len, 512)); @@ -28,14 +28,13 @@ namespace { #endif // break-up requests into chunks of size maxnr each - uint64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr; - for (uint64_t iter = 0; iter < n_iters; iter++) { - uint64_t n_ops = std::min((uint64_t) read_reqs.size() - (iter * maxnr), - (uint64_t) maxnr); + int64_t n_iters = ROUND_UP(read_reqs.size(), maxnr) / maxnr; + for (int64_t iter = 0; iter < n_iters; iter++) { + int64_t n_ops = std::min(read_reqs.size() - (iter * maxnr), maxnr); std::vector cbs(n_ops, nullptr); std::vector evts(n_ops); std::vector cb(n_ops); - for (uint64_t j = 0; j < n_ops; j++) { + for (int64_t j = 0; j < n_ops; j++) { io_prep_pread(cb.data() + j, fd, read_reqs[j + iter * maxnr].buf, read_reqs[j + iter * maxnr].len, read_reqs[j + iter * maxnr].offset); @@ -48,33 +47,60 @@ namespace { cbs[i] = cb.data() + i; } - uint64_t n_tries = 0; - while (n_tries <= n_retries) { - // issue reads - int64_t ret = io_submit(ctx, (int64_t) n_ops, cbs.data()); - // if requests didn't get accepted - if (ret != (int64_t) n_ops) { - std::stringstream err; - err << "io_submit() failed; returned " << ret - << ", expected=" << n_ops << ", ernno=" << errno << "=" - << ::strerror(-ret) << ", try #" << n_tries + 1 - << ", ctx: " << ctx; - throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, - __LINE__); - } else { - // wait on io_getevents - ret = io_getevents(ctx, (int64_t) n_ops, (int64_t) n_ops, evts.data(), - nullptr); - // if requests didn't complete - if (ret != (int64_t) n_ops) { + int64_t ret; + int64_t num_submitted = 0, submit_retry = 0; + while (num_submitted < n_ops) { + while ((ret = io_submit(ctx, n_ops - num_submitted, + cbs.data() + num_submitted)) < 0) { + if (-ret != EINTR) { std::stringstream err; - err << "io_getevents() failed; returned " << ret - << ", expected=" << n_ops << ", ernno=" << errno << "=" - << ::strerror(-ret) << ", try #" << n_tries + 1; + err << "Unknown error occur in io_submit, errno: " << -ret << ", " + << strerror(-ret); throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, __LINE__); + } + } + num_submitted += ret; + if (num_submitted < n_ops) { + submit_retry++; + if (submit_retry <= n_retries) { + LOG(WARNING) << "io_submit() failed; submit: " << num_submitted + << ", expected: " << n_ops + << ", retry: " << submit_retry; + } else { + std::stringstream err; + err << "io_submit failed after retried " << n_retries << " times"; + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); + } + } + } + + int64_t num_read = 0, read_retry = 0; + while (num_read < n_ops) { + while ((ret = io_getevents(ctx, n_ops - num_read, n_ops - num_read, + evts.data() + num_read, nullptr)) < 0) { + if (-ret != EINTR) { + std::stringstream err; + err << "Unknown error occur in io_getevents, errno: " << -ret + << ", " << strerror(-ret); + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); + } + } + num_read += ret; + if (num_read < n_ops) { + read_retry++; + if (read_retry <= n_retries) { + LOG(WARNING) << "io_getevents() failed; read: " << num_read + << ", expected: " << n_ops + << ", retry: " << read_retry; } else { - break; + std::stringstream err; + err << "io_getevents failed after retried " << n_retries + << " times"; + throw diskann::ANNException(err.str(), -1, __FUNCSIG__, __FILE__, + __LINE__); } } }