From d1789c953c4f881395d1afab948ec9effbcce0bc Mon Sep 17 00:00:00 2001 From: xtaci Date: Thu, 15 Jul 2021 10:20:38 +0800 Subject: [PATCH] recycle pcb in next waitio --- watcher.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/watcher.go b/watcher.go index 66f43fc..8379112 100644 --- a/watcher.go +++ b/watcher.go @@ -51,6 +51,7 @@ type watcher struct { pendingCreate []*aiocb pendingProcessing []*aiocb // swaped pending pendingMutex sync.Mutex + recycles []*aiocb // IO-completion events to user chResults chan *aiocb @@ -179,15 +180,21 @@ func (w *watcher) notifyPending() { // WaitIO blocks until any read/write completion, or error. // An internal 'buf' returned or 'r []OpResult' are safe to use BEFORE next call to WaitIO(). func (w *watcher) WaitIO() (r []OpResult, err error) { + // recycle previous aiocb + for k := range w.recycles { + aiocbPool.Put(w.recycles[k]) + } + w.recycles = w.recycles[:0] + for { select { case pcb := <-w.chResults: r = append(r, OpResult{Operation: pcb.op, Conn: pcb.conn, IsSwapBuffer: pcb.useSwap, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx}) - aiocbPool.Put(pcb) + w.recycles = append(w.recycles, pcb) for len(w.chResults) > 0 { pcb := <-w.chResults r = append(r, OpResult{Operation: pcb.op, Conn: pcb.conn, IsSwapBuffer: pcb.useSwap, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx}) - aiocbPool.Put(pcb) + w.recycles = append(w.recycles, pcb) } atomic.CompareAndSwapInt32(&w.shouldSwap, 0, 1)