Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add label to io_threaded_fallbacks to categorize operations #108

Open
wants to merge 1 commit into
base: v24.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/core/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ posix_file_impl::stat() noexcept {
struct stat st;
auto ret = ::fstat(fd, &st);
return wrap_syscall(ret, st);
}).then([] (syscall_result_extra<struct stat> ret) {
}, submit_reason::file_operation).then([] (syscall_result_extra<struct stat> ret) {
ret.throw_if_error();
return make_ready_future<struct stat>(ret.extra);
});
Expand All @@ -224,7 +224,7 @@ future<>
posix_file_impl::truncate(uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, length] {
return wrap_syscall<int>(::ftruncate(_fd, length));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand All @@ -234,7 +234,7 @@ future<int>
posix_file_impl::ioctl(uint64_t cmd, void* argp) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, cmd, argp] () mutable {
return wrap_syscall<int>(::ioctl(_fd, cmd, argp));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
// Some ioctls require to return a positive integer back.
return make_ready_future<int>(sr.result);
Expand All @@ -255,7 +255,7 @@ future<int>
posix_file_impl::fcntl(int op, uintptr_t arg) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, op, arg] () mutable {
return wrap_syscall<int>(::fcntl(_fd, op, arg));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
// Some fcntls require to return a positive integer back.
return make_ready_future<int>(sr.result);
Expand All @@ -277,7 +277,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
offset, length));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand All @@ -298,7 +298,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
supported = false; // Racy, but harmless. At most we issue an extra call or two.
}
return wrap_syscall<int>(ret);
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down Expand Up @@ -340,7 +340,7 @@ posix_file_impl::close() noexcept {
try {
return engine()._thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::close(fd));
});
}, submit_reason::file_operation);
} catch (...) {
report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd)));
Expand All @@ -362,7 +362,7 @@ blockdev_file_impl::size() noexcept {
uint64_t size;
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
return wrap_syscall(ret, size);
}).then([] (syscall_result_extra<uint64_t> ret) {
}, submit_reason::file_operation).then([] (syscall_result_extra<uint64_t> ret) {
ret.throw_if_error();
return make_ready_future<uint64_t>(ret.extra);
});
Expand Down Expand Up @@ -662,7 +662,7 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
uint64_t range[2] { offset, length };
return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down
59 changes: 35 additions & 24 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2013,7 +2013,7 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt
}
close_fd.cancel();
return wrap_syscall(fd, st);
}).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("open failed", name);
return make_file_impl(sr.result, options, open_flags, sr.extra);
}).then([] (shared_ptr<file_impl> impl) {
Expand All @@ -2028,7 +2028,7 @@ reactor::remove_file(std::string_view pathname) noexcept {
return futurize_invoke([this, pathname] {
return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] {
return wrap_syscall<int>(::remove(pathname.c_str()));
}).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("remove failed", pathname);
return make_ready_future<>();
});
Expand All @@ -2041,7 +2041,8 @@ reactor::rename_file(std::string_view old_pathname, std::string_view new_pathnam
return futurize_invoke([this, old_pathname, new_pathname] {
return _thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
}).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
}, submit_reason::file_operation
).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
return make_ready_future<>();
});
Expand All @@ -2054,7 +2055,7 @@ reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept
return futurize_invoke([this, oldpath, newpath] {
return _thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] {
return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
}).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
return make_ready_future<>();
});
Expand All @@ -2068,7 +2069,7 @@ reactor::chmod(std::string_view name, file_permissions permissions) noexcept {
return futurize_invoke([name, mode, this] {
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] {
return wrap_syscall<int>(::chmod(name.c_str(), mode));
}).then([name = sstring(name), mode] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name), mode] (syscall_result<int> sr) {
if (sr.result == -1) {
auto reason = format("chmod(0{:o}) failed", mode);
sr.throw_fs_exception(reason, fs::path(name));
Expand Down Expand Up @@ -2112,7 +2113,7 @@ reactor::file_type(std::string_view name, follow_symlink follow) noexcept {
auto stat_syscall = follow ? stat : lstat;
auto ret = stat_syscall(name.c_str(), &st);
return wrap_syscall(ret, st);
}).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
if (long(sr.result) == -1) {
if (sr.error != ENOENT && sr.error != ENOTDIR) {
sr.throw_fs_exception_if_error("stat failed", name);
Expand Down Expand Up @@ -2142,7 +2143,7 @@ future<size_t> reactor::read_directory(int fd, char* buffer, size_t buffer_size)
return _thread_pool->submit<syscall_result<long>>([fd, buffer, buffer_size] () {
auto ret = ::syscall(__NR_getdents64, fd, reinterpret_cast<linux_dirent64*>(buffer), buffer_size);
return wrap_syscall(ret);
}).then([] (syscall_result<long> ret) {
}, submit_reason::file_operation).then([] (syscall_result<long> ret) {
ret.throw_if_error();
return make_ready_future<size_t>(ret.result);
});
Expand All @@ -2155,7 +2156,7 @@ reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) {
return _thread_pool->submit<syscall_result<int>>([fd, path = sstring(path), flags] {
auto ret = ::inotify_add_watch(fd, path.c_str(), flags);
return wrap_syscall(ret);
}).then([] (syscall_result<int> ret) {
}, submit_reason::file_operation).then([] (syscall_result<int> ret) {
ret.throw_if_error();
return make_ready_future<int>(ret.result);
});
Expand Down Expand Up @@ -2257,7 +2258,7 @@ reactor::spawn(std::string_view pathname,
return wrap_syscall<int>(::posix_spawn(&child_pid, pathname.c_str(), &actions, &attr,
const_cast<char* const *>(argv.data()),
const_cast<char* const *>(env.data())));
});
}, submit_reason::process_operation);
}).finally([&actions, &attr] {
posix_spawn_file_actions_destroy(&actions);
posix_spawnattr_destroy(&attr);
Expand Down Expand Up @@ -2295,7 +2296,7 @@ static auto next_waitpid_timeout(std::chrono::milliseconds this_timeout) {
future<int> reactor::waitpid(pid_t pid) {
return _thread_pool->submit<syscall_result<int>>([pid] {
return wrap_syscall<int>(syscall(__NR_pidfd_open, pid, O_NONBLOCK));
}).then([pid, this] (syscall_result<int> pidfd) {
}, submit_reason::process_operation).then([pid, this] (syscall_result<int> pidfd) {
if (pidfd.result == -1) {
// pidfd_open() was introduced in linux 5.3, so the pidfd.error could be ENOSYS on
// older kernels. But it could be other error like EMFILE or ENFILE. anyway, we
Expand All @@ -2308,7 +2309,7 @@ future<int> reactor::waitpid(pid_t pid) {
&wait_timeout] {
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
}).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
}, submit_reason::process_operation).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
if (ret.result == 0) {
wait_timeout = next_waitpid_timeout(wait_timeout);
return ::seastar::sleep(wait_timeout).then([] {
Expand All @@ -2328,7 +2329,7 @@ future<int> reactor::waitpid(pid_t pid) {
return pidfd.readable().then([pid, &wstatus, this] {
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
});
}, submit_reason::process_operation);
}).then([&wstatus] (syscall_result<pid_t> ret) {
ret.throw_if_error();
assert(ret.result > 0);
Expand All @@ -2353,7 +2354,7 @@ reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept {
auto stat_syscall = follow ? stat : lstat;
auto ret = stat_syscall(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("stat failed", pathname);
struct stat& st = sr.extra;
stat_data sd;
Expand Down Expand Up @@ -2391,7 +2392,7 @@ reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept
auto aflags = std::underlying_type_t<access_flags>(flags);
auto ret = ::access(pathname.c_str(), aflags);
return wrap_syscall(ret);
}).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
if (sr.result < 0) {
if ((sr.error == ENOENT && flags == access_flags::exists) ||
(sr.error == EACCES && flags != access_flags::exists)) {
Expand All @@ -2413,7 +2414,7 @@ reactor::file_system_at(std::string_view pathname) noexcept {
struct statfs st;
auto ret = statfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
static std::unordered_map<long int, fs_type> type_mapper = {
{ internal::fs_magic::xfs, fs_type::xfs },
{ internal::fs_magic::ext2, fs_type::ext2 },
Expand All @@ -2440,7 +2441,7 @@ reactor::fstatfs(int fd) noexcept {
struct statfs st;
auto ret = ::fstatfs(fd, &st);
return wrap_syscall(ret, st);
}).then([] (syscall_result_extra<struct statfs> sr) {
}, submit_reason::file_operation).then([] (syscall_result_extra<struct statfs> sr) {
sr.throw_if_error();
struct statfs st = sr.extra;
return make_ready_future<struct statfs>(std::move(st));
Expand All @@ -2455,7 +2456,7 @@ reactor::statvfs(std::string_view pathname) noexcept {
struct statvfs st;
auto ret = ::statvfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
sr.throw_fs_exception_if_error("statvfs failed", pathname);
struct statvfs st = sr.extra;
return make_ready_future<struct statvfs>(std::move(st));
Expand All @@ -2479,7 +2480,7 @@ reactor::open_directory(std::string_view name) noexcept {
}
}
return wrap_syscall(fd, st);
}).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
}, submit_reason::file_operation).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
sr.throw_fs_exception_if_error("open failed", name);
return make_file_impl(sr.result, file_open_options(), oflags, sr.extra);
}).then([] (shared_ptr<file_impl> file_impl) {
Expand All @@ -2495,7 +2496,7 @@ reactor::make_directory(std::string_view name, file_permissions permissions) noe
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
auto mode = static_cast<mode_t>(permissions);
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
}).then([name = sstring(name)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("mkdir failed", name);
});
});
Expand All @@ -2508,7 +2509,7 @@ reactor::touch_directory(std::string_view name, file_permissions permissions) no
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
auto mode = static_cast<mode_t>(permissions);
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
}).then([name = sstring(name)] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
if (sr.result == -1 && sr.error != EEXIST) {
sr.throw_fs_exception("mkdir failed", fs::path(name));
}
Expand Down Expand Up @@ -2553,7 +2554,7 @@ reactor::fdatasync(int fd) noexcept {
}
return _thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::fdatasync(fd));
}).then([] (syscall_result<int> sr) {
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<>();
});
Expand Down Expand Up @@ -2700,6 +2701,12 @@ void reactor::register_metrics() {

namespace sm = seastar::metrics;

auto io_fallback_counter = [this](const sstring& reason_str, submit_reason r) {
static auto reason_label = sm::label("reason");
return sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::count, _thread_pool.get(), r),
sm::description("Total number of io-threaded-fallbacks operations"), { reason_label(reason_str), });
};

_metric_groups.add_group("reactor", {
sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")),
// total_operations value:DERIVE:0:U
Expand All @@ -2725,9 +2732,13 @@ void reactor::register_metrics() {
// total_operations value:DERIVE:0:U
sm::make_counter("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
// total_operations value:DERIVE:0:U
sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
sm::description("Total number of io-threaded-fallbacks operations")),

io_fallback_counter("aio_fallback", submit_reason::aio_fallback),
// total_operations value:DERIVE:0:U
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you even understand these comments? I sure don't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_fallback_counter("file_operation", submit_reason::file_operation),
// total_operations value:DERIVE:0:U
io_fallback_counter("process_operation", submit_reason::process_operation),
// total_operations value:DERIVE:0:U
io_fallback_counter("unknown", submit_reason::unknown),
});

_metric_groups.add_group("memory", {
Expand Down
2 changes: 1 addition & 1 deletion src/core/reactor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void aio_storage_context::schedule_retry() {
return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
return wrap_syscall<int>(r);
}).then_wrapped([this] (future<syscall_result<int>> f) {
}, submit_reason::aio_fallback).then_wrapped([this] (future<syscall_result<int>> f) {
// If submit failed, just log the error and exit the loop.
// The next call to submit_work will call schedule_retry again.
if (f.failed()) {
Expand Down
Loading