Skip to content

Commit

Permalink
ws
Browse files Browse the repository at this point in the history
  • Loading branch information
desperatee committed Nov 14, 2024
1 parent 871cb21 commit 6ed1cce
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 13 deletions.
21 changes: 20 additions & 1 deletion rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,32 @@ type AccountSubscription struct {

func (sw *AccountSubscription) Recv() (*AccountResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*AccountResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *AccountSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *AccountSubscription) Response() <-chan *AccountResult {
typedChan := make(chan *AccountResult, 1)
go func(ch chan *AccountResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*AccountResult)
}(typedChan)
return typedChan
}

func (sw *AccountSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
21 changes: 20 additions & 1 deletion rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,32 @@ type BlockSubscription struct {

func (sw *BlockSubscription) Recv() (*BlockResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*BlockResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *BlockSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *BlockSubscription) Response() <-chan *BlockResult {
typedChan := make(chan *BlockResult, 1)
go func(ch chan *BlockResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*BlockResult)
}(typedChan)
return typedChan
}

func (sw *BlockSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
3 changes: 3 additions & 0 deletions rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ws

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -32,6 +33,8 @@ import (
"go.uber.org/zap"
)

var ErrSubscriptionClosed = errors.New("subscription closed")

type result interface{}

type Client struct {
Expand Down
27 changes: 23 additions & 4 deletions rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (

// LogsSubscribe subscribes to transaction logging.
func (cl *Client) LogsSubscribe(
// Filter criteria for the logs to receive results by account type.
// Filter criteria for the logs to receive results by account type.
filter LogsSubscribeFilterType,
commitment rpc.CommitmentType, // (optional)
) (*LogSubscription, error) {
Expand All @@ -59,9 +59,9 @@ func (cl *Client) LogsSubscribe(

// LogsSubscribe subscribes to all transactions that mention the provided Pubkey.
func (cl *Client) LogsSubscribeMentions(
// Subscribe to all transactions that mention the provided Pubkey.
// Subscribe to all transactions that mention the provided Pubkey.
mentions solana.PublicKey,
// (optional)
// (optional)
commitment rpc.CommitmentType,
) (*LogSubscription, error) {
return cl.logsSubscribe(
Expand Down Expand Up @@ -109,13 +109,32 @@ type LogSubscription struct {

func (sw *LogSubscription) Recv() (*LogResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*LogResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *LogSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *LogSubscription) Response() <-chan *LogResult {
typedChan := make(chan *LogResult, 1)
go func(ch chan *LogResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*LogResult)
}(typedChan)
return typedChan
}

func (sw *LogSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
21 changes: 20 additions & 1 deletion rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,32 @@ type ProgramSubscription struct {

func (sw *ProgramSubscription) Recv() (*ProgramResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*ProgramResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *ProgramSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *ProgramSubscription) Response() <-chan *ProgramResult {
typedChan := make(chan *ProgramResult, 1)
go func(ch chan *ProgramResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*ProgramResult)
}(typedChan)
return typedChan
}

func (sw *ProgramSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
21 changes: 20 additions & 1 deletion rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,32 @@ type RootSubscription struct {

func (sw *RootSubscription) Recv() (*RootResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*RootResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *RootSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *RootSubscription) Response() <-chan *RootResult {
typedChan := make(chan *RootResult, 1)
go func(ch chan *RootResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*RootResult)
}(typedChan)
return typedChan
}

func (sw *RootSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
10 changes: 8 additions & 2 deletions rpc/ws/signatureSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ type SignatureSubscription struct {

func (sw *SignatureSubscription) Recv() (*SignatureResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SignatureResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down Expand Up @@ -99,7 +102,10 @@ func (sw *SignatureSubscription) RecvWithTimeout(timeout time.Duration) (*Signat
select {
case <-time.After(timeout):
return nil, ErrTimeout
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SignatureResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
21 changes: 20 additions & 1 deletion rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,32 @@ type SlotSubscription struct {

func (sw *SlotSubscription) Recv() (*SlotResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SlotResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *SlotSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *SlotSubscription) Response() <-chan *SlotResult {
typedChan := make(chan *SlotResult, 1)
go func(ch chan *SlotResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*SlotResult)
}(typedChan)
return typedChan
}

func (sw *SlotSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
21 changes: 20 additions & 1 deletion rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,32 @@ type SlotsUpdatesSubscription struct {

func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SlotsUpdatesResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *SlotsUpdatesSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *SlotsUpdatesSubscription) Response() <-chan *SlotsUpdatesResult {
typedChan := make(chan *SlotsUpdatesResult, 1)
go func(ch chan *SlotsUpdatesResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*SlotsUpdatesResult)
}(typedChan)
return typedChan
}

func (sw *SlotsUpdatesSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
21 changes: 20 additions & 1 deletion rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,32 @@ type VoteSubscription struct {

func (sw *VoteSubscription) Recv() (*VoteResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*VoteResult), nil
case err := <-sw.sub.err:
return nil, err
}
}

func (sw *VoteSubscription) Err() <-chan error {
return sw.sub.err
}
func (sw *VoteSubscription) Response() <-chan *VoteResult {
typedChan := make(chan *VoteResult, 1)
go func(ch chan *VoteResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*VoteResult)
}(typedChan)
return typedChan
}

func (sw *VoteSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}

0 comments on commit 6ed1cce

Please sign in to comment.