diff --git a/argon/vm/io/socket/psocket.cpp b/argon/vm/io/socket/psocket.cpp index f692f0bf..0c8931ac 100644 --- a/argon/vm/io/socket/psocket.cpp +++ b/argon/vm/io/socket/psocket.cpp @@ -23,7 +23,7 @@ using namespace argon::vm::loop; using namespace argon::vm::datatype; using namespace argon::vm::io::socket; -CallbackReturnStatus AcceptCallBack(Event *event) { +CallbackStatus AcceptCallBack(Event *event) { sockaddr_storage addr{}; socklen_t addrlen; int remote; @@ -35,24 +35,24 @@ CallbackReturnStatus AcceptCallBack(Event *event) { if (errno != EAGAIN && errno != EWOULDBLOCK) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } auto *ret = SocketNew(sock->family, sock->type, sock->protocol, remote); if (ret == nullptr) - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) ret); Release(ret); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus ConnectResultCallBack(Event *event) { +CallbackStatus ConnectResultCallBack(Event *event) { auto *sock = (Socket *) event->initiator; socklen_t len = sizeof(int); int error; @@ -64,15 +64,15 @@ CallbackReturnStatus ConnectResultCallBack(Event *event) { if (error == 0) { argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) sock); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } ErrorFromErrno(error); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus ConnectCallBack(Event *event) { +CallbackStatus ConnectCallBack(Event *event) { event->callback = ConnectResultCallBack; if (connect(((const Socket *) event->initiator)->sock, (sockaddr *) event->buffer.data, @@ -82,16 +82,16 @@ CallbackReturnStatus ConnectCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvCallBack(Event *event) { +CallbackStatus RecvCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto bytes = recv(sock->sock, @@ -105,10 +105,10 @@ CallbackReturnStatus RecvCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length = bytes; @@ -117,17 +117,17 @@ CallbackReturnStatus RecvCallBack(Event *event) { if (buffer == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) buffer); Release(buffer); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvAllCallBack(Event *event) { +CallbackStatus RecvAllCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto delta = event->buffer.allocated - event->buffer.length; @@ -141,10 +141,10 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length += bytes; @@ -154,14 +154,14 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { if (buffer == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) buffer); Release(buffer); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } auto *tmp = (unsigned char *) argon::vm::memory::Realloc(event->buffer.data, @@ -169,17 +169,17 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { if (tmp == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } event->buffer.data = tmp; event->buffer.allocated += kRecvAllIncSize; - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } -CallbackReturnStatus RecvFromCallBack(Event *event) { +CallbackStatus RecvFromCallBack(Event *event) { sockaddr_storage storage{}; socklen_t addrlen = sizeof(sockaddr_storage); @@ -197,10 +197,10 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length = bytes; @@ -209,7 +209,7 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { if (remote_addr == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } auto *data = BytesNewHoldBuffer(event->buffer.data, event->buffer.allocated, event->buffer.length, true); @@ -218,7 +218,7 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { Release(remote_addr); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } auto *ret = TupleNew("oo", data, remote_addr); @@ -231,13 +231,13 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { Release(ret); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus RecvIntoCallBack(Event *event) { +CallbackStatus RecvIntoCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto bytes = recv(sock->sock, @@ -251,10 +251,10 @@ CallbackReturnStatus RecvIntoCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length = bytes; @@ -268,13 +268,13 @@ CallbackReturnStatus RecvIntoCallBack(Event *event) { Release(buffer); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus RecvRawCallBack(Event *event) { +CallbackStatus RecvRawCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto bytes = recv(sock->sock, @@ -288,10 +288,10 @@ CallbackReturnStatus RecvRawCallBack(Event *event) { event->user_callback(event, event->aux, (int) bytes); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length = bytes; @@ -299,7 +299,7 @@ CallbackReturnStatus RecvRawCallBack(Event *event) { return event->user_callback(event, event->aux, 0); } -CallbackReturnStatus SendCallBack(Event *event) { +CallbackStatus SendCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto bytes = send(sock->sock, @@ -314,10 +314,10 @@ CallbackReturnStatus SendCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } auto *buffer = IntNew(bytes); @@ -330,13 +330,13 @@ CallbackReturnStatus SendCallBack(Event *event) { Release(buffer); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus SendRawCallBack(Event *event) { +CallbackStatus SendRawCallBack(Event *event) { auto *sock = (const Socket *) event->initiator; auto bytes = send(sock->sock, @@ -350,10 +350,10 @@ CallbackReturnStatus SendRawCallBack(Event *event) { event->user_callback(event, event->aux, (int) bytes); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } event->buffer.length = bytes; @@ -361,7 +361,7 @@ CallbackReturnStatus SendRawCallBack(Event *event) { return event->user_callback(event, event->aux, 0); } -CallbackReturnStatus SendRecvCallBack(Event *event) { +CallbackStatus SendRecvCallBack(Event *event) { auto *sock = (Socket *) event->initiator; auto bytes = send(sock->sock, @@ -373,10 +373,10 @@ CallbackReturnStatus SendRecvCallBack(Event *event) { if (errno != EAGAIN) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } if (!RecvCB(sock, @@ -385,19 +385,19 @@ CallbackReturnStatus SendRecvCallBack(Event *event) { event->buffer.data, event->buffer.allocated, 0)) - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; - return CallbackReturnStatus::SUCCESS_NO_WAKEUP; + return CallbackStatus::CONTINUE; } -CallbackReturnStatus SendToCallBack(Event *event) { +CallbackStatus SendToCallBack(Event *event) { sockaddr_storage storage{}; socklen_t addrlen; auto *sock = (const Socket *) event->initiator; if (!AddrToSockAddr(event->aux, &storage, &addrlen, sock->family)) - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; auto bytes = sendto(sock->sock, event->buffer.arbuf.buffer, @@ -410,10 +410,10 @@ CallbackReturnStatus SendToCallBack(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } auto *buffer = IntNew(bytes); @@ -425,10 +425,10 @@ CallbackReturnStatus SendToCallBack(Event *event) { Release(buffer); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } bool argon::vm::io::socket::Accept(Socket *sock) { diff --git a/argon/vm/io/socket/socket.h b/argon/vm/io/socket/socket.h index a3d11d9e..112056f8 100644 --- a/argon/vm/io/socket/socket.h +++ b/argon/vm/io/socket/socket.h @@ -21,6 +21,7 @@ #include #include +#include #include #endif diff --git a/argon/vm/io/socket/winsocket.cpp b/argon/vm/io/socket/winsocket.cpp index 7c7531f1..213ac52c 100644 --- a/argon/vm/io/socket/winsocket.cpp +++ b/argon/vm/io/socket/winsocket.cpp @@ -23,17 +23,17 @@ using namespace argon::vm::io::socket; bool LoadWSAExtension(SOCKET socket, GUID guid, void **target); -CallbackReturnStatus RecvAllStarter(Event *event); +CallbackStatus RecvAllStarter(Event *event); // EOL -CallbackReturnStatus AcceptCallBack(Event *event) { +CallbackStatus AcceptCallBack(Event *event) { argon::vm::FiberSetAsyncResult(event->fiber, event->aux); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus AcceptStarter(Event *event) { +CallbackStatus AcceptStarter(Event *event) { auto *sock = (const Socket *) event->initiator; auto *remote = (Socket *) event->aux; @@ -52,21 +52,21 @@ CallbackReturnStatus AcceptStarter(Event *event) { if (result != 0 && WSAGetLastError() != WSA_IO_PENDING) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus ConnectCallBack(Event *event) { +CallbackStatus ConnectCallBack(Event *event) { argon::vm::memory::Free(event->buffer.data); argon::vm::FiberSetAsyncResult(event->fiber, event->initiator); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus ConnectStarter(Event *event) { +CallbackStatus ConnectStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = ConnectCallBack; @@ -84,13 +84,13 @@ CallbackReturnStatus ConnectStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvCallBack(Event *event) { +CallbackStatus RecvCallBack(Event *event) { auto *bytes = BytesNewHoldBuffer( (unsigned char *) event->buffer.wsa.buf, event->buffer.allocated, @@ -99,17 +99,17 @@ CallbackReturnStatus RecvCallBack(Event *event) { if (bytes == nullptr) { argon::vm::memory::Free(event->buffer.wsa.buf); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) bytes); Release(bytes); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvStarter(Event *event) { +CallbackStatus RecvStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RecvCallBack; @@ -127,13 +127,13 @@ CallbackReturnStatus RecvStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvAllCallBack(Event *event) { +CallbackStatus RecvAllCallBack(Event *event) { auto delta = event->buffer.allocated - event->buffer.length; if (event->buffer.wsa.len < delta) { @@ -146,14 +146,14 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { if (bytes == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } argon::vm::FiberSetAsyncResult(event->fiber, (ArObject *) bytes); Release(bytes); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } auto *tmp = (unsigned char *) argon::vm::memory::Realloc(event->buffer.data, @@ -161,7 +161,7 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { if (tmp == nullptr) { argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } event->buffer.data = tmp; @@ -173,16 +173,16 @@ CallbackReturnStatus RecvAllCallBack(Event *event) { event->buffer.wsa.buf = (char *) (tmp + event->buffer.length); event->buffer.wsa.len = event->buffer.allocated - event->buffer.length; - if(RecvAllStarter(event) != CallbackReturnStatus::SUCCESS){ + if(RecvAllStarter(event) != CallbackStatus::SUCCESS){ argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::RETRY; + return CallbackStatus::RETRY; } -CallbackReturnStatus RecvAllStarter(Event *event) { +CallbackStatus RecvAllStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RecvAllCallBack; @@ -200,20 +200,20 @@ CallbackReturnStatus RecvAllStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RawCallBack(Event *event) { +CallbackStatus RawCallBack(Event *event) { event->buffer.data = (unsigned char *) event->buffer.wsa.buf; event->buffer.length = event->buffer.wsa.len; return event->user_callback(event, event->aux, 0); } -CallbackReturnStatus RecvRawStarter(Event *event) { +CallbackStatus RecvRawStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RawCallBack; @@ -229,19 +229,19 @@ CallbackReturnStatus RecvRawStarter(Event *event) { if (result != 0 && WSAGetLastError() != WSA_IO_PENDING) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvFromCallBack(Event *event) { +CallbackStatus RecvFromCallBack(Event *event) { auto *remote_addr = SockAddrToAddr((sockaddr_storage *) event->buffer.data, ((Socket *) event->initiator)->family); if (remote_addr == nullptr) { argon::vm::memory::Free(event->buffer.wsa.buf); argon::vm::memory::Free(event->buffer.data); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } argon::vm::memory::Free(event->buffer.data); @@ -253,7 +253,7 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { Release(remote_addr); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } auto *ret = TupleNew("oo", data, remote_addr); @@ -266,13 +266,13 @@ CallbackReturnStatus RecvFromCallBack(Event *event) { Release(ret); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus RecvFromStarter(Event *event) { +CallbackStatus RecvFromStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RecvFromCallBack; @@ -293,13 +293,13 @@ CallbackReturnStatus RecvFromStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus RecvIntoCallBack(Event *event) { +CallbackStatus RecvIntoCallBack(Event *event) { BufferRelease(&event->buffer.arbuf); auto *bytes = IntNew((IntegerUnderlying) event->buffer.wsa.len); @@ -308,13 +308,13 @@ CallbackReturnStatus RecvIntoCallBack(Event *event) { Release(bytes); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus RecvIntoStarter(Event *event) { +CallbackStatus RecvIntoStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RecvIntoCallBack; @@ -332,13 +332,13 @@ CallbackReturnStatus RecvIntoStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus SendCallBack(Event *event) { +CallbackStatus SendCallBack(Event *event) { BufferRelease(&event->buffer.arbuf); auto *wbytes = IntNew((IntegerUnderlying) event->buffer.wsa.len); @@ -347,25 +347,25 @@ CallbackReturnStatus SendCallBack(Event *event) { Release(wbytes); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus SendRecvCallBack(Event *event) { +CallbackStatus SendRecvCallBack(Event *event) { if (!RecvCB((Socket *) event->initiator, event->aux, event->user_callback, (unsigned char *) event->buffer.wsa.buf, event->buffer.allocated, 0)) - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; - return CallbackReturnStatus::SUCCESS_NO_WAKEUP; + return CallbackStatus::CONTINUE; } -CallbackReturnStatus SendCBStarter(Event *event) { +CallbackStatus SendCBStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = RawCallBack; @@ -381,13 +381,13 @@ CallbackReturnStatus SendCBStarter(Event *event) { if (result != 0 && WSAGetLastError() != WSA_IO_PENDING) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus SendStarter(Event *event) { +CallbackStatus SendStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = SendCallBack; @@ -405,13 +405,13 @@ CallbackReturnStatus SendStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus SendRecvStarter(Event *event) { +CallbackStatus SendRecvStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = SendRecvCallBack; @@ -427,13 +427,13 @@ CallbackReturnStatus SendRecvStarter(Event *event) { if (result != 0 && WSAGetLastError() != WSA_IO_PENDING) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } -CallbackReturnStatus SendToCallBack(Event *event) { +CallbackStatus SendToCallBack(Event *event) { BufferRelease(&event->buffer.arbuf); argon::vm::memory::Free(event->buffer.data); @@ -444,13 +444,13 @@ CallbackReturnStatus SendToCallBack(Event *event) { Release(wbytes); - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } -CallbackReturnStatus SendToStarter(Event *event) { +CallbackStatus SendToStarter(Event *event) { auto *sock = (const Socket *) event->initiator; event->callback = SendToCallBack; @@ -472,10 +472,10 @@ CallbackReturnStatus SendToStarter(Event *event) { ErrorFromSocket(); - return CallbackReturnStatus::FAILURE; + return CallbackStatus::FAILURE; } - return CallbackReturnStatus::SUCCESS; + return CallbackStatus::SUCCESS; } bool argon::vm::io::socket::Accept(Socket *sock) { diff --git a/argon/vm/loop/eploop.cpp b/argon/vm/loop/eploop.cpp index cefc9cd1..77ee698d 100644 --- a/argon/vm/loop/eploop.cpp +++ b/argon/vm/loop/eploop.cpp @@ -20,14 +20,6 @@ using namespace argon::vm::loop; using namespace argon::vm::datatype; -// Prototypes - -void ProcessOutTrigger(EvLoop *loop); - -void ProcessQueue(EventQueue *queue, EventDirection direction); - -// EOL - EvLoop *argon::vm::loop::EventLoopNew() { EvLoop *evl; @@ -52,7 +44,7 @@ EvLoop *argon::vm::loop::EventLoopNew() { bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { epoll_event events[kMaxEvents]; - ProcessOutTrigger(loop); + ProcessOutQueue(loop); auto ret = epoll_wait(loop->handle, events, kMaxEvents, (int) timeout); if (ret < 0) { @@ -67,14 +59,15 @@ bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { auto *queue = (EventQueue *) events[i].data.ptr; if (events[i].events & EPOLLIN) - ProcessQueue(queue, EventDirection::IN); + ProcessQueueEvents(loop, queue, EventDirection::IN); if (events[i].events & EPOLLOUT) - ProcessQueue(queue, EventDirection::OUT); + ProcessQueueEvents(loop, queue, EventDirection::OUT); std::unique_lock _(queue->lock); - if (queue->items == 0 && epoll_ctl(loop->handle, EPOLL_CTL_DEL, queue->handle, nullptr) < 0) + if (queue->in_events.Count() == 0 && queue->out_events.Count() == 0 && + epoll_ctl(loop->handle, EPOLL_CTL_DEL, queue->handle, nullptr) < 0) assert(false); // Never get here! } @@ -86,7 +79,7 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * std::unique_lock _(queue->lock); - if (queue->items == 0) { + if (queue->in_events.Count() == 0 && queue->out_events.Count() == 0) { ep_event.events = EPOLLOUT | EPOLLIN | EPOLLET; ep_event.data.ptr = queue; @@ -99,7 +92,18 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * return false; } - } else if (direction == EventDirection::OUT && queue->out_event.head == nullptr) { + } + + auto is_empty = queue->out_events.Count() == 0; + + event->fiber = vm::GetFiber(); + + if (direction == EventDirection::IN) + queue->in_events.Enqueue(event); + else + queue->out_events.Enqueue(event); + + if (direction == EventDirection::OUT && is_empty) { // Since an event with active EPOLLET flag is used, it is possible that the send callback // is not immediately invoked, to avoid this, the desire to write to a socket is signaled // in a separate queue @@ -111,10 +115,6 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * vm::SetFiberStatus(FiberStatus::BLOCKED); - event->fiber = vm::GetFiber(); - - queue->AddEvent(event, direction); - loop->io_count++; loop->cond.notify_one(); @@ -122,47 +122,4 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * return true; } -void ProcessOutTrigger(EvLoop *loop) { - std::unique_lock _(loop->out_lock); - - for (EventQueue *queue = loop->out_queues; queue != nullptr; queue = queue->next) - ProcessQueue(queue, EventDirection::OUT); - - loop->out_queues = nullptr; -} - -void ProcessQueue(EventQueue *queue, EventDirection direction) { - Event **head = &queue->in_event.head; - Event *tmp; - - CallbackReturnStatus status; - - if (direction == EventDirection::OUT) - head = &queue->out_event.head; - - do { - thlocal_event = *head; - - if (*head == nullptr) - break; - - status = thlocal_event->callback(thlocal_event); - if (status == CallbackReturnStatus::RETRY) - return; - - thlocal_event->loop->io_count--; - - if (status != CallbackReturnStatus::SUCCESS_NO_WAKEUP) - Spawn(thlocal_event->fiber); - - std::unique_lock _(queue->lock); - - tmp = queue->PopEvent(direction); - - _.unlock(); - - EventDel(tmp); - } while (status != CallbackReturnStatus::FAILURE); -} - #endif \ No newline at end of file diff --git a/argon/vm/loop/event.h b/argon/vm/loop/event.h index 2c517885..2a6644cb 100644 --- a/argon/vm/loop/event.h +++ b/argon/vm/loop/event.h @@ -11,29 +11,21 @@ #include -#undef CONST -#undef FASTCALL -#undef Yield - -#else - -#include - #endif #include namespace argon::vm::loop { - enum class CallbackReturnStatus { + enum class CallbackStatus { + CONTINUE, FAILURE, RETRY, - SUCCESS, - SUCCESS_NO_WAKEUP + SUCCESS }; - using EventCB = CallbackReturnStatus (*)(struct Event *); + using EventCB = CallbackStatus (*)(struct Event *); - using UserCB = CallbackReturnStatus (*)(struct Event *, datatype::ArObject *, int status); + using UserCB = CallbackStatus (*)(struct Event *, datatype::ArObject *, int status); #ifdef _ARGON_PLATFORM_WINDOWS struct Event : OVERLAPPED { @@ -58,14 +50,14 @@ namespace argon::vm::loop { struct { datatype::ArBuffer arbuf; -#ifdef _ARGON_PLATFORM_WINDOWS - WSABUF wsa; -#endif unsigned char *data; datatype::ArSize length; - datatype::ArSize allocated; + +#ifdef _ARGON_PLATFORM_WINDOWS + WSABUF wsa; +#endif } buffer; int flags; diff --git a/argon/vm/loop/evloop.cpp b/argon/vm/loop/evloop.cpp index 901048f7..bd3ad05d 100644 --- a/argon/vm/loop/evloop.cpp +++ b/argon/vm/loop/evloop.cpp @@ -17,11 +17,11 @@ using namespace argon::vm::datatype; EvLoop *default_event_loop = nullptr; -thread_local Event *argon::vm::loop::thlocal_event = nullptr; +thread_local Fiber *argon::vm::loop::evloop_cur_fiber = nullptr; // Prototypes -void TimerTaskDel(TimerTask *ttask); +void TimerTaskDel(EvLoop *loop, TimerTask *task); // Internal @@ -31,10 +31,7 @@ unsigned long long TimeNow() { } void EventLoopDispatcher(EvLoop *loop) { - TimerTask *ttask; - - unsigned long long loop_time; - long timeout; + TimerTask *task; while (!loop->should_stop) { if (loop->io_count == 0) { @@ -50,49 +47,44 @@ void EventLoopDispatcher(EvLoop *loop) { break; } - loop_time = TimeNow(); + auto loop_time = TimeNow(); + auto timeout = (long) kEventTimeout; - timeout = kEventTimeout; - if ((ttask = loop->timer_heap.PeekMin()) != nullptr) { - timeout = (long) (ttask->timeout - loop_time); + if ((task = loop->timer_heap.PeekMin()) != nullptr) { + timeout = (long) (task->timeout - loop_time); if (timeout < 0) timeout = 0; } EventLoopIOPoll(loop, timeout); - while (ttask != nullptr) { - if (loop_time < ttask->timeout) + while (task != nullptr) { + if (loop_time < task->timeout) break; loop->timer_heap.PopMin(); loop->io_count--; - argon::vm::Spawn(ttask->fiber); + argon::vm::Spawn(task->fiber); - TimerTaskDel(ttask); + TimerTaskDel(loop, task); - ttask = loop->timer_heap.PeekMin(); + task = loop->timer_heap.PeekMin(); } } } -void TimerTaskDel(TimerTask *ttask) { - auto *loop = ttask->loop; - +void TimerTaskDel(EvLoop *loop, TimerTask *task) { std::unique_lock _(loop->lock); - if (loop->free_t_task_count + 1 <= kMaxFreeTasks) { - ttask->next = loop->free_t_task; - loop->free_t_task = ttask; - - loop->free_t_task_count++; + if (loop->free_t_tasks.Count() + 1 <= kMaxFreeTasks) { + loop->free_t_tasks.Push(task); return; } - argon::vm::memory::Free(ttask); + argon::vm::memory::Free(task); } // Public @@ -107,19 +99,14 @@ bool argon::vm::loop::EventLoopInit() { } Event *argon::vm::loop::EventNew(EvLoop *loop, ArObject *initiator) { - Event *event = nullptr; + Event *event; if (loop == nullptr) return nullptr; std::unique_lock _(loop->lock); - if (loop->free_events != nullptr) { - event = loop->free_events; - loop->free_events = event->next; - - loop->free_events_count--; - } + event = loop->free_events.Pop(); _.unlock(); @@ -148,7 +135,6 @@ EventQueue *argon::vm::loop::EventQueueNew(EvHandle handle) { new(&(queue->lock))std::mutex(); - queue->items = 0; queue->handle = handle; return queue; @@ -161,46 +147,41 @@ EvLoop *argon::vm::loop::GetEventLoop() { } bool argon::vm::loop::EventLoopSetTimeout(EvLoop *loop, datatype::ArSize timeout) { - auto now = TimeNow(); - TimerTask *ttask = nullptr; + TimerTask *task; if (loop == nullptr) return false; - std::unique_lock _(loop->lock); + auto now = TimeNow(); - if (loop->free_t_task != nullptr) { - ttask = loop->free_t_task; - loop->free_t_task = (TimerTask *) ttask->next; + std::unique_lock _(loop->lock); - loop->free_t_task_count--; - } + task = (TimerTask *) loop->free_t_tasks.Pop(); _.unlock(); - if (ttask == nullptr) { - ttask = (TimerTask *) memory::Alloc(sizeof(TimerTask)); - if (ttask == nullptr) + if (task == nullptr) { + task = (TimerTask *) memory::Alloc(sizeof(TimerTask)); + if (task == nullptr) return false; } - memory::MemoryZero(ttask, sizeof(TimerTask)); + memory::MemoryZero(task, sizeof(TimerTask)); - ttask->loop = loop; - ttask->fiber = vm::GetFiber(); + task->fiber = vm::GetFiber(); - ttask->callback = [](Task *task) { - vm::FiberSetAsyncResult(task->fiber, (ArObject *) Nil); + task->callback = [](Task *_task) { + vm::FiberSetAsyncResult(_task->fiber, (ArObject *) Nil); }; _.lock(); - ttask->id = loop->t_task_id++; - ttask->timeout = now + timeout; + task->id = loop->t_task_id++; + task->timeout = now + timeout; vm::SetFiberStatus(FiberStatus::BLOCKED); - loop->timer_heap.Insert(ttask); + loop->timer_heap.Insert(task); loop->io_count++; @@ -219,11 +200,8 @@ void argon::vm::loop::EventDel(Event *event) { std::unique_lock _(loop->lock); - if (loop->free_events_count + 1 <= kMaxFreeEvents) { - event->next = loop->free_events; - loop->free_events = event; - - loop->free_events_count++; + if (loop->free_events.Count() + 1 <= kMaxFreeEvents) { + loop->free_events.Push(event); return; } @@ -243,8 +221,6 @@ void argon::vm::loop::EventLoopShutdown() { #ifndef _ARGON_PLATFORM_WINDOWS void argon::vm::loop::EventQueueDel(EventQueue **queue) { - assert((*queue)->items == 0); - (*queue)->lock.~mutex(); argon::vm::memory::Free(*queue); @@ -252,55 +228,47 @@ void argon::vm::loop::EventQueueDel(EventQueue **queue) { *queue = nullptr; } -// EventQueue -Event *argon::vm::loop::EventQueue::PopEvent(EventDirection direction) { - Event **head = &this->in_event.head; - Event **tail = &this->in_event.tail; - Event *ret; +void argon::vm::loop::ProcessOutQueue(argon::vm::loop::EvLoop *loop) { + std::unique_lock _(loop->out_lock); - if (direction == EventDirection::OUT) { - head = &this->out_event.head; - tail = &this->out_event.tail; - } + for (EventQueue *queue = loop->out_queues; queue != nullptr; queue = queue->next) + ProcessQueueEvents(loop, queue, EventDirection::OUT); - ret = *head; + loop->out_queues = nullptr; +} - if (ret != nullptr) { - *head = ret->prev; +void argon::vm::loop::ProcessQueueEvents(EvLoop *loop, EventQueue *queue, EventDirection direction) { + auto *ev_queue = &queue->in_events; - if(ret->prev != nullptr) - ret->prev->next = nullptr; + if (direction == EventDirection::OUT) + ev_queue = &queue->out_events; - if (*tail == ret) - *tail = nullptr; + CallbackStatus status; - this->items--; - } + do { + auto *event = ev_queue->GetHead(); + if (event == nullptr) + break; - return ret; -} + evloop_cur_fiber = event->fiber; -void argon::vm::loop::EventQueue::AddEvent(Event *event, EventDirection direction) { - Event **head = &this->in_event.head; - Event **tail = &this->in_event.tail; + status = event->callback(event); + if (status == CallbackStatus::RETRY) + return; - if (direction == EventDirection::OUT) { - head = &this->out_event.head; - tail = &this->out_event.tail; - } + if (status != CallbackStatus::CONTINUE) + Spawn(event->fiber); - event->next = *tail; - event->prev = nullptr; + loop->io_count--; - if (*tail != nullptr) - (*tail)->prev = event; + std::unique_lock _(queue->lock); - *tail = event; + event = ev_queue->Dequeue(); - if (*head == nullptr) - *head = event; + _.unlock(); - this->items++; + EventDel(event); + } while (status != CallbackStatus::FAILURE); } #endif \ No newline at end of file diff --git a/argon/vm/loop/evloop.h b/argon/vm/loop/evloop.h index a1256a0c..d1fb4388 100644 --- a/argon/vm/loop/evloop.h +++ b/argon/vm/loop/evloop.h @@ -13,54 +13,17 @@ #include -#include -#include +#include #include +#include +#include namespace argon::vm::loop { constexpr const unsigned int kEventTimeout = 24; // millisecond constexpr const unsigned int kMaxFreeEvents = 2046; constexpr const unsigned int kMaxFreeTasks = 128; -#ifdef _ARGON_PLATFORM_WINDOWS - - using EvHandle = void *; - -#else - constexpr const unsigned int kMaxEvents = 50; - - using EvHandle = int; - - enum class EventDirection { - IN, - OUT - }; - - struct EventQueue { - std::mutex lock; - - EventQueue *next; - - struct { - Event *head; - Event *tail; - } in_event; - - struct { - Event *head; - Event *tail; - } out_event; - - unsigned int items; - - EvHandle handle; - - Event *PopEvent(EventDirection direction); - - void AddEvent(Event *event, EventDirection direction); - }; - -#endif + struct Event; struct EvLoop { std::mutex lock; @@ -71,19 +34,15 @@ namespace argon::vm::loop { std::condition_variable cond; - MinHeap timer_heap; + support::MinHeap timer_heap; #ifndef _ARGON_PLATFORM_WINDOWS EventQueue *out_queues; #endif - Event *free_events; - - TimerTask *free_t_task; + support::Stack free_events; - datatype::ArSize free_events_count; - - datatype::ArSize free_t_task_count; + support::Stack free_t_tasks; datatype::ArSize t_task_id; @@ -94,7 +53,7 @@ namespace argon::vm::loop { bool should_stop; }; - extern thread_local Event *thlocal_event; + extern thread_local struct Fiber *evloop_cur_fiber; Event *EventNew(EvLoop *loop, datatype::ArObject *initiator); @@ -134,6 +93,10 @@ namespace argon::vm::loop { void EventQueueDel(EventQueue **queue); + void ProcessOutQueue(EvLoop *loop); + + void ProcessQueueEvents(EvLoop *loop, EventQueue *queue, EventDirection direction); + #endif } // namespace argon::vm::loop diff --git a/argon/vm/loop/evqueue.h b/argon/vm/loop/evqueue.h new file mode 100644 index 00000000..50f1d13f --- /dev/null +++ b/argon/vm/loop/evqueue.h @@ -0,0 +1,45 @@ +// This source file is part of the Argon project. +// +// Licensed under the Apache License v2.0 + +#include + +#include + +#include + +#ifndef ARGON_VM_LOOP_EVQUEUE_H_ +#define ARGON_VM_LOOP_EVQUEUE_H_ + +namespace argon::vm::loop { +#ifdef _ARGON_PLATFORM_WINDOWS + + using EvHandle = void *; + +#else + constexpr const unsigned int kMaxEvents = 50; + + using EvHandle = int; + + class Event; + + enum class EventDirection { + IN, + OUT + }; + + struct EventQueue { + std::mutex lock; + + EventQueue *next; + + support::Queue in_events; + + support::Queue out_events; + + EvHandle handle; + }; +#endif +} // namespace argon::vm::loop + +#endif //ARGON_VM_LOOP_EVQUEUE_H_ diff --git a/argon/vm/loop/kqloop.cpp b/argon/vm/loop/kqloop.cpp index 66279039..38ef72ce 100644 --- a/argon/vm/loop/kqloop.cpp +++ b/argon/vm/loop/kqloop.cpp @@ -10,30 +10,17 @@ #include -#include - -#include - #include #include using namespace argon::vm::loop; -using namespace argon::vm::datatype; - -// Prototypes - -void ProcessOutTrigger(EvLoop *loop); - -void ProcessQueue(EventQueue *queue, EventDirection direction); - -// EOL EvLoop *argon::vm::loop::EventLoopNew() { EvLoop *evl; if ((evl = (EvLoop *) argon::vm::memory::Calloc(sizeof(EvLoop))) != nullptr) { if ((evl->handle = kqueue()) < 0) { - ErrorFromErrno(errno); + datatype::ErrorFromErrno(errno); argon::vm::memory::Free(evl); @@ -42,7 +29,6 @@ EvLoop *argon::vm::loop::EventLoopNew() { new(&evl->lock)std::mutex(); new(&evl->out_lock)std::mutex(); - new(&evl->cond)std::condition_variable(); } @@ -54,7 +40,7 @@ bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { struct kevent kev[2]; timespec ts{}; - ProcessOutTrigger(loop); + ProcessOutQueue(loop); ts.tv_sec = (long) timeout / 1000; ts.tv_nsec = (long) ((timeout % 1000) * 1000000); @@ -72,13 +58,13 @@ bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { auto *queue = (EventQueue *) events[i].udata; if (events[i].filter == EVFILT_READ) - ProcessQueue(queue, EventDirection::IN); + ProcessQueueEvents(loop, queue, EventDirection::IN); else if (events[i].filter == EVFILT_WRITE) - ProcessQueue(queue, EventDirection::OUT); + ProcessQueueEvents(loop, queue, EventDirection::OUT); std::unique_lock _(queue->lock); - if (queue->items == 0) { + if (queue->in_events.Count() == 0 && queue->out_events.Count() == 0) { EV_SET(kev, queue->handle, events[i].filter, EV_DELETE, 0, 0, nullptr); if (kevent(loop->handle, kev, 1, nullptr, 0, nullptr) < 0) @@ -94,7 +80,7 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * std::unique_lock _(queue->lock); - if (queue->items == 0) { + if (queue->in_events.Count() == 0 && queue->out_events.Count() == 0) { EV_SET(kev, queue->handle, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, queue); EV_SET(kev + 1, queue->handle, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, queue); @@ -103,13 +89,22 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * vm::SetFiberStatus(FiberStatus::RUNNING); - ErrorFromErrno(errno); + datatype::ErrorFromErrno(errno); return false; } } - if (direction == EventDirection::OUT && queue->out_event.head == nullptr) { + auto is_empty = queue->out_events.Count() == 0; + + event->fiber = vm::GetFiber(); + + if (direction == EventDirection::IN) + queue->in_events.Enqueue(event); + else + queue->out_events.Enqueue(event); + + if (direction == EventDirection::OUT && is_empty) { std::unique_lock out_lock(loop->out_lock); queue->next = loop->out_queues; @@ -118,10 +113,6 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * vm::SetFiberStatus(FiberStatus::BLOCKED); - event->fiber = vm::GetFiber(); - - queue->AddEvent(event, direction); - loop->io_count++; loop->cond.notify_one(); @@ -129,47 +120,4 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, EventQueue *queue, Event * return true; } -void ProcessOutTrigger(EvLoop *loop) { - std::unique_lock _(loop->out_lock); - - for (EventQueue *queue = loop->out_queues; queue != nullptr; queue = queue->next) - ProcessQueue(queue, EventDirection::OUT); - - loop->out_queues = nullptr; -} - -void ProcessQueue(EventQueue *queue, EventDirection direction) { - Event **head = &queue->in_event.head; - Event *tmp; - - CallbackReturnStatus status; - - if (direction == EventDirection::OUT) - head = &queue->out_event.head; - - do { - thlocal_event = *head; - - if (*head == nullptr) - break; - - status = thlocal_event->callback(thlocal_event); - if (status == CallbackReturnStatus::RETRY) - return; - - thlocal_event->loop->io_count--; - - if (status != CallbackReturnStatus::SUCCESS_NO_WAKEUP) - Spawn(thlocal_event->fiber); - - std::unique_lock _(queue->lock); - - tmp = queue->PopEvent(direction); - - _.unlock(); - - EventDel(tmp); - } while (status != CallbackReturnStatus::FAILURE); -} - #endif \ No newline at end of file diff --git a/argon/vm/loop/minheap.h b/argon/vm/loop/support/minheap.h similarity index 97% rename from argon/vm/loop/minheap.h rename to argon/vm/loop/support/minheap.h index 4a428104..47f5a6ef 100644 --- a/argon/vm/loop/minheap.h +++ b/argon/vm/loop/support/minheap.h @@ -2,10 +2,10 @@ // // Licensed under the Apache License v2.0 -#ifndef ARGON_VM_LOOP_MINHEAP_H_ -#define ARGON_VM_LOOP_MINHEAP_H_ +#ifndef ARGON_VM_LOOP_SUPPORT_MINHEAP_H_ +#define ARGON_VM_LOOP_SUPPORT_MINHEAP_H_ -namespace argon::vm::loop { +namespace argon::vm::loop::support { template using heap_less = bool (*)(const T *left, const T *right); @@ -230,6 +230,6 @@ namespace argon::vm::loop { this->HeapVerify(t); } }; -} // namespace argon::vm::loop +} // namespace argon::vm::loop::support -#endif // !ARGON_VM_LOOP_MINHEAP_H_ +#endif // !ARGON_VM_LOOP_SUPPORT_MINHEAP_H_ diff --git a/argon/vm/loop/support/queue.h b/argon/vm/loop/support/queue.h new file mode 100644 index 00000000..29e2d5a8 --- /dev/null +++ b/argon/vm/loop/support/queue.h @@ -0,0 +1,62 @@ +// This source file is part of the Argon project. +// +// Licensed under the Apache License v2.0 + +#ifndef ARGON_VM_LOOP_SUPPORT_QUEUE_H_ +#define ARGON_VM_LOOP_SUPPORT_QUEUE_H_ + +namespace argon::vm::loop::support { + template + class Queue { + T *head_ = nullptr; + T *tail_ = nullptr; + + unsigned int items = 0; + public: + T *Dequeue() { + T *t; + + if (this->head_ == nullptr) + return nullptr; + + t = this->head_; + + if (this->head_->prev != nullptr) { + this->head_->prev->next = nullptr; + this->head_ = this->head_->prev; + } else { + this->head_ = nullptr; + this->tail_ = nullptr; + } + + this->items--; + + return t; + } + + T *GetHead() { + return this->head_; + } + + unsigned int Count() { + return this->items; + } + + void Enqueue(T *t) { + t->next = this->tail_; + t->prev = nullptr; + + if (this->tail_ != nullptr) + this->tail_->prev = t; + + if (this->head_ == nullptr) + this->head_ = t; + + this->tail_ = t; + + this->items++; + } + }; +} // namespace argon::vm::loop::support + +#endif // !ARGON_VM_LOOP_SUPPORT_QUEUE_H_ diff --git a/argon/vm/loop/support/stack.h b/argon/vm/loop/support/stack.h new file mode 100644 index 00000000..ca9c82fc --- /dev/null +++ b/argon/vm/loop/support/stack.h @@ -0,0 +1,40 @@ +// This source file is part of the Argon project. +// +// Licensed under the Apache License v2.0 + +#ifndef ARGON_VM_LOOP_SUPPORT_STACK_H_ +#define ARGON_VM_LOOP_SUPPORT_STACK_H_ + +namespace argon::vm::loop::support { + template + class Stack { + T *stack = nullptr; + + unsigned int items = 0; + public: + T *Pop() { + auto *t = this->stack; + if (t == nullptr) + return nullptr; + + this->stack = t->next; + + this->items--; + + return t; + } + + unsigned int Count() { + return this->items; + } + + void Push(T *t) { + t->next = this->stack; + this->stack = t; + + this->items++; + } + }; +} // namespace argon::vm::loop::support + +#endif // !ARGON_VM_LOOP_SUPPORT_STACK_H_ diff --git a/argon/vm/loop/task.h b/argon/vm/loop/task.h index 28dd3caa..6a86a6d1 100644 --- a/argon/vm/loop/task.h +++ b/argon/vm/loop/task.h @@ -5,6 +5,8 @@ #ifndef ARGON_VM_LOOP_TASK_H_ #define ARGON_VM_LOOP_TASK_H_ +#include + #include namespace argon::vm::loop { @@ -14,9 +16,7 @@ namespace argon::vm::loop { struct Task { Task *next; - struct EvLoop *loop; - - struct Fiber *fiber; + Fiber *fiber; TaskCB callback; }; diff --git a/argon/vm/loop/winloop.cpp b/argon/vm/loop/winloop.cpp index ee396a2c..dc4a6bb3 100644 --- a/argon/vm/loop/winloop.cpp +++ b/argon/vm/loop/winloop.cpp @@ -40,7 +40,7 @@ EvLoop *argon::vm::loop::EventLoopNew() { } bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { - auto status = CallbackReturnStatus::SUCCESS; + auto status = CallbackStatus::SUCCESS; Event *event; void *key; @@ -48,33 +48,36 @@ bool argon::vm::loop::EventLoopIOPoll(EvLoop *loop, unsigned long timeout) { DWORD bytes; bool ok = GetQueuedCompletionStatus(loop->handle, &bytes, (PULONG_PTR) &key, (LPOVERLAPPED *) &event, timeout); - - thlocal_event = event; - if (!ok) { if (::GetLastError() == WAIT_TIMEOUT) return false; ErrorFromWinErr(); - if (event->user_callback != nullptr) + if (event->user_callback != nullptr) { + evloop_cur_fiber = event->fiber; + event->user_callback(event, event->aux, -1); + } } else { event->buffer.wsa.len = bytes; - if (event->callback != nullptr) + if (event->callback != nullptr) { + evloop_cur_fiber = event->fiber; + status = event->callback(event); + } else vm::FiberSetAsyncResult(event->fiber, (ArObject *) Nil); // Default: Set initiator as return value } - if (status != CallbackReturnStatus::SUCCESS_NO_WAKEUP && status != CallbackReturnStatus::RETRY) { + if (status != CallbackStatus::CONTINUE && status != CallbackStatus::RETRY) { loop->io_count--; Spawn(event->fiber); } - if (status != CallbackReturnStatus::RETRY) + if (status != CallbackStatus::RETRY) EventDel(event); return true; @@ -85,7 +88,7 @@ bool argon::vm::loop::EventLoopAddEvent(EvLoop *loop, Event *event) { event->fiber = vm::GetFiber(); - if (event->callback(event) == CallbackReturnStatus::FAILURE) { + if (event->callback(event) == CallbackStatus::FAILURE) { argon::vm::SetFiberStatus(FiberStatus::RUNNING); EventDel(event); diff --git a/argon/vm/runtime.cpp b/argon/vm/runtime.cpp index 89802da5..182a0ebd 100644 --- a/argon/vm/runtime.cpp +++ b/argon/vm/runtime.cpp @@ -127,7 +127,7 @@ void VCoreRelease(OSThread *); if (ost_local != nullptr) #define ON_EVENT_DISPATCHER \ - if(loop::thlocal_event != nullptr) + if(loop::evloop_cur_fiber != nullptr) #define PUSH_LCQUEUE(vcore, fiber) \ do { \ @@ -615,8 +615,8 @@ ArObject *argon::vm::GetLastError() { if (ost_local != nullptr) fiber = ost_local->fiber; - else if (loop::thlocal_event != nullptr) - fiber = loop::thlocal_event->fiber; + else if (loop::evloop_cur_fiber != nullptr) + fiber = loop::evloop_cur_fiber; if (fiber != nullptr) { if (fiber->panic == nullptr) @@ -820,8 +820,8 @@ bool argon::vm::CheckLastPanic(const char *id) { if (ost_local != nullptr) last = &ost_local->fiber->panic; - else if (loop::thlocal_event != nullptr) - last = &loop::thlocal_event->fiber->panic; + else if (loop::evloop_cur_fiber != nullptr) + last = &loop::evloop_cur_fiber->panic; if (last == nullptr || !AR_TYPEOF((*last)->object, type_error_)) return false; @@ -866,7 +866,7 @@ bool argon::vm::Initialize(const Config *config) { bool argon::vm::IsPanicking() { ON_ARGON_CONTEXT return ost_local->fiber->panic != nullptr; - ON_EVENT_DISPATCHER return loop::thlocal_event->fiber->panic != nullptr; + ON_EVENT_DISPATCHER return loop::evloop_cur_fiber->panic != nullptr; return panic_global != nullptr; } @@ -925,7 +925,7 @@ bool argon::vm::Spawn(Function *func, ArObject **argv, ArSize argc, OpCodeCallMo Fiber *argon::vm::GetFiber() { ON_ARGON_CONTEXT return ost_local->fiber; - ON_EVENT_DISPATCHER return loop::thlocal_event->fiber; + ON_EVENT_DISPATCHER return loop::evloop_cur_fiber; return nullptr; } @@ -933,7 +933,7 @@ Fiber *argon::vm::GetFiber() { FiberStatus argon::vm::GetFiberStatus() { ON_ARGON_CONTEXT return ost_local->fiber_status; - ON_EVENT_DISPATCHER return loop::thlocal_event->fiber->status; + ON_EVENT_DISPATCHER return loop::evloop_cur_fiber->status; assert(false); } @@ -941,7 +941,7 @@ FiberStatus argon::vm::GetFiberStatus() { Frame *argon::vm::GetFrame() { ON_ARGON_CONTEXT return ost_local->fiber->frame; - ON_EVENT_DISPATCHER return loop::thlocal_event->fiber->frame; + ON_EVENT_DISPATCHER return loop::evloop_cur_fiber->frame; return nullptr; } @@ -962,7 +962,7 @@ void argon::vm::DiscardLastPanic() { } ON_EVENT_DISPATCHER { - PanicCleanup(&loop::thlocal_event->fiber->panic); + PanicCleanup(&loop::evloop_cur_fiber->panic); return; } @@ -975,8 +975,8 @@ void argon::vm::Panic(datatype::ArObject *panic) { if (ost_local != nullptr) fiber = ost_local->fiber; - else if (loop::thlocal_event != nullptr) - fiber = loop::thlocal_event->fiber; + else if (loop::evloop_cur_fiber != nullptr) + fiber = loop::evloop_cur_fiber; if (fiber != nullptr) { if ((fiber->panic = PanicNew(fiber->panic, fiber->frame, panic)) == nullptr) @@ -997,7 +997,7 @@ void argon::vm::SetFiberStatus(FiberStatus status) { return; } - ON_EVENT_DISPATCHER loop::thlocal_event->fiber->status = status; + ON_EVENT_DISPATCHER loop::evloop_cur_fiber->status = status; } void argon::vm::Spawn(argon::vm::Fiber *fiber) { @@ -1020,4 +1020,4 @@ void argon::vm::Yield() { if (has_work) OSTWakeRun(); -} \ No newline at end of file +}