diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f0718fe..198377a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## master +- Added direct stream subscribing via a dedicated channel. ([@palkan][]) + +Added a `$pubsub` reserver channel to allow subscribing to streams (signed or unsigned) without relying on channels (similar to Turbo Streams). + +Enable public (unsigned) streams support via the `--public_streams` (or `ANYCABLE_PUBLIC_STREAMS=1`) option or provide a secret key to verify signed streams via the `--streams_secret=` (or `ANYCABLE_STREAMS_SECRET=`) option. + +- The `--turbo_rails_key` and `--cable_ready_key` options are deprecated in favor of the new `--streams_secret` option. The `--turbo_rails_key` and `--cable_ready_cleartext` are no longer supported (use `--public_streams` and the `$pubsub` channel instead). + - Allowing embedding AnyCable into existing web applications. ([@palkan][]) You can now set up an AnyCable instance without an HTTP server and mount AnyCable WebSocket/SSE handlers wherever you like. diff --git a/cli/cli.go b/cli/cli.go index 4f5261d1..6afb621e 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -22,10 +22,10 @@ import ( "github.com/anycable/anycable-go/mrb" "github.com/anycable/anycable-go/node" "github.com/anycable/anycable-go/pubsub" - "github.com/anycable/anycable-go/rails" "github.com/anycable/anycable-go/router" "github.com/anycable/anycable-go/server" "github.com/anycable/anycable-go/sse" + "github.com/anycable/anycable-go/streams" "github.com/anycable/anycable-go/telemetry" "github.com/anycable/anycable-go/utils" "github.com/anycable/anycable-go/version" @@ -480,13 +480,18 @@ func (r *Runner) Instrumenter() metricspkg.Instrumenter { func (r *Runner) defaultRouter() *router.RouterController { router := router.NewRouterController(nil) - if r.config.Rails.TurboRailsKey != "" || r.config.Rails.TurboRailsClearText { - turboController := rails.NewTurboController(r.config.Rails.TurboRailsKey, r.log) + if r.config.Streams.PubSubChannel != "" { + streamController := streams.NewStreamsController(&r.config.Streams, r.log) + router.Route(r.config.Streams.PubSubChannel, streamController) // nolint:errcheck + } + + if r.config.Streams.Turbo && r.config.Streams.Secret != "" { + turboController := streams.NewTurboController(r.config.Streams.Secret, r.log) router.Route("Turbo::StreamsChannel", turboController) // nolint:errcheck } - if r.config.Rails.CableReadyKey != "" || r.config.Rails.CableReadyClearText { - crController := rails.NewCableReadyController(r.config.Rails.CableReadyKey, r.log) + if r.config.Streams.CableReady && r.config.Streams.Secret != "" { + crController := streams.NewCableReadyController(r.config.Streams.Secret, r.log) router.Route("CableReady::Stream", crController) // nolint:errcheck } diff --git a/cli/options.go b/cli/options.go index 153d9ba0..c9a92b13 100644 --- a/cli/options.go +++ b/cli/options.go @@ -188,6 +188,42 @@ Use shutdown_timeout instead.`) c.SSE.AllowedOrigins = c.WS.AllowedOrigins + if c.Rails.TurboRailsKey != "" { + fmt.Println(`DEPRECATION WARNING: turbo_rails_key option is deprecated +and will be removed in the next major release of anycable-go. +Use streams_secret instead.`) + + if c.Streams.Secret == "" { + c.Streams.Secret = c.Rails.TurboRailsKey + } + + c.Streams.Turbo = true + } + + if c.Rails.TurboRailsClearText { + fmt.Println(`DEPRECATION WARNING: turbo_rails_cleartext option is deprecated +and will be removed in the next major release of anycable-go. +It has no effect anymore, use public streams instead.`) + } + + if c.Rails.CableReadyKey != "" { + fmt.Println(`DEPRECATION WARNING: cable_ready_key option is deprecated +and will be removed in the next major release of anycable-go. +Use streams_secret instead.`) + + if c.Streams.Secret == "" { + c.Streams.Secret = c.Rails.CableReadyKey + } + + c.Streams.CableReady = true + } + + if c.Rails.CableReadyClearText { + fmt.Println(`DEPRECATION WARNING: cable_ready_cleartext option is deprecated +and will be removed in the next major release of anycable-go. +It has no effect anymore, use public streams instead.`) + } + return &c, nil, false } @@ -721,6 +757,7 @@ func metricsCLIFlags(c *config.Config, filter *string, mtags *string) []cli.Flag Usage: "DEPRECATED. Specify how often flush metrics logs (in seconds)", Value: c.Metrics.LogInterval, Destination: &c.Metrics.LogInterval, + Hidden: true, }, &cli.StringFlag{ @@ -859,28 +896,56 @@ func jwtCLIFlags(c *config.Config) []cli.Flag { // signedStreamsCLIFlags returns misc CLI flags func signedStreamsCLIFlags(c *config.Config) []cli.Flag { return withDefaults(signedStreamsCategoryDescription, []cli.Flag{ + &cli.StringFlag{ + Name: "streams_secret", + Usage: "Secret you use to sign stream names", + Destination: &c.Streams.Secret, + }, + + &cli.BoolFlag{ + Name: "public_streams", + Usage: "Enable public (unsigned) streams", + Destination: &c.Streams.Public, + }, + + &cli.BoolFlag{ + Name: "turbo_streams", + Usage: "Enable Turbo Streams support", + Destination: &c.Streams.Turbo, + }, + + &cli.BoolFlag{ + Name: "cable_ready", + Usage: "Enable Cable Ready support", + Destination: &c.Streams.CableReady, + }, + &cli.StringFlag{ Name: "turbo_rails_key", - Usage: "Enable Turbo Streams fastlane with the specified signing key", + Usage: "[DEPRECATED] Enable Turbo Streams fastlane with the specified signing key", Destination: &c.Rails.TurboRailsKey, + Hidden: true, }, &cli.BoolFlag{ Name: "turbo_rails_cleartext", - Usage: "Enable Turbo Streams fastlane without stream names signing", + Usage: "[DEPRECATED] Enable Turbo Streams fastlane without stream names signing", Destination: &c.Rails.TurboRailsClearText, + Hidden: true, }, &cli.StringFlag{ Name: "cable_ready_key", - Usage: "Enable CableReady fastlane with the specified signing key", + Usage: "[DEPRECATED] Enable CableReady fastlane with the specified signing key", Destination: &c.Rails.CableReadyKey, + Hidden: true, }, &cli.BoolFlag{ Name: "cable_ready_cleartext", - Usage: "Enable Cable Ready fastlane without stream names signing", + Usage: "[DEPRECATED] Enable Cable Ready fastlane without stream names signing", Destination: &c.Rails.CableReadyClearText, + Hidden: true, }, }) } diff --git a/config/config.go b/config/config.go index efe024fc..f1ed0085 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ import ( "github.com/anycable/anycable-go/rpc" "github.com/anycable/anycable-go/server" "github.com/anycable/anycable-go/sse" + "github.com/anycable/anycable-go/streams" "github.com/anycable/anycable-go/ws" nanoid "github.com/matoous/go-nanoid" @@ -51,6 +52,7 @@ type Config struct { EmbedNats bool EmbeddedNats enats.Config SSE sse.Config + Streams streams.Config UserPresets []string } @@ -82,6 +84,7 @@ func NewConfig() Config { Rails: rails.NewConfig(), EmbeddedNats: enats.NewConfig(), SSE: sse.NewConfig(), + Streams: streams.NewConfig(), } return config diff --git a/docs/getting_started.md b/docs/getting_started.md index d7c8b08d..398c59ed 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -45,9 +45,38 @@ Run server: ```sh $ anycable-go -=> INFO time context=main Starting AnyCable v1.2.1 (pid: 12902, open files limit: 524288, gomaxprocs: 4) +=> INFO time context=main Starting AnyCable v1.4.8 (pid: 12902, open files limit: 524288, gomaxprocs: 4) ``` -By default, `anycable-go` tries to connect to an RPC server listening at `localhost:50051` (the default host for the Ruby gem). You can change this setting by providing `--rpc_host` option or `ANYCABLE_RPC_HOST` env variable (read more about [configuration](./configuration.md)). +By default, `anycable-go` tries to connect to a gRPC server listening at `localhost:50051` (the default host for the Ruby gem). + +You can change this setting by providing `--rpc_host` option or `ANYCABLE_RPC_HOST` env variable (read more about [configuration](./configuration.md)). All other configuration parameters have the same default values as the corresponding parameters for the AnyCable RPC server, so you don't need to change them usually. + +### Standalone mode (pub/sub only) + +AnyCable is designed as a logic-less proxy for your real-time features relying on a backend server to authenticate connections, authorize subscriptions and process incoming messages. That's why our default configuration assumes having an RPC server to handle all this logic. + +For pure pub/sub functionality, you can use AnyCable in a standalone mode, without any RPC servers. For that, you must configure the following features: + +- [JWT authentication](./jwt_identification.md) or disable authentication completely (`--noauth`). **NOTE:** You can still add minimal protection via the `--allowed_origins` option (see [configuration](./configuration.md#primary-settings)). + +- Enable [signed streams](./signed_streams.md) or allow public streams via the `--public_streams` option. + +There is also a shortcut option `--public` to enable both `--noauth` and `--public_streams` options. **Use it with caution**. + +Thus, to run AnyCable real-time server in an insecure standalone mode, use the following command: + +```sh +$ anycable-go --public + +... +``` + +To secure access to AnyCable server, specify either the `--jwt_secret` or `--streams_secret` option. There is also the `--secret` shortcut: + +```sh +$ anycable-go --secret=VERY_SECRET_VALUE + +``` diff --git a/docs/signed_streams.md b/docs/signed_streams.md index b30824a2..64a1e4f3 100644 --- a/docs/signed_streams.md +++ b/docs/signed_streams.md @@ -1,74 +1,169 @@ -# Speedy Hotwire and CableReady streams +# Signed streams -AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WS server without _touching_ your Ruby/Rails application (i.e., without performing RPC calls). Thus, you can make subscriptions blazingly fast and reduce the load on the RPC server. +AnyCable allows you to subscribe to _streams_ without using _channels_ (in Action Cable terminology). Channels is a great way to encapsulate business-logic for a given real-time feature, but in many cases all we need is a good old explicit pub/sub. That's where the **signed streams** feature comes into play. -In combination with [JWT identification](./jwt_identification.md), this feature makes it possible to avoid running RPC server at all in case you only need Hotwire/CableReady functionality. +> You read more about the Action Cable abstract design, how it compares to direct pub/sub and what are the pros and cons from this [Any Cables Monthly issue](https://anycable.substack.com/p/any-cables-monthly-18). Don't forget to subscribe! -> 🎥 Check out this [AnyCasts episode](https://anycable.io/blog/anycasts-rails-7-hotwire-and-anycable/) to learn how to use AnyCable with Hotwire Rails application in a RPC-less way. +Signed streams work as follows: + +- Given a stream name, say, "chat/2024", you generate its signed version using a **secret key** (see below on the signing algorithm) + +- On the client side, you subscribe to the "$pubsub" channel and provide the signed stream name as a `signed_stream_name` parameter -## Usage with Hotwire / Turbo Streams +- AnyCable process the subscribe command, verifies the stream name and completes the subscription (if verified). -We assume that you use an Action Cable integration provided by the [turbo-rails][] gem. +For verification, you MUST provide the **secret key** via the `--streams_secret` (`ANYCABLE_STREAMS_SECRET`) parameter for AnyCable. -Whenever you use the `#turbo_stream_from` helper, Rails generates a _signed stream name_ to pass along the subscription request. -In order to _verify_ it at the AnyCable Go side, we need to know the encryption key, so it must be the same for the Rails app and for the AnyCable server. +## Full-stack example: Rails -Specify the verifier key in the Rails config: +Let's consider an example of using signed stream in a Rails application. + +Assume that we want to subscribe a user with ID=17 to their personal notifications channel, "notifications/17". + +First, we need to generate a signed stream name: ```ruby -# config/environments/production.rb -config.turbo.signed_stream_verifier_key = "s3cЯeT" +signed_name = AnyCable.signed_stream_name("notifications/17") ``` -For AnyCable Go, you must specify the same key as the value for the `turbo_rails_key` configuration option (`ANYCABLE_TURBO_RAILS_KEY` env var) to activate the fast lane: +Or you can use the `#signed_stream_name` helper in your views -```sh -anycable-go --turbo_rails_key=s3cЯeT +```erb +
"> -# or -ANYCABLE_TURBO_RAILS_KEY=s3cЯeT anycable-go +
+``` + +By default, AnyCable uses `Rails.application.secret_key_base` to sign streams. We recommend configuring a custom secret though (so you can easily rotate values at both ends, the Rails app and AnyCable servers). You can specify it via the `streams_secret` configuration parameter (in `anycable.yml`, credentials, or environment). + +Then, on the client side, you can subscribe to this stream as follows: + +```js +// using @rails/actioncable +let subscription = consumer.subscriptions.create( + {channel: "$pubsub", signed_stream_name: stream}, + { + received: (msg) => { + // handle notification msg + } + } +) + +// using @anycable/web +let channel = cable.streamFromSigned(stream); +channel.on("message", (msg) => { + // handle notification +}) +``` + +Now you can broadcast messages to this stream as usual: + +```ruby +ActionCable.server.broadcast "notifications/#{user.id}", payload ``` -You should the following line in the logs at the server start: +## Public (unsigned) streams + +Sometimes you may want to skip all the signing ceremony and use plain stream names instead. With AnyCable, you can do that by enabling the `--public_streams` option (or `ANYCABLE_PUBLIC_STREAMS=true`) for the AnyCable server: ```sh -... -INFO 2021-09-14T12:49:34.274Z context=main Using channels router: Turbo::StreamsChannel -... +$ anycable-go --public_streams + +# or +$ ANYCABLE_PUBLIC_STREAMS=true anycable-go ``` -## Usage with CableReady +With public streams enabled, you can subscribe to them as follows: + +```js +// using @rails/actioncable +let subscription = consumer.subscriptions.create( + {channel: "$pubsub", stream_name: "notifications/17"}, + { + received: (msg) => { + // handle notification msg + } + } +) + +// using @anycable/web +let channel = cable.streamFrom("notifications/17"); +channel.on("message", (msg) => { + // handle notification +}) +``` -**NOTE:** This feature requires upcoming CableReady v5. Currently, [preview versions](https://rubygems.org/gems/cable_ready) are available. +## Signing algorithm -Whenever you use the `#stream_from` helper, CableReady generates a _signed stream identifier_ to pass along the subscription request. +We use the same algorithm as Rails uses in its [MessageVerifier](https://api.rubyonrails.org/v7.1.3/classes/ActiveSupport/MessageVerifier.html): -In order to _verify_ it at the AnyCable Go side, we need to know the encryption key, so it must be the same for the Rails app and for the AnyCable server. +1. Encode the stream name by first converting it into a JSON string and then encoding in Base64 format. +1. Calculate a HMAC digest using the SHA256 hash function from the secret and the encoded stream name. +1. Concatenate the encoded stream name, a double dash (`--`), and the digest. -Specify the verifier key in the CableReady config: +Here is the Ruby version of the alrogithm: ```ruby -# config/initializers/cable_ready.rb -CableReady.configure do |config| - config.verifier_key = "s3cЯeT" -end +encoded = ::Base64.strict_encode64(JSON.dump(stream_name)) +digest = OpenSSL::HMAC.hexdigest("SHA256", SECRET_KEY, encoded) +signed_stream_name = "#{encoded}--#{digest}" ``` -For AnyCable Go, you must specify the same key as the value for the `cable_ready_key` configuration option (`ANYCABLE_CABLE_READY_KEY` env var) to activate the fast lane: +The JavaScript (Node.js) version: -```sh -anycable-go --cable_ready_key=s3cЯeT +```js +import { createHmac } from 'crypto'; -# or -ANYCABLE_CABLE_READY_KEY=s3cЯeT anycable-go +const encoded = Buffer.from(JSON.stringify(stream_name)).toString('base64'); +const digest = createHmac('sha256', SECRET_KEY).update(encoded).digest('hex'); +const signedStreamName = `${encoded}--${digest}`; ``` -You should the following line in the logs at the server start: +The Python version looks as follows: -```sh -... -INFO 2021-09-14T12:52:59.371Z context=main Using channels router: CableReady::Stream -... +```python +import base64 +import json +import hmac +import hashlib + +encoded = base64.b64encode(json.dumps(stream_name).encode('utf-8')).decode('utf-8') +digest = hmac.new(SECRET_KEY.encode('utf-8'), encoded.encode('utf-8'), hashlib.sha256).hexdigest() +signed_stream_name = f"{encoded}--{digest}" +``` + +The PHP version is as follows: + +```php +$encoded = base64_encode(json_encode($stream_name)); +$digest = hash_hmac('sha256', $encoded, $SECRET_KEY); +$signed_stream_name = $encoded . '--' . $digest; ``` -[turbo-rails]: https://github.com/hotwired/turbo-rails +## Hotwire and CableReady support + +AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions). + +In combination with [JWT authentication](./jwt_identification.md), this feature makes it possible to avoid run AnyCable in a standalone mode for Hotwire/CableReady applications. + +> 🎥 Check out this [AnyCasts episode](https://anycable.io/blog/anycasts-rails-7-hotwire-and-anycable/) to learn how to use AnyCable with Hotwire Rails application in a RPC-less way. + +You must explicitly enable Turbo Streams or CableReady signed streams support at the AnyCable server side by specifying the `--turbo_streams` (`ANYCABLE_TURBO_STREAMS=true`) or `--cable_ready_streams` (`ANYCABLE_CABLE_READY_STREAMS=true`) option respectively. + +You must also provide the `--streams_secret` corresponding to the secret you use for Turbo/CableReady. You can configure them in your Rails application as follows: + +```ruby +# Turbo configuration + +# config/environments/production.rb +config.turbo.signed_stream_verifier_key = "" + +# CableReady configuration + +# config/initializers/cable_ready.rb +CableReady.configure do |config| + config.verifier_key = "" +end +``` diff --git a/features/sse.testfile b/features/sse.testfile index 3a787f41..1df47d4a 100644 --- a/features/sse.testfile +++ b/features/sse.testfile @@ -1,5 +1,5 @@ launch :anycable, - "./dist/anycable-go --sse --turbo_rails_cleartext --jwt_id_key=qwerty --broadcast_adapter=http --presets=broker" + "./dist/anycable-go --sse --public_streams --jwt_id_key=qwerty --broadcast_adapter=http --presets=broker" wait_tcp 8080 @@ -12,7 +12,7 @@ require "uri" require "net/http" require "fiber" -identifier = URI.encode_www_form_component({channel: "Turbo::StreamsChannel", signed_stream_name: stream_name}.to_json) +identifier = URI.encode_www_form_component({channel: "$pubsub", stream_name: stream_name}.to_json) url = "http://localhost:8080/events?jid=#{token}&identifier=#{identifier}" diff --git a/features/cable_ready.testfile b/features/streams.testfile similarity index 85% rename from features/cable_ready.testfile rename to features/streams.testfile index 4d48ec35..84e71a96 100644 --- a/features/cable_ready.testfile +++ b/features/streams.testfile @@ -1,5 +1,5 @@ launch :anycable, - "./dist/anycable-go --cable_ready_cleartext --jwt_id_key=qwerty " \ + "./dist/anycable-go --public_streams --jwt_id_key=qwerty " \ "--metrics_rotate_interval=1 --metrics_log --metrics_log_filter=rpc_call_total,rpc_error_total,rpc_retries_total", capture_output: true @@ -13,7 +13,7 @@ scenario = [ { client: { protocol: "action_cable", - name: "turbo", + name: "streamer", connection_options: { query: { jid: token @@ -22,9 +22,9 @@ scenario = [ actions: [ { subscribe: { - channel: "CableReady::Stream", + channel: "$pubsub", params: { - signed_stream_id: "stream/2023" + stream_name: "stream/2023" } } }, diff --git a/features/turbo_rpc_less.testfile b/features/turbo_rpc_less.testfile index 343f3e6b..cb4f5501 100644 --- a/features/turbo_rpc_less.testfile +++ b/features/turbo_rpc_less.testfile @@ -1,5 +1,5 @@ launch :anycable, - "./dist/anycable-go --turbo_rails_key=s3Krit --jwt_id_key=qwerty " \ + "./dist/anycable-go --streams_secret=s3Krit --turbo_streams --jwt_id_key=qwerty " \ "--metrics_rotate_interval=1 --metrics_log --metrics_log_filter=rpc_call_total,rpc_error_total,rpc_retries_total", capture_output: true diff --git a/rails/cable_ready.go b/rails/cable_ready.go deleted file mode 100644 index ed056be1..00000000 --- a/rails/cable_ready.go +++ /dev/null @@ -1,116 +0,0 @@ -package rails - -import ( - "encoding/json" - "log/slog" - - "github.com/anycable/anycable-go/common" - "github.com/anycable/anycable-go/node" - "github.com/anycable/anycable-go/utils" -) - -type CableReadyController struct { - verifier *utils.MessageVerifier - log *slog.Logger -} - -var _ node.Controller = (*CableReadyController)(nil) - -func NewCableReadyController(key string, l *slog.Logger) *CableReadyController { - var verifier *utils.MessageVerifier - - if key != "" { - verifier = utils.NewMessageVerifier(key) - } - - return &CableReadyController{verifier, l.With("context", "cable_ready")} -} - -func (c *CableReadyController) Start() error { - return nil -} - -func (c *CableReadyController) Shutdown() error { - return nil -} - -func (c *CableReadyController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { - return nil, nil -} - -func (c *CableReadyController) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - params := struct { - SignedStreamID string `json:"identifier"` - }{} - - err := json.Unmarshal([]byte(channel), ¶ms) - - if err != nil { - c.log.With("identifier", channel).Warn("invalid identifier", "error", err) - return nil, err - } - - var stream string - - if c.IsCleartext() { - stream = params.SignedStreamID - - c.log.With("identifier", channel).Debug("unsigned", "stream", stream) - } else { - verified, err := c.verifier.Verified(params.SignedStreamID) - - if err != nil { - c.log.With("identifier", channel).Debug("verification failed", "stream", params.SignedStreamID, "error", err) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - var ok bool - - stream, ok = verified.(string) - - if !ok { - c.log.With("identifier", channel).Debug("verification failed: stream name is not a string", "stream", verified) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - c.log.With("identifier", channel).Debug("verified", "stream", stream) - } - - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{common.ConfirmationMessage(channel)}, - Streams: []string{stream}, - DisconnectInterest: -1, - }, nil -} - -func (c *CableReadyController) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{}, - Streams: []string{}, - StopAllStreams: true, - }, nil -} - -func (c *CableReadyController) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) { - return nil, nil -} - -func (c *CableReadyController) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error { - return nil -} - -func (c *CableReadyController) IsCleartext() bool { - return c.verifier == nil -} diff --git a/rails/cable_ready_test.go b/rails/cable_ready_test.go deleted file mode 100644 index 4d9b6a3e..00000000 --- a/rails/cable_ready_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package rails - -import ( - "fmt" - "log/slog" - "testing" - - "github.com/anycable/anycable-go/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCableReadyController(t *testing.T) { - key := "s3Krit" - // CableReady::Config.instance.verifier_key = 's3Krit' - // CableReady.signed_stream_verifier.generate("stream:2021") - stream := "InN0cmVhbToyMDIxIg==--44f6315dd9faefe713ef5685e114413c1afe8759197a0fc39b15cee75769417e" - - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewCableReadyController(key, slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"stream:2021"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) - - t.Run("Subscribe (failure)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", "fake_id") - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Subscribe (failure + not a string)", func(t *testing.T) { - signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", signed) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Unsubscribe", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) - - res, err := subject.Unsubscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{}, res.Transmissions) - assert.Equal(t, []string{}, res.Streams) - assert.Equal(t, true, res.StopAllStreams) - }) -} - -func TestCableReadyControllerWithClearText(t *testing.T) { - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewCableReadyController("", slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := "{\"channel\":\"CableReady::Stream\",\"identifier\":\"stream:2023\"}" - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"stream:2023"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) -} diff --git a/rails/config.go b/rails/config.go index ded5665b..cc3297c8 100644 --- a/rails/config.go +++ b/rails/config.go @@ -1,3 +1,4 @@ +// Deprecated package rails type Config struct { diff --git a/rails/turbo.go b/rails/turbo.go deleted file mode 100644 index d2853a5a..00000000 --- a/rails/turbo.go +++ /dev/null @@ -1,116 +0,0 @@ -package rails - -import ( - "encoding/json" - "log/slog" - - "github.com/anycable/anycable-go/common" - "github.com/anycable/anycable-go/node" - "github.com/anycable/anycable-go/utils" -) - -type TurboController struct { - verifier *utils.MessageVerifier - log *slog.Logger -} - -var _ node.Controller = (*TurboController)(nil) - -func NewTurboController(key string, l *slog.Logger) *TurboController { - var verifier *utils.MessageVerifier - - if key != "" { - verifier = utils.NewMessageVerifier(key) - } - - return &TurboController{verifier, l.With("context", "turbo")} -} - -func (c *TurboController) Start() error { - return nil -} - -func (c *TurboController) Shutdown() error { - return nil -} - -func (c *TurboController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { - return nil, nil -} - -func (c *TurboController) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - params := struct { - SignedStreamID string `json:"signed_stream_name"` - }{} - - err := json.Unmarshal([]byte(channel), ¶ms) - - if err != nil { - c.log.With("identifier", channel).Warn("invalid identifier", "error", err) - return nil, err - } - - var stream string - - if c.IsCleartext() { - stream = params.SignedStreamID - - c.log.With("identifier", channel).Debug("unsigned", "stream", stream) - } else { - verified, err := c.verifier.Verified(params.SignedStreamID) - - if err != nil { - c.log.With("identifier", channel).Debug("verification failed", "stream", params.SignedStreamID, "error", err) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - var ok bool - - stream, ok = verified.(string) - - if !ok { - c.log.With("identifier", channel).Debug("verification failed: stream name is not a string", "stream", verified) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - c.log.With("identifier", channel).Debug("verified", "stream", stream) - } - - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{common.ConfirmationMessage(channel)}, - Streams: []string{stream}, - DisconnectInterest: -1, - }, nil -} - -func (c *TurboController) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{}, - Streams: []string{}, - StopAllStreams: true, - }, nil -} - -func (c *TurboController) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) { - return nil, nil -} - -func (c *TurboController) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error { - return nil -} - -func (c *TurboController) IsCleartext() bool { - return c.verifier == nil -} diff --git a/rails/turbo_test.go b/rails/turbo_test.go deleted file mode 100644 index ec0833a5..00000000 --- a/rails/turbo_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package rails - -import ( - "fmt" - "log/slog" - "testing" - - "github.com/anycable/anycable-go/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestTurboController(t *testing.T) { - key := "s3Krit" - // Turbo.signed_stream_verifier_key = 's3Krit' - // Turbo::StreamsChannel.signed_stream_name([:chat, "2021"]) - stream := "ImNoYXQ6MjAyMSI=--f9ee45dbccb1da04d8ceb99cc820207804370ba0d06b46fc3b8b373af1315628" - - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewTurboController(key, slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"chat:2021"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) - - t.Run("Subscribe (failure)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", "fake_id") - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Subscribe (failure + not a string)", func(t *testing.T) { - signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", signed) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Unsubscribe", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) - - res, err := subject.Unsubscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{}, res.Transmissions) - assert.Equal(t, []string{}, res.Streams) - assert.Equal(t, true, res.StopAllStreams) - }) -} - -func TestTurboControllerClearText(t *testing.T) { - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewTurboController("", slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := "{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"chat:2023\"}" - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"chat:2023"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) -} diff --git a/streams/config.go b/streams/config.go new file mode 100644 index 00000000..96648840 --- /dev/null +++ b/streams/config.go @@ -0,0 +1,27 @@ +// This package provides functionality to directly subscribe to streams +// without using channels (a simplified pub/sub mode) +package streams + +type Config struct { + // Secret is a key used to sign and verify streams + Secret string + + // Public determines if public (unsigned) streams are allowed + Public bool + + // PubSubChannel is the channel name used for direct pub/sub + PubSubChannel string + + // Turbo is a flag to enable Turbo Streams support + Turbo bool + + // CableReady is a flag to enable CableReady support + CableReady bool +} + +// NewConfig returns a new Config with the given key +func NewConfig() Config { + return Config{ + PubSubChannel: "$pubsub", + } +} diff --git a/streams/controller.go b/streams/controller.go new file mode 100644 index 00000000..0c1213eb --- /dev/null +++ b/streams/controller.go @@ -0,0 +1,189 @@ +package streams + +import ( + "encoding/json" + "errors" + "log/slog" + + "github.com/anycable/anycable-go/common" + "github.com/anycable/anycable-go/node" + "github.com/anycable/anycable-go/utils" + "github.com/joomcode/errorx" +) + +type SubscribeRequest struct { + StreamName string `json:"stream_name"` + SignedStreamName string `json:"signed_stream_name"` +} + +func (r *SubscribeRequest) IsPresent() bool { + return r.StreamName != "" || r.SignedStreamName != "" +} + +type StreamResolver = func(string) (*SubscribeRequest, error) + +type Controller struct { + verifier *utils.MessageVerifier + resolver StreamResolver + log *slog.Logger +} + +var _ node.Controller = (*Controller)(nil) + +func NewController(key string, resolver StreamResolver, l *slog.Logger) *Controller { + var verifier *utils.MessageVerifier + + if key != "" { + verifier = utils.NewMessageVerifier(key) + } + + return &Controller{verifier, resolver, l.With("context", "streams")} +} + +func (c *Controller) Start() error { + return nil +} + +func (c *Controller) Shutdown() error { + return nil +} + +func (c *Controller) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { + return nil, nil +} + +func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, identifier string) (*common.CommandResult, error) { + request, err := c.resolver(identifier) + + if err != nil { + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, errorx.Decorate(err, "invalid identifier") + } + + if !request.IsPresent() { + err := errors.New("malformed identifier: no stream name or signed stream") + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, err + } + + var stream string + + if request.StreamName != "" { + stream = request.StreamName + + c.log.With("identifier", identifier).Debug("unsigned", "stream", stream) + } else { + verified, err := c.verifier.Verified(request.SignedStreamName) + + if err != nil { + c.log.With("identifier", identifier).Debug("verification failed", "stream", request.SignedStreamName, "error", err) + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, + nil + } + + var ok bool + stream, ok = verified.(string) + + if !ok { + c.log.With("identifier", identifier).Debug("verification failed: stream name is not a string", "stream", verified) + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, + nil + } + + c.log.With("identifier", identifier).Debug("verified", "stream", stream) + } + + return &common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{common.ConfirmationMessage(identifier)}, + Streams: []string{stream}, + DisconnectInterest: -1, + }, nil +} + +func (c *Controller) Unsubscribe(sid string, env *common.SessionEnv, ids string, identifier string) (*common.CommandResult, error) { + return &common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{}, + Streams: []string{}, + StopAllStreams: true, + }, nil +} + +func (c *Controller) Perform(sid string, env *common.SessionEnv, ids string, identifier string, data string) (*common.CommandResult, error) { + return nil, nil +} + +func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error { + return nil +} + +func NewStreamsController(conf *Config, l *slog.Logger) *Controller { + key := conf.Secret + allowPublic := conf.Public + + resolver := func(identifier string) (*SubscribeRequest, error) { + var request SubscribeRequest + + if err := json.Unmarshal([]byte(identifier), &request); err != nil { + return nil, err + } + + if !allowPublic && request.StreamName != "" { + return nil, errors.New("public streams are not allowed") + } + + return &request, nil + } + + return NewController(key, resolver, l) +} + +type TurboMessage struct { + SignedStreamName string `json:"signed_stream_name"` +} + +func NewTurboController(key string, l *slog.Logger) *Controller { + resolver := func(identifier string) (*SubscribeRequest, error) { + var msg TurboMessage + + if err := json.Unmarshal([]byte(identifier), &msg); err != nil { + return nil, err + } + + return &SubscribeRequest{SignedStreamName: msg.SignedStreamName}, nil + } + + return NewController(key, resolver, l) +} + +type CableReadyMesssage struct { + Identifier string `json:"identifier"` +} + +func NewCableReadyController(key string, l *slog.Logger) *Controller { + resolver := func(identifier string) (*SubscribeRequest, error) { + var msg CableReadyMesssage + + if err := json.Unmarshal([]byte(identifier), &msg); err != nil { + return nil, err + } + + return &SubscribeRequest{SignedStreamName: msg.Identifier}, nil + } + + return NewController(key, resolver, l) +} diff --git a/streams/controller_test.go b/streams/controller_test.go new file mode 100644 index 00000000..67f8c4c3 --- /dev/null +++ b/streams/controller_test.go @@ -0,0 +1,197 @@ +package streams + +import ( + "fmt" + "log/slog" + "testing" + + "github.com/anycable/anycable-go/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + key = "s3Krit" + // Turbo.signed_stream_verifier_key = 's3Krit' + // Turbo::StreamsChannel.signed_stream_name([:chat, "2021"]) + stream = "ImNoYXQ6MjAyMSI=--f9ee45dbccb1da04d8ceb99cc820207804370ba0d06b46fc3b8b373af1315628" +) + +func TestNewController(t *testing.T) { + t.Run("No stream name", func(t *testing.T) { + resolver := func(string) (*SubscribeRequest, error) { + return &SubscribeRequest{}, nil + } + + subject := NewController(key, resolver, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", "") + + require.Error(t, err) + require.NotNil(t, res) + + assert.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage("")}, res.Transmissions) + }) +} + +func TestStreamsController(t *testing.T) { + t.Run("Subscribe - public", func(t *testing.T) { + conf := NewConfig() + conf.Public = true + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", `{"channel":"$pubsub","stream_name":"chat:2024"}`) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) + assert.Equal(t, []string{"chat:2024"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe - no public allowed", func(t *testing.T) { + conf := NewConfig() + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", `{"channel":"$pubsub","stream_name":"chat:2024"}`) + + require.Error(t, err) + require.NotNil(t, res) + assert.Equal(t, []string{common.RejectionMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) + }) + + t.Run("Subscribe - signed", func(t *testing.T) { + conf := NewConfig() + conf.Secret = key + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}` + + res, err := subject.Subscribe("42", nil, "name=jack", identifier) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) +} + +func TestTurboController(t *testing.T) { + env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) + subject := NewTurboController(key, slog.Default()) + + t.Run("Subscribe (success)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe (failure)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", "fake_id") + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Subscribe (failure + not a string)", func(t *testing.T) { + signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", signed) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) + + res, err := subject.Unsubscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{}, res.Transmissions) + assert.Equal(t, []string{}, res.Streams) + assert.Equal(t, true, res.StopAllStreams) + }) +} + +func TestCableReadyController(t *testing.T) { + env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) + subject := NewCableReadyController(key, slog.Default()) + + t.Run("Subscribe (success)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe (failure)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", "fake_id") + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Subscribe (failure + not a string)", func(t *testing.T) { + signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", signed) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) + + res, err := subject.Unsubscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{}, res.Transmissions) + assert.Equal(t, []string{}, res.Streams) + assert.Equal(t, true, res.StopAllStreams) + }) +}