Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the websocket client with new sync mode + more intuitive usage patterns + add daemon service #113

Merged
merged 7 commits into from
Mar 16, 2024
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/google/go-querystring v1.1.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/samber/mo v1.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand Down
36 changes: 25 additions & 11 deletions pkg/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/go-querystring/query"
"github.com/google/uuid"

"github.com/chia-network/go-chia-libs/pkg/config"
"github.com/chia-network/go-chia-libs/pkg/rpcinterface"
Expand Down Expand Up @@ -309,6 +310,8 @@ func (c *HTTPClient) httpClientForService(service rpcinterface.ServiceType) (*ht
)

switch service {
case rpcinterface.ServiceDaemon:
return nil, fmt.Errorf("daemon RPC calls must be made with the websocket client")
case rpcinterface.ServiceFullNode:
if c.nodeClient == nil {
c.nodeClient, err = c.generateHTTPClientForService(rpcinterface.ServiceFullNode)
Expand Down Expand Up @@ -376,26 +379,37 @@ func (c *HTTPClient) httpClientForService(service rpcinterface.ServiceType) (*ht

// The following are here to satisfy the interface, but are not used by the HTTP client

// SubscribeSelf subscribes to events in response to requests from this service
// Not applicable on the HTTP connection
// SubscribeSelf does not apply to the HTTP Client
func (c *HTTPClient) SubscribeSelf() error {
return nil
return fmt.Errorf("subscriptions are not supported on the HTTP client - websockets are required for subscriptions")
}

// Subscribe adds a subscription to events from a particular service
// Subscribe does not apply to the HTTP Client
// Not applicable on the HTTP connection
func (c *HTTPClient) Subscribe(service string) error {
return nil
return fmt.Errorf("subscriptions are not supported on the HTTP client - websockets are required for subscriptions")
}

// ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else
// Not applicable on the HTTP connection
func (c *HTTPClient) ListenSync(handler rpcinterface.WebsocketResponseHandler) error {
return nil
// AddHandler does not apply to HTTP Client
func (c *HTTPClient) AddHandler(handler rpcinterface.WebsocketResponseHandler) (uuid.UUID, error) {
return uuid.Nil, fmt.Errorf("handlers are not supported on the HTTP client - reponses are returned directly from the calling functions")
}

// AddDisconnectHandler Not applicable to the HTTP client
// RemoveHandler does not apply to HTTP Client
func (c *HTTPClient) RemoveHandler(handlerID uuid.UUID) {}

// AddDisconnectHandler does not apply to the HTTP Client
func (c *HTTPClient) AddDisconnectHandler(onDisconnect rpcinterface.DisconnectHandler) {}

// AddReconnectHandler Not applicable to the HTTP client
// AddReconnectHandler does not apply to the HTTP Client
func (c *HTTPClient) AddReconnectHandler(onReconnect rpcinterface.ReconnectHandler) {}

// SetSyncMode does not apply to the HTTP Client
func (c *HTTPClient) SetSyncMode() error {
return nil
}

// SetAsyncMode does not apply to the HTTP Client
func (c *HTTPClient) SetAsyncMode() error {
return fmt.Errorf("async mode is not supported on the HTTP client")
}
46 changes: 23 additions & 23 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package rpc

import (
"log"
"net/http"

"github.com/google/uuid"

"github.com/chia-network/go-chia-libs/pkg/config"
"github.com/chia-network/go-chia-libs/pkg/httpclient"
"github.com/chia-network/go-chia-libs/pkg/rpcinterface"
"github.com/chia-network/go-chia-libs/pkg/types"
"github.com/chia-network/go-chia-libs/pkg/websocketclient"
)

Expand All @@ -18,15 +18,14 @@ type Client struct {
activeClient rpcinterface.Client

// Services for the different chia services
DaemonService *DaemonService
FullNodeService *FullNodeService
WalletService *WalletService
FarmerService *FarmerService
HarvesterService *HarvesterService
CrawlerService *CrawlerService
DataLayerService *DataLayerService
TimelordService *TimelordService

websocketHandlers []rpcinterface.WebsocketResponseHandler
}

// ConnectionMode specifies the method used to connect to the server (HTTP or Websocket)
Expand Down Expand Up @@ -64,6 +63,7 @@ func NewClient(connectionMode ConnectionMode, configOption rpcinterface.ConfigOp
c.activeClient = activeClient

// Init Services
c.DaemonService = &DaemonService{client: c}
c.FullNodeService = &FullNodeService{client: c}
c.WalletService = &WalletService{client: c}
c.FarmerService = &FarmerService{client: c}
Expand Down Expand Up @@ -103,16 +103,13 @@ func (c *Client) Subscribe(service string) error {
// This is expected to NOT be used in conjunction with ListenSync
// This will run in the background, and allow other things to happen in the foreground
// while ListenSync will take over the foreground process
func (c *Client) AddHandler(handler rpcinterface.WebsocketResponseHandler) error {
c.websocketHandlers = append(c.websocketHandlers, handler)

go func() {
err := c.ListenSync(c.handlerProxy)
if err != nil {
log.Printf("Error calling ListenSync: %s\n", err.Error())
}
}()
return nil
func (c *Client) AddHandler(handler rpcinterface.WebsocketResponseHandler) (uuid.UUID, error) {
return c.activeClient.AddHandler(handler)
}

// RemoveHandler removes the handler from the list of active response handlers
func (c *Client) RemoveHandler(handlerID uuid.UUID) {
c.activeClient.RemoveHandler(handlerID)
}

// AddDisconnectHandler the function to call when the client is disconnected
Expand All @@ -125,15 +122,18 @@ func (c *Client) AddReconnectHandler(onReconnect rpcinterface.ReconnectHandler)
c.activeClient.AddReconnectHandler(onReconnect)
}

// handlerProxy matches the websocketRespHandler signature to send requests back to any registered handlers
// Here to support multiple handlers for a single event in the future
func (c *Client) handlerProxy(resp *types.WebsocketResponse, err error) {
for _, handler := range c.websocketHandlers {
handler(resp, err)
}
// SetSyncMode sets the client to wait for responses before returning
// This is default (and only option) for HTTP client
// Websocket client defaults to async mode
func (c *Client) SetSyncMode() error {
return c.activeClient.SetSyncMode()
}

// ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else
func (c *Client) ListenSync(handler rpcinterface.WebsocketResponseHandler) error {
return c.activeClient.ListenSync(handler)
// SetAsyncMode sets the client to async mode
// This does not apply to the HTTP client
// For the websocket client, this is the default mode and means that RPC function calls return immediate with empty
// versions of the structs that would otherwise contain the response, and you should have an async handler defined
// to receive the response
func (c *Client) SetAsyncMode() error {
return c.activeClient.SetAsyncMode()
}
15 changes: 12 additions & 3 deletions pkg/rpc/clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/chia-network/go-chia-libs/pkg/config"
"github.com/chia-network/go-chia-libs/pkg/httpclient"
"github.com/chia-network/go-chia-libs/pkg/rpcinterface"
"github.com/chia-network/go-chia-libs/pkg/websocketclient"
)

// WithAutoConfig automatically loads chia config from CHIA_ROOT
Expand All @@ -23,6 +24,13 @@ func WithManualConfig(cfg config.ChiaConfig) rpcinterface.ConfigOptionFunc {
}
}

// WithSyncWebsocket is a helper to making the client and calling SetSyncMode to set the client to sync mode by default
func WithSyncWebsocket() rpcinterface.ClientOptionFunc {
return func(c rpcinterface.Client) error {
return c.SetSyncMode()
}
}

// WithBaseURL sets the host for RPC requests
func WithBaseURL(url *url.URL) rpcinterface.ClientOptionFunc {
return func(c rpcinterface.Client) error {
Expand All @@ -46,11 +54,12 @@ func WithCache(validTime time.Duration) rpcinterface.ClientOptionFunc {
// WithTimeout sets the timeout for the requests
func WithTimeout(timeout time.Duration) rpcinterface.ClientOptionFunc {
return func(c rpcinterface.Client) error {
typed, ok := c.(*httpclient.HTTPClient)
if ok {
switch typed := c.(type) {
case *httpclient.HTTPClient:
typed.Timeout = timeout
case *websocketclient.WebsocketClient:
typed.Timeout = timeout
}

return nil
}
}
39 changes: 39 additions & 0 deletions pkg/rpc/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rpc

import (
"net/http"

"github.com/chia-network/go-chia-libs/pkg/rpcinterface"
)

// DaemonService encapsulates direct daemon RPC methods
type DaemonService struct {
client *Client
}

// NewRequest returns a new request specific to the crawler service
func (s *DaemonService) NewRequest(rpcEndpoint rpcinterface.Endpoint, opt interface{}) (*rpcinterface.Request, error) {
return s.client.NewRequest(rpcinterface.ServiceDaemon, rpcEndpoint, opt)
}

// Do is just a shortcut to the client's Do method
func (s *DaemonService) Do(req *rpcinterface.Request, v interface{}) (*http.Response, error) {
return s.client.Do(req, v)
}

// GetNetworkInfo gets the network name and prefix from the full node
func (s *DaemonService) GetNetworkInfo(opts *GetNetworkInfoOptions) (*GetNetworkInfoResponse, *http.Response, error) {
request, err := s.NewRequest("get_network_info", opts)
if err != nil {
return nil, nil, err
}

r := &GetNetworkInfoResponse{}

resp, err := s.Do(request, r)
if err != nil {
return nil, resp, err
}

return r, resp, nil
}
63 changes: 55 additions & 8 deletions pkg/rpc/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {

Websockets function asynchronously and as such, there are a few implementation differences compared to using the simpler HTTP request/response pattern. You must define a handler function to process responses received over the websocket connection, and you must also specifically subscribe to the events the handler should receive.

#### Handler Function
#### Handler Functions

Handler functions must use the following signature: `func handlerFunc(data *types.WebsocketResponse, err error)`. The function will be passed the data that was received from the websocket and an error.

Expand Down Expand Up @@ -106,13 +106,44 @@ func gotResponse(data *types.WebsocketResponse, err error) {
}
```

You may also use a blocking/synchronous handler function, if listening to websocket responses is all your main process is doing:
#### Synchronous Mode

If you want websockets to behave more like request/response style calls, you can enable sync mode.

To make all calls sync by default, you can set an option on the client:

```go
package main

import (
"log"
"fmt"

"github.com/chia-network/go-chia-libs/pkg/rpc"
)

func main() {
client, err := rpc.NewClient(rpc.ConnectionModeWebsocket, rpc.WithAutoConfig(), rpc.WithSyncWebsocket())
if err != nil {
// error happened
}

netInfo, _, err := client.DaemonService.GetNetworkInfo(&rpc.GetNetworkInfoOptions{})
if err != nil {
// error happened
}

// netInfo has the actual network information, since we're running in sync mode
fmt.Println(netInfo.NetworkName.OrEmpty())
}
```

You can also temporarily enable synchronous mode and then turn it back off

```go
package main

import (
"fmt"

"github.com/chia-network/go-chia-libs/pkg/rpc"
"github.com/chia-network/go-chia-libs/pkg/types"
Expand All @@ -121,19 +152,35 @@ import (
func main() {
client, err := rpc.NewClient(rpc.ConnectionModeWebsocket, rpc.WithAutoConfig())
if err != nil {
log.Fatalln(err.Error())
// error happened
}

client.ListenSync(gotResponse)
client.AddHandler(gotAsyncResponse)

// Other application logic here
client.SetSyncMode()

netInfo, _, err := client.DaemonService.GetNetworkInfo(&rpc.GetNetworkInfoOptions{})
if err != nil {
// error happened
}
fmt.Println(netInfo.NetworkName.OrEmpty())

client.SetAsyncMode()
}

func gotResponse(data *types.WebsocketResponse, err error) {
log.Printf("Received a `%s` command response\n", data.Command)
func gotAsyncResponse(data *types.WebsocketResponse, err error) {
log.Printf("Received a `%s` async command response\n", data.Command)
}
```

The output of this program will look something like the following. Note that both the async handler AND the sync response
variables saw the event and were able to handle it.

```shell
Received a `get_network_info` command response
mainnet
```

#### Subscribing to Events

There are two helper functions to subscribe to events that come over the websocket.
Expand Down
22 changes: 20 additions & 2 deletions pkg/rpcinterface/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rpcinterface
import (
"net/http"
"net/url"

"github.com/google/uuid"
)

// Client defines the interface for a client
Expand All @@ -19,8 +21,13 @@ type Client interface {
SubscribeSelf() error
// Subscribe adds a subscription to events from a particular service
Subscribe(service string) error
// ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else
ListenSync(handler WebsocketResponseHandler) error

// AddHandler adds a handler function that will be called when a message is received over the websocket
// Does not apply to HTTP client
AddHandler(handler WebsocketResponseHandler) (uuid.UUID, error)

// RemoveHandler removes the handler from the active websocket handlers
RemoveHandler(handlerID uuid.UUID)

// AddDisconnectHandler adds a function to call if the connection is disconnected
// Applies to websocket connections
Expand All @@ -29,4 +36,15 @@ type Client interface {
// AddReconnectHandler adds a function to call if the connection is reconnected
// Applies to websocket connections
AddReconnectHandler(onReconnect ReconnectHandler)

// SetSyncMode enforces synchronous request/response behavior
// This is default for HTTP client, but websocket default is async, so this forces a different mode
// Note that anything received by the websocket in sync mode that is not the current expected response
// will be ignored
SetSyncMode() error

// SetAsyncMode sets the client to async mode
// This is not supported for the HTTP client, but will set the websocket client back to async mode
// if it was set to sync mode temporarily
SetAsyncMode() error
}
Loading