Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor synchronous interface #33

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ before_script:
- go get github.com/onsi/gomega
- go get code.google.com/p/go.tools/cmd/cover
- go install github.com/onsi/ginkgo/ginkgo
script: ginkgo -r --skipMeasurements --cover --trace
script: ginkgo -r --skipMeasurements --cover --trace --race
env:
global:
- PATH=$HOME/gopath/bin:$PATH
38 changes: 38 additions & 0 deletions apns_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,46 @@ import (
. "github.com/onsi/gomega"

"testing"
"time"
)

type mockConn struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is mockConn something that users of this library would find useful when testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say no with the implementation as it stands – it's not a terribly generic solution. I figured since Conn is now an interface, users of the library would write their own

connect func() error
read func([]byte) (int, error)
readWithTimeout func([]byte, time.Time) (int, error)
}

func (m *mockConn) Connect() error {
if m.connect != nil {
return m.connect()
}

return nil
}

func (m *mockConn) Read(b []byte) (int, error) {
if m.read != nil {
return m.read(b)
}
return 0, nil
}

func (m *mockConn) Write([]byte) (int, error) {
return 0, nil
}

func (m *mockConn) Close() error {
return nil
}

func (m *mockConn) ReadWithTimeout(b []byte, t time.Time) (int, error) {
if m.readWithTimeout != nil {
return m.readWithTimeout(b, t)
}

return 0, nil
}

func TestApns(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Apns Suite")
Expand Down
207 changes: 40 additions & 167 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,33 @@
package apns

import (
"container/list"
"crypto/tls"
"io"
"log"
"sync"
"time"
)

type buffer struct {
size int
*list.List
}

func newBuffer(size int) *buffer {
return &buffer{size, list.New()}
}

func (b *buffer) Add(v interface{}) *list.Element {
e := b.PushBack(v)

if b.Len() > b.size {
b.Remove(b.Front())
}

return e
}

type Client struct {
Conn *Conn
FailedNotifs chan NotificationResult
conn Conn

notifs chan Notification
id uint32
sess Session
sessm sync.Mutex
}

func newClientWithConn(gw string, conn Conn) Client {
c := Client{
Conn: &conn,
FailedNotifs: make(chan NotificationResult),
id: uint32(1),
notifs: make(chan Notification),
}
func newClientWithConn(conn Conn) (Client, error) {
c := Client{conn: conn}

go c.runLoop()
sess := newSession(conn)
err := sess.Connect()
if err != nil {
return c, err
}

return c
return Client{conn, sess, sync.Mutex{}}, nil
}

func NewClientWithCert(gw string, cert tls.Certificate) Client {
func NewClientWithCert(gw string, cert tls.Certificate) (Client, error) {
conn := NewConnWithCert(gw, cert)

return newClientWithConn(gw, conn)
return newClientWithConn(conn)
}

func NewClient(gw string, cert string, key string) (Client, error) {
Expand All @@ -60,7 +36,7 @@ func NewClient(gw string, cert string, key string) (Client, error) {
return Client{}, err
}

return newClientWithConn(gw, conn), nil
return newClientWithConn(conn)
}

func NewClientWithFiles(gw string, certFile string, keyFile string) (Client, error) {
Expand All @@ -69,151 +45,48 @@ func NewClientWithFiles(gw string, certFile string, keyFile string) (Client, err
return Client{}, err
}

return newClientWithConn(gw, conn), nil
return newClientWithConn(conn)
}

func (c *Client) Send(n Notification) error {
c.notifs <- n
return nil
}

func (c *Client) reportFailedPush(v interface{}, err *Error) {
failedNotif, ok := v.(Notification)
if !ok || v == nil {
return
if c.sess.Disconnected() {
c.reconnectAndRequeue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, this waits for the next send before retrying the previously failed notifications. Not sure if that's ideal.

}

select {
case c.FailedNotifs <- NotificationResult{Notif: failedNotif, Err: *err}:
default:
}
return c.sess.Send(n)
}

func (c *Client) requeue(cursor *list.Element) {
// If `cursor` is not nil, this means there are notifications that
// need to be delivered (or redelivered)
for ; cursor != nil; cursor = cursor.Next() {
if n, ok := cursor.Value.(Notification); ok {
go func() { c.notifs <- n }()
}
}
}

func (c *Client) handleError(err *Error, buffer *buffer) *list.Element {
cursor := buffer.Back()

for cursor != nil {
// Get notification
n, _ := cursor.Value.(Notification)
func (c *Client) reconnectAndRequeue() {
c.sessm.Lock()
defer c.sessm.Unlock()

// If the notification, move cursor after the trouble notification
if n.Identifier == err.Identifier {
go c.reportFailedPush(cursor.Value, err)
// Pull off undelivered notifications
notifs := c.sess.RequeueableNotifications()

next := cursor.Next()

buffer.Remove(cursor)
return next
}

cursor = cursor.Prev()
}

return cursor
}
// Reconnect
c.sess = nil

func (c *Client) runLoop() {
sent := newBuffer(50)
cursor := sent.Front()
for c.sess == nil {
sess := newSession(c.conn)

// APNS connection
for {
err := c.Conn.Connect()
err := sess.Connect()
if err != nil {
// TODO Probably want to exponentially backoff...
// TODO retry policy
// TODO connect error channel
// Keep trying to connect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are cases where retrying is not what we want, such as an invalid or expired certificate.

We need the behaviour to depend on the specific error that occurred in sess.Connect().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. That's where I think the TODO of implementing retry policy would come into play. That behavior could be passed in by the caller.

time.Sleep(1 * time.Second)
continue
}

// Start reading errors from APNS
errs := readErrs(c.Conn)

c.requeue(cursor)

// Connection open, listen for notifs and errors
for {
var err error
var n Notification

// Check for notifications or errors. There is a chance we'll send notifications
// if we already have an error since `select` will "pseudorandomly" choose a
// ready channels. It turns out to be fine because the connection will already
// be closed and it'll requeue. We could check before we get to this select
// block, but it doesn't seem worth the extra code and complexity.
select {
case err = <-errs:
case n = <-c.notifs:
}

// If there is an error we understand, find the notification that failed,
// move the cursor right after it.
if nErr, ok := err.(*Error); ok {
cursor = c.handleError(nErr, sent)
break
}

if err != nil {
break
}

// Add to list
cursor = sent.Add(n)

// Set identifier if not specified
if n.Identifier == 0 {
n.Identifier = c.id
c.id++
} else if c.id < n.Identifier {
c.id = n.Identifier + 1
}

b, err := n.ToBinary()
if err != nil {
// TODO
continue
}

_, err = c.Conn.Write(b)

if err == io.EOF {
log.Println("EOF trying to write notification")
break
}

if err != nil {
log.Println("err writing to apns", err.Error())
break
}

cursor = cursor.Next()
}
c.sess = sess
}
}

func readErrs(c *Conn) chan error {
errs := make(chan error)

go func() {
p := make([]byte, 6, 6)
_, err := c.Read(p)
if err != nil {
errs <- err
return
}

e := NewError(p)
errs <- &e
}()
for _, n := range notifs {
// TODO handle error from sending
c.sess.Send(n)
}
}

return errs
var newSession = func(c Conn) Session {
return NewSession(c)
}
Loading