diff --git a/rpc/ws/accountSubscribe.go b/rpc/ws/accountSubscribe.go index c64f272c..2ec33c24 100644 --- a/rpc/ws/accountSubscribe.go +++ b/rpc/ws/accountSubscribe.go @@ -85,13 +85,32 @@ type AccountSubscription struct { func (sw *AccountSubscription) Recv() (*AccountResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*AccountResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *AccountSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *AccountSubscription) Response() <-chan *AccountResult { + typedChan := make(chan *AccountResult, 1) + go func(ch chan *AccountResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*AccountResult) + }(typedChan) + return typedChan +} + func (sw *AccountSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/blockSubscribe.go b/rpc/ws/blockSubscribe.go index e713c09c..f9a00956 100644 --- a/rpc/ws/blockSubscribe.go +++ b/rpc/ws/blockSubscribe.go @@ -143,13 +143,32 @@ type BlockSubscription struct { func (sw *BlockSubscription) Recv() (*BlockResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*BlockResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *BlockSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *BlockSubscription) Response() <-chan *BlockResult { + typedChan := make(chan *BlockResult, 1) + go func(ch chan *BlockResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*BlockResult) + }(typedChan) + return typedChan +} + func (sw *BlockSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/client.go b/rpc/ws/client.go index 4d54bb19..92e9e973 100644 --- a/rpc/ws/client.go +++ b/rpc/ws/client.go @@ -19,6 +19,7 @@ package ws import ( "context" + "errors" "fmt" "io" "net/http" @@ -32,6 +33,8 @@ import ( "go.uber.org/zap" ) +var ErrSubscriptionClosed = errors.New("subscription closed") + type result interface{} type Client struct { diff --git a/rpc/ws/logsSubscribe.go b/rpc/ws/logsSubscribe.go index b23973a6..abd6ff01 100644 --- a/rpc/ws/logsSubscribe.go +++ b/rpc/ws/logsSubscribe.go @@ -47,7 +47,7 @@ const ( // LogsSubscribe subscribes to transaction logging. func (cl *Client) LogsSubscribe( -// Filter criteria for the logs to receive results by account type. + // Filter criteria for the logs to receive results by account type. filter LogsSubscribeFilterType, commitment rpc.CommitmentType, // (optional) ) (*LogSubscription, error) { @@ -59,9 +59,9 @@ func (cl *Client) LogsSubscribe( // LogsSubscribe subscribes to all transactions that mention the provided Pubkey. func (cl *Client) LogsSubscribeMentions( -// Subscribe to all transactions that mention the provided Pubkey. + // Subscribe to all transactions that mention the provided Pubkey. mentions solana.PublicKey, -// (optional) + // (optional) commitment rpc.CommitmentType, ) (*LogSubscription, error) { return cl.logsSubscribe( @@ -109,13 +109,32 @@ type LogSubscription struct { func (sw *LogSubscription) Recv() (*LogResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*LogResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *LogSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *LogSubscription) Response() <-chan *LogResult { + typedChan := make(chan *LogResult, 1) + go func(ch chan *LogResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*LogResult) + }(typedChan) + return typedChan +} + func (sw *LogSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/programSubscribe.go b/rpc/ws/programSubscribe.go index ab0e43f6..adafb6dc 100644 --- a/rpc/ws/programSubscribe.go +++ b/rpc/ws/programSubscribe.go @@ -88,13 +88,32 @@ type ProgramSubscription struct { func (sw *ProgramSubscription) Recv() (*ProgramResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*ProgramResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *ProgramSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *ProgramSubscription) Response() <-chan *ProgramResult { + typedChan := make(chan *ProgramResult, 1) + go func(ch chan *ProgramResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*ProgramResult) + }(typedChan) + return typedChan +} + func (sw *ProgramSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/rootSubscribe.go b/rpc/ws/rootSubscribe.go index 2d519dca..dc0c3ff4 100644 --- a/rpc/ws/rootSubscribe.go +++ b/rpc/ws/rootSubscribe.go @@ -44,13 +44,32 @@ type RootSubscription struct { func (sw *RootSubscription) Recv() (*RootResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*RootResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *RootSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *RootSubscription) Response() <-chan *RootResult { + typedChan := make(chan *RootResult, 1) + go func(ch chan *RootResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*RootResult) + }(typedChan) + return typedChan +} + func (sw *RootSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/signatureSubscribe.go b/rpc/ws/signatureSubscribe.go index d9a5c844..7e2f5eaa 100644 --- a/rpc/ws/signatureSubscribe.go +++ b/rpc/ws/signatureSubscribe.go @@ -69,7 +69,10 @@ type SignatureSubscription struct { func (sw *SignatureSubscription) Recv() (*SignatureResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SignatureResult), nil case err := <-sw.sub.err: return nil, err @@ -99,7 +102,10 @@ func (sw *SignatureSubscription) RecvWithTimeout(timeout time.Duration) (*Signat select { case <-time.After(timeout): return nil, ErrTimeout - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SignatureResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/slotSubscribe.go b/rpc/ws/slotSubscribe.go index d79aed36..2df8ee74 100644 --- a/rpc/ws/slotSubscribe.go +++ b/rpc/ws/slotSubscribe.go @@ -47,13 +47,32 @@ type SlotSubscription struct { func (sw *SlotSubscription) Recv() (*SlotResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SlotResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *SlotSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *SlotSubscription) Response() <-chan *SlotResult { + typedChan := make(chan *SlotResult, 1) + go func(ch chan *SlotResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*SlotResult) + }(typedChan) + return typedChan +} + func (sw *SlotSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/slotsUpdatesSubscribe.go b/rpc/ws/slotsUpdatesSubscribe.go index 989b81fc..0e78505a 100644 --- a/rpc/ws/slotsUpdatesSubscribe.go +++ b/rpc/ws/slotsUpdatesSubscribe.go @@ -79,13 +79,32 @@ type SlotsUpdatesSubscription struct { func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SlotsUpdatesResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *SlotsUpdatesSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *SlotsUpdatesSubscription) Response() <-chan *SlotsUpdatesResult { + typedChan := make(chan *SlotsUpdatesResult, 1) + go func(ch chan *SlotsUpdatesResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*SlotsUpdatesResult) + }(typedChan) + return typedChan +} + func (sw *SlotsUpdatesSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/voteSubscribe.go b/rpc/ws/voteSubscribe.go index 28db269b..cdab0c78 100644 --- a/rpc/ws/voteSubscribe.go +++ b/rpc/ws/voteSubscribe.go @@ -61,13 +61,32 @@ type VoteSubscription struct { func (sw *VoteSubscription) Recv() (*VoteResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*VoteResult), nil case err := <-sw.sub.err: return nil, err } } +func (sw *VoteSubscription) Err() <-chan error { + return sw.sub.err +} +func (sw *VoteSubscription) Response() <-chan *VoteResult { + typedChan := make(chan *VoteResult, 1) + go func(ch chan *VoteResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*VoteResult) + }(typedChan) + return typedChan +} + func (sw *VoteSubscription) Unsubscribe() { sw.sub.Unsubscribe() }