Skip to content

Commit

Permalink
Added socket operations
Browse files Browse the repository at this point in the history
  • Loading branch information
danlapid committed Feb 10, 2024
1 parent 4b2a059 commit 52d5ebf
Show file tree
Hide file tree
Showing 22 changed files with 1,298 additions and 112 deletions.
8 changes: 7 additions & 1 deletion include/cppcoro/detail/linux_async_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,21 @@ namespace cppcoro
{
auto* operation = static_cast<linux_async_operation*>(ioState);
operation->m_res = operation->m_completeFunc();
if (operation->m_res < 0) {
operation->m_res = -errno;
}
operation->m_awaitingCoroutine.resume();
}

cppcoro::coroutine_handle<> m_awaitingCoroutine;

};

static constexpr int error_operation_cancelled = ECANCELED;
template<typename OPERATION>
class linux_async_operation_cancellable
: protected linux_async_operation_base
{
static constexpr int error_operation_cancelled = ECANCELED;

protected:

Expand Down Expand Up @@ -278,6 +281,9 @@ namespace cppcoro
auto* operation = static_cast<linux_async_operation_cancellable*>(ioState);

operation->m_res = operation->m_completeFunc();
if (operation->m_res < 0) {
operation->m_res = -errno;
}

auto state = operation->m_state.load(std::memory_order_acquire);
if (state == state::started)
Expand Down
26 changes: 21 additions & 5 deletions include/cppcoro/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
#endif

namespace cppcoro
Expand Down Expand Up @@ -81,6 +83,9 @@ namespace cppcoro
static socket create_udpv6(io_service& ioSvc);

socket(socket&& other) noexcept;
socket& operator=(socket&& other) noexcept;
socket(const socket& other) noexcept;
socket& operator=(const socket& other) noexcept;

/// Closes the socket, releasing any associated resources.
///
Expand All @@ -90,7 +95,8 @@ namespace cppcoro
/// disconnect() and wait until the disconnect operation completes.
~socket();

socket& operator=(socket&& other) noexcept;
int close();


#if CPPCORO_OS_WINNT
/// Get the Win32 socket handle assocaited with this socket.
Expand All @@ -104,6 +110,9 @@ namespace cppcoro
/// operation completing synchronously or whether it should suspend the coroutine
/// and wait until the I/O completion event is dispatched to an I/O thread.
bool skip_completion_on_success() noexcept { return m_skipCompletionOnSuccess; }
#elif CPPCORO_OS_LINUX
/// Get the linux fd assocaited with this socket.
cppcoro::detail::linux::fd_t native_handle() noexcept { return m_handle; }
#endif

/// Get the address and port of the local end-point.
Expand Down Expand Up @@ -242,20 +251,27 @@ namespace cppcoro
void close_send();
void close_recv();

private:

friend class socket_accept_operation_impl;
friend class socket_connect_operation_impl;

#if CPPCORO_OS_WINNT
explicit socket(
cppcoro::detail::win32::socket_t handle,
bool skipCompletionOnSuccess) noexcept;
#elif CPPCORO_OS_LINUX
explicit socket(
cppcoro::detail::linux::fd_t handle,
cppcoro::detail::linux::message_queue* mq) noexcept;
#endif
private:

friend class socket_accept_operation_impl;
friend class socket_connect_operation_impl;

#if CPPCORO_OS_WINNT
cppcoro::detail::win32::socket_t m_handle;
bool m_skipCompletionOnSuccess;
#elif CPPCORO_OS_LINUX
cppcoro::detail::linux::fd_t m_handle;
cppcoro::detail::linux::message_queue* m_mq;
#endif

ip_endpoint m_localEndPoint;
Expand Down
85 changes: 82 additions & 3 deletions include/cppcoro/net/socket_accept_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@
#include <cppcoro/cancellation_token.hpp>
#include <cppcoro/cancellation_registration.hpp>

#include <atomic>
#include <optional>

#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
# include <cppcoro/detail/win32_overlapped_operation.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
# include <cppcoro/detail/linux_async_operation.hpp>
#endif

# include <atomic>
# include <optional>

namespace cppcoro
{
namespace net
{
#if CPPCORO_OS_WINNT
class socket;

class socket_accept_operation_impl
Expand Down Expand Up @@ -100,9 +106,82 @@ namespace cppcoro
socket_accept_operation_impl m_impl;

};
#elif CPPCORO_OS_LINUX
class socket;

class socket_accept_operation_impl
{
public:

socket_accept_operation_impl(
socket& listeningSocket,
socket& acceptingSocket) noexcept
: m_listeningSocket(listeningSocket)
, m_acceptingSocket(acceptingSocket)
{}

bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void get_result(cppcoro::detail::linux_async_operation_base& operation);

private:
socket& m_listeningSocket;
socket& m_acceptingSocket;
alignas(8) std::uint8_t m_addressBuffer[88];
};

class socket_accept_operation
: public cppcoro::detail::linux_async_operation<socket_accept_operation>
{
public:

socket_accept_operation(
socket& listeningSocket,
socket& acceptingSocket,
cppcoro::detail::linux::message_queue* mq) noexcept
: cppcoro::detail::linux_async_operation<socket_accept_operation>(mq)
, m_impl(listeningSocket, acceptingSocket)
{}

private:

friend class cppcoro::detail::linux_async_operation<socket_accept_operation>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void get_result() { m_impl.get_result(*this); }

socket_accept_operation_impl m_impl;

};

class socket_accept_operation_cancellable
: public cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>
{
public:

socket_accept_operation_cancellable(
socket& listeningSocket,
socket& acceptingSocket,
cppcoro::detail::linux::message_queue* mq,
cancellation_token&& ct) noexcept
: cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>(mq, std::move(ct))
, m_impl(listeningSocket, acceptingSocket)
{}

private:

friend class cppcoro::detail::linux_async_operation_cancellable<socket_accept_operation_cancellable>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void cancel() noexcept { m_impl.cancel(*this); }
void get_result() { m_impl.get_result(*this); }

socket_accept_operation_impl m_impl;

};
#endif
}
}

#endif // CPPCORO_OS_WINNT

#endif
81 changes: 80 additions & 1 deletion include/cppcoro/net/socket_connect_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
# include <cppcoro/detail/win32_overlapped_operation.hpp>
#elif CPPCORO_OS_LINUX
# include <cppcoro/detail/linux.hpp>
# include <cppcoro/detail/linux_async_operation.hpp>
#endif

namespace cppcoro
{
namespace net
{
#if CPPCORO_OS_WINNT
class socket;

class socket_connect_operation_impl
Expand Down Expand Up @@ -87,9 +92,83 @@ namespace cppcoro
socket_connect_operation_impl m_impl;

};
#elif CPPCORO_OS_LINUX
class socket;

class socket_connect_operation_impl
{
public:

socket_connect_operation_impl(
socket& socket,
const ip_endpoint& remoteEndPoint) noexcept
: m_socket(socket)
, m_remoteEndPoint(remoteEndPoint)
{}

bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept;
void get_result(cppcoro::detail::linux_async_operation_base& operation);

private:

socket& m_socket;
ip_endpoint m_remoteEndPoint;

};

class socket_connect_operation
: public cppcoro::detail::linux_async_operation<socket_connect_operation>
{
public:

socket_connect_operation(
socket& socket,
const ip_endpoint& remoteEndPoint,
cppcoro::detail::linux::message_queue* mq) noexcept
: cppcoro::detail::linux_async_operation<socket_connect_operation>(mq)
, m_impl(socket, remoteEndPoint)
{}

private:

friend class cppcoro::detail::linux_async_operation<socket_connect_operation>;

bool try_start() noexcept { return m_impl.try_start(*this); }
decltype(auto) get_result() { return m_impl.get_result(*this); }

socket_connect_operation_impl m_impl;

};

class socket_connect_operation_cancellable
: public cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>
{
public:

socket_connect_operation_cancellable(
socket& socket,
const ip_endpoint& remoteEndPoint,
cppcoro::detail::linux::message_queue* mq,
cancellation_token&& ct) noexcept
: cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>(mq, std::move(ct))
, m_impl(socket, remoteEndPoint)
{}

private:

friend class cppcoro::detail::linux_async_operation_cancellable<socket_connect_operation_cancellable>;

bool try_start() noexcept { return m_impl.try_start(*this); }
void cancel() noexcept { m_impl.cancel(*this); }
void get_result() { m_impl.get_result(*this); }

socket_connect_operation_impl m_impl;

};
#endif
}
}

#endif // CPPCORO_OS_WINNT

#endif
Loading

0 comments on commit 52d5ebf

Please sign in to comment.