Skip to content

Commit

Permalink
Incorporate Dasynq changes (v0.02)
Browse files Browse the repository at this point in the history
  • Loading branch information
davmac314 committed Nov 9, 2016
1 parent bb6855c commit 807ce78
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/dasynq/dasynq-epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ template <class Base> class EpollLoop : public Base
throw std::system_error(std::make_error_code(std::errc::not_supported));
}

// flags specifies which watch to remove; ignored if the loop doesn't support
// separate read/write watches.
void removeFdWatch(int fd, int flags) noexcept
{
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
Expand Down
2 changes: 2 additions & 0 deletions src/dasynq/dasynq-kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ template <class Base> class KqueueLoop : public Base
}
}

// flags specifies which watch to remove; ignored if the loop doesn't support
// separate read/write watches.
void removeFdWatch(int fd, int flags)
{
removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
Expand Down
47 changes: 29 additions & 18 deletions src/dasynq/dasynq.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ namespace dprivate {
// that is, watcher should not be disabled until all watched event types are queued.
constexpr static int multi_watch = 4;

// Represents a queued event notification
// Represents a queued event notification. Various event watchers derive from this type.
class BaseWatcher
{
template <typename T_Mutex, typename Traits> friend class EventDispatch;
Expand Down Expand Up @@ -169,8 +169,10 @@ namespace dprivate {

protected:
int watch_fd;
int watch_flags;
int event_flags;

// These flags are protected by the loop's internal lock:
int watch_flags; // events being watched
int event_flags; // events pending (queued)

BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }

Expand Down Expand Up @@ -303,9 +305,6 @@ namespace dprivate {
};

// This class serves as the base class (mixin) for the AEN mechanism class.
// Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin;
// they do not work independently. The mixin pattern that we use to avoid dynamic dispatch
// forces them to be two seperate classes, however.
//
// The EventDispatch class maintains the queued event data structures. It inserts watchers
// into the queue when eventes are received (receiveXXX methods).
Expand All @@ -314,7 +313,7 @@ namespace dprivate {
friend class EventLoop<T_Mutex>;

// queue data structure/pointer
BaseWatcher * first;
BaseWatcher * first = nullptr;

using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex>;
using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
Expand Down Expand Up @@ -753,7 +752,9 @@ template <typename T_Mutex> class EventLoop

Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
{
// Called with lock held
// Called with lock held;
// bdfw->event_flags contains only with pending (queued) events

if (is_multi_watch) {
BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);

Expand All @@ -772,20 +773,23 @@ template <typename T_Mutex> class EventLoop
}
}
else {
// TODO this will need flags for such a loop, since it can't
// otherwise distinguish which channel watch to remove
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
}
}
else if (rearmType == Rearm::DISARM) {
// Nothing more to do
}
else if (rearmType == Rearm::REARM) {
bdfw->watch_flags |= IN_EVENTS;

if (! LoopTraits::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
// If this is a BidiFdWatch (multiwatch) then we do not want to re-enable a
// channel that has an event pending (is already queued):
watch_flags &= ~(bdfw->event_flags);
loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
static_cast<BaseWatcher *>(bdfw),
(bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
(watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
}
else {
loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
Expand All @@ -807,6 +811,7 @@ template <typename T_Mutex> class EventLoop
}
}

// Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
{
// Called with lock held
Expand All @@ -815,9 +820,7 @@ template <typename T_Mutex> class EventLoop
bdfw->watch_flags &= ~OUT_EVENTS;

if (LoopTraits::has_separate_rw_fd_watches) {
// TODO this will need flags for such a loop, since it can't
// otherwise distinguish which channel watch to remove
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
}
else {
Expand All @@ -836,10 +839,15 @@ template <typename T_Mutex> class EventLoop
}
else if (rearmType == Rearm::REARM) {
bdfw->watch_flags |= OUT_EVENTS;

if (! LoopTraits::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
// If this is a BidiFdWatch (multiwatch) then we do not want to re-enable a
// channel that has an event pending (is already queued):
watch_flags &= ~(bdfw->event_flags);
loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
static_cast<BaseWatcher *>(bdfw),
(bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
(watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
}
else {
loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
Expand Down Expand Up @@ -913,9 +921,11 @@ template <typename T_Mutex> class EventLoop
// The primary watcher for a multi-watch watcher is queued for
// read events.
rearmType = bbfw->readReady(*this, bfw->watch_fd);
bbfw->event_flags &= ~IN_EVENTS;
}
else {
rearmType = bfw->fdEvent(*this, bfw->watch_fd, bfw->event_flags);
bfw->event_flags = 0;
}
break;
}
Expand All @@ -926,6 +936,7 @@ template <typename T_Mutex> class EventLoop
}
case WatchType::SECONDARYFD: {
rearmType = bbfw->writeReady(*this, bbfw->watch_fd);
bbfw->event_flags &= ~OUT_EVENTS;
break;
}
default: ;
Expand Down Expand Up @@ -1088,7 +1099,6 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
// In a single threaded environment, it is safe to delete the watcher after
// calling this method as long as the handler (if it is active) accesses no
// internal state and returns Rearm::REMOVED.
// TODO: implement REMOVED, or correct above statement.
void deregister(EventLoop &eloop) noexcept
{
eloop.deregister(this, this->watch_fd);
Expand Down Expand Up @@ -1192,6 +1202,8 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
this->outWatcher.BaseWatcher::init();
this->watch_fd = fd;
this->watch_flags = flags | dprivate::multi_watch;
this->read_removed = false;
this->write_removed = false;
eloop.registerFd(this, fd, flags);
}

Expand All @@ -1208,7 +1220,6 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
// In a single threaded environment, it is safe to delete the watcher after
// calling this method as long as the handler (if it is active) accesses no
// internal state and returns Rearm::REMOVED.
// TODO: implement REMOVED, or correct above statement.
void deregister(EventLoop &eloop) noexcept
{
eloop.deregister(this, this->watch_fd);
Expand Down

0 comments on commit 807ce78

Please sign in to comment.