Skip to content

Commit

Permalink
make client more configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
coder543 committed Jan 13, 2019
1 parent 48c1157 commit 5e66ec2
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventsource
import (
"context"
"io"
"log"
"net/http"
"sync"
"time"
Expand All @@ -11,14 +12,20 @@ import (
// Client wraps an http connection and converts it to an
// event stream.
type Client struct {
flusher http.Flusher
write io.Writer
ctx context.Context
events chan Event
closed bool
waiter sync.WaitGroup
lock sync.Mutex
flushing *time.Timer
flusher http.Flusher
write io.Writer
ctx context.Context
events chan Event
closed bool
waiter sync.WaitGroup
lock sync.Mutex
flushing *time.Timer
flushLatency time.Duration
}

type Options struct {
ChannelSize int
FlushLatency time.Duration
}

// NewClient creates a client wrapping a response writer.
Expand All @@ -27,10 +34,28 @@ type Client struct {
// original http.Request helps determine which headers, but the request it is
// optional.
// Returns nil on error.
func NewClient(w http.ResponseWriter, req *http.Request) *Client {
func NewClient(w http.ResponseWriter, req *http.Request, options ...Options) *Client {
if len(options) > 1 {
log.Panicln("only one Options value may be provided")
}

flushLatency := 100 * time.Millisecond
channelSize := 100

if len(options) == 1 {
options := options[0]
if options.FlushLatency > 0 {
flushLatency = options.FlushLatency
}
if options.ChannelSize > 0 {
channelSize = options.ChannelSize
}
}

c := &Client{
events: make(chan Event, 100),
write: w,
events: make(chan Event, channelSize),
write: w,
flushLatency: flushLatency,
}

// Check to ensure we support flushing
Expand Down Expand Up @@ -122,7 +147,7 @@ func (c *Client) run() {
c.lock.Lock()
io.Copy(c.write, &ev)
if c.flushing == nil {
c.flushing = time.AfterFunc(100*time.Millisecond, c.flush)
c.flushing = time.AfterFunc(c.flushLatency, c.flush)
}
c.lock.Unlock()

Expand Down

0 comments on commit 5e66ec2

Please sign in to comment.