Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Feb 1, 2020
1 parent 4ccc4e5 commit cb07b8c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
1 change: 0 additions & 1 deletion aio_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (p *poller) Wait(chEventNotify chan pollerEvents, die chan struct{}) {
return
}

// note chan swap must not continue unexpected
pe := swapEvents[swapIdx]
pe = pe[:0]
swapIdx = (swapIdx + 1) % len(swapEvents)
Expand Down
2 changes: 1 addition & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func WaitIO() (r []OpResult, err error) {
return defaultWatcher.WaitIO()
}

// Read submits an async read request on 'fd' with context 'ctx', using buffer 'buf'
// Read submits an async read request on 'fd' with context 'ctx', using buffer 'buf'.
// 'buf' can be set to nil to use internal buffer.
// 'ctx' is the user-defined value passed through the gaio watcher unchanged.
func Read(ctx interface{}, conn net.Conn, buf []byte) error {
Expand Down
40 changes: 22 additions & 18 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
opDelete
)

// aiocb contains all info for a request
// aiocb contains all info for a single request
type aiocb struct {
l *list.List // list where this request belongs to
elem *list.Element
Expand All @@ -69,7 +69,7 @@ const (
fdWrite = 2
)

// fdDesc contains all info related to fd
// fdDesc contains all data structures associated to fd
type fdDesc struct {
status byte // fd read/write status
readers list.List // all read/write requests
Expand Down Expand Up @@ -122,7 +122,7 @@ type Watcher struct {
dieOnce sync.Once
}

// NewWatcher creates a management object for monitoring file descriptors
// NewWatcher creates a management object for monitoring file descriptors.
// 'bufsize' sets the internal swap buffer size for Read() with nil.
func NewWatcherSize(bufsize int) (*Watcher, error) {
w := new(Watcher)
Expand Down Expand Up @@ -150,7 +150,7 @@ func NewWatcherSize(bufsize int) (*Watcher, error) {
w.swapResults[i] = make([]OpResult, 0, maxEvents)
}

// finalizer for system resources
// watcher finalizer for system resources
runtime.SetFinalizer(w, func(w *Watcher) {
close(w.die)
w.pfd.Close()
Expand Down Expand Up @@ -179,7 +179,7 @@ func (w *Watcher) notifyPending() {
}
}

// WaitIO blocks until any read/write completion, or error
// WaitIO blocks until any read/write completion, or error.
func (w *Watcher) WaitIO() (r []OpResult, err error) {
select {
case r := <-w.chNotifyCompletion:
Expand Down Expand Up @@ -318,11 +318,13 @@ func (w *Watcher) loop() {
gc := make(chan uintptr)

// for timeout operations
// aiocb has non-zero deadline exists in timeouts & queue
// at same time or in neither of them
// aiocb has non-zero deadline exists
// in bothtimeouts & queue at any time
// or in neither of them.
timer := time.NewTimer(0)
var timeouts timedHeap

// release connection related resources
releaseConn := func(ident int) {
if desc, ok := descs[ident]; ok {
// delete from heap
Expand All @@ -347,7 +349,7 @@ func (w *Watcher) loop() {
}
}

// release all resources
// defer function to release all resources
defer func() {
for ident := range descs {
releaseConn(ident)
Expand Down Expand Up @@ -376,7 +378,7 @@ func (w *Watcher) loop() {
continue
}

// new conn
// handling new connection
var desc *fdDesc
if ok {
desc = descs[ident]
Expand All @@ -392,7 +394,7 @@ func (w *Watcher) loop() {
// assign idents
ident = dupfd

// unexpected situation, should notify caller
// unexpected situation, should notify caller if we cannot dup(2)
werr := w.pfd.Watch(ident)
if werr != nil {
select {
Expand All @@ -403,17 +405,17 @@ func (w *Watcher) loop() {
continue
}

// bindings
// file description bindings
desc = &fdDesc{ptr: pcb.ptr}
descs[ident] = desc
connIdents[pcb.ptr] = ident
// as we duplicated succesfuly, we're safe to
// close the original connection
pcb.conn.Close()

// the conn is still useful for GC finalizer
// note finalizer function cannot hold reference to net.Conn
// if not it will never be GC-ed
// the conn is still useful for GC finalizer.
// note finalizer function cannot hold reference to net.Conn,
// if not it will never be GC-ed.
runtime.SetFinalizer(pcb.conn, func(c net.Conn) {
select {
case gc <- reflect.ValueOf(c).Pointer():
Expand All @@ -426,6 +428,7 @@ func (w *Watcher) loop() {
// operations splitted into different buckets
switch pcb.op {
case OpRead:
// try immediately if readable/writable
if desc.readers.Len() == 0 && desc.status&fdRead > 0 {
if w.tryRead(ident, pcb) {
select {
Expand All @@ -441,6 +444,7 @@ func (w *Watcher) loop() {
desc.status &^= fdRead
}
}
// enqueue for poller events
pcb.l = &desc.readers
pcb.elem = pcb.l.PushBack(pcb)
case OpWrite:
Expand All @@ -463,7 +467,7 @@ func (w *Watcher) loop() {
pcb.elem = pcb.l.PushBack(pcb)
}

// timer
// push to heap for timeout operation
if !pcb.deadline.IsZero() {
heap.Push(&timeouts, pcb)
if timeouts.Len() == 1 {
Expand All @@ -472,7 +476,7 @@ func (w *Watcher) loop() {
}
}
pending = pending[:0]
case pe := <-w.chEventNotify:
case pe := <-w.chEventNotify: // poller events
// suppose fd(s) being polled is closed by conn.Close() from outside after chanrecv,
// and a new conn has re-opened with the same handler number(fd). The read and write
// on this fd is fatal.
Expand All @@ -494,7 +498,7 @@ func (w *Watcher) loop() {
pcb := elem.Value.(*aiocb)
if w.tryRead(e.ident, pcb) {
results = append(results, OpResult{Operation: OpRead, Conn: pcb.conn, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx})
// for shared memory, we need to notify WaitIO immediately
// for requests using internal swap buffer, we need to notify WaitIO immediately
if pcb.useSwap {
select {
case w.chNotifyCompletion <- results:
Expand Down Expand Up @@ -558,7 +562,7 @@ func (w *Watcher) loop() {
}
}

case <-timer.C:
case <-timer.C: // timeout heap
for timeouts.Len() > 0 {
now := time.Now()
pcb := timeouts[0]
Expand Down

0 comments on commit cb07b8c

Please sign in to comment.