diff --git a/include/cppcoro/detail/linux.hpp b/include/cppcoro/detail/linux.hpp index 562e8f31..3346bfd2 100644 --- a/include/cppcoro/detail/linux.hpp +++ b/include/cppcoro/detail/linux.hpp @@ -87,6 +87,9 @@ namespace cppcoro { using callback_type = void(io_state* state); callback_type* m_callback; + io_state(callback_type* callback) noexcept + : m_callback(callback) + {} }; class message_queue @@ -99,6 +102,8 @@ namespace cppcoro public: message_queue(); ~message_queue(); + void add_fd_watch(int fd, void* cb, uint32_t events); + void remove_fd_watch(int fd); bool enqueue_message(void* message, message_type type); bool dequeue_message(void*& message, message_type& type, bool wait); }; @@ -106,6 +111,12 @@ namespace cppcoro safe_fd create_event_fd(); safe_fd create_timer_fd(); safe_fd create_epoll_fd(); + + struct safe_file_data + { + safe_fd fd; + message_queue* mq; + }; } // namespace linux } // namespace detail } // namespace cppcoro diff --git a/include/cppcoro/detail/linux_async_operation.hpp b/include/cppcoro/detail/linux_async_operation.hpp new file mode 100644 index 00000000..ec143245 --- /dev/null +++ b/include/cppcoro/detail/linux_async_operation.hpp @@ -0,0 +1,317 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Lewis Baker +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef CPPCORO_DETAIL_LINUX_ASYNC_OPERATION_HPP_INCLUDED +#define CPPCORO_DETAIL_LINUX_ASYNC_OPERATION_HPP_INCLUDED + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace cppcoro +{ + namespace detail + { + class linux_async_operation_base + : protected detail::linux::io_state + { + public: + + linux_async_operation_base( + detail::linux::io_state::callback_type* callback, + detail::linux::message_queue* mq) noexcept + : detail::linux::io_state(callback) + , m_mq(mq) + {} + + std::size_t get_result() + { + if (m_res < 0) + { + throw std::system_error{ + -m_res, + std::system_category() + }; + } + + return m_res; + } + + std::int32_t m_res; + std::function m_completeFunc; + detail::linux::message_queue* m_mq; + }; + + template + class linux_async_operation + : protected linux_async_operation_base + { + protected: + + linux_async_operation(detail::linux::message_queue* mq) noexcept + : linux_async_operation_base( + &linux_async_operation::on_operation_completed, mq) + {} + + public: + + bool await_ready() const noexcept { return false; } + + CPPCORO_NOINLINE + bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) + { + static_assert(std::is_base_of_v); + + m_awaitingCoroutine = awaitingCoroutine; + return static_cast(this)->try_start(); + } + + decltype(auto) await_resume() + { + return static_cast(this)->get_result(); + } + + private: + + static void on_operation_completed( + detail::linux::io_state* ioState) noexcept + { + auto* operation = static_cast(ioState); + operation->m_res = operation->m_completeFunc(); + operation->m_awaitingCoroutine.resume(); + } + + cppcoro::coroutine_handle<> m_awaitingCoroutine; + + }; + + template + class linux_async_operation_cancellable + : protected linux_async_operation_base + { + static constexpr int error_operation_cancelled = ECANCELED; + + protected: + + linux_async_operation_cancellable( + detail::linux::message_queue* mq, + cancellation_token&& ct) noexcept + : linux_async_operation_base( + &linux_async_operation_cancellable::on_operation_completed, mq) + , m_state(ct.is_cancellation_requested() ? state::completed : state::not_started) + , m_cancellationToken(std::move(ct)) + { + m_res = -error_operation_cancelled; + } + + linux_async_operation_cancellable( + linux_async_operation_cancellable&& other) noexcept + : linux_async_operation_base(std::move(other)) + , m_state(other.m_state.load(std::memory_order_relaxed)) + , m_cancellationToken(std::move(other.m_cancellationToken)) + { + assert(m_res == other.m_res); + } + + public: + + bool await_ready() const noexcept + { + return m_state.load(std::memory_order_relaxed) == state::completed; + } + + CPPCORO_NOINLINE + bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) + { + static_assert(std::is_base_of_v); + + m_awaitingCoroutine = awaitingCoroutine; + + // TRICKY: Register cancellation callback before starting the operation + // in case the callback registration throws due to insufficient + // memory. We need to make sure that the logic that occurs after + // starting the operation is noexcept, otherwise we run into the + // problem of not being able to cancel the started operation and + // the dilemma of what to do with the exception. + // + // However, doing this means that the cancellation callback may run + // prior to returning below so in the case that cancellation may + // occur we defer setting the state to 'started' until after + // the operation has finished starting. The cancellation callback + // will only attempt to request cancellation of the operation with + // CancelIoEx() once the state has been set to 'started'. + const bool canBeCancelled = m_cancellationToken.can_be_cancelled(); + if (canBeCancelled) + { + m_cancellationCallback.emplace( + std::move(m_cancellationToken), + [this] { this->on_cancellation_requested(); }); + } + else + { + m_state.store(state::started, std::memory_order_relaxed); + } + + // Now start the operation. + const bool willCompleteAsynchronously = static_cast(this)->try_start(); + if (!willCompleteAsynchronously) + { + // Operation completed synchronously, resume awaiting coroutine immediately. + return false; + } + + if (canBeCancelled) + { + // Need to flag that the operation has finished starting now. + + // However, the operation may have completed concurrently on + // another thread, transitioning directly from not_started -> complete. + // Or it may have had the cancellation callback execute and transition + // from not_started -> cancellation_requested. We use a compare-exchange + // to determine a winner between these potential racing cases. + state oldState = state::not_started; + if (!m_state.compare_exchange_strong( + oldState, + state::started, + std::memory_order_release, + std::memory_order_acquire)) + { + if (oldState == state::cancellation_requested) + { + // Request the operation be cancelled. + // Note that it may have already completed on a background + // thread by now so this request for cancellation may end up + // being ignored. + static_cast(this)->cancel(); + + if (!m_state.compare_exchange_strong( + oldState, + state::started, + std::memory_order_release, + std::memory_order_acquire)) + { + assert(oldState == state::completed); + return false; + } + } + else + { + assert(oldState == state::completed); + return false; + } + } + } + + return true; + } + + decltype(auto) await_resume() + { + // Free memory used by the cancellation callback now that the operation + // has completed rather than waiting until the operation object destructs. + // eg. If the operation is passed to when_all() then the operation object + // may not be destructed until all of the operations complete. + m_cancellationCallback.reset(); + + if (m_res == -error_operation_cancelled) + { + throw operation_cancelled{}; + } + + return static_cast(this)->get_result(); + } + + private: + + enum class state + { + not_started, + started, + cancellation_requested, + completed + }; + + void on_cancellation_requested() noexcept + { + auto oldState = m_state.load(std::memory_order_acquire); + if (oldState == state::not_started) + { + // This callback is running concurrently with await_suspend(). + // The call to start the operation may not have returned yet so + // we can't safely request cancellation of it. Instead we try to + // notify the await_suspend() thread by transitioning the state + // to state::cancellation_requested so that the await_suspend() + // thread can request cancellation after it has finished starting + // the operation. + const bool transferredCancelResponsibility = + m_state.compare_exchange_strong( + oldState, + state::cancellation_requested, + std::memory_order_release, + std::memory_order_acquire); + if (transferredCancelResponsibility) + { + return; + } + } + + // No point requesting cancellation if the operation has already completed. + if (oldState != state::completed) + { + static_cast(this)->cancel(); + } + m_mq->enqueue_message(reinterpret_cast(m_awaitingCoroutine.address()), + detail::linux::RESUME_TYPE); + } + + static void on_operation_completed( + detail::linux::io_state* ioState) noexcept + { + auto* operation = static_cast(ioState); + + operation->m_res = operation->m_completeFunc(); + + auto state = operation->m_state.load(std::memory_order_acquire); + if (state == state::started) + { + operation->m_state.store(state::completed, std::memory_order_relaxed); + operation->m_awaitingCoroutine.resume(); + } + else + { + // We are racing with await_suspend() call suspending. + // Try to mark it as completed using an atomic exchange and look + // at the previous value to determine whether the coroutine suspended + // first (in which case we resume it now) or we marked it as completed + // first (in which case await_suspend() will return false and immediately + // resume the coroutine). + state = operation->m_state.exchange( + state::completed, + std::memory_order_acq_rel); + if (state == state::started) + { + // The await_suspend() method returned (or will return) 'true' and so + // we need to resume the coroutine. + operation->m_awaitingCoroutine.resume(); + } + } + } + + std::atomic m_state; + cppcoro::cancellation_token m_cancellationToken; + std::optional m_cancellationCallback; + cppcoro::coroutine_handle<> m_awaitingCoroutine; + + }; + } +} + +#endif diff --git a/include/cppcoro/file.hpp b/include/cppcoro/file.hpp index 1366af82..41c422da 100644 --- a/include/cppcoro/file.hpp +++ b/include/cppcoro/file.hpp @@ -13,6 +13,8 @@ #if CPPCORO_OS_WINNT # include +#elif CPPCORO_OS_LINUX +# include #endif #include @@ -46,6 +48,18 @@ namespace cppcoro file_buffering_mode bufferingMode); detail::win32::safe_handle m_fileHandle; +#elif CPPCORO_OS_LINUX + file(detail::linux::safe_file_data&& fileData) noexcept; + + static detail::linux::safe_file_data open( + int fileAccess, + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode); + + detail::linux::safe_file_data m_fileData; #endif }; diff --git a/include/cppcoro/file_read_operation.hpp b/include/cppcoro/file_read_operation.hpp index 4116c2fe..6d4bec0f 100644 --- a/include/cppcoro/file_read_operation.hpp +++ b/include/cppcoro/file_read_operation.hpp @@ -15,9 +15,14 @@ #if CPPCORO_OS_WINNT # include # include +#elif CPPCORO_OS_LINUX +# include +# include +#endif namespace cppcoro { +#if CPPCORO_OS_WINNT class file_read_operation_impl { public: @@ -92,7 +97,86 @@ namespace cppcoro file_read_operation_impl m_impl; }; - +#elif CPPCORO_OS_LINUX + class file_read_operation_impl + { + public: + + file_read_operation_impl( + int fd, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount) noexcept + : m_fd(fd) + , m_offset(fileOffset) + , m_buffer(buffer) + , m_byteCount(byteCount) + {} + + bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept; + void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept; + + private: + + int m_fd; + std::uint64_t m_offset; + void* m_buffer; + std::size_t m_byteCount; + + }; + + class file_read_operation + : public cppcoro::detail::linux_async_operation + { + public: + + file_read_operation( + int fd, + detail::linux::message_queue* mq, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount) noexcept + : cppcoro::detail::linux_async_operation(mq) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation; + + bool try_start() noexcept { return m_impl.try_start(*this); } + + file_read_operation_impl m_impl; + + }; + + class file_read_operation_cancellable + : public cppcoro::detail::linux_async_operation_cancellable + { + public: + + file_read_operation_cancellable( + int fd, + detail::linux::message_queue* mq, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount, + cancellation_token&& ct) noexcept + : cppcoro::detail::linux_async_operation_cancellable( + mq, std::move(ct)) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation_cancellable; + + bool try_start() noexcept { return m_impl.try_start(*this); } + void cancel() noexcept { m_impl.cancel(*this); } + + file_read_operation_impl m_impl; + + }; #endif } diff --git a/include/cppcoro/file_write_operation.hpp b/include/cppcoro/file_write_operation.hpp index 40f68541..09550d01 100644 --- a/include/cppcoro/file_write_operation.hpp +++ b/include/cppcoro/file_write_operation.hpp @@ -15,9 +15,14 @@ #if CPPCORO_OS_WINNT # include # include +#elif CPPCORO_OS_LINUX +# include +# include +#endif namespace cppcoro { +#if CPPCORO_OS_WINNT class file_write_operation_impl { public: @@ -91,8 +96,89 @@ namespace cppcoro file_write_operation_impl m_impl; }; + #elif CPPCORO_OS_LINUX + + class file_write_operation_impl + { + public: + + file_write_operation_impl( + int fd, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount) noexcept + : m_fd(fd) + , m_offset(fileOffset) + , m_buffer(buffer) + , m_byteCount(byteCount) + {} + + bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept; + void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept; + + private: + + int m_fd; + std::uint64_t m_offset; + const void* m_buffer; + std::size_t m_byteCount; + + }; + + class file_write_operation + : public cppcoro::detail::linux_async_operation + { + public: + + file_write_operation( + int fd, + detail::linux::message_queue* mq, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount) noexcept + : cppcoro::detail::linux_async_operation(mq) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation; + + bool try_start() noexcept { return m_impl.try_start(*this); } + + file_write_operation_impl m_impl; + + }; + + class file_write_operation_cancellable + : public cppcoro::detail::linux_async_operation_cancellable + { + public: + + file_write_operation_cancellable( + int fd, + detail::linux::message_queue* mq, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount, + cancellation_token&& ct) noexcept + : cppcoro::detail::linux_async_operation_cancellable( + mq, std::move(ct)) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation_cancellable; + + bool try_start() noexcept { return m_impl.try_start(*this); } + void cancel() noexcept { m_impl.cancel(*this); } + + file_write_operation_impl m_impl; + + }; +#endif } -#endif // CPPCORO_OS_WINNT #endif diff --git a/include/cppcoro/io_service.hpp b/include/cppcoro/io_service.hpp index 4fe60e80..d4d58275 100644 --- a/include/cppcoro/io_service.hpp +++ b/include/cppcoro/io_service.hpp @@ -137,6 +137,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT detail::win32::handle_t native_iocp_handle() noexcept; void ensure_winsock_initialised(); +#elif CPPCORO_OS_LINUX + detail::linux::message_queue* get_mq() noexcept; #endif private: diff --git a/include/cppcoro/read_only_file.hpp b/include/cppcoro/read_only_file.hpp index 25271428..f9ba4d21 100644 --- a/include/cppcoro/read_only_file.hpp +++ b/include/cppcoro/read_only_file.hpp @@ -51,6 +51,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT read_only_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + read_only_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/include/cppcoro/read_write_file.hpp b/include/cppcoro/read_write_file.hpp index de9e5374..854089c6 100644 --- a/include/cppcoro/read_write_file.hpp +++ b/include/cppcoro/read_write_file.hpp @@ -58,6 +58,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT read_write_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + read_write_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/include/cppcoro/write_only_file.hpp b/include/cppcoro/write_only_file.hpp index 1af66575..fd8373e5 100644 --- a/include/cppcoro/write_only_file.hpp +++ b/include/cppcoro/write_only_file.hpp @@ -57,6 +57,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT write_only_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + write_only_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index af914442..e20f51ab 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -159,14 +159,33 @@ if(WIN32) elseif(CMAKE_SYSTEM_NAME MATCHES "Linux") set(linuxDetailIncludes linux.hpp + linux_async_operation.hpp ) list(TRANSFORM linuxDetailIncludes PREPEND "${PROJECT_SOURCE_DIR}/include/cppcoro/detail/") list(APPEND detailIncludes ${linuxDetailIncludes}) - list(APPEND sources + set(linuxSources linux.cpp io_service.cpp + file.cpp + readable_file.cpp + writable_file.cpp + read_only_file.cpp + write_only_file.cpp + read_write_file.cpp + file_read_operation.cpp + file_write_operation.cpp + # socket_helpers.cpp + # socket.cpp + # socket_accept_operation.cpp + # socket_connect_operation.cpp + # socket_disconnect_operation.cpp + # socket_send_operation.cpp + # socket_send_to_operation.cpp + # socket_recv_operation.cpp + # socket_recv_from_operation.cpp ) + list(APPEND sources ${linuxSources}) endif() add_library(cppcoro diff --git a/lib/file.cpp b/lib/file.cpp index 36911673..9ace9fb1 100644 --- a/lib/file.cpp +++ b/lib/file.cpp @@ -36,9 +36,23 @@ std::uint64_t cppcoro::file::size() const } return size.QuadPart; +#elif CPPCORO_OS_LINUX + struct stat sb; + if (fstat(m_fileData.fd.fd(), &sb) < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error getting file size: fstat" + }; + } + + return sb.st_size; #endif } +#if CPPCORO_OS_WINNT cppcoro::file::file(detail::win32::safe_handle&& fileHandle) noexcept : m_fileHandle(std::move(fileHandle)) { @@ -166,3 +180,79 @@ cppcoro::detail::win32::safe_handle cppcoro::file::open( return std::move(fileHandle); } + +#elif CPPCORO_OS_LINUX + +cppcoro::file::file(detail::linux::safe_file_data &&fileData) noexcept + : m_fileData(std::move(fileData)) +{ +} + +cppcoro::detail::linux::safe_file_data cppcoro::file::open( + int fileAccess, + io_service &ioService, + const std::filesystem::path &path, + cppcoro::file_open_mode openMode, + cppcoro::file_share_mode shareMode, + cppcoro::file_buffering_mode bufferingMode) +{ + int flags = fileAccess; + + if ((bufferingMode & file_buffering_mode::temporary) == file_buffering_mode::temporary) + { + // TODO + } + if ((bufferingMode & file_buffering_mode::unbuffered) == file_buffering_mode::unbuffered) + { + // TODO + } + + if ((shareMode & file_share_mode::read) == file_share_mode::read) + { + // TODO + } + if ((shareMode & file_share_mode::write) == file_share_mode::write) + { + // TODO + } + if ((shareMode & file_share_mode::delete_) == file_share_mode::delete_) + { + // TODO + } + + switch (openMode) + { + case file_open_mode::create_or_open: + flags |= O_CREAT; + break; + case file_open_mode::create_always: + flags |= O_CREAT | O_TRUNC; + break; + case file_open_mode::create_new: + flags |= O_EXCL; + break; + case file_open_mode::open_existing: + // Default. + break; + case file_open_mode::truncate_existing: + flags |= O_TRUNC; + break; + } + + cppcoro::detail::linux::safe_fd fd( + ::open(path.c_str(), flags | O_NONBLOCK, S_IRWXU | S_IRWXG)); + if (fd.fd() < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error opening file: open" + }; + } + + //posix_fadvise(fd.get(), 0, 0, advice); + + return {std::move(fd), ioService.get_mq()}; +} +#endif diff --git a/lib/file_read_operation.cpp b/lib/file_read_operation.cpp index 49c0e635..d5153609 100644 --- a/lib/file_read_operation.cpp +++ b/lib/file_read_operation.cpp @@ -50,4 +50,28 @@ void cppcoro::file_read_operation_impl::cancel( (void)::CancelIoEx(m_fileHandle, operation.get_overlapped()); } -#endif // CPPCORO_OS_WINNT +#elif CPPCORO_OS_LINUX + +bool cppcoro::file_read_operation_impl::try_start( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + auto seek_res = lseek(m_fd, m_offset, SEEK_SET); + if (seek_res < 0) { + operation.m_res = -errno; + return false; + } + operation.m_completeFunc = [=]() { + int res = read(m_fd, m_buffer, m_byteCount); + operation.m_mq->remove_fd_watch(m_fd); + return res; + }; + operation.m_mq->add_fd_watch(m_fd, reinterpret_cast(&operation), EPOLLIN); + return true; +} + +void cppcoro::file_read_operation_impl::cancel( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + operation.m_mq->remove_fd_watch(m_fd); +} +#endif diff --git a/lib/file_write_operation.cpp b/lib/file_write_operation.cpp index 300d709d..d164e4bb 100644 --- a/lib/file_write_operation.cpp +++ b/lib/file_write_operation.cpp @@ -50,4 +50,28 @@ void cppcoro::file_write_operation_impl::cancel( (void)::CancelIoEx(m_fileHandle, operation.get_overlapped()); } -#endif // CPPCORO_OS_WINNT +#elif CPPCORO_OS_LINUX + +bool cppcoro::file_write_operation_impl::try_start( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + auto seek_res = lseek(m_fd, m_offset, SEEK_SET); + if (seek_res < 0) { + operation.m_res = -errno; + return false; + } + operation.m_completeFunc = [=]() { + int res = write(m_fd, m_buffer, m_byteCount); + operation.m_mq->remove_fd_watch(m_fd); + return res; + }; + operation.m_mq->add_fd_watch(m_fd, reinterpret_cast(&operation), EPOLLOUT); + return true; +} + +void cppcoro::file_write_operation_impl::cancel( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + operation.m_mq->remove_fd_watch(m_fd); +} +#endif diff --git a/lib/io_service.cpp b/lib/io_service.cpp index fb41700e..a128900d 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -512,7 +512,12 @@ void cppcoro::io_service::ensure_winsock_initialised() } } -#endif // CPPCORO_OS_WINNT +#elif CPPCORO_OS_LINUX +cppcoro::detail::linux::message_queue* cppcoro::io_service::get_mq() noexcept +{ + return &m_mq; +} +#endif void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept { @@ -876,13 +881,13 @@ void cppcoro::io_service::timer_thread_state::run() noexcept if (status == 0 || status == -1 || (status == 1 && ev.data.fd == m_wakeupfd.fd())) { uint64_t count; - read(m_wakeupfd.fd(), &count, sizeof(uint64_t)); + (void)read(m_wakeupfd.fd(), &count, sizeof(uint64_t)); waitEvent = true; } else if (status == 1 && ev.data.fd == m_timerfd.fd()) { uint64_t count; - read(m_timerfd.fd(), &count, sizeof(uint64_t)); + (void)read(m_timerfd.fd(), &count, sizeof(uint64_t)); timerEvent = true; } #endif diff --git a/lib/linux.cpp b/lib/linux.cpp index 13dd0069..bd01e5a9 100644 --- a/lib/linux.cpp +++ b/lib/linux.cpp @@ -27,16 +27,7 @@ namespace cppcoro m_epollfd = safe_fd{create_epoll_fd()}; m_ev.data.fd = m_pipefd[0]; m_ev.events = EPOLLIN; - - if(epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_pipefd[0], &m_ev) == -1) - { - throw std::system_error - { - static_cast(errno), - std::system_category(), - "Error creating io_service: epoll ctl pipe" - }; - } + add_fd_watch(m_pipefd[0], reinterpret_cast(m_pipefd[0]), EPOLLIN); } message_queue::~message_queue() @@ -45,6 +36,40 @@ namespace cppcoro assert(close(m_pipefd[1]) == 0); } + void message_queue::add_fd_watch(int fd, void* cb, uint32_t events){ + struct epoll_event ev = {0}; + ev.data.ptr = cb; + ev.events = events; + if(epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, fd, &ev) == -1) + { + if (errno == EPERM) { + // epoll returns EPERM on regular files because they are + // always ready for read/write, we can just queue the callback to run + enqueue_message(cb, CALLBACK_TYPE); + } else { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "message_queue: add_fd_watch failed" + }; + } + } + } + void message_queue::remove_fd_watch(int fd){ + if(epoll_ctl(m_epollfd.fd(), EPOLL_CTL_DEL, fd, NULL) == -1) + { + if (errno != EPERM) { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "message_queue: remove_fd_watch failed" + }; + } + } + } + bool message_queue::enqueue_message(void* msg, message_type type) { message qmsg; @@ -87,25 +112,31 @@ namespace cppcoro }; } - message qmsg; - ssize_t status = read(m_pipefd[0], (char*)&qmsg, sizeof(message)); + if (ev.data.fd == m_pipefd[0]) { + message qmsg; + ssize_t status = read(m_pipefd[0], (char*)&qmsg, sizeof(message)); - if(status == -1) - { - if (errno == EINTR || errno == EAGAIN) { - return false; - } - throw std::system_error + if(status == -1) { - static_cast(errno), - std::system_category(), - "Error retrieving message from message queue: mq_receive" - }; - } + if (errno == EINTR || errno == EAGAIN) { + return false; + } + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error retrieving message from message queue: mq_receive" + }; + } - msg = qmsg.m_ptr; - type = qmsg.m_type; - return true; + msg = qmsg.m_ptr; + type = qmsg.m_type; + return true; + } else { + msg = ev.data.ptr; + type = CALLBACK_TYPE; + return true; + } } safe_fd create_event_fd() diff --git a/lib/read_only_file.cpp b/lib/read_only_file.cpp index cdff78df..995d6306 100644 --- a/lib/read_only_file.cpp +++ b/lib/read_only_file.cpp @@ -33,4 +33,28 @@ cppcoro::read_only_file::read_only_file( { } +#elif CPPCORO_OS_LINUX +#include + +cppcoro::read_only_file cppcoro::read_only_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return read_only_file(file::open( + O_RDONLY, + ioService, + path, + file_open_mode::open_existing, + shareMode, + bufferingMode)); +} + +cppcoro::read_only_file::read_only_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , readable_file(detail::linux::safe_file_data{}) +{ +} #endif diff --git a/lib/read_write_file.cpp b/lib/read_write_file.cpp index e9e816d0..f7c838cd 100644 --- a/lib/read_write_file.cpp +++ b/lib/read_write_file.cpp @@ -35,4 +35,30 @@ cppcoro::read_write_file::read_write_file( { } +#elif CPPCORO_OS_LINUX +#include + +cppcoro::read_write_file cppcoro::read_write_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return read_write_file(file::open( + O_RDWR, + ioService, + path, + openMode, + shareMode, + bufferingMode)); +} + +cppcoro::read_write_file::read_write_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , readable_file(detail::linux::safe_file_data{}) + , writable_file(detail::linux::safe_file_data{}) +{ +} #endif diff --git a/lib/readable_file.cpp b/lib/readable_file.cpp index 12b90f2b..c5e00024 100644 --- a/lib/readable_file.cpp +++ b/lib/readable_file.cpp @@ -32,5 +32,32 @@ cppcoro::file_read_operation_cancellable cppcoro::readable_file::read( byteCount, std::move(ct)); } +#elif CPPCORO_OS_LINUX +cppcoro::file_read_operation cppcoro::readable_file::read( + std::uint64_t offset, + void* buffer, + std::size_t byteCount) const noexcept +{ + return file_read_operation( + m_fileData.fd.fd(), + m_fileData.mq, + offset, + buffer, + byteCount); +} +cppcoro::file_read_operation_cancellable cppcoro::readable_file::read( + std::uint64_t offset, + void* buffer, + std::size_t byteCount, + cancellation_token ct) const noexcept +{ + return file_read_operation_cancellable( + m_fileData.fd.fd(), + m_fileData.mq, + offset, + buffer, + byteCount, + std::move(ct)); +} #endif diff --git a/lib/writable_file.cpp b/lib/writable_file.cpp index 3c413c25..6055a926 100644 --- a/lib/writable_file.cpp +++ b/lib/writable_file.cpp @@ -72,4 +72,48 @@ cppcoro::file_write_operation_cancellable cppcoro::writable_file::write( }; } +#elif CPPCORO_OS_LINUX +#include + +void cppcoro::writable_file::set_size( + std::uint64_t fileSize) +{ + if (ftruncate64(m_fileData.fd.fd(), fileSize) < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error setting file size: ftruncate64" + }; + } +} + +cppcoro::file_write_operation cppcoro::writable_file::write( + std::uint64_t offset, + const void* buffer, + std::size_t byteCount) noexcept +{ + return file_write_operation( + m_fileData.fd.fd(), + m_fileData.mq, + offset, + buffer, + byteCount); +} + +cppcoro::file_write_operation_cancellable cppcoro::writable_file::write( + std::uint64_t offset, + const void* buffer, + std::size_t byteCount, + cancellation_token ct) noexcept +{ + return file_write_operation_cancellable( + m_fileData.fd.fd(), + m_fileData.mq, + offset, + buffer, + byteCount, + std::move(ct)); +} #endif diff --git a/lib/write_only_file.cpp b/lib/write_only_file.cpp index 25540583..c30eb097 100644 --- a/lib/write_only_file.cpp +++ b/lib/write_only_file.cpp @@ -34,4 +34,29 @@ cppcoro::write_only_file::write_only_file( { } +#elif CPPCORO_OS_LINUX +#include + +cppcoro::write_only_file cppcoro::write_only_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return write_only_file(file::open( + O_WRONLY, + ioService, + path, + openMode, + shareMode, + bufferingMode)); +} + +cppcoro::write_only_file::write_only_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , writable_file(detail::linux::safe_file_data{}) +{ +} #endif diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 34698e79..16593c2e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -50,6 +50,8 @@ else() list(APPEND tests scheduling_operator_tests.cpp io_service_tests.cpp + file_tests.cpp + # socket_tests.cpp ) endif() # let more time for some tests