Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux Socket Operations #80

Merged
merged 1 commit into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/cppcoro/detail/linux_async_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,21 @@ namespace cppcoro
{
auto* operation = static_cast<linux_async_operation*>(ioState);
operation->m_res = operation->m_completeFunc();
if (operation->m_res < 0) {
operation->m_res = -errno;
}
operation->m_awaitingCoroutine.resume();
}

cppcoro::coroutine_handle<> m_awaitingCoroutine;

};

static constexpr int error_operation_cancelled = ECANCELED;
template<typename OPERATION>
class linux_async_operation_cancellable
: protected linux_async_operation_base
{
static constexpr int error_operation_cancelled = ECANCELED;

protected:

Expand Down Expand Up @@ -278,6 +281,9 @@ namespace cppcoro
auto* operation = static_cast<linux_async_operation_cancellable*>(ioState);

operation->m_res = operation->m_completeFunc();
if (operation->m_res < 0) {
operation->m_res = -errno;
}

auto state = operation->m_state.load(std::memory_order_acquire);
if (state == state::started)
Expand Down
26 changes: 21 additions & 5 deletions include/cppcoro/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
#endif

namespace cppcoro
Expand Down Expand Up @@ -81,6 +83,9 @@ namespace cppcoro
static socket create_udpv6(io_service& ioSvc);

socket(socket&& other) noexcept;
socket& operator=(socket&& other) noexcept;
socket(const socket& other) noexcept;
socket& operator=(const socket& other) noexcept;

/// Closes the socket, releasing any associated resources.
///
Expand All @@ -90,7 +95,8 @@ namespace cppcoro
/// disconnect() and wait until the disconnect operation completes.
~socket();

socket& operator=(socket&& other) noexcept;
int close();


#if CPPCORO_OS_WINNT
/// Get the Win32 socket handle assocaited with this socket.
Expand All @@ -104,6 +110,9 @@ namespace cppcoro
/// operation completing synchronously or whether it should suspend the coroutine
/// and wait until the I/O completion event is dispatched to an I/O thread.
bool skip_completion_on_success() noexcept { return m_skipCompletionOnSuccess; }
#elif CPPCORO_OS_LINUX
/// Get the linux fd assocaited with this socket.
cppcoro::detail::linux::fd_t native_handle() noexcept { return m_handle; }
#endif

/// Get the address and port of the local end-point.
Expand Down Expand Up @@ -242,20 +251,27 @@ namespace cppcoro
void close_send();
void close_recv();

private:

friend class socket_accept_operation_impl;
friend class socket_connect_operation_impl;

#if CPPCORO_OS_WINNT
explicit socket(
cppcoro::detail::win32::socket_t handle,
bool skipCompletionOnSuccess) noexcept;
#elif CPPCORO_OS_LINUX
explicit socket(
cppcoro::detail::linux::fd_t handle,
cppcoro::detail::linux::message_queue* mq) noexcept;
#endif
private:

friend class socket_accept_operation_impl;
friend class socket_connect_operation_impl;

#if CPPCORO_OS_WINNT
cppcoro::detail::win32::socket_t m_handle;
bool m_skipCompletionOnSuccess;
#elif CPPCORO_OS_LINUX
cppcoro::detail::linux::fd_t m_handle;
cppcoro::detail::linux::message_queue* m_mq;
#endif

ip_endpoint m_localEndPoint;
Expand Down
85 changes: 82 additions & 3 deletions include/cppcoro/net/socket_accept_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@
#include <cppcoro/cancellation_token.hpp>
#include <cppcoro/cancellation_registration.hpp>

#include <atomic>
#include <optional>

#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
# include <cppcoro/detail/win32_overlapped_operation.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
# include <cppcoro/detail/linux_async_operation.hpp>
#endif

# include <atomic>
# include <optional>

namespace cppcoro
{
namespace net
{
#if CPPCORO_OS_WINNT
class socket;

class socket_accept_operation_impl
Expand Down Expand Up @@ -100,9 +106,82 @@ namespace cppcoro
socket_accept_operation_impl m_impl;

};
#elif CPPCORO_OS_LINUX
class socket;

class socket_accept_operation_impl
{
public:

socket_accept_operation_impl(
socket& listeningSocket,
socket& acceptingSocket) noexcept
: m_listeningSocket(listeningSocket)
, m_acceptingSocket(acceptingSocket)
{}

bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void get_result(cppcoro::detail::linux_async_operation_base& operation);

private:
socket& m_listeningSocket;
socket& m_acceptingSocket;
alignas(8) std::uint8_t m_addressBuffer[88];
};

class socket_accept_operation
: public cppcoro::detail::linux_async_operation<socket_accept_operation>
{
public:

socket_accept_operation(
socket& listeningSocket,
socket& acceptingSocket,
cppcoro::detail::linux::message_queue* mq) noexcept
: cppcoro::detail::linux_async_operation<socket_accept_operation>(mq)
, m_impl(listeningSocket, acceptingSocket)
{}

private:

friend class cppcoro::detail::linux_async_operation<socket_accept_operation>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void get_result() { m_impl.get_result(*this); }

socket_accept_operation_impl m_impl;

};

class socket_accept_operation_cancellable
: public cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>
{
public:

socket_accept_operation_cancellable(
socket& listeningSocket,
socket& acceptingSocket,
cppcoro::detail::linux::message_queue* mq,
cancellation_token&& ct) noexcept
: cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>(mq, std::move(ct))
, m_impl(listeningSocket, acceptingSocket)
{}

private:

friend class cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void cancel() noexcept { m_impl.cancel(*this); }
void get_result() { m_impl.get_result(*this); }

socket_accept_operation_impl m_impl;

};
#endif
}
}

#endif // CPPCORO_OS_WINNT

#endif
81 changes: 80 additions & 1 deletion include/cppcoro/net/socket_connect_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
# include <cppcoro/detail/win32_overlapped_operation.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
# include <cppcoro/detail/linux_async_operation.hpp>
#endif

namespace cppcoro
{
namespace net
{
#if CPPCORO_OS_WINNT
class socket;

class socket_connect_operation_impl
Expand Down Expand Up @@ -87,9 +92,83 @@ namespace cppcoro
socket_connect_operation_impl m_impl;

};
#elif CPPCORO_OS_LINUX
class socket;

class socket_connect_operation_impl
{
public:

socket_connect_operation_impl(
socket& socket,
const ip_endpoint& remoteEndPoint) noexcept
: m_socket(socket)
, m_remoteEndPoint(remoteEndPoint)
{}

bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void get_result(cppcoro::detail::linux_async_operation_base& operation);

private:

socket& m_socket;
ip_endpoint m_remoteEndPoint;

};

class socket_connect_operation
: public cppcoro::detail::linux_async_operation<socket_connect_operation>
{
public:

socket_connect_operation(
socket& socket,
const ip_endpoint& remoteEndPoint,
cppcoro::detail::linux::message_queue* mq) noexcept
: cppcoro::detail::linux_async_operation<socket_connect_operation>(mq)
, m_impl(socket, remoteEndPoint)
{}

private:

friend class cppcoro::detail::linux_async_operation<socket_connect_operation>;

bool try_start() noexcept { return m_impl.try_start(*this); }
decltype(auto) get_result() { return m_impl.get_result(*this); }

socket_connect_operation_impl m_impl;

};

class socket_connect_operation_cancellable
: public cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>
{
public:

socket_connect_operation_cancellable(
socket& socket,
const ip_endpoint& remoteEndPoint,
cppcoro::detail::linux::message_queue* mq,
cancellation_token&& ct) noexcept
: cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>(mq, std::move(ct))
, m_impl(socket, remoteEndPoint)
{}

private:

friend class cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void cancel() noexcept { m_impl.cancel(*this); }
void get_result() { m_impl.get_result(*this); }

socket_connect_operation_impl m_impl;

};
#endif
}
}

#endif // CPPCORO_OS_WINNT

#endif
Loading
Loading