-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor synchronous interface #33
Changes from 20 commits
2414e77
992f8f2
ff65474
5fefc9a
3c1ae0e
bbb3e83
a9286ca
92b1603
4b752d3
e4d4232
29e6c72
2935eb1
fc87338
e4a1787
7dd714e
571f185
1af1192
cc9e4f3
d06b28b
d01f235
70205b7
d245d82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,57 +1,33 @@ | ||
package apns | ||
|
||
import ( | ||
"container/list" | ||
"crypto/tls" | ||
"io" | ||
"log" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type buffer struct { | ||
size int | ||
*list.List | ||
} | ||
|
||
func newBuffer(size int) *buffer { | ||
return &buffer{size, list.New()} | ||
} | ||
|
||
func (b *buffer) Add(v interface{}) *list.Element { | ||
e := b.PushBack(v) | ||
|
||
if b.Len() > b.size { | ||
b.Remove(b.Front()) | ||
} | ||
|
||
return e | ||
} | ||
|
||
type Client struct { | ||
Conn *Conn | ||
FailedNotifs chan NotificationResult | ||
conn Conn | ||
|
||
notifs chan Notification | ||
id uint32 | ||
sess Session | ||
sessm sync.Mutex | ||
} | ||
|
||
func newClientWithConn(gw string, conn Conn) Client { | ||
c := Client{ | ||
Conn: &conn, | ||
FailedNotifs: make(chan NotificationResult), | ||
id: uint32(1), | ||
notifs: make(chan Notification), | ||
} | ||
func newClientWithConn(conn Conn) (Client, error) { | ||
c := Client{conn: conn} | ||
|
||
go c.runLoop() | ||
sess := newSession(conn) | ||
err := sess.Connect() | ||
if err != nil { | ||
return c, err | ||
} | ||
|
||
return c | ||
return Client{conn, sess, sync.Mutex{}}, nil | ||
} | ||
|
||
func NewClientWithCert(gw string, cert tls.Certificate) Client { | ||
func NewClientWithCert(gw string, cert tls.Certificate) (Client, error) { | ||
conn := NewConnWithCert(gw, cert) | ||
|
||
return newClientWithConn(gw, conn) | ||
return newClientWithConn(conn) | ||
} | ||
|
||
func NewClient(gw string, cert string, key string) (Client, error) { | ||
|
@@ -60,7 +36,7 @@ func NewClient(gw string, cert string, key string) (Client, error) { | |
return Client{}, err | ||
} | ||
|
||
return newClientWithConn(gw, conn), nil | ||
return newClientWithConn(conn) | ||
} | ||
|
||
func NewClientWithFiles(gw string, certFile string, keyFile string) (Client, error) { | ||
|
@@ -69,151 +45,48 @@ func NewClientWithFiles(gw string, certFile string, keyFile string) (Client, err | |
return Client{}, err | ||
} | ||
|
||
return newClientWithConn(gw, conn), nil | ||
return newClientWithConn(conn) | ||
} | ||
|
||
func (c *Client) Send(n Notification) error { | ||
c.notifs <- n | ||
return nil | ||
} | ||
|
||
func (c *Client) reportFailedPush(v interface{}, err *Error) { | ||
failedNotif, ok := v.(Notification) | ||
if !ok || v == nil { | ||
return | ||
if c.sess.Disconnected() { | ||
c.reconnectAndRequeue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As is, this waits for the next send before retrying the previously failed notifications. Not sure if that's ideal. |
||
} | ||
|
||
select { | ||
case c.FailedNotifs <- NotificationResult{Notif: failedNotif, Err: *err}: | ||
default: | ||
} | ||
return c.sess.Send(n) | ||
} | ||
|
||
func (c *Client) requeue(cursor *list.Element) { | ||
// If `cursor` is not nil, this means there are notifications that | ||
// need to be delivered (or redelivered) | ||
for ; cursor != nil; cursor = cursor.Next() { | ||
if n, ok := cursor.Value.(Notification); ok { | ||
go func() { c.notifs <- n }() | ||
} | ||
} | ||
} | ||
|
||
func (c *Client) handleError(err *Error, buffer *buffer) *list.Element { | ||
cursor := buffer.Back() | ||
|
||
for cursor != nil { | ||
// Get notification | ||
n, _ := cursor.Value.(Notification) | ||
func (c *Client) reconnectAndRequeue() { | ||
c.sessm.Lock() | ||
defer c.sessm.Unlock() | ||
|
||
// If the notification, move cursor after the trouble notification | ||
if n.Identifier == err.Identifier { | ||
go c.reportFailedPush(cursor.Value, err) | ||
// Pull off undelivered notifications | ||
notifs := c.sess.RequeueableNotifications() | ||
|
||
next := cursor.Next() | ||
|
||
buffer.Remove(cursor) | ||
return next | ||
} | ||
|
||
cursor = cursor.Prev() | ||
} | ||
|
||
return cursor | ||
} | ||
// Reconnect | ||
c.sess = nil | ||
|
||
func (c *Client) runLoop() { | ||
sent := newBuffer(50) | ||
cursor := sent.Front() | ||
for c.sess == nil { | ||
sess := newSession(c.conn) | ||
|
||
// APNS connection | ||
for { | ||
err := c.Conn.Connect() | ||
err := sess.Connect() | ||
if err != nil { | ||
// TODO Probably want to exponentially backoff... | ||
// TODO retry policy | ||
// TODO connect error channel | ||
// Keep trying to connect | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are cases where retrying is not what we want, such as an invalid or expired certificate. We need the behaviour to depend on the specific error that occurred in sess.Connect(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. That's where I think the |
||
time.Sleep(1 * time.Second) | ||
continue | ||
} | ||
|
||
// Start reading errors from APNS | ||
errs := readErrs(c.Conn) | ||
|
||
c.requeue(cursor) | ||
|
||
// Connection open, listen for notifs and errors | ||
for { | ||
var err error | ||
var n Notification | ||
|
||
// Check for notifications or errors. There is a chance we'll send notifications | ||
// if we already have an error since `select` will "pseudorandomly" choose a | ||
// ready channels. It turns out to be fine because the connection will already | ||
// be closed and it'll requeue. We could check before we get to this select | ||
// block, but it doesn't seem worth the extra code and complexity. | ||
select { | ||
case err = <-errs: | ||
case n = <-c.notifs: | ||
} | ||
|
||
// If there is an error we understand, find the notification that failed, | ||
// move the cursor right after it. | ||
if nErr, ok := err.(*Error); ok { | ||
cursor = c.handleError(nErr, sent) | ||
break | ||
} | ||
|
||
if err != nil { | ||
break | ||
} | ||
|
||
// Add to list | ||
cursor = sent.Add(n) | ||
|
||
// Set identifier if not specified | ||
if n.Identifier == 0 { | ||
n.Identifier = c.id | ||
c.id++ | ||
} else if c.id < n.Identifier { | ||
c.id = n.Identifier + 1 | ||
} | ||
|
||
b, err := n.ToBinary() | ||
if err != nil { | ||
// TODO | ||
continue | ||
} | ||
|
||
_, err = c.Conn.Write(b) | ||
|
||
if err == io.EOF { | ||
log.Println("EOF trying to write notification") | ||
break | ||
} | ||
|
||
if err != nil { | ||
log.Println("err writing to apns", err.Error()) | ||
break | ||
} | ||
|
||
cursor = cursor.Next() | ||
} | ||
c.sess = sess | ||
} | ||
} | ||
|
||
func readErrs(c *Conn) chan error { | ||
errs := make(chan error) | ||
|
||
go func() { | ||
p := make([]byte, 6, 6) | ||
_, err := c.Read(p) | ||
if err != nil { | ||
errs <- err | ||
return | ||
} | ||
|
||
e := NewError(p) | ||
errs <- &e | ||
}() | ||
for _, n := range notifs { | ||
// TODO handle error from sending | ||
c.sess.Send(n) | ||
} | ||
} | ||
|
||
return errs | ||
var newSession = func(c Conn) Session { | ||
return NewSession(c) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is mockConn something that users of this library would find useful when testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say no with the implementation as it stands – it's not a terribly generic solution. I figured since
Conn
is now an interface, users of the library would write their own