Skip to content

Commit

Permalink
add amortized flushing to client, increase client channel buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
coder543 committed Jan 7, 2019
1 parent b1f2d9a commit 4d26a50
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"io"
"net/http"
"sync"
"time"
)

// Client wraps an http connection and converts it to an
// event stream.
type Client struct {
flush http.Flusher
write io.Writer
close http.CloseNotifier
events chan *Event
closed bool
waiter sync.WaitGroup
flush http.Flusher
write io.Writer
close http.CloseNotifier
events chan *Event
closed bool
waiter sync.WaitGroup
lastFlush time.Time
lastWrite time.Time
}

// NewClient creates a client wrapping a response writer.
Expand All @@ -26,7 +29,7 @@ type Client struct {
// Returns nil on error.
func NewClient(w http.ResponseWriter, req *http.Request) *Client {
c := &Client{
events: make(chan *Event, 1),
events: make(chan *Event, 100),
write: w,
}

Expand Down Expand Up @@ -55,6 +58,7 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
// start the sending thread
c.waiter.Add(1)
go c.run()
go c.flusher()
return c
}

Expand Down Expand Up @@ -84,7 +88,6 @@ func (c *Client) Wait() {

// Worker thread for the client responsible for writing events
func (c *Client) run() {

for {
select {
case ev, ok := <-c.events:
Expand All @@ -97,13 +100,28 @@ func (c *Client) run() {

// send the event
io.Copy(c.write, ev)
c.flush.Flush()
c.lastWrite = time.Now()

case _ = <-c.close.CloseNotify():
case <-c.close.CloseNotify():
c.closed = true
c.waiter.Done()
return
}

}
}

// flusher amortizes flushing costs for high activity SSE channels
func (c *Client) flusher() {
ticker := time.NewTicker(100 * time.Millisecond)

for !c.closed {
<-ticker.C
if c.lastFlush.Before(c.lastWrite) {
c.lastFlush = c.lastWrite
c.flush.Flush()
}
}

ticker.Stop()
}

0 comments on commit 4d26a50

Please sign in to comment.