Skip to content

Commit

Permalink
core: add label to io_threaded_fallbacks to categorize operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Oct 19, 2024
1 parent 4a3406c commit f538975
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 39 deletions.
18 changes: 9 additions & 9 deletions src/core/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ posix_file_impl::stat() noexcept {
struct stat st;
auto ret = ::fstat(fd, &st);
return wrap_syscall(ret, st);
}).then([] (syscall_result_extra<struct stat> ret) {
}, submit_reason::file_operation).then([] (syscall_result_extra<struct stat> ret) {
ret.throw_if_error();
return make_ready_future<struct stat>(ret.extra);
});
Expand All @@ -224,7 +224,7 @@ future<>
posix_file_impl::truncate(uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, length] {
return wrap_syscall<int>(::ftruncate(_fd, length));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand All @@ -234,7 +234,7 @@ future<int>
posix_file_impl::ioctl(uint64_t cmd, void* argp) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, cmd, argp] () mutable {
return wrap_syscall<int>(::ioctl(_fd, cmd, argp));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
// Some ioctls require to return a positive integer back.
return make_ready_future<int>(sr.result);
Expand All @@ -255,7 +255,7 @@ future<int>
posix_file_impl::fcntl(int op, uintptr_t arg) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, op, arg] () mutable {
return wrap_syscall<int>(::fcntl(_fd, op, arg));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
// Some fcntls require to return a positive integer back.
return make_ready_future<int>(sr.result);
Expand All @@ -277,7 +277,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
offset, length));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand All @@ -298,7 +298,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
supported = false; // Racy, but harmless. At most we issue an extra call or two.
}
return wrap_syscall<int>(ret);
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down Expand Up @@ -340,7 +340,7 @@ posix_file_impl::close() noexcept {
try {
return engine()._thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::close(fd));
});
}, submit_reason::file_operation);
} catch (...) {
report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd)));
Expand All @@ -362,7 +362,7 @@ blockdev_file_impl::size() noexcept {
uint64_t size;
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
return wrap_syscall(ret, size);
}).then([] (syscall_result_extra<uint64_t> ret) {
}, submit_reason::file_operation).then([] (syscall_result_extra<uint64_t> ret) {
ret.throw_if_error();
return make_ready_future<uint64_t>(ret.extra);
});
Expand Down Expand Up @@ -662,7 +662,7 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
uint64_t range[2] { offset, length };
return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down
59 changes: 35 additions & 24 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2013,7 +2013,7 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt
}
close_fd.cancel();
return wrap_syscall(fd, st);
}).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("open failed", name);
return make_file_impl(sr.result, options, open_flags, sr.extra);
}).then([] (shared_ptr<file_impl> impl) {
Expand All @@ -2028,7 +2028,7 @@ reactor::remove_file(std::string_view pathname) noexcept {
return futurize_invoke([this, pathname] {
return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] {
return wrap_syscall<int>(::remove(pathname.c_str()));
}).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("remove failed", pathname);
return make_ready_future<>();
});
Expand All @@ -2041,7 +2041,8 @@ reactor::rename_file(std::string_view old_pathname, std::string_view new_pathnam
return futurize_invoke([this, old_pathname, new_pathname] {
return _thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
}).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
}, submit_reason::file_operation
).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
return make_ready_future<>();
});
Expand All @@ -2054,7 +2055,7 @@ reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept
return futurize_invoke([this, oldpath, newpath] {
return _thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] {
return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
}).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
return make_ready_future<>();
});
Expand All @@ -2068,7 +2069,7 @@ reactor::chmod(std::string_view name, file_permissions permissions) noexcept {
return futurize_invoke([name, mode, this] {
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] {
return wrap_syscall<int>(::chmod(name.c_str(), mode));
}).then([name = sstring(name), mode] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name), mode] (syscall_result<int> sr) {
if (sr.result == -1) {
auto reason = format("chmod(0{:o}) failed", mode);
sr.throw_fs_exception(reason, fs::path(name));
Expand Down Expand Up @@ -2112,7 +2113,7 @@ reactor::file_type(std::string_view name, follow_symlink follow) noexcept {
auto stat_syscall = follow ? stat : lstat;
auto ret = stat_syscall(name.c_str(), &st);
return wrap_syscall(ret, st);
}).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
if (long(sr.result) == -1) {
if (sr.error != ENOENT && sr.error != ENOTDIR) {
sr.throw_fs_exception_if_error("stat failed", name);
Expand Down Expand Up @@ -2142,7 +2143,7 @@ future<size_t> reactor::read_directory(int fd, char* buffer, size_t buffer_size)
return _thread_pool->submit<syscall_result<long>>([fd, buffer, buffer_size] () {
auto ret = ::syscall(__NR_getdents64, fd, reinterpret_cast<linux_dirent64*>(buffer), buffer_size);
return wrap_syscall(ret);
}).then([] (syscall_result<long> ret) {
}, submit_reason::file_operation).then([] (syscall_result<long> ret) {
ret.throw_if_error();
return make_ready_future<size_t>(ret.result);
});
Expand All @@ -2155,7 +2156,7 @@ reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) {
return _thread_pool->submit<syscall_result<int>>([fd, path = sstring(path), flags] {
auto ret = ::inotify_add_watch(fd, path.c_str(), flags);
return wrap_syscall(ret);
}).then([] (syscall_result<int> ret) {
}, submit_reason::file_operation).then([] (syscall_result<int> ret) {
ret.throw_if_error();
return make_ready_future<int>(ret.result);
});
Expand Down Expand Up @@ -2257,7 +2258,7 @@ reactor::spawn(std::string_view pathname,
return wrap_syscall<int>(::posix_spawn(&child_pid, pathname.c_str(), &actions, &attr,
const_cast<char* const *>(argv.data()),
const_cast<char* const *>(env.data())));
});
}, submit_reason::process_operation);
}).finally([&actions, &attr] {
posix_spawn_file_actions_destroy(&actions);
posix_spawnattr_destroy(&attr);
Expand Down Expand Up @@ -2295,7 +2296,7 @@ static auto next_waitpid_timeout(std::chrono::milliseconds this_timeout) {
future<int> reactor::waitpid(pid_t pid) {
return _thread_pool->submit<syscall_result<int>>([pid] {
return wrap_syscall<int>(syscall(__NR_pidfd_open, pid, O_NONBLOCK));
}).then([pid, this] (syscall_result<int> pidfd) {
}, submit_reason::process_operation).then([pid, this] (syscall_result<int> pidfd) {
if (pidfd.result == -1) {
// pidfd_open() was introduced in linux 5.3, so the pidfd.error could be ENOSYS on
// older kernels. But it could be other error like EMFILE or ENFILE. anyway, we
Expand All @@ -2308,7 +2309,7 @@ future<int> reactor::waitpid(pid_t pid) {
&wait_timeout] {
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
}).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
}, submit_reason::process_operation).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
if (ret.result == 0) {
wait_timeout = next_waitpid_timeout(wait_timeout);
return ::seastar::sleep(wait_timeout).then([] {
Expand All @@ -2328,7 +2329,7 @@ future<int> reactor::waitpid(pid_t pid) {
return pidfd.readable().then([pid, &wstatus, this] {
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
});
}, submit_reason::process_operation);
}).then([&wstatus] (syscall_result<pid_t> ret) {
ret.throw_if_error();
assert(ret.result > 0);
Expand All @@ -2353,7 +2354,7 @@ reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept {
auto stat_syscall = follow ? stat : lstat;
auto ret = stat_syscall(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("stat failed", pathname);
struct stat& st = sr.extra;
stat_data sd;
Expand Down Expand Up @@ -2391,7 +2392,7 @@ reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept
auto aflags = std::underlying_type_t<access_flags>(flags);
auto ret = ::access(pathname.c_str(), aflags);
return wrap_syscall(ret);
}).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
if (sr.result < 0) {
if ((sr.error == ENOENT && flags == access_flags::exists) ||
(sr.error == EACCES && flags != access_flags::exists)) {
Expand All @@ -2413,7 +2414,7 @@ reactor::file_system_at(std::string_view pathname) noexcept {
struct statfs st;
auto ret = statfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
static std::unordered_map<long int, fs_type> type_mapper = {
{ internal::fs_magic::xfs, fs_type::xfs },
{ internal::fs_magic::ext2, fs_type::ext2 },
Expand All @@ -2440,7 +2441,7 @@ reactor::fstatfs(int fd) noexcept {
struct statfs st;
auto ret = ::fstatfs(fd, &st);
return wrap_syscall(ret, st);
}).then([] (syscall_result_extra<struct statfs> sr) {
}, submit_reason::file_operation).then([] (syscall_result_extra<struct statfs> sr) {
sr.throw_if_error();
struct statfs st = sr.extra;
return make_ready_future<struct statfs>(std::move(st));
Expand All @@ -2455,7 +2456,7 @@ reactor::statvfs(std::string_view pathname) noexcept {
struct statvfs st;
auto ret = ::statvfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
sr.throw_fs_exception_if_error("statvfs failed", pathname);
struct statvfs st = sr.extra;
return make_ready_future<struct statvfs>(std::move(st));
Expand All @@ -2479,7 +2480,7 @@ reactor::open_directory(std::string_view name) noexcept {
}
}
return wrap_syscall(fd, st);
}).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("open failed", name);
return make_file_impl(sr.result, file_open_options(), oflags, sr.extra);
}).then([] (shared_ptr<file_impl> file_impl) {
Expand All @@ -2495,7 +2496,7 @@ reactor::make_directory(std::string_view name, file_permissions permissions) noe
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
auto mode = static_cast<mode_t>(permissions);
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
}).then([name = sstring(name)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("mkdir failed", name);
});
});
Expand All @@ -2508,7 +2509,7 @@ reactor::touch_directory(std::string_view name, file_permissions permissions) no
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
auto mode = static_cast<mode_t>(permissions);
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
}).then([name = sstring(name)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
if (sr.result == -1 && sr.error != EEXIST) {
sr.throw_fs_exception("mkdir failed", fs::path(name));
}
Expand Down Expand Up @@ -2553,7 +2554,7 @@ reactor::fdatasync(int fd) noexcept {
}
return _thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::fdatasync(fd));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down Expand Up @@ -2700,6 +2701,12 @@ void reactor::register_metrics() {

namespace sm = seastar::metrics;

auto io_fallback_counter = [this](const sstring& reason_str, submit_reason r) {
static auto reason_label = sm::label("reason");
return sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::count, _thread_pool.get(), r),
sm::description("Total number of io-threaded-fallbacks operations"), { reason_label(reason_str), });
};

_metric_groups.add_group("reactor", {
sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")),
// total_operations value:DERIVE:0:U
Expand All @@ -2725,9 +2732,13 @@ void reactor::register_metrics() {
// total_operations value:DERIVE:0:U
sm::make_counter("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
// total_operations value:DERIVE:0:U
sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
sm::description("Total number of io-threaded-fallbacks operations")),

io_fallback_counter("aio_fallback", submit_reason::aio_fallback),
// total_operations value:DERIVE:0:U
io_fallback_counter("file_operation", submit_reason::file_operation),
// total_operations value:DERIVE:0:U
io_fallback_counter("process_operation", submit_reason::process_operation),
// total_operations value:DERIVE:0:U
io_fallback_counter("unknown", submit_reason::unknown),
});

_metric_groups.add_group("memory", {
Expand Down
2 changes: 1 addition & 1 deletion src/core/reactor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void aio_storage_context::schedule_retry() {
return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
return wrap_syscall<int>(r);
}).then_wrapped([this] (future<syscall_result<int>> f) {
}, submit_reason::aio_fallback).then_wrapped([this] (future<syscall_result<int>> f) {
// If submit failed, just log the error and exit the loop.
// The next call to submit_work will call schedule_retry again.
if (f.failed()) {
Expand Down
Loading

0 comments on commit f538975

Please sign in to comment.