diff --git a/.gitignore b/.gitignore index de82cf20..b25a761e 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,4 @@ Gemfile.lock /k6 /coverage.out +admin/ui/build diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f0718fe..12a0716d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,29 @@ ## master +- HTTP broadcaster is enabled by default. ([@palkan][]) + +HTTP broadcaster is now enabled by default in addition to Redis (backward-compatibility). It will be the primary default broadcaster in v2. + +- Added new top-level `--secret` configuration option. ([@palkan][]) + +This is a one secret to rule them all! Unless specific secrets are specified, all features relying on secret keys use the global (_application_) secret either directly (signed streams, JWT) or as a secret base to generate auth tokens (HTTP broadcasting and HTTP RPC). + +- Added public mode (`--public`). ([@palkan][]) + +In this mode, AnyCable doesn't enforce connection authentication, enables public streams, and disables HTTP broadcast endpoint authentication. (So, it's a combination of `--noauth` and `--public_streams`). + +- Added direct stream subscribing via a dedicated channel. ([@palkan][]) + +Added a `$pubsub` reserved channel to allow subscribing to streams (signed or unsigned) without relying on channels (similar to Turbo Streams). + +Enable public (unsigned) streams support via the `--public_streams` (or `ANYCABLE_PUBLIC_STREAMS=1`) option or provide a secret key to verify signed streams via the `--streams_secret=` (or `ANYCABLE_STREAMS_SECRET=`) option. + +- Config options refactoring (renaming): + + - Deprecated (but still supported): `--turbo_rails_key` (now `--turbo_streams_secret`), `--cable_ready_key` (now `--cable_ready_secret`), `--jwt_id_key` (now `--jwt_secret`), `--jwt_id_param` (now `--jwt_param`), `--jwt_id_enforce` (now `--enforce_jwt`). + - Deprecated (no longer supported): `--turbo_rails_cleartext`, `--cable_ready_cleartext` (use `--public_streams` instead). + - Allowing embedding AnyCable into existing web applications. ([@palkan][]) You can now set up an AnyCable instance without an HTTP server and mount AnyCable WebSocket/SSE handlers wherever you like. diff --git a/Makefile b/Makefile index bd6f02fd..869bf6e3 100644 --- a/Makefile +++ b/Makefile @@ -153,6 +153,7 @@ test-conformance-http: tmp/anycable-go-test \ ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \ ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \ + ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/_broadcast \ bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --rpc_host=http://localhost:9292/_anycable" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb test-conformance-nats: tmp/anycable-go-test @@ -162,17 +163,18 @@ test-conformance-nats-embedded: tmp/anycable-go-test ANYCABLE_BROADCAST_ADAPTER=nats ANYCABLE_NATS_SERVERS=nats://127.0.0.1:4242 ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4242 bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb test-conformance-broker-http: tmp/anycable-go-test - ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb + ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb test-conformance-broker-redis: tmp/anycable-go-test ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb test-conformance-broker-nats: tmp/anycable-go-test - ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb + ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb test-conformance-embedded: tmp/anycable-embedded-test \ ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \ + ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/broadcast \ ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \ ANYCABLE_RPC_HOST=http://localhost:9292/_anycable \ ANYCABLE_HEADERS=cookie,x-api-token \ diff --git a/broadcast/http.go b/broadcast/http.go index a79ba0b1..ecd5287b 100644 --- a/broadcast/http.go +++ b/broadcast/http.go @@ -9,11 +9,13 @@ import ( "strconv" "github.com/anycable/anycable-go/server" + "github.com/anycable/anycable-go/utils" + "github.com/joomcode/errorx" ) const ( - defaultHTTPPort = 8090 - defaultHTTPPath = "/_broadcast" + defaultHTTPPath = "/_broadcast" + broadcastKeyPhrase = "broadcast-cable" ) // HTTPConfig contains HTTP pubsub adapter configuration @@ -24,20 +26,26 @@ type HTTPConfig struct { Path string // Secret token to authorize requests Secret string + // SecretBase is a secret used to generate a token if none provided + SecretBase string } // NewHTTPConfig builds a new config for HTTP pub/sub func NewHTTPConfig() HTTPConfig { return HTTPConfig{ - Port: defaultHTTPPort, Path: defaultHTTPPath, } } +func (c *HTTPConfig) IsSecured() bool { + return c.Secret != "" || c.SecretBase != "" +} + // HTTPBroadcaster represents HTTP broadcaster type HTTPBroadcaster struct { port int path string + conf *HTTPConfig authHeader string server *server.HTTPServer node Handler @@ -48,18 +56,12 @@ var _ Broadcaster = (*HTTPBroadcaster)(nil) // NewHTTPBroadcaster builds a new HTTPSubscriber struct func NewHTTPBroadcaster(node Handler, config *HTTPConfig, l *slog.Logger) *HTTPBroadcaster { - authHeader := "" - - if config.Secret != "" { - authHeader = fmt.Sprintf("Bearer %s", config.Secret) - } - return &HTTPBroadcaster{ - node: node, - log: l.With("context", "broadcast").With("provider", "http"), - port: config.Port, - path: config.Path, - authHeader: authHeader, + node: node, + log: l.With("context", "broadcast").With("provider", "http"), + port: config.Port, + path: config.Path, + conf: config, } } @@ -67,6 +69,32 @@ func (HTTPBroadcaster) IsFanout() bool { return false } +// Prepare configures the broadcaster to make it ready to accept requests +// (i.e., calculates the authentication token, etc.) +func (s *HTTPBroadcaster) Prepare() error { + authHeader := "" + + if s.conf.Secret == "" && s.conf.SecretBase != "" { + secret, err := utils.NewMessageVerifier(s.conf.SecretBase).Sign([]byte(broadcastKeyPhrase)) + + if err != nil { + err = errorx.Decorate(err, "failed to auto-generate authentication key for HTTP broadcaster") + return err + } + + s.log.Info("auto-generated authorization secret from the application secret") + s.conf.Secret = string(secret) + } + + if s.conf.Secret != "" { + authHeader = fmt.Sprintf("Bearer %s", s.conf.Secret) + } + + s.authHeader = authHeader + + return nil +} + // Start creates an HTTP server or attaches a handler to the existing one func (s *HTTPBroadcaster) Start(done chan (error)) error { server, err := server.ForPort(strconv.Itoa(s.port)) @@ -75,10 +103,23 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error { return err } + err = s.Prepare() + if err != nil { + return err + } + s.server = server s.server.SetupHandler(s.path, http.HandlerFunc(s.Handler)) - s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s", s.server.Address(), s.path)) + var verifiedVia string + + if s.authHeader != "" { + verifiedVia = "authorization required" + } else { + verifiedVia = "no authorization" + } + + s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s (%s)", s.server.Address(), s.path, verifiedVia)) go func() { if err := s.server.StartAndAnnounce("broadcasting HTTP server"); err != nil { diff --git a/broadcast/http_test.go b/broadcast/http_test.go index c321b05f..e27c70e5 100644 --- a/broadcast/http_test.go +++ b/broadcast/http_test.go @@ -1,6 +1,7 @@ package broadcast import ( + "context" "encoding/json" "log/slog" "net/http" @@ -10,15 +11,28 @@ import ( "github.com/anycable/anycable-go/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHttpHandler(t *testing.T) { handler := &mocks.Handler{} - config := HTTPConfig{} - secretConfig := HTTPConfig{Secret: "secret"} + config := NewHTTPConfig() + + secretConfig := NewHTTPConfig() + secretConfig.SecretBase = "qwerty" + broadcastKey := "42923a28b760e667fc92f7c6123bb07a282822b329dd2ef48e7aee7830d98485" + broadcaster := NewHTTPBroadcaster(handler, &config, slog.Default()) protectedBroadcaster := NewHTTPBroadcaster(handler, &secretConfig, slog.Default()) + done := make(chan (error)) + + require.NoError(t, broadcaster.Start(done)) + defer broadcaster.Shutdown(context.Background()) // nolint:errcheck + + require.NoError(t, protectedBroadcaster.Start(done)) + defer protectedBroadcaster.Shutdown(context.Background()) // nolint:errcheck + payload, err := json.Marshal(map[string]string{"stream": "any_test", "data": "123_test"}) if err != nil { t.Fatal(err) @@ -31,9 +45,7 @@ func TestHttpHandler(t *testing.T) { t.Run("Handles broadcasts", func(t *testing.T) { req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(broadcaster.Handler) @@ -44,9 +56,7 @@ func TestHttpHandler(t *testing.T) { t.Run("Rejects non-POST requests", func(t *testing.T) { req, err := http.NewRequest("GET", "/", strings.NewReader(string(payload))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(broadcaster.Handler) @@ -57,9 +67,7 @@ func TestHttpHandler(t *testing.T) { t.Run("Rejects when authorization header is missing", func(t *testing.T) { req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(protectedBroadcaster.Handler) @@ -68,13 +76,11 @@ func TestHttpHandler(t *testing.T) { assert.Equal(t, http.StatusUnauthorized, rr.Code) }) - t.Run("Rejects when authorization header is valid", func(t *testing.T) { + t.Run("Accepts when authorization header is valid", func(t *testing.T) { req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload))) - req.Header.Set("Authorization", "Bearer secret") + req.Header.Set("Authorization", "Bearer "+broadcastKey) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(protectedBroadcaster.Handler) diff --git a/cli/cli.go b/cli/cli.go index 4f5261d1..7600f132 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -22,10 +22,10 @@ import ( "github.com/anycable/anycable-go/mrb" "github.com/anycable/anycable-go/node" "github.com/anycable/anycable-go/pubsub" - "github.com/anycable/anycable-go/rails" "github.com/anycable/anycable-go/router" "github.com/anycable/anycable-go/server" "github.com/anycable/anycable-go/sse" + "github.com/anycable/anycable-go/streams" "github.com/anycable/anycable-go/telemetry" "github.com/anycable/anycable-go/utils" "github.com/anycable/anycable-go/version" @@ -162,6 +162,10 @@ func (r *Runner) Run() error { r.log.Info(fmt.Sprintf("Starting %s %s%s (pid: %d, open file limit: %s, gomaxprocs: %d)", r.name, version.Version(), mrubySupport, os.Getpid(), utils.OpenFileLimit(), numProcs)) + if r.config.IsPublic() { + r.log.Warn("Server is running in the public mode") + } + appNode, err := r.runNode() if err != nil { @@ -377,10 +381,23 @@ func (r *Runner) newController(metrics *metricspkg.Metrics) (node.Controller, er return nil, errorx.Decorate(err, "!!! Failed to initialize controller !!!") } + ids := []identity.Identifier{} + if r.config.JWT.Enabled() { - identifier := identity.NewJWTIdentifier(&r.config.JWT, r.log) + ids = append(ids, identity.NewJWTIdentifier(&r.config.JWT, r.log)) + r.log.Info(fmt.Sprintf("JWT authentication is enabled (param: %s, enforced: %v)", r.config.JWT.Param, r.config.JWT.Force)) + } + + if r.config.SkipAuth { + ids = append(ids, identity.NewPublicIdentifier()) + r.log.Info("connection authentication is disabled") + } + + if len(ids) > 1 { + identifier := identity.NewIdentifierPipeline(ids...) controller = identity.NewIdentifiableController(controller, identifier) - r.log.Info(fmt.Sprintf("JWT identification is enabled (param: %s, enforced: %v)", r.config.JWT.Param, r.config.JWT.Force)) + } else if len(ids) == 1 { + controller = identity.NewIdentifiableController(controller, ids[0]) } if !r.Router().Empty() { @@ -480,13 +497,18 @@ func (r *Runner) Instrumenter() metricspkg.Instrumenter { func (r *Runner) defaultRouter() *router.RouterController { router := router.NewRouterController(nil) - if r.config.Rails.TurboRailsKey != "" || r.config.Rails.TurboRailsClearText { - turboController := rails.NewTurboController(r.config.Rails.TurboRailsKey, r.log) + if r.config.Streams.PubSubChannel != "" { + streamController := streams.NewStreamsController(&r.config.Streams, r.log) + router.Route(r.config.Streams.PubSubChannel, streamController) // nolint:errcheck + } + + if r.config.Streams.Turbo && r.config.Streams.GetTurboSecret() != "" { + turboController := streams.NewTurboController(r.config.Streams.GetTurboSecret(), r.log) router.Route("Turbo::StreamsChannel", turboController) // nolint:errcheck } - if r.config.Rails.CableReadyKey != "" || r.config.Rails.CableReadyClearText { - crController := rails.NewCableReadyController(r.config.Rails.CableReadyKey, r.log) + if r.config.Streams.CableReady && r.config.Streams.GetCableReadySecret() != "" { + crController := streams.NewCableReadyController(r.config.Streams.GetCableReadySecret(), r.log) router.Route("CableReady::Stream", crController) // nolint:errcheck } diff --git a/cli/embed.go b/cli/embed.go index a7a83ca8..9f607f42 100644 --- a/cli/embed.go +++ b/cli/embed.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/anycable/anycable-go/broadcast" "github.com/anycable/anycable-go/node" "github.com/anycable/anycable-go/utils" "github.com/anycable/anycable-go/version" @@ -40,6 +41,19 @@ func (e *Embedded) SSEHandler(ctx context.Context) (http.Handler, error) { return sseHandler, nil } +// HTTPBroadcastHandler returns an HTTP handler to process broadcasting requests +func (e *Embedded) HTTPBroadcastHandler() (http.Handler, error) { + broadcaster := broadcast.NewHTTPBroadcaster(e.n, &e.r.config.HTTPBroadcast, e.r.log) + + err := broadcaster.Prepare() + + if err != nil { + return nil, err + } + + return http.HandlerFunc(broadcaster.Handler), nil +} + // Shutdown stops the AnyCable node gracefully. func (e *Embedded) Shutdown(ctx context.Context) error { for _, shutdownable := range e.r.shutdownables { diff --git a/cli/options.go b/cli/options.go index 153d9ba0..06b1672c 100644 --- a/cli/options.go +++ b/cli/options.go @@ -59,6 +59,12 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error, var metricsFilter string var enatsRoutes, enatsGateways string var presets string + var turboRailsKey, cableReadyKey string + var turboRailsClearText, cableReadyClearText bool + var jwtIdKey, jwtIdParam string + var jwtIdEnforce bool + var noRPC bool + var isPublic bool // Print raw version without prefix cli.VersionPrinter = func(cCtx *cli.Context) { @@ -66,21 +72,21 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error, } flags := []cli.Flag{} - flags = append(flags, serverCLIFlags(&c, &path)...) + flags = append(flags, serverCLIFlags(&c, &path, &isPublic)...) flags = append(flags, sslCLIFlags(&c)...) flags = append(flags, broadcastCLIFlags(&c)...) flags = append(flags, brokerCLIFlags(&c)...) flags = append(flags, redisCLIFlags(&c)...) flags = append(flags, httpBroadcastCLIFlags(&c)...) flags = append(flags, natsCLIFlags(&c)...) - flags = append(flags, rpcCLIFlags(&c, &headers, &cookieFilter)...) + flags = append(flags, rpcCLIFlags(&c, &headers, &cookieFilter, &noRPC)...) flags = append(flags, disconnectorCLIFlags(&c)...) flags = append(flags, logCLIFlags(&c)...) flags = append(flags, metricsCLIFlags(&c, &metricsFilter, &mtags)...) flags = append(flags, wsCLIFlags(&c)...) flags = append(flags, pingCLIFlags(&c)...) - flags = append(flags, jwtCLIFlags(&c)...) - flags = append(flags, signedStreamsCLIFlags(&c)...) + flags = append(flags, jwtCLIFlags(&c, &jwtIdKey, &jwtIdParam, &jwtIdEnforce)...) + flags = append(flags, signedStreamsCLIFlags(&c, &turboRailsKey, &cableReadyKey, &turboRailsClearText, &cableReadyClearText)...) flags = append(flags, statsdCLIFlags(&c)...) flags = append(flags, embeddedNatsCLIFlags(&c, &enatsRoutes, &enatsGateways)...) flags = append(flags, sseCLIFlags(&c)...) @@ -188,6 +194,136 @@ Use shutdown_timeout instead.`) c.SSE.AllowedOrigins = c.WS.AllowedOrigins + if turboRailsKey != "" { + fmt.Println(`DEPRECATION WARNING: turbo_rails_key option is deprecated +and will be removed in the next major release of anycable-go. +Use turbo_streams_secret instead.`) + + c.Streams.TurboSecret = turboRailsKey + + c.Streams.Turbo = true + } + + if turboRailsClearText { + fmt.Println(`DEPRECATION WARNING: turbo_rails_cleartext option is deprecated +and will be removed in the next major release of anycable-go. +It has no effect anymore, use public streams instead.`) + } + + if cableReadyKey != "" { + fmt.Println(`DEPRECATION WARNING: cable_ready_key option is deprecated +and will be removed in the next major release of anycable-go. +Use cable_ready_secret instead.`) + + c.Streams.CableReadySecret = cableReadyKey + + c.Streams.CableReady = true + } + + if cableReadyClearText { + fmt.Println(`DEPRECATION WARNING: cable_ready_cleartext option is deprecated +and will be removed in the next major release of anycable-go. +It has no effect anymore, use public streams instead.`) + } + + if jwtIdKey != "" { + fmt.Println(`DEPRECATION WARNING: jwt_id_key option is deprecated +and will be removed in the next major release of anycable-go. +Use jwt_secret instead.`) + + if c.JWT.Secret == "" { + c.JWT.Secret = jwtIdKey + } + } + + if jwtIdParam != "" { + fmt.Println(`DEPRECATION WARNING: jwt_id_param option is deprecated +and will be removed in the next major release of anycable-go. +Use jwt_param instead.`) + + if c.JWT.Param == "" { + c.JWT.Param = jwtIdParam + } + } + + if jwtIdEnforce { + fmt.Println(`DEPRECATION WARNING: jwt_id_enforce option is deprecated +and will be removed in the next major release of anycable-go. +Use enfore_jwt instead.`) + + c.JWT.Force = true + } + + // Configure RPC + if noRPC { + c.RPC.Implementation = "none" + } + + // Legacy HTTP authentication stuff + if c.HTTPBroadcast.Secret != "" { + fmt.Println(`DEPRECATION WARNING: http_broadcast_secret option is deprecated +and will be removed in the next major release of anycable-go. +Use broadcast_key instead.`) + } + + if c.HTTPBroadcast.Secret == "" { + c.HTTPBroadcast.Secret = c.BroadcastKey + } + + // Fallback secrets + if c.Secret != "" { + if c.Streams.Secret == "" { + c.Streams.Secret = c.Secret + } + + if c.JWT.Secret == "" { + c.JWT.Secret = c.Secret + } + + if c.HTTPBroadcast.Secret == "" { + c.HTTPBroadcast.SecretBase = c.Secret + } + + if c.RPC.Secret == "" { + c.RPC.SecretBase = c.Secret + } + } + + // Nullify none secrets + if c.Streams.Secret == "none" { + c.Streams.Secret = "" + } + + if c.JWT.Secret == "none" { + c.JWT.Secret = "" + } + + if c.RPC.Secret == "none" { + c.RPC.Secret = "" + } + + if c.HTTPBroadcast.Secret == "none" { + c.HTTPBroadcast.Secret = "" + } + + // Configure default HTTP port + if c.HTTPBroadcast.Port == 0 { + if c.HTTPBroadcast.IsSecured() { + c.HTTPBroadcast.Port = c.Port + } else { + c.HTTPBroadcast.Port = 8090 + } + } + + // Configure public mode and other insecure features + if isPublic { + c.SkipAuth = true + c.Streams.Public = true + // Ensure broadcasting is also public + c.HTTPBroadcast.Secret = "" + c.HTTPBroadcast.SecretBase = "" + } + return &c, nil, false } @@ -233,7 +369,7 @@ var ( ) // serverCLIFlags returns base server flags -func serverCLIFlags(c *config.Config, path *string) []cli.Flag { +func serverCLIFlags(c *config.Config, path *string, isPublic *bool) []cli.Flag { return withDefaults(serverCategoryDescription, []cli.Flag{ &cli.StringFlag{ Name: "host", @@ -250,6 +386,33 @@ func serverCLIFlags(c *config.Config, path *string) []cli.Flag { Destination: &c.Port, }, + &cli.StringFlag{ + Name: "secret", + Usage: "A common secret key used by all features by default", + Value: c.Secret, + Destination: &c.Secret, + }, + + &cli.StringFlag{ + Name: "broadcast_key", + Usage: "An authentication key for broadcast requests", + Value: c.BroadcastKey, + Destination: &c.BroadcastKey, + }, + + &cli.BoolFlag{ + Name: "public", + Usage: "[DANGER ZONE] Run server in the public mode allowing all connections and stream subscriptions", + Destination: isPublic, + }, + + &cli.BoolFlag{ + Name: "noauth", + Usage: "[DANGER ZONE] Disable client authentication over RPC", + Value: c.SkipAuth, + Destination: &c.SkipAuth, + }, + &cli.IntFlag{ Name: "max-conn", Usage: "Limit simultaneous server connections (0 – without limit)", @@ -330,6 +493,7 @@ func broadcastCLIFlags(c *config.Config) []cli.Flag { Usage: "The size of the goroutines pool to broadcast messages", Value: c.App.HubGopoolSize, Destination: &c.App.HubGopoolSize, + Hidden: true, }, }) } @@ -386,6 +550,7 @@ func redisCLIFlags(c *config.Config) []cli.Flag { Usage: "Interval to rediscover sentinels in seconds", Value: c.Redis.SentinelDiscoveryInterval, Destination: &c.Redis.SentinelDiscoveryInterval, + Hidden: true, }, &cli.IntFlag{ @@ -393,6 +558,7 @@ func redisCLIFlags(c *config.Config) []cli.Flag { Usage: "Interval to periodically ping Redis to make sure it's alive", Value: c.Redis.KeepalivePingInterval, Destination: &c.Redis.KeepalivePingInterval, + Hidden: true, }, &cli.BoolFlag{ @@ -400,6 +566,7 @@ func redisCLIFlags(c *config.Config) []cli.Flag { Usage: "Verify Redis server TLS certificate (only if URL protocol is rediss://)", Value: c.Redis.TLSVerify, Destination: &c.Redis.TLSVerify, + Hidden: true, }, &cli.BoolFlag{ @@ -407,6 +574,7 @@ func redisCLIFlags(c *config.Config) []cli.Flag { Usage: "Disable client-side caching", Value: c.Redis.DisableCache, Destination: &c.Redis.DisableCache, + Hidden: true, }, }) } @@ -430,8 +598,9 @@ func httpBroadcastCLIFlags(c *config.Config) []cli.Flag { &cli.StringFlag{ Name: "http_broadcast_secret", - Usage: "HTTP pub/sub authorization secret", + Usage: "[Deprecated] HTTP pub/sub authorization secret", Destination: &c.HTTPBroadcast.Secret, + Hidden: true, }, }) } @@ -457,6 +626,7 @@ func natsCLIFlags(c *config.Config) []cli.Flag { Name: "nats_dont_randomize_servers", Usage: "Pass this option to disable NATS servers randomization during (re-)connect", Destination: &c.NATS.DontRandomizeServers, + Hidden: true, }, }) } @@ -476,6 +646,7 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) [] Usage: "NATS server bind address", Value: c.EmbeddedNats.ServiceAddr, Destination: &c.EmbeddedNats.ServiceAddr, + Hidden: true, }, &cli.StringFlag{ @@ -483,6 +654,7 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) [] Usage: "NATS cluster service bind address", Value: c.EmbeddedNats.ClusterAddr, Destination: &c.EmbeddedNats.ClusterAddr, + Hidden: true, }, &cli.StringFlag{ @@ -503,6 +675,7 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) [] Usage: "NATS gateway bind address", Value: c.EmbeddedNats.GatewayAddr, Destination: &c.EmbeddedNats.GatewayAddr, + Hidden: true, }, &cli.StringFlag{ @@ -523,6 +696,7 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) [] Usage: "Embedded NATS store directory (for JetStream)", Value: c.EmbeddedNats.StoreDir, Destination: &c.EmbeddedNats.StoreDir, + Hidden: true, }, &cli.StringFlag{ @@ -536,18 +710,20 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) [] Name: "enats_debug", Usage: "Enable NATS server logs", Destination: &c.EmbeddedNats.Debug, + Hidden: true, }, &cli.BoolFlag{ Name: "enats_trace", Usage: "Enable NATS server protocol trace logs", Destination: &c.EmbeddedNats.Trace, + Hidden: true, }, }) } // rpcCLIFlags returns CLI flags for RPC -func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { +func rpcCLIFlags(c *config.Config, headers, cookieFilter *string, isNone *bool) []cli.Flag { return withDefaults(rpcCategoryDescription, []cli.Flag{ &cli.StringFlag{ Name: "rpc_host", @@ -556,6 +732,12 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Destination: &c.RPC.Host, }, + &cli.BoolFlag{ + Name: "norpc", + Usage: "Disable RPC component and run server in the standalone mode", + Destination: isNone, + }, + &cli.IntFlag{ Name: "rpc_concurrency", Usage: "Max number of concurrent RPC request; should be slightly less than the RPC server concurrency", @@ -567,6 +749,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Name: "rpc_enable_tls", Usage: "Enable client-side TLS with the RPC server", Destination: &c.RPC.EnableTLS, + Hidden: true, }, &cli.BoolFlag{ @@ -574,12 +757,14 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Usage: "Whether to verify the RPC server certificate", Destination: &c.RPC.TLSVerify, Value: true, + Hidden: true, }, &cli.StringFlag{ Name: "rpc_tls_root_ca", Usage: "CA root certificate file path or contents in PEM format (if not set, system CAs will be used)", Destination: &c.RPC.TLSRootCA, + Hidden: true, }, &cli.IntFlag{ @@ -587,6 +772,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Usage: "Override default MaxCallRecvMsgSize for RPC client (bytes)", Value: c.RPC.MaxRecvSize, Destination: &c.RPC.MaxRecvSize, + Hidden: true, }, &cli.IntFlag{ @@ -594,6 +780,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Usage: "Override default MaxCallSendMsgSize for RPC client (bytes)", Value: c.RPC.MaxSendSize, Destination: &c.RPC.MaxSendSize, + Hidden: true, }, &cli.StringFlag{ @@ -614,6 +801,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Usage: "RPC implementation (grpc, http)", Value: c.RPC.Implementation, Destination: &c.RPC.Implementation, + Hidden: true, }, &cli.StringFlag{ @@ -628,6 +816,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string) []cli.Flag { Usage: "HTTP RPC timeout (in ms)", Value: c.RPC.RequestTimeout, Destination: &c.RPC.RequestTimeout, + Hidden: true, }, }) } @@ -647,6 +836,7 @@ func disconnectorCLIFlags(c *config.Config) []cli.Flag { Usage: "Max number of Disconnect calls per second", Value: c.DisconnectQueue.Rate, Destination: &c.DisconnectQueue.Rate, + Hidden: true, }, &cli.IntFlag{ @@ -721,6 +911,7 @@ func metricsCLIFlags(c *config.Config, filter *string, mtags *string) []cli.Flag Usage: "DEPRECATED. Specify how often flush metrics logs (in seconds)", Value: c.Metrics.LogInterval, Destination: &c.Metrics.LogInterval, + Hidden: true, }, &cli.StringFlag{ @@ -733,6 +924,7 @@ func metricsCLIFlags(c *config.Config, filter *string, mtags *string) []cli.Flag Name: "metrics_log_formatter", Usage: "Specify the path to custom Ruby formatter script (only supported on MacOS and Linux)", Destination: &c.Metrics.LogFormatter, + Hidden: true, }, &cli.StringFlag{ @@ -776,6 +968,7 @@ func wsCLIFlags(c *config.Config) []cli.Flag { Usage: "WebSocket connection read buffer size", Value: c.WS.ReadBufferSize, Destination: &c.WS.ReadBufferSize, + Hidden: true, }, &cli.IntFlag{ @@ -783,6 +976,7 @@ func wsCLIFlags(c *config.Config) []cli.Flag { Usage: "WebSocket connection write buffer size", Value: c.WS.WriteBufferSize, Destination: &c.WS.WriteBufferSize, + Hidden: true, }, &cli.Int64Flag{ @@ -790,12 +984,14 @@ func wsCLIFlags(c *config.Config) []cli.Flag { Usage: "Maximum size of a message in bytes", Value: c.WS.MaxMessageSize, Destination: &c.WS.MaxMessageSize, + Hidden: true, }, &cli.BoolFlag{ Name: "enable_ws_compression", Usage: "Enable experimental WebSocket per message compression", Destination: &c.WS.EnableCompression, + Hidden: true, }, &cli.StringFlag{ @@ -821,6 +1017,7 @@ func pingCLIFlags(c *config.Config) []cli.Flag { Usage: "Precision for timestamps in ping messages (s, ms, ns)", Value: c.App.PingTimestampPrecision, Destination: &c.App.PingTimestampPrecision, + Hidden: true, }, &cli.IntFlag{ @@ -833,16 +1030,30 @@ func pingCLIFlags(c *config.Config) []cli.Flag { } // jwtCLIFlags returns CLI flags for JWT -func jwtCLIFlags(c *config.Config) []cli.Flag { +func jwtCLIFlags(c *config.Config, jwtIdKey *string, jwtIdParam *string, jwtIdEnforce *bool) []cli.Flag { return withDefaults(jwtCategoryDescription, []cli.Flag{ &cli.StringFlag{ Name: "jwt_id_key", + Destination: jwtIdKey, + Usage: "[Depracated]", + Hidden: true, + }, + + &cli.StringFlag{ + Name: "jwt_secret", Usage: "The encryption key used to verify JWT tokens", Destination: &c.JWT.Secret, }, &cli.StringFlag{ Name: "jwt_id_param", + Destination: jwtIdParam, + Usage: "[Deprecated]", + Hidden: true, + }, + + &cli.StringFlag{ + Name: "jwt_param", Usage: "The name of a query string param or an HTTP header carrying a token", Value: c.JWT.Param, Destination: &c.JWT.Param, @@ -850,6 +1061,13 @@ func jwtCLIFlags(c *config.Config) []cli.Flag { &cli.BoolFlag{ Name: "jwt_id_enforce", + Usage: "[Deprecated]", + Destination: jwtIdEnforce, + Hidden: true, + }, + + &cli.BoolFlag{ + Name: "enforce_jwt", Usage: "Whether to enforce token presence for all connections", Destination: &c.JWT.Force, }, @@ -857,30 +1075,70 @@ func jwtCLIFlags(c *config.Config) []cli.Flag { } // signedStreamsCLIFlags returns misc CLI flags -func signedStreamsCLIFlags(c *config.Config) []cli.Flag { +func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKey *string, turboRailsClearText *bool, cableReadyCleartext *bool) []cli.Flag { return withDefaults(signedStreamsCategoryDescription, []cli.Flag{ + &cli.StringFlag{ + Name: "streams_secret", + Usage: "Secret you use to sign stream names", + Destination: &c.Streams.Secret, + }, + + &cli.BoolFlag{ + Name: "public_streams", + Usage: "Enable public (unsigned) streams", + Destination: &c.Streams.Public, + }, + + &cli.BoolFlag{ + Name: "turbo_streams", + Usage: "Enable Turbo Streams support", + Destination: &c.Streams.Turbo, + }, + + &cli.BoolFlag{ + Name: "cable_ready", + Usage: "Enable Cable Ready support", + Destination: &c.Streams.CableReady, + }, + &cli.StringFlag{ Name: "turbo_rails_key", - Usage: "Enable Turbo Streams fastlane with the specified signing key", - Destination: &c.Rails.TurboRailsKey, + Usage: "[Deprecated]", + Destination: turboRailsKey, + Hidden: true, + }, + + &cli.StringFlag{ + Name: "turbo_streams_secret", + Usage: "A custom secret to verify Turbo Streams", + Destination: &c.Streams.TurboSecret, }, &cli.BoolFlag{ Name: "turbo_rails_cleartext", - Usage: "Enable Turbo Streams fastlane without stream names signing", - Destination: &c.Rails.TurboRailsClearText, + Usage: "[DEPRECATED] Enable Turbo Streams fastlane without stream names signing", + Destination: turboRailsClearText, + Hidden: true, }, &cli.StringFlag{ Name: "cable_ready_key", - Usage: "Enable CableReady fastlane with the specified signing key", - Destination: &c.Rails.CableReadyKey, + Usage: "[Deprecated]", + Destination: cableReadyKey, + Hidden: true, + }, + + &cli.StringFlag{ + Name: "cable_ready_secret", + Usage: "A custom secret to verify CableReady streams", + Destination: &c.Streams.CableReadySecret, }, &cli.BoolFlag{ Name: "cable_ready_cleartext", - Usage: "Enable Cable Ready fastlane without stream names signing", - Destination: &c.Rails.CableReadyClearText, + Usage: "[DEPRECATED] Enable Cable Ready fastlane without stream names signing", + Destination: cableReadyCleartext, + Hidden: true, }, }) } @@ -904,6 +1162,7 @@ func statsdCLIFlags(c *config.Config) []cli.Flag { Usage: "Statsd client maximum UDP packet size", Value: c.Metrics.Statsd.MaxPacketSize, Destination: &c.Metrics.Statsd.MaxPacketSize, + Hidden: true, }, &cli.StringFlag{ Name: "statsd_tags_format", diff --git a/cli/runner_options.go b/cli/runner_options.go index 90c743ef..1e3b0ff0 100644 --- a/cli/runner_options.go +++ b/cli/runner_options.go @@ -40,6 +40,10 @@ func WithController(fn controllerFactory) Option { // WithDefaultRPCController is an Option to set Runner controller to default rpc.Controller func WithDefaultRPCController() Option { return WithController(func(m *metrics.Metrics, c *config.Config, l *slog.Logger) (node.Controller, error) { + if c.RPC.Implementation == "none" { + return node.NewNullController(l), nil + } + return rpc.NewController(m, &c.RPC, l) }) } diff --git a/cmd/embedded-cable/main.go b/cmd/embedded-cable/main.go index 6b8310c8..85170dbb 100644 --- a/cmd/embedded-cable/main.go +++ b/cmd/embedded-cable/main.go @@ -21,7 +21,6 @@ func main() { cli.WithDefaultRPCController(), cli.WithDefaultBroker(), cli.WithDefaultSubscriber(), - cli.WithDefaultBroadcaster(), cli.WithTelemetry(), cli.WithLogger(logger), } @@ -56,8 +55,16 @@ func main() { os.Exit(1) } + broadcastHandler, err := anycable.HTTPBroadcastHandler() + + if err != nil { + fmt.Printf("%+v\n", err) + os.Exit(1) + } + http.Handle("/cable", wsHandler) http.Handle("/sse", seeHandler) + http.Handle("/broadcast", broadcastHandler) go http.ListenAndServe(":8080", nil) // nolint:errcheck,gosec diff --git a/common/common.go b/common/common.go index fe06214b..caf0d15b 100644 --- a/common/common.go +++ b/common/common.go @@ -505,3 +505,8 @@ func ConfirmationMessage(identifier string) string { func RejectionMessage(identifier string) string { return string(utils.ToJSON(Reply{Identifier: identifier, Type: RejectedType})) } + +// DisconnectionMessage returns a disconnect message with the specified reason and reconnect flag +func DisconnectionMessage(reason string, reconnect bool) string { + return string(utils.ToJSON(DisconnectMessage{Type: DisconnectType, Reason: reason, Reconnect: reconnect})) +} diff --git a/config/config.go b/config/config.go index efe024fc..479a9d22 100644 --- a/config/config.go +++ b/config/config.go @@ -8,11 +8,11 @@ import ( "github.com/anycable/anycable-go/metrics" nconfig "github.com/anycable/anycable-go/nats" "github.com/anycable/anycable-go/node" - "github.com/anycable/anycable-go/rails" rconfig "github.com/anycable/anycable-go/redis" "github.com/anycable/anycable-go/rpc" "github.com/anycable/anycable-go/server" "github.com/anycable/anycable-go/sse" + "github.com/anycable/anycable-go/streams" "github.com/anycable/anycable-go/ws" nanoid "github.com/matoous/go-nanoid" @@ -21,6 +21,9 @@ import ( // Config contains main application configuration type Config struct { ID string + Secret string + BroadcastKey string + SkipAuth bool App node.Config RPC rpc.Config BrokerAdapter string @@ -47,10 +50,10 @@ type Config struct { Debug bool Metrics metrics.Config JWT identity.JWTConfig - Rails rails.Config EmbedNats bool EmbeddedNats enats.Config SSE sse.Config + Streams streams.Config UserPresets []string } @@ -64,7 +67,7 @@ func NewConfig() Config { Port: 8080, Path: []string{"/cable"}, HealthPath: "/health", - BroadcastAdapter: "redis", + BroadcastAdapter: "http,redis", Broker: broker.NewConfig(), Headers: []string{"cookie"}, LogLevel: "info", @@ -79,10 +82,14 @@ func NewConfig() Config { NATS: nconfig.NewNATSConfig(), DisconnectQueue: node.NewDisconnectQueueConfig(), JWT: identity.NewJWTConfig(""), - Rails: rails.NewConfig(), EmbeddedNats: enats.NewConfig(), SSE: sse.NewConfig(), + Streams: streams.NewConfig(), } return config } + +func (c Config) IsPublic() bool { + return c.SkipAuth && c.Streams.Public +} diff --git a/config/presets_test.go b/config/presets_test.go index 4a94f767..81a28458 100644 --- a/config/presets_test.go +++ b/config/presets_test.go @@ -71,7 +71,7 @@ func TestFlyPresets_When_RedisConfigured(t *testing.T) { assert.Equal(t, "0.0.0.0", config.Host) assert.Equal(t, false, config.EmbedNats) - assert.Equal(t, "redis", config.BroadcastAdapter) + assert.Equal(t, "http,redis", config.BroadcastAdapter) assert.Equal(t, "redis", config.PubSubAdapter) assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr) assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr) diff --git a/docs/Readme.md b/docs/Readme.md index 62ed790e..dcf6e8b5 100644 --- a/docs/Readme.md +++ b/docs/Readme.md @@ -11,6 +11,6 @@ * [Apollo GraphQL](apollo.md) * [Binary formats](binary_formats.md) * [JWT identification](jwt_identification.md) -* [Signed streams (Hotwire, CableReady)](signed_streams.md) +* [Signed streams](signed_streams.md) * [Embedded NATS](embedded_nats.md) * [Using as a library](library.md) diff --git a/docs/broadcasting.md b/docs/broadcasting.md new file mode 100644 index 00000000..52f1ef20 --- /dev/null +++ b/docs/broadcasting.md @@ -0,0 +1,199 @@ +# Broadcasting + +Publishing messages from your application to connected clients (aka _broadcasting_) is an essential component of any real-time application. + +AnyCable comes with multiple options on how to broadcast messages. We call them _broadcasters_. Currently, we support HTTP, Redis, and NATS-based broadcasters. + +**NOTE:** The default broadcaster is Redis Pub/Sub for backward-compatibility reasons. This is going to change in v2. + +## HTTP + +> Enable via `--broadcast_adapter=http` (or `ANYCABLE_BROADCAST_ADAPTER=http`). + +HTTP broadcaster has zero-dependencies and, thus, allows you to quickly start using AnyCable, and it's good enough to keep using it at scale. + +By default, HTTP broadcaster accepts publications as POST requests to the `/_broadcast` path of your server\*. The request body MUST contain the publication payload (see below to learn about [the format](#publication-format)). + +Here is a basic cURL example: + +```bash +curl -X POST -H "Content-Type: application/json" -d '{"stream":"my_stream","data":"{\"text\":\"Hello, world!\"}"}' http://localhost:8090/_broadcast +``` + +\* If neither the broadcast key nor the application secret is specified, we configure HTTP broadcaster to use a different port by default (`:8090`) for security reasons. You can handle broadcast requests at the main AnyCable port by specifying it explicitly (via the `http_broadcast_port` option). If the broadcast key is specified or explicitly set to "none" or auto-generated from the application secret (see below), we run it on the main port. You will see the notice in the startup logs telling you how the HTTP broadcaster endpoint was configured: + +```sh +2024-03-06 10:35:39.297 INF Accept broadcast requests at http://localhost:8090/_broadcast (no authorization) nodeid=uE3mZ7 context=broadcast provider=http + +# OR +2024-03-06 10:35:39.297 INF Accept broadcast requests at http://localhost:8080/_broadcast (authorization required) nodeid=uE3mZ7 context=broadcast provider=http +``` + +### Securing HTTP endpoint + +We automatically secure the HTTP broadcaster endpoint if the application broadcast key (`--broadcast_key`) is specified or inferred\* from the application secret (`--secret`) and the server is not running in the public mode (`--public`). + +Every request MUST include an "Authorization" header with the `Bearer ` value: + +```sh +# Run AnyCable +$ anycable-go --broadcast_key=my-secret-key + +2024-03-06 10:35:39.296 INF Starting AnyCable 1.5.0-a7aa9b4 (with mruby 1.2.0 (2015-11-17)) (pid: 57260, open file limit: 122880, gomaxprocs: 8) nodeid=uE3mZ7 +... +2024-03-06 10:35:39.297 INF Accept broadcast requests at http://localhost:8080/_broadcast (authorization required) nodeid=uE3mZ7 context=broadcast provider=http + +# Broadcast a message +$ curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer my-secret-key" -d '{"stream":"my_stream","data":"{\"text\":\"Hello, world!\"}"}' http://localhost:8080/_broadcast -w "%{http_code}" + +201 +``` + +\* When the broadcast key is missing but the application secret is present, we automatically generate a broadcast key using the following formula (in Ruby): + +```ruby +broadcast_key = OpenSSL::HMAC.hexdigest("SHA256", "", "broadcast-cable") +``` + +When using official AnyCable server libraries, you don't need to calculate it yourself (they all use the same inference mechanism). But if you want to publish broadcasts using a custom implementation, you can generate a broadcast key for your secret key as follows: + +```sh +echo -n 'broadcast-cable' | openssl dgst -sha256 -hmac '' | awk '{print $2}' +``` + +## Redis Pub/Sub + +> Enable via `--broadcast_adapter=redis` (or `ANYCABLE_BROADCAST_ADAPTER=redis`). + +This broadcaster uses Redis [Pub/Sub](https://redis.io/topics/pubsub) feature under the hood, and, thus, publications are delivered to all subscribed AnyCable servers simultaneously. + +All broadcast messages are published to a single channel (configured via the `--redis_channel`, defaults to `__anycable__`) as follows: + +```sh +$ redis-cli PUBLISH __anycable__ '{"stream":"my_stream","data":"{\"text\":\"Hello, world!\"}"}' + +(integer) 1 +``` + +Note that since all AnyCable server receive each publication, we cannot use [broker](./broker.md) to provide stream history support when using Redis Pub/Sub. + +See [configuration](./configuration.md#redis-configuration) for available Redis options. + +## Redis X + +> Enable via `--broadcast_adapter=redisx` (or `ANYCABLE_BROADCAST_ADAPTER=redisx`). + +**IMPORTANT:** Redis v6.2+ is required. + +Redis X broadcaster uses [Redis Streams][redis-streams] instead of Publish/Subscribe to _consume_ publications from your application. That gives us the following benefits: + +- **Broker compatibility**. This broadcaster uses a [broker](/anycable-go/broker.md) to store messages in a cache and distribute them within a cluster. This is possible due to the usage of Redis Streams consumer groups. + +- **Better delivery guarantees**. Even if there is no AnyCable server available at the broadcast time, the message will be stored in Redis and delivered to an AnyCable server once it is available. In combination with the [broker feature](./broker.md), you can achieve at-least-once delivery guarantees (compared to at-most-once provided by Redis Pub/Sub). + +To broadcast a message, you publish it to a dedicated Redis stream (configured via the `--redis_channel` option, defaults to `__anycable__`) with the publication JSON provided as the `payload` field value: + +```sh +$ redis-cli XADD __anycable__ "*" payload '{"stream":"my_stream","data":"{\"text\":\"Hello, world!\"}"}' + +"1709754437079-0" +``` + +See [configuration](./configuration.md#redis-configuration) for available Redis options. + +## NATS Pub/Sub + +> Enable via `--broadcast_adapter=nats` (or `ANYCABLE_BROADCAST_ADAPTER=nats`). + +NATS broadcaster uses [NATS publish/subscribe](https://docs.nats.io/nats-concepts/core-nats/pubsub) functionality and supports cluster features out-of-the-box. It works to Redis Pub/Sub: distribute publications to all subscribed AnyCable servers. Thus, it's incompatible with [broker](./broker.md) (stream history support), too. + +To broadcast a message, you publish it to a NATS stream (configured via the `--nats_channel` option, defaults to `__anycable__`) as follows: + +```sh +$ nats pub __anycable__ '{"stream":"my_stream","data":"{\"text\":\"Hello, world!\"}"}' + +12:03:39 Published 60 bytes to "__anycable__" +``` + +NATS Pub/Sub is useful when you want to set up an AnyCable cluster using our [embedded NATS](./embedded_nats.md) feature, so you can avoid having additional infrastructure components. + +See [configuration](./configuration.md#nats-configuration) for available NATS options. + +## Publication format + +AnyCable accepts broadcast messages encoded as JSON and having the following properties: + +```js +{ + "stream": "", // string + "data": "", // string, usually a JSON-encoded object, but not necessarily + "meta": "{}" // object, publication metadata, optional +} +``` + +It's also possible to publish multiple messages at once. For that, you just send them as an array of publications: + +```js +[ + { + "stream": "...", + "data": "...", + }, + { + "stream": "...", + "data": "..." + } +] +``` + +The `meta` field MAY contain additional instructions for servers on how to deliver the publication. Currently, the following fields are supported: + +- `exclude_socket`: you can specify a unique client identifier (returned by the server in the `welcome` message as `sid`) to remove this client from the list of recipients. + +All other meta fields are ignored for now. + +Here is a JSON Schema describing this format: + +```json +{ + "$schema": "http://json-schema.org/draft-07/schema", + "definitions": { + "publication": { + "type": "object", + "properties": { + "stream": { + "type": "string", + "description": "Publication stream name" + }, + "data": { + "type": "string", + "description": "Payload, usually a JSON-encoded object, but not necessarily" + }, + "meta": { + "type": "object", + "description": "Publication metadata, optional", + "properties": { + "exclude_socket": { + "type": "string", + "description": "Unique client identifier to remove this client from the list of recipients" + } + }, + "additionalProperties": true + } + }, + "required": ["stream", "data"] + } + }, + "anyOf": [ + { + "$ref": "#/definitions/publication" + }, + { + "type": "array", + "items":{"$ref": "#/definitions/publication"} + } + ] +} +``` + +[redis-streams]: https://redis.io/docs/data-types/streams-tutorial/ diff --git a/docs/broker.md b/docs/broker.md index 473e0e5e..3cbe2a88 100644 --- a/docs/broker.md +++ b/docs/broker.md @@ -52,7 +52,7 @@ You can use [AnyCable JS client](https://github.com/anycable/anycable-client) li ## Broadcasting messages -Broker is responsible for **registering broadcast messages**. Each message MUST be registered once; thus, we MUST use a broadcasting method which publishes messages to a single node in a cluster (see [Broadcast adapters](../ruby/broadcast_adapters.md)). Currently, `http` and `redisx` adapters are supported. +Broker is responsible for **registering broadcast messages**. Each message MUST be registered once; thus, we MUST use a broadcasting method which publishes messages to a single node in a cluster (see [broadcasting](./broadcasting.md)). Currently, `http` and `redisx` adapters are supported. **NOTE:** When legacy adapters are used, enabling a broker has no effect. diff --git a/docs/configuration.md b/docs/configuration.md index bab8a1af..c7bbc53b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -22,14 +22,6 @@ Here is the list of the most commonly used configuration parameters. Server host and port (default: `"localhost:8080"`). -**--rpc_host** (`ANYCABLE_RPC_HOST`) - -RPC service address (default: `"localhost:50051"`). - -**--rpc_impl** (`ANYCABLE_RPC_IMPL`) - -RPC implementation to use, HTTP or gRPC (default: inferred from the `rpc_host` value, so no need to specify manually in most cases). See below for more details on [HTTP RPC](#http-rpc). - **--path** (`ANYCABLE_PATH`) WebSocket endpoint path (default: `"/cable"`). @@ -42,14 +34,6 @@ You can also use wildcards (at the end of the paths) or path placeholders: anycable-go --path="/cable,/admin/cable/*,/accounts/{tenant}/cable" ``` -**--headers** (`ANYCABLE_HEADERS`) - -Comma-separated list of headers to proxy to RPC (default: `"cookie"`). - -**--proxy-cookies** (`ANYCABLE_PROXY_COOKIES`) - -Comma-separated list of cookies to proxy to RPC (default: all cookies). - **--allowed_origins** (`ANYCABLE_ALLOWED_ORIGINS`) Comma-separated list of hostnames to check the Origin header against during the WebSocket Upgrade. @@ -57,7 +41,7 @@ Supports wildcards, e.g., `--allowed_origins=*.evilmartians.io,www.evilmartians. **--broadcast_adapter** (`ANYCABLE_BROADCAST_ADAPTER`, default: `redis`) -[Broadcasting adapter](../ruby/broadcast_adapters.md) to use. Available options: `redis` (default), `redisx`, `nats`, and `http`. +[Broadcasting adapter](./broadcasting.md) to use. Available options: `redis` (default), `redisx`, `nats`, and `http`. When HTTP adapter is used, AnyCable-Go accepts broadcasting requests on `:8090/_broadcast`. @@ -71,30 +55,78 @@ You can also enable multiple adapters at once by specifying them separated by co Pub/Sub adapter to use to distribute broadcasted messages within the cluster (when non-distributed broadcasting adapter is used). **Required for broker**. +**--streams_secret** (`ANYCABLE_STREAMS_SECRET`) + +A secret key used to verify [signed_streams](./signed_streams.md). If not set, the `--secret` setting is used (see below). + +## RPC settings + +**--rpc_host** (`ANYCABLE_RPC_HOST`) + +RPC service address (default: `"localhost:50051"`). You can also specify the scheme part to indicate which RPC protocol to use, gRPC or HTTP (gRPC is assumed by default). See below for more details on [HTTP RPC](#http-rpc). + +**--norpc** (`ANYCABLE_NORPC=true`) + +This setting disables the RPC component completely. That means, you can only use AnyCable in a standalone mode (with [JWT authentication](./jwt_identification.md) and [signed streams](./signed_streams.md)). + +**--headers** (`ANYCABLE_HEADERS`) + +Comma-separated list of headers to proxy to RPC (default: `"cookie"`). + +**--proxy-cookies** (`ANYCABLE_PROXY_COOKIES`) + +Comma-separated list of cookies to proxy to RPC (default: all cookies). + +## Security/access settings + +**--secret** (`ANYCABLE_SECRET`) + +A common secret key used by the following components (unless a specific key is specified): [JWT authentication](./jwt_identification.md), [signed streams](./signed_streams.md). + +**--broadcast_key** (`ANYCABLE_BROADCAST_KEY`) + +A secret key used to authenticate broadcast requests. See [broadcasting docs](./broadcasting.md). You can use the special "none" value to disable broadcasting authentication. + +**--noauth** (`ANYCABLE_NOAUTH=true`) + +This setting disables client authentication checks (so, anyone is allowed to connect). Use it with caution. **NOTE**: if you use _enforced_ JWT authentication, the `--noauth` option has no effect. + +**--public_streams** (`ANYCABLE_PUBLIC_STREAMS=true`) + +Setting this value allows direct subscribing to streams using unsigned names (see more in the [signed streams docs](./signed_streams.md)). + +**--public** (`ANYCABLE_PUBLIC=true`) + +This is a shortcut to specify both `--noauth`, `--public_streams` and `--broadcast_key=none`, so you can use AnyCable without any protection. **Do not do this in production**. + +## HTTP API + **--http_broadcast_port** (`ANYCABLE_HTTP_BROADCAST_PORT`, default: `8090`) You can specify on which port to receive broadcasting requests (NOTE: it could be the same port as the main HTTP server listens to). -**--http_broadcast_secret** (`ANYCABLE_HTTP_BROADCAST_SECRET`) - -Authorization secret to protect the broadcasting endpoint (see [Ruby docs](../ruby/broadcast_adapters.md#securing-http-endpoint)). +## Redis configuration **--redis_url** (`ANYCABLE_REDIS_URL` or `REDIS_URL`) -Redis URL for pub/sub (default: `"redis://localhost:6379/5"`). +Redis URL to connect to (default: `"redis://localhost:6379/5"`). Used by the corresponding pub/sub, broadcasting, and broker adapters. **--redis_channel** (`ANYCABLE_REDIS_CHANNEL`) Redis channel for broadcasting (default: `"__anycable__"`). When using the `redisx` adapter, it's used as a name of the Redis stream. +## NATS configuration + **--nats_servers** (`ANYCABLE_NATS_SERVERS`) -The list of [NATS][] servers to connect to (default: `"nats://localhost:4222"`). +The list of [NATS][] servers to connect to (default: `"nats://localhost:4222"`). Used by the corresponding pub/sub, broadcasting, and broker adapters. **--nats_channel** (`ANYCABLE_NATS_CHANNEL`) NATS channel for broadcasting (default: `"__anycable__"`). +## Logging settings + **--log_level** (`ANYCABLE_LOG_LEVEL`) Logging level (default: `"info"`). @@ -103,14 +135,6 @@ Logging level (default: `"info"`). Enable debug mode (more verbose logging). -**--shutdown_timeout** (`ANYCABLE_SHUTDOWN_TIMEOUT`) - -The number of seconds to wait for the server to shutdown gracefully, i.e., disconnect all active sessions and perform the corresponding Disconnect RPC calls (see below). Default: 30. - -**--pong_timeout** (`ANYCABLE_PONG_TIMEOUT`) - -For clients using the [extended Action Cable protocol](../misc/action_cable_protocol.md#action-cable-extended-protocol), the number of seconds to wait for the client to respond to the PING message with the PONG command. The default value is zero, meaning that no PONGs are expected. The recommended value to activate this feature is 10 seconds. Requiring pongs helps to detect broken connections faster. - ## Presets AnyCable-Go comes with a few built-in configuration presets for particular deployments environments, such as Heroku or Fly. The presets are detected and activated automatically. As an indication, you can find a line in the logs: @@ -166,17 +190,6 @@ For example, using the following URL, you can set the ping interval to 10 second ws://localhost:8080/cable?pi=10&ptp=ms ``` -## HTTP RPC - -When using HTTP RPC, you can specify the following additional options: - -- `http_rpc_secret`: a secret token used to authenticate RPC requests. -- `http_rpc_timeout`: timeout for RPC requests (default: 3s). - -You MUST use `rpc_host` configuration option to provide the URL for HTTP RPC, e.g.: `https://my.web.app/anycable`. - -Please, refer to the [RPC over](../ruby/http_rpc.md) documentation for more information about this communication mode. - ## TLS To secure your `anycable-go` server provide the paths to SSL certificate and private key: @@ -193,45 +206,6 @@ If RPC server uses certificate issued by private CA, then you can pass either it If RPC uses self-signed certificate, you can disable RPC server certificate verification by setting `--rpc_tls_verify` (`ANYCABLE_RPC_TLS_VERIFY`) to `false`, but this is insecure, use only in test/development. -## Concurrency settings - -AnyCable-Go uses a single Go gRPC client\* to communicate with AnyCable RPC servers (see [the corresponding PR](https://github.com/anycable/anycable-go/pull/88)). We limit the number of concurrent RPC calls to avoid flooding servers (and getting `ResourceExhausted` exceptions in response). - -\* A single _client_ doesn't necessary mean a single connection; a Go gRPC client could maintain multiple HTTP2 connections, for example, when using [DNS-based load balancing](../deployment/load_balancing). - -We limit the number of concurrent RPC calls at the application level (to prevent RPC servers overload). By default, the concurrency limit is equal to **28**, which is intentionally less than the default RPC size (see [Ruby configuration](../ruby/configuration.md#concurrency-settings)): there is a tiny lag between the times when the response is received by the client and the corresponding worker is returned to the pool. Thus, whenever you update the concurrency settings, make sure that the AnyCable-Go value is _slightly less_ than the AnyCable-RPC one. - -You can change this value via `--rpc_concurrency` (`ANYCABLE_RPC_CONCURRENCY`) parameter. - -## Adaptive concurrency - -

- -AnyCable-Go Pro provides the **adaptive concurrency** feature. When it is enabled, AnyCable-Go automatically adjusts its RPC concurrency limit depending on the two factors: the number of `ResourceExhausted` errors (indicating that the current concurrency limit is greater than RPC servers capacity) and the number of pending RPC calls (indicating the current concurrency is too small to process incoming messages). The first factor (exhausted errors) has a priority (so if we have both a huge backlog and a large number of errors we decrease the concurrency limit). - -You can enable the adaptive concurrency by specifying 0 as the `--rpc_concurrency` value: - -```sh -$ anycable-go --rpc_concurrency=0 - -... - -INFO 2023-02-23T15:26:13.649Z context=rpc RPC controller initialized: \ - localhost:50051 (concurrency: auto (initial=25, min=5, max=100), enable_tls: false, proto_versions: v1) -``` - -You should see the `(concurrency: auto (...))` in the logs. You can also specify the upper and lower bounds for concurrency via the following parameters: - -```sh -$ anycable-go \ - --rpc_concurrency=0 \ - --rpc_concurrency_initial=30 \ - --rpc_concurrency_max=50 \ - --rpc_concurrency_min=5 -``` - -You can also monitor the current concurrency value via the `rpc_capacity_num` metrics. - ## Disconnect settings AnyCable-Go notifies an RPC server about disconnected clients asynchronously with a rate limit. We do that to allow other RPC calls to have higher priority (because _live_ clients are usually more important) and to avoid load spikes during mass disconnects (i.e., when a server restarts). diff --git a/docs/getting_started.md b/docs/getting_started.md index d7c8b08d..ca38d492 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -1,6 +1,8 @@ -# Getting Started with AnyCable-Go +# Getting Started with AnyCable -AnyCable-Go is a WebSocket server for AnyCable written in Golang. +AnyCable is a language-agnostic real-time server focused on performance and reliability written in Go. + +> The quickest way to get AnyCable is to use our managed (and free) solution: [plus.anycable.io](https://plus.anycable.io) ## Installation @@ -10,19 +12,10 @@ MacOS users could install it with [Homebrew](https://brew.sh/) ```sh brew install anycable-go - -# or use --HEAD option for edge versions -brew install anycable-go --HEAD ``` Arch Linux users can install [anycable-go package from AUR](https://aur.archlinux.org/packages/anycable-go/). -Of course, you can install it from source too: - -```sh -go get -u -f github.com/anycable/anycable-go/cmd/anycable-go -``` - ### Via NPM For JavaScript projects, there is also an option to install AnyCable-Go via NPM: @@ -36,18 +29,99 @@ yarn add --dev @anycable/anycable-go npx anycable-go ``` -**NOTE:** The version of the NPM package is the same as the version of the AnyCable-Go binary (which is downloaded automatically on the first run). +**NOTE:** The version of the NPM package is the same as the version of the AnyCable server binary (which is downloaded automatically on the first run). ## Usage -Run server: +After installation, you can run AnyCable as follows: ```sh $ anycable-go -=> INFO time context=main Starting AnyCable v1.2.1 (pid: 12902, open files limit: 524288, gomaxprocs: 4) +2024-03-06 13:38:07.545 INF Starting AnyCable 1.5.0-4f16b99 (with mruby 1.2.0 (2015-11-17)) (pid: 8289, open file limit: 122880, gomaxprocs: 8) nodeid=hj2mXN +... +2024-03-06 13:38:56.490 INF RPC controller initialized: localhost:50051 (concurrency: 28, impl: grpc, enable_tls: false, proto_versions: v1) nodeid=FlCtwf context=rpc +``` + +By default, AnyCable tries to connect to a gRPC server listening at `localhost:50051` (the default host for the Ruby gem). + +AnyCable is designed as a logic-less proxy for your real-time features relying on a backend server to authenticate connections, authorize subscriptions and process incoming messages. That's why our default configuration assumes having an RPC server to handle all this logic. + +You can read more about AnyCable RPC in the [corresponding documentation](./rpc.md). + +### Standalone mode (pub/sub only) + +For pure pub/sub functionality, you can use AnyCable in a standalone mode, without any RPC servers. For that, you must configure the following features: + +- [JWT authentication](./jwt_identification.md) or disable authentication completely (`--noauth`). **NOTE:** You can still add minimal protection via the `--allowed_origins` option (see [configuration](./configuration.md#primary-settings)). + +- Enable [signed streams](./signed_streams.md) or allow public streams via the `--public_streams` option. + +There is also a shortcut option `--public` to enable both `--noauth` and `--public_streams` options. **Use it with caution**. + +You can also explicitly disable the RPC component by specifying the `--norpc` option. + +Thus, to run AnyCable real-time server in an insecure standalone mode, use the following command: + +```sh +$ anycable-go --public + +2024-03-06 14:00:12.549 INF Starting AnyCable 1.5.0-4f16b99 (with mruby 1.2.0 (2015-11-17)) (pid: 17817, open file limit: 122880, gomaxprocs: 8) nodeid=wAhWDB +2024-03-06 14:00:12.549 WRN Server is running in the public mode nodeid=wAhWDB +... ``` -By default, `anycable-go` tries to connect to an RPC server listening at `localhost:50051` (the default host for the Ruby gem). You can change this setting by providing `--rpc_host` option or `ANYCABLE_RPC_HOST` env variable (read more about [configuration](./configuration.md)). +To secure access to AnyCable server, specify either the `--jwt_secret` or `--streams_secret` option. There is also the `--secret` shortcut: + +```sh +anycable-go --secret=VERY_SECRET_VALUE --norpc +``` + +Read more about pub/sub mode in the [signed streams documentation](./signed_streams.md). + +### Connecting to AnyCable + +AnyCable uses the [Action Cable protocol][protocol] for client-server communication. We recommend using our official [JavaScript client library][anycable-client] for all JavaScript/TypeScript runtimes: + +```js +import { createCable } from '@anycable/web' + +const cable = createCable(CABLE_URL) + +const subscription = cable.subscribeTo('ChatChannel', { roomId: '42' }) + +const _ = await subscription.perform('speak', { msg: 'Hello' }) + +subscription.on('message', msg => { + if (msg.type === 'typing') { + console.log(`User ${msg.name} is typing`) + } else { + console.log(`${msg.name}: ${msg.text}`) + } +}) +``` + +**Note**: The snippet above assumes having a "ChatChannel" defined in your application (which is connected to AnyCable via RPC). + +You can also use: + +- Third-party Action Cable-compatible clients. + +- EventSource (Server-Sent Events) connections ([more info](./sse.md)). + +- Custom WebSocket clients following the [Action Cable protocol][protocol]. + +AnyCable Pro also supports: + +- Apollo GraphQL WebSocket clients ([more info](./apollo.md)) + +- HTTP streaming (long-polling) ([more info](./long_polling.md)) + +- OCPP WebSocket clients ([more info](./ocpp.md)) + +### Broadcasting messages + +Finally, to broadcast messages to connected clients via the name pub/sub streams, you can use one of the provided [broadcast adapters](./broadcasting.md). -All other configuration parameters have the same default values as the corresponding parameters for the AnyCable RPC server, so you don't need to change them usually. +[anycable-client]: https://github.com/anycable/anycable-client +[protocol]: ../misc/action_cable_protocol.md diff --git a/docs/jwt_identification.md b/docs/jwt_identification.md index 8a8d10fe..deb54ce3 100644 --- a/docs/jwt_identification.md +++ b/docs/jwt_identification.md @@ -1,8 +1,8 @@ -# JWT identification +# JWT authentication AnyCable provides support for [JWT][jwt]-based authentication and identification. -You can pass a properly structured token along the connection request to authorize the connection and set up _identifiers_ (in terms of Action Cable). This approach brings the following benefits: +We use the term "identification", because you can also pass a properly structured information as a part of the token to not only authentication the connection but also set up _identifiers_ (in terms of Action Cable). This approach brings the following benefits: - **Performance**. No RPC call is required during the connection initiation, since we already have identification information. Thus, less load on the RPC server, much faster connection time (at least, 2x faster). - **Usability**. Universal way of dealing with credentials (no need to deal with cookies for web and whatever else for mobile apps). @@ -14,11 +14,12 @@ You can pass a properly structured token along the connection request to authori **NOTE**: Currently, we only support the HMAC signing algorithms. -First, you must enable JWT identification support in `anycable-go` by configuring the following params: +By default, the `--secret` configuration parameter is used as a JWT secret key. If you want to use a custom key for JWT, you can specify it via the `--jwt_secret` (`ANYCABLE_JWT_SECRET`) parameter. -- **--jwt_id_key** (`ANYCABLE_JWT_ID_KEY`): the encryption key used to verify tokens. -- (_Optional_) **--jwt_id_param** (`ANYCABLE_JWT_ID_PARAM`): the name of a query string param or an HTTP header, which carries a token. The header name is prefixed with `X-`. Default: `jid` (and the `X-JID` header correspondingly). -- (_Optional_) **--jwt_id_enforce** (`ANYCABLE_JWT_ID_ENFORCE`): whether to require all connection requests to contain a token. Connections without a token would be rejected right away. If not set, the servers fallbacks to the RPC call (as w/o JWT enabled). Default: false. +Other configuration options are: + +- (_Optional_) **--jwt_param** (`ANYCABLE_ID_PARAM`, default: "jid"): the name of a query string param or an HTTP header, which carries a token. The header name is prefixed with `X-`. +- (_Optional_) **--enforce_jwt** (`ANYCABLE_ENFORCE_JWT`, default: false): whether to require all connection requests to contain a token. Connections without a token would be rejected right away. If not set, the servers fallbacks to the RPC call (if RPC is configured) or would be accepted if authentication is disabled (`--noauth`). A client must provide an identification token either via a query param or via an HTTP header (if possible). For example: diff --git a/docs/pubsub.md b/docs/pubsub.md index d7ac6df0..b5a30b32 100644 --- a/docs/pubsub.md +++ b/docs/pubsub.md @@ -13,7 +13,7 @@ Although, we do not plan to sunset legacy, distributed adapters in the nearest f By default, pub/sub is disabled (since the default broadcast adapter is legacy, fan-out Redis). To enable the pub/sub layer, you must provide the name of the provider via the `--pubsub` option. -You also need to enable a compatible broadcasting adapter. See [Broadcast adapters](/ruby/broadcast_adapters.md). +You also need to enable a compatible broadcasting adapter. See [broadcasting](./broadcasting.md). **NOTE**: It's safe to enable `--pubsub` even if you're still using legacy broadcasting adapters (they do not pass messages through the pub/sub layer). diff --git a/docs/reliable_streams.md b/docs/reliable_streams.md index 0d8c956a..39586464 100644 --- a/docs/reliable_streams.md +++ b/docs/reliable_streams.md @@ -70,7 +70,7 @@ INFO 2023-07-04T02:00:24.386Z consumer=s2IbkM context=broadcast id=s2IbkM provid ... ``` -See [Broadcast adapters](/ruby/broadcast_adapters.md) for more information. +See [broadcasting documentation](./broadcasting.md) for more information. Finally, to re-transmit _registered_ messages within a cluster, you MUST also configure a pub/sub adapter (via the `--pubsub` option). The command will look as follows: diff --git a/docs/rpc.md b/docs/rpc.md new file mode 100644 index 00000000..812ff377 --- /dev/null +++ b/docs/rpc.md @@ -0,0 +1,113 @@ +# AnyCable RPC + +AnyCable allows you to control all the real-time communication logic from your backend application. For that, AnyCable uses a _remote procedure call_ (RPC) mechanism to delegate handling of connection lifecycle events and processing of incoming messages (subscriptions, arbitrary actions). + +Using RPC is required if you design your real-time logic using _Channels_ (like in Rails Action Cable). For primitive pub/sub, you can run AnyCable in a [standalone mode](./getting_started.md#standalone-mode-pubsub-only), i.e., without RPC. + +## RPC over gRPC + +AnyCable is built for performance. Hence, it defaults to gRPC as a transport/protocol for RPC communication. + +By default, AnyCable tries to connect to a gRPC server at `localhost:50051`: + +```sh +$ anycable-go + +2024-03-06 14:09:23.532 INF Starting AnyCable 1.5.0-4f16b99 (with mruby 1.2.0 (2015-11-17)) (pid: 21540, open file limit: 122880, gomaxprocs: 8) nodeid=6VV3mO +... +2024-03-06 14:09:23.533 INF RPC controller initialized: localhost:50051 (concurrency: 28, impl: grpc, enable_tls: false, proto_versions: v1) nodeid=6VV3mO context=rpc +``` + +You can change this setting by providing `--rpc_host` option or `ANYCABLE_RPC_HOST` env variable. + +[AnyCable Ruby][anycable-ruby] library comes with AnyCable gRPC server out-of-the-box. + +For other platforms, you can use definitions for the AnyCable gRPC service ([rpc.proto][proto]) to write your custom RPC server. + +## RPC over HTTP + +AnyCable also supports RPC communication over HTTP. It's a good alternative if you don't want to deal with separate gRPC servers or you are using a platform that doesn't support gRPC (e.g., Heroku, Google Cloud Run). + +To connect to an HTTP RPC server, you must specify the `--rpc_host` (or `ANYCABLE_RPC_HOST`) with the explicit `http://` (or `https://`) scheme: + +```sh +$ anycable-go --rpc_host=http://localhost:3000/_anycable + +2024-03-06 14:21:37.231 INF Starting AnyCable 1.5.0-4f16b99 (with mruby 1.2.0 (2015-11-17)) (pid: 26540, open file limit: 122880, gomaxprocs: 8) nodeid=VkaKtV +... +2024-03-06 14:21:37.232 INF RPC controller initialized: http://localhost:3000/_anycable (concurrency: 28, impl: http, enable_tls: false, proto_versions: v1) nodeid=VkaKtV context=rpc +``` + +[AnyCable Ruby][anycable-ruby] library allows you to mount AnyCable HTTP RPC right into your Rack-compatible web server. + +[AnyCable JS][anycable-server-js] provides HTTP handlers for processing HTTP RPC requests. + +For other platforms, check out our Open API specification with examples on how to implement AnyCable HTTP RPC endpoint yourself: [anycable.spotlight.io](https://anycable.stoplight.io). + +### Configuration and security + +If HTTP RPC endpoint is open to public (which is usually the case, since HTTP RPC is often embedded into the main application web server), it MUST be protected from unauthorized access. + +AnyCable can be configured to pass an authentication key along RPC requests in the `Authorization: Bearer ` header. + +You can either configure the RPC server key explicitly via the `--http_rpc_secret` (or `ANYCABLE_HTTP_RPC_SECRET`) parameter or use the application secret (`--secret`) to generate one using the following formula (in Ruby): + +```ruby +rpc_secret_key = OpenSSL::HMAC.hexdigest("SHA256", "", "rpc-cable") +``` + +Alternatively, using `openssl`: + +```sh +echo -n 'rpc-cable' | openssl dgst -sha256 -hmac '' | awk '{print $2}' +``` + +If you use official AnyCable libraries at the RPC server side, you don't need to worry about these details yourself (the shared application secret is used to generate tokens at both sides). Just make sure both sides share the same application or HTTP RPC secret. + +Other available configuration options: + +- `http_rpc_timeout`: timeout for HTTP RPC requests (default: 3s). + +## Concurrency settings + +AnyCable uses a single Go gRPC client\* to communicate with AnyCable RPC servers (see [the corresponding PR](https://github.com/anycable/anycable-go/pull/88)). We limit the number of concurrent RPC calls to avoid flooding servers (and getting `ResourceExhausted` exceptions in response). + +\* A single _client_ doesn't necessary mean a single connection; a Go gRPC client could maintain multiple HTTP2 connections, for example, when using [DNS-based load balancing](../deployment/load_balancing). + +We limit the number of concurrent RPC calls at the application level (to prevent RPC servers overload). By default, the concurrency limit is equal to **28**, which is intentionally less than the default RPC pool size of **30** (for example, in Ruby gRPC server implementation): there is a tiny lag between the times when the response is received by the client and the corresponding worker is returned to the pool. Thus, whenever you update the concurrency settings, make sure that the AnyCable value is _slightly less_ than one we use by default for AnyCable Ruby gRPC server. + +You can change this value via `--rpc_concurrency` (`ANYCABLE_RPC_CONCURRENCY`) parameter. + +### Adaptive concurrency + +

+ +AnyCable Pro provides the **adaptive concurrency** feature. When it is enabled, AnyCable automatically adjusts its RPC concurrency limit depending on the two factors: the number of `ResourceExhausted` errors (indicating that the current concurrency limit is greater than RPC servers capacity) and the number of pending RPC calls (indicating the current concurrency is too small to process incoming messages). The first factor (exhausted errors) has a priority (so if we have both a huge backlog and a large number of errors we decrease the concurrency limit). + +You can enable the adaptive concurrency by specifying 0 as the `--rpc_concurrency` value: + +```sh +$ anycable-go --rpc_concurrency=0 + +... + +2024-03-06 14:21:37.232 INF RPC controller initialized: \ + localhost:50051 (concurrency: auto (initial=25, min=5, max=100), enable_tls: false, proto_versions: v1) \ + nodeid=VkaKtV context=rpc +``` + +You should see the `(concurrency: auto (...))` in the logs. You can also specify the upper and lower bounds for concurrency via the following parameters: + +```sh +$ anycable-go \ + --rpc_concurrency=0 \ + --rpc_concurrency_initial=30 \ + --rpc_concurrency_max=50 \ + --rpc_concurrency_min=5 +``` + +You can also monitor the current concurrency value via the `rpc_capacity_num` metrics. Read more about [AnyCable instrumentation](./instrumentation.md). + +[proto]: ../misc/rpc_proto.md +[anycable-ruby]: https://github.com/anycable/anycable +[anycable-server-js]: https://github.com/anycable/anycable-serverless-js diff --git a/docs/signed_streams.md b/docs/signed_streams.md index b30824a2..41af76a6 100644 --- a/docs/signed_streams.md +++ b/docs/signed_streams.md @@ -1,74 +1,171 @@ -# Speedy Hotwire and CableReady streams +# Signed streams -AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WS server without _touching_ your Ruby/Rails application (i.e., without performing RPC calls). Thus, you can make subscriptions blazingly fast and reduce the load on the RPC server. +AnyCable allows you to subscribe to _streams_ without using _channels_ (in Action Cable terminology). Channels is a great way to encapsulate business-logic for a given real-time feature, but in many cases all we need is a good old explicit pub/sub. That's where the **signed streams** feature comes into play. -In combination with [JWT identification](./jwt_identification.md), this feature makes it possible to avoid running RPC server at all in case you only need Hotwire/CableReady functionality. +> You read more about the Action Cable abstract design, how it compares to direct pub/sub and what are the pros and cons from this [Any Cables Monthly issue](https://anycable.substack.com/p/any-cables-monthly-18). Don't forget to subscribe! -> 🎥 Check out this [AnyCasts episode](https://anycable.io/blog/anycasts-rails-7-hotwire-and-anycable/) to learn how to use AnyCable with Hotwire Rails application in a RPC-less way. +Signed streams work as follows: + +- Given a stream name, say, "chat/2024", you generate its signed version using a **secret key** (see below on the signing algorithm) + +- On the client side, you subscribe to the "$pubsub" channel and provide the signed stream name as a `signed_stream_name` parameter -## Usage with Hotwire / Turbo Streams +- AnyCable process the subscribe command, verifies the stream name and completes the subscription (if verified). -We assume that you use an Action Cable integration provided by the [turbo-rails][] gem. +For verification, you MUST provide the **secret key** via the `--streams_secret` (`ANYCABLE_STREAMS_SECRET`) parameter for AnyCable. -Whenever you use the `#turbo_stream_from` helper, Rails generates a _signed stream name_ to pass along the subscription request. -In order to _verify_ it at the AnyCable Go side, we need to know the encryption key, so it must be the same for the Rails app and for the AnyCable server. +## Full-stack example: Rails -Specify the verifier key in the Rails config: +Let's consider an example of using signed stream in a Rails application. + +Assume that we want to subscribe a user with ID=17 to their personal notifications channel, "notifications/17". + +First, we need to generate a signed stream name: ```ruby -# config/environments/production.rb -config.turbo.signed_stream_verifier_key = "s3cЯeT" +signed_name = AnyCable::Streams.signed("notifications/17") ``` -For AnyCable Go, you must specify the same key as the value for the `turbo_rails_key` configuration option (`ANYCABLE_TURBO_RAILS_KEY` env var) to activate the fast lane: +Or you can use the `#signed_stream_name` helper in your views -```sh -anycable-go --turbo_rails_key=s3cЯeT +```erb +
"> -# or -ANYCABLE_TURBO_RAILS_KEY=s3cЯeT anycable-go +
+``` + +By default, AnyCable uses `Rails.application.secret_key_base` to sign streams. We recommend configuring a custom secret though (so you can easily rotate values at both ends, the Rails app and AnyCable servers). You can specify it via the `streams_secret` configuration parameter (in `anycable.yml`, credentials, or environment). + +Then, on the client side, you can subscribe to this stream as follows: + +```js +// using @rails/actioncable +let subscription = consumer.subscriptions.create( + {channel: "$pubsub", signed_stream_name: stream}, + { + received: (msg) => { + // handle notification msg + } + } +) + +// using @anycable/web +let channel = cable.streamFromSigned(stream); +channel.on("message", (msg) => { + // handle notification +}) ``` -You should the following line in the logs at the server start: +Now you can broadcast messages to this stream as usual: + +```ruby +ActionCable.server.broadcast "notifications/#{user.id}", payload +``` + +## Public (unsigned) streams + +Sometimes you may want to skip all the signing ceremony and use plain stream names instead. With AnyCable, you can do that by enabling the `--public_streams` option (or `ANYCABLE_PUBLIC_STREAMS=true`) for the AnyCable server: ```sh -... -INFO 2021-09-14T12:49:34.274Z context=main Using channels router: Turbo::StreamsChannel -... +$ anycable-go --public_streams + +# or +$ ANYCABLE_PUBLIC_STREAMS=true anycable-go ``` -## Usage with CableReady +With public streams enabled, you can subscribe to them as follows: + +```js +// using @rails/actioncable +let subscription = consumer.subscriptions.create( + {channel: "$pubsub", stream_name: "notifications/17"}, + { + received: (msg) => { + // handle notification msg + } + } +) + +// using @anycable/web +let channel = cable.streamFrom("notifications/17"); +channel.on("message", (msg) => { + // handle notification +}) +``` -**NOTE:** This feature requires upcoming CableReady v5. Currently, [preview versions](https://rubygems.org/gems/cable_ready) are available. +## Signing algorithm -Whenever you use the `#stream_from` helper, CableReady generates a _signed stream identifier_ to pass along the subscription request. +We use the same algorithm as Rails uses in its [MessageVerifier](https://api.rubyonrails.org/v7.1.3/classes/ActiveSupport/MessageVerifier.html): -In order to _verify_ it at the AnyCable Go side, we need to know the encryption key, so it must be the same for the Rails app and for the AnyCable server. +1. Encode the stream name by first converting it into a JSON string and then encoding in Base64 format. +1. Calculate a HMAC digest using the SHA256 hash function from the secret and the encoded stream name. +1. Concatenate the encoded stream name, a double dash (`--`), and the digest. -Specify the verifier key in the CableReady config: +Here is the Ruby version of the algorithm: ```ruby -# config/initializers/cable_ready.rb -CableReady.configure do |config| - config.verifier_key = "s3cЯeT" -end +encoded = ::Base64.strict_encode64(JSON.dump(stream_name)) +digest = OpenSSL::HMAC.hexdigest("SHA256", SECRET_KEY, encoded) +signed_stream_name = "#{encoded}--#{digest}" ``` -For AnyCable Go, you must specify the same key as the value for the `cable_ready_key` configuration option (`ANYCABLE_CABLE_READY_KEY` env var) to activate the fast lane: +The JavaScript (Node.js) version: -```sh -anycable-go --cable_ready_key=s3cЯeT +```js +import { createHmac } from 'crypto'; -# or -ANYCABLE_CABLE_READY_KEY=s3cЯeT anycable-go +const encoded = Buffer.from(JSON.stringify(stream_name)).toString('base64'); +const digest = createHmac('sha256', SECRET_KEY).update(encoded).digest('hex'); +const signedStreamName = `${encoded}--${digest}`; ``` -You should the following line in the logs at the server start: +The Python version looks as follows: -```sh -... -INFO 2021-09-14T12:52:59.371Z context=main Using channels router: CableReady::Stream -... +```python +import base64 +import json +import hmac +import hashlib + +encoded = base64.b64encode(json.dumps(stream_name).encode('utf-8')).decode('utf-8') +digest = hmac.new(SECRET_KEY.encode('utf-8'), encoded.encode('utf-8'), hashlib.sha256).hexdigest() +signed_stream_name = f"{encoded}--{digest}" +``` + +The PHP version is as follows: + +```php +$encoded = base64_encode(json_encode($stream_name)); +$digest = hash_hmac('sha256', $encoded, $SECRET_KEY); +$signed_stream_name = $encoded . '--' . $digest; +``` + +## Hotwire and CableReady support + +AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions). + +In combination with [JWT authentication](./jwt_identification.md), this feature makes it possible to avoid run AnyCable in a standalone mode for Hotwire/CableReady applications. + +> 🎥 Check out this [AnyCasts episode](https://anycable.io/blog/anycasts-rails-7-hotwire-and-anycable/) to learn how to use AnyCable with Hotwire Rails application in a RPC-less way. + +You must explicitly enable Turbo Streams or CableReady signed streams support at the AnyCable server side by specifying the `--turbo_streams` (`ANYCABLE_TURBO_STREAMS=true`) or `--cable_ready_streams` (`ANYCABLE_CABLE_READY_STREAMS=true`) option respectively. + +You must also provide the `--streams_secret` corresponding to the secret you use for Turbo/CableReady. You can configure them in your Rails application as follows: + +```ruby +# Turbo configuration + +# config/environments/production.rb +config.turbo.signed_stream_verifier_key = "" + +# CableReady configuration + +# config/initializers/cable_ready.rb +CableReady.configure do |config| + config.verifier_key = "" +end ``` -[turbo-rails]: https://github.com/hotwired/turbo-rails +You can also specify custom secrets for Turbo Streams and CableReady via the `--turbo_streams_secret` and `--cable_ready_secret` parameters respectively. diff --git a/features/sse.testfile b/features/sse.testfile index 3a787f41..917c699f 100644 --- a/features/sse.testfile +++ b/features/sse.testfile @@ -1,5 +1,5 @@ launch :anycable, - "./dist/anycable-go --sse --turbo_rails_cleartext --jwt_id_key=qwerty --broadcast_adapter=http --presets=broker" + "./dist/anycable-go --sse --public_streams --secret=qwerty --broadcast_adapter=http --presets=broker" wait_tcp 8080 @@ -12,15 +12,21 @@ require "uri" require "net/http" require "fiber" -identifier = URI.encode_www_form_component({channel: "Turbo::StreamsChannel", signed_stream_name: stream_name}.to_json) +identifier = URI.encode_www_form_component({channel: "$pubsub", stream_name: stream_name}.to_json) url = "http://localhost:8080/events?jid=#{token}&identifier=#{identifier}" Event = Struct.new(:type, :data, :id, :retry) +# echo -n 'broadcast-cable' | openssl dgst -sha256 -hmac 'qwerty' | awk '{print $2}' +BROADCAST_KEY = "42923a28b760e667fc92f7c6123bb07a282822b329dd2ef48e7aee7830d98485" + def broadcast(stream, data) - uri = URI.parse("http://localhost:8090/_broadcast") - header = {"Content-Type": "application/json"} + uri = URI.parse("http://localhost:8080/_broadcast") + header = { + "Content-Type": "application/json", + "Authorization": "Bearer #{BROADCAST_KEY}" + } data = {stream: stream, data: data.to_json} http = Net::HTTP.new(uri.host, uri.port) request = Net::HTTP::Post.new(uri.request_uri, header) diff --git a/features/standalone.testfile b/features/standalone.testfile new file mode 100644 index 00000000..f9c19a61 --- /dev/null +++ b/features/standalone.testfile @@ -0,0 +1,73 @@ +launch :anycable, + "./dist/anycable-go --secret=s3Krit --norpc" + +wait_tcp 8080 + +payload = {ext: {}.to_json, exp: (Time.now.to_i + 60)} + +token = ::JWT.encode(payload, "s3Krit", "HS256") + +verifier = ActiveSupport::MessageVerifier.new("s3Krit", digest: "SHA256", serializer: JSON) +signed_stream_name = verifier.generate("chat/2023") + +# Authenticated client + subscription +scenario = [ + { + client: { + protocol: "action_cable", + name: "turbo", + connection_options: { + query: { + jid: token + } + }, + actions: [ + { + subscribe: { + channel: "$pubsub", + params: { + signed_stream_name: signed_stream_name + } + } + }, + ] + } + } +] + +TEST_COMMAND = <<~CMD + bundle exec wsdirector ws://localhost:8080/cable -i #{scenario.to_json} +CMD + +run :wsdirector, TEST_COMMAND + +result = stdout(:wsdirector) + +unless result.include?("1 clients, 0 failures") + fail "Unexpected scenario result:\n#{result}" +end + +# Unauthenticated client +scenario = [ + { + receive: { + data: { + type: "disconnect", + reason: "unauthorized", + reconnect: false + } + } + } +] + +TEST_COMMAND = <<~CMD + bundle exec wsdirector ws://localhost:8080/cable -i #{scenario.to_json} +CMD + +run :wsdirector, TEST_COMMAND + +result = stdout(:wsdirector) + +unless result.include?("1 clients, 0 failures") + fail "Unexpected scenario result:\n#{result}" +end diff --git a/features/cable_ready.testfile b/features/streams.testfile similarity index 70% rename from features/cable_ready.testfile rename to features/streams.testfile index 4d48ec35..4ceebfbb 100644 --- a/features/cable_ready.testfile +++ b/features/streams.testfile @@ -1,30 +1,21 @@ launch :anycable, - "./dist/anycable-go --cable_ready_cleartext --jwt_id_key=qwerty " \ + "./dist/anycable-go --public " \ "--metrics_rotate_interval=1 --metrics_log --metrics_log_filter=rpc_call_total,rpc_error_total,rpc_retries_total", capture_output: true wait_tcp 8080 -payload = {ext: {}.to_json, exp: (Time.now.to_i + 60)} - -token = ::JWT.encode(payload, "qwerty", "HS256") - scenario = [ { client: { protocol: "action_cable", - name: "turbo", - connection_options: { - query: { - jid: token - } - }, + name: "streamer", actions: [ { subscribe: { - channel: "CableReady::Stream", + channel: "$pubsub", params: { - signed_stream_id: "stream/2023" + stream_name: "stream/2023" } } }, diff --git a/forspell.dict b/forspell.dict index fcb3ce8f..6b882f6b 100644 --- a/forspell.dict +++ b/forspell.dict @@ -64,3 +64,6 @@ resumable ack caniuse reconnection +norpc +noauth +jid diff --git a/identity/identity.go b/identity/identity.go index 21c55cca..44589838 100644 --- a/identity/identity.go +++ b/identity/identity.go @@ -17,6 +17,28 @@ type Identifier interface { Identify(sid string, env *common.SessionEnv) (*common.ConnectResult, error) } +type IdentifierPipeline struct { + identifiers []Identifier +} + +var _ Identifier = (*IdentifierPipeline)(nil) + +func NewIdentifierPipeline(identifiers ...Identifier) *IdentifierPipeline { + return &IdentifierPipeline{identifiers} +} + +func (p *IdentifierPipeline) Identify(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { + for _, i := range p.identifiers { + res, err := i.Identify(sid, env) + + if err != nil || res != nil { + return res, err + } + } + + return nil, nil +} + type IdentifiableController struct { controller node.Controller identifier Identifier diff --git a/identity/identity_test.go b/identity/identity_test.go index 0ac96d1f..37f5defb 100644 --- a/identity/identity_test.go +++ b/identity/identity_test.go @@ -135,3 +135,41 @@ func TestIdentifiableController(t *testing.T) { controller.AssertCalled(t, "Disconnect", "42", env, "name=jack", []string{"chat"}) }) } + +func TestIdentifierPipeline(t *testing.T) { + mocked := mocks.Identifier{} + pub := NewPublicIdentifier() + + pipe := NewIdentifierPipeline(&mocked, pub) + + env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) + + t.Run("first identifier success", func(t *testing.T) { + mocked.On("Identify", "mock-ok", env).Return( + &common.ConnectResult{Identifier: "mock_welcome"}, + nil, + ) + + res, err := pipe.Identify("mock-ok", env) + + assert.NoError(t, err) + assert.Equal(t, "mock_welcome", res.Identifier) + }) + + t.Run("second identifier success", func(t *testing.T) { + mocked.On("Identify", "mock-nok", env).Return(nil, nil) + + res, err := pipe.Identify("mock-nok", env) + + assert.NoError(t, err) + assert.Equal(t, `{"sid":"mock-nok"}`, res.Identifier) + }) + + t.Run("first identifier fail", func(t *testing.T) { + mocked.On("Identify", "mock-err", env).Return(nil, errors.New("failed")) + + _, err := pipe.Identify("mock-err", env) + + assert.Error(t, err) + }) +} diff --git a/identity/public.go b/identity/public.go new file mode 100644 index 00000000..bef1ed31 --- /dev/null +++ b/identity/public.go @@ -0,0 +1,29 @@ +package identity + +import ( + "fmt" + + "github.com/anycable/anycable-go/common" +) + +// PublicIdentifier identifies all clients and use their sid as the only identifier +type PublicIdentifier struct { +} + +var _ Identifier = (*PublicIdentifier)(nil) + +func NewPublicIdentifier() *PublicIdentifier { + return &PublicIdentifier{} +} + +func (pi *PublicIdentifier) Identify(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { + return &common.ConnectResult{ + Identifier: publicIdentifiers(sid), + Transmissions: []string{actionCableWelcomeMessage(sid)}, + Status: common.SUCCESS, + }, nil +} + +func publicIdentifiers(sid string) string { + return fmt.Sprintf(`{"sid":"%s"}`, sid) +} diff --git a/node/controller.go b/node/controller.go index d93136f5..6123862c 100644 --- a/node/controller.go +++ b/node/controller.go @@ -1,14 +1,67 @@ package node -import "github.com/anycable/anycable-go/common" +import ( + "errors" + "log/slog" + + "github.com/anycable/anycable-go/common" +) // Controller is an interface describing business-logic handler (e.g. RPC) type Controller interface { Start() error Shutdown() error Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) - Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) - Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) - Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) - Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error + Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error) + Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error) + Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error) + Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error +} + +type NullController struct { + log *slog.Logger +} + +func NewNullController(l *slog.Logger) *NullController { + return &NullController{l.With("context", "rpc", "impl", "null")} +} + +func (c *NullController) Start() (err error) { + c.log.Info("no RPC configured") + + return +} + +func (c *NullController) Shutdown() (err error) { return } + +func (c *NullController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { + c.log.Debug("reject connection") + + return &common.ConnectResult{ + Status: common.FAILURE, + Transmissions: []string{common.DisconnectionMessage(common.UNAUTHORIZED_REASON, false)}, + DisconnectInterest: -1, + }, nil +} + +func (c *NullController) Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error) { + c.log.Debug("reject subscription", "channel", channel) + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(channel)}, + DisconnectInterest: -1, + }, nil +} + +func (c *NullController) Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error) { + return nil, errors.New("not implemented") +} + +func (c *NullController) Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error) { + return nil, errors.New("not implemented") +} + +func (c *NullController) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error { + return nil } diff --git a/rails/cable_ready.go b/rails/cable_ready.go deleted file mode 100644 index ed056be1..00000000 --- a/rails/cable_ready.go +++ /dev/null @@ -1,116 +0,0 @@ -package rails - -import ( - "encoding/json" - "log/slog" - - "github.com/anycable/anycable-go/common" - "github.com/anycable/anycable-go/node" - "github.com/anycable/anycable-go/utils" -) - -type CableReadyController struct { - verifier *utils.MessageVerifier - log *slog.Logger -} - -var _ node.Controller = (*CableReadyController)(nil) - -func NewCableReadyController(key string, l *slog.Logger) *CableReadyController { - var verifier *utils.MessageVerifier - - if key != "" { - verifier = utils.NewMessageVerifier(key) - } - - return &CableReadyController{verifier, l.With("context", "cable_ready")} -} - -func (c *CableReadyController) Start() error { - return nil -} - -func (c *CableReadyController) Shutdown() error { - return nil -} - -func (c *CableReadyController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { - return nil, nil -} - -func (c *CableReadyController) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - params := struct { - SignedStreamID string `json:"identifier"` - }{} - - err := json.Unmarshal([]byte(channel), ¶ms) - - if err != nil { - c.log.With("identifier", channel).Warn("invalid identifier", "error", err) - return nil, err - } - - var stream string - - if c.IsCleartext() { - stream = params.SignedStreamID - - c.log.With("identifier", channel).Debug("unsigned", "stream", stream) - } else { - verified, err := c.verifier.Verified(params.SignedStreamID) - - if err != nil { - c.log.With("identifier", channel).Debug("verification failed", "stream", params.SignedStreamID, "error", err) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - var ok bool - - stream, ok = verified.(string) - - if !ok { - c.log.With("identifier", channel).Debug("verification failed: stream name is not a string", "stream", verified) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - c.log.With("identifier", channel).Debug("verified", "stream", stream) - } - - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{common.ConfirmationMessage(channel)}, - Streams: []string{stream}, - DisconnectInterest: -1, - }, nil -} - -func (c *CableReadyController) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{}, - Streams: []string{}, - StopAllStreams: true, - }, nil -} - -func (c *CableReadyController) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) { - return nil, nil -} - -func (c *CableReadyController) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error { - return nil -} - -func (c *CableReadyController) IsCleartext() bool { - return c.verifier == nil -} diff --git a/rails/cable_ready_test.go b/rails/cable_ready_test.go deleted file mode 100644 index 4d9b6a3e..00000000 --- a/rails/cable_ready_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package rails - -import ( - "fmt" - "log/slog" - "testing" - - "github.com/anycable/anycable-go/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCableReadyController(t *testing.T) { - key := "s3Krit" - // CableReady::Config.instance.verifier_key = 's3Krit' - // CableReady.signed_stream_verifier.generate("stream:2021") - stream := "InN0cmVhbToyMDIxIg==--44f6315dd9faefe713ef5685e114413c1afe8759197a0fc39b15cee75769417e" - - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewCableReadyController(key, slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"stream:2021"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) - - t.Run("Subscribe (failure)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", "fake_id") - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Subscribe (failure + not a string)", func(t *testing.T) { - signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", signed) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Unsubscribe", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) - - res, err := subject.Unsubscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{}, res.Transmissions) - assert.Equal(t, []string{}, res.Streams) - assert.Equal(t, true, res.StopAllStreams) - }) -} - -func TestCableReadyControllerWithClearText(t *testing.T) { - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewCableReadyController("", slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := "{\"channel\":\"CableReady::Stream\",\"identifier\":\"stream:2023\"}" - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"stream:2023"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) -} diff --git a/rails/config.go b/rails/config.go deleted file mode 100644 index ded5665b..00000000 --- a/rails/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package rails - -type Config struct { - TurboRailsKey string - TurboRailsClearText bool - CableReadyKey string - CableReadyClearText bool -} - -func NewConfig() Config { - return Config{} -} diff --git a/rails/turbo.go b/rails/turbo.go deleted file mode 100644 index d2853a5a..00000000 --- a/rails/turbo.go +++ /dev/null @@ -1,116 +0,0 @@ -package rails - -import ( - "encoding/json" - "log/slog" - - "github.com/anycable/anycable-go/common" - "github.com/anycable/anycable-go/node" - "github.com/anycable/anycable-go/utils" -) - -type TurboController struct { - verifier *utils.MessageVerifier - log *slog.Logger -} - -var _ node.Controller = (*TurboController)(nil) - -func NewTurboController(key string, l *slog.Logger) *TurboController { - var verifier *utils.MessageVerifier - - if key != "" { - verifier = utils.NewMessageVerifier(key) - } - - return &TurboController{verifier, l.With("context", "turbo")} -} - -func (c *TurboController) Start() error { - return nil -} - -func (c *TurboController) Shutdown() error { - return nil -} - -func (c *TurboController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { - return nil, nil -} - -func (c *TurboController) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - params := struct { - SignedStreamID string `json:"signed_stream_name"` - }{} - - err := json.Unmarshal([]byte(channel), ¶ms) - - if err != nil { - c.log.With("identifier", channel).Warn("invalid identifier", "error", err) - return nil, err - } - - var stream string - - if c.IsCleartext() { - stream = params.SignedStreamID - - c.log.With("identifier", channel).Debug("unsigned", "stream", stream) - } else { - verified, err := c.verifier.Verified(params.SignedStreamID) - - if err != nil { - c.log.With("identifier", channel).Debug("verification failed", "stream", params.SignedStreamID, "error", err) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - var ok bool - - stream, ok = verified.(string) - - if !ok { - c.log.With("identifier", channel).Debug("verification failed: stream name is not a string", "stream", verified) - - return &common.CommandResult{ - Status: common.FAILURE, - Transmissions: []string{common.RejectionMessage(channel)}, - }, - nil - } - - c.log.With("identifier", channel).Debug("verified", "stream", stream) - } - - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{common.ConfirmationMessage(channel)}, - Streams: []string{stream}, - DisconnectInterest: -1, - }, nil -} - -func (c *TurboController) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) { - return &common.CommandResult{ - Status: common.SUCCESS, - Transmissions: []string{}, - Streams: []string{}, - StopAllStreams: true, - }, nil -} - -func (c *TurboController) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) { - return nil, nil -} - -func (c *TurboController) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error { - return nil -} - -func (c *TurboController) IsCleartext() bool { - return c.verifier == nil -} diff --git a/rails/turbo_test.go b/rails/turbo_test.go deleted file mode 100644 index ec0833a5..00000000 --- a/rails/turbo_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package rails - -import ( - "fmt" - "log/slog" - "testing" - - "github.com/anycable/anycable-go/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestTurboController(t *testing.T) { - key := "s3Krit" - // Turbo.signed_stream_verifier_key = 's3Krit' - // Turbo::StreamsChannel.signed_stream_name([:chat, "2021"]) - stream := "ImNoYXQ6MjAyMSI=--f9ee45dbccb1da04d8ceb99cc820207804370ba0d06b46fc3b8b373af1315628" - - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewTurboController(key, slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"chat:2021"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) - - t.Run("Subscribe (failure)", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", "fake_id") - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Subscribe (failure + not a string)", func(t *testing.T) { - signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", signed) - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.FAILURE, res.Status) - assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) - }) - - t.Run("Unsubscribe", func(t *testing.T) { - channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) - - res, err := subject.Unsubscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{}, res.Transmissions) - assert.Equal(t, []string{}, res.Streams) - assert.Equal(t, true, res.StopAllStreams) - }) -} - -func TestTurboControllerClearText(t *testing.T) { - env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) - subject := NewTurboController("", slog.Default()) - - t.Run("Subscribe (success)", func(t *testing.T) { - channel := "{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"chat:2023\"}" - - res, err := subject.Subscribe("42", env, "name=jack", channel) - - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, common.SUCCESS, res.Status) - assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) - assert.Equal(t, []string{"chat:2023"}, res.Streams) - assert.Equal(t, -1, res.DisconnectInterest) - }) -} diff --git a/rpc/config.go b/rpc/config.go index c25fda6a..ee33f1b9 100644 --- a/rpc/config.go +++ b/rpc/config.go @@ -46,7 +46,7 @@ type Config struct { MaxRecvSize int // Max send msg size (bytes) MaxSendSize int - // Underlying implementation (grpc, http) + // Underlying implementation (grpc, http, or none) Implementation string // Alternative dialer implementation DialFun Dialer @@ -54,6 +54,8 @@ type Config struct { Secret string // Timeout for HTTP RPC requests (in ms) RequestTimeout int + // SecretBase is a secret used to generate authentication token + SecretBase string } // NewConfig builds a new config diff --git a/rpc/rpc.go b/rpc/rpc.go index 7a9d4d13..a90f1b51 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -13,6 +13,8 @@ import ( "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/metrics" "github.com/anycable/anycable-go/protocol" + "github.com/anycable/anycable-go/utils" + "github.com/joomcode/errorx" pb "github.com/anycable/anycable-go/protos" "google.golang.org/grpc" @@ -44,6 +46,8 @@ const ( metricsRPCPending = "rpc_pending_num" metricsRPCCapacity = "rpc_capacity_num" metricsGRPCActiveConns = "grpc_active_conn_num" + + secretKeyPhrase = "rpc-cable" ) type grpcClientHelper struct { @@ -198,6 +202,19 @@ func (c *Controller) Start() error { switch impl { case "http": var err error + + if c.config.Secret == "" && c.config.SecretBase != "" { + secret, verr := utils.NewMessageVerifier(c.config.SecretBase).Sign([]byte(secretKeyPhrase)) + + if verr != nil { + verr = errorx.Decorate(verr, "failed to auto-generate authentication key for HTTP RPC") + return verr + } + + c.log.Info("auto-generated authorization secret from the application secret") + c.config.Secret = string(secret) + } + dialer, err = NewHTTPDialer(c.config) if err != nil { return err diff --git a/server/server.go b/server/server.go index f41b0f8e..1b583dbd 100644 --- a/server/server.go +++ b/server/server.go @@ -42,7 +42,7 @@ var ( // MaxConn is a default configuration for maximum connections MaxConn int // Default logger - Logger *slog.Logger + Logger *slog.Logger = slog.Default() ) // ForPort creates new or returns the existing server for the specified port diff --git a/streams/config.go b/streams/config.go new file mode 100644 index 00000000..b0760b04 --- /dev/null +++ b/streams/config.go @@ -0,0 +1,49 @@ +// This package provides functionality to directly subscribe to streams +// without using channels (a simplified pub/sub mode) +package streams + +type Config struct { + // Secret is a key used to sign and verify streams + Secret string + + // Public determines if public (unsigned) streams are allowed + Public bool + + // PubSubChannel is the channel name used for direct pub/sub + PubSubChannel string + + // Turbo is a flag to enable Turbo Streams support + Turbo bool + + // TurboSecret is a custom secret key used to verify Turbo Streams + TurboSecret string + + // CableReady is a flag to enable CableReady support + CableReady bool + + // CableReadySecret is a custom secret key used to verify CableReady streams + CableReadySecret string +} + +// NewConfig returns a new Config with the given key +func NewConfig() Config { + return Config{ + PubSubChannel: "$pubsub", + } +} + +func (c Config) GetTurboSecret() string { + if c.TurboSecret != "" { + return c.TurboSecret + } + + return c.Secret +} + +func (c Config) GetCableReadySecret() string { + if c.CableReadySecret != "" { + return c.CableReadySecret + } + + return c.Secret +} diff --git a/streams/controller.go b/streams/controller.go new file mode 100644 index 00000000..0c1213eb --- /dev/null +++ b/streams/controller.go @@ -0,0 +1,189 @@ +package streams + +import ( + "encoding/json" + "errors" + "log/slog" + + "github.com/anycable/anycable-go/common" + "github.com/anycable/anycable-go/node" + "github.com/anycable/anycable-go/utils" + "github.com/joomcode/errorx" +) + +type SubscribeRequest struct { + StreamName string `json:"stream_name"` + SignedStreamName string `json:"signed_stream_name"` +} + +func (r *SubscribeRequest) IsPresent() bool { + return r.StreamName != "" || r.SignedStreamName != "" +} + +type StreamResolver = func(string) (*SubscribeRequest, error) + +type Controller struct { + verifier *utils.MessageVerifier + resolver StreamResolver + log *slog.Logger +} + +var _ node.Controller = (*Controller)(nil) + +func NewController(key string, resolver StreamResolver, l *slog.Logger) *Controller { + var verifier *utils.MessageVerifier + + if key != "" { + verifier = utils.NewMessageVerifier(key) + } + + return &Controller{verifier, resolver, l.With("context", "streams")} +} + +func (c *Controller) Start() error { + return nil +} + +func (c *Controller) Shutdown() error { + return nil +} + +func (c *Controller) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) { + return nil, nil +} + +func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, identifier string) (*common.CommandResult, error) { + request, err := c.resolver(identifier) + + if err != nil { + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, errorx.Decorate(err, "invalid identifier") + } + + if !request.IsPresent() { + err := errors.New("malformed identifier: no stream name or signed stream") + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, err + } + + var stream string + + if request.StreamName != "" { + stream = request.StreamName + + c.log.With("identifier", identifier).Debug("unsigned", "stream", stream) + } else { + verified, err := c.verifier.Verified(request.SignedStreamName) + + if err != nil { + c.log.With("identifier", identifier).Debug("verification failed", "stream", request.SignedStreamName, "error", err) + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, + nil + } + + var ok bool + stream, ok = verified.(string) + + if !ok { + c.log.With("identifier", identifier).Debug("verification failed: stream name is not a string", "stream", verified) + + return &common.CommandResult{ + Status: common.FAILURE, + Transmissions: []string{common.RejectionMessage(identifier)}, + }, + nil + } + + c.log.With("identifier", identifier).Debug("verified", "stream", stream) + } + + return &common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{common.ConfirmationMessage(identifier)}, + Streams: []string{stream}, + DisconnectInterest: -1, + }, nil +} + +func (c *Controller) Unsubscribe(sid string, env *common.SessionEnv, ids string, identifier string) (*common.CommandResult, error) { + return &common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{}, + Streams: []string{}, + StopAllStreams: true, + }, nil +} + +func (c *Controller) Perform(sid string, env *common.SessionEnv, ids string, identifier string, data string) (*common.CommandResult, error) { + return nil, nil +} + +func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error { + return nil +} + +func NewStreamsController(conf *Config, l *slog.Logger) *Controller { + key := conf.Secret + allowPublic := conf.Public + + resolver := func(identifier string) (*SubscribeRequest, error) { + var request SubscribeRequest + + if err := json.Unmarshal([]byte(identifier), &request); err != nil { + return nil, err + } + + if !allowPublic && request.StreamName != "" { + return nil, errors.New("public streams are not allowed") + } + + return &request, nil + } + + return NewController(key, resolver, l) +} + +type TurboMessage struct { + SignedStreamName string `json:"signed_stream_name"` +} + +func NewTurboController(key string, l *slog.Logger) *Controller { + resolver := func(identifier string) (*SubscribeRequest, error) { + var msg TurboMessage + + if err := json.Unmarshal([]byte(identifier), &msg); err != nil { + return nil, err + } + + return &SubscribeRequest{SignedStreamName: msg.SignedStreamName}, nil + } + + return NewController(key, resolver, l) +} + +type CableReadyMesssage struct { + Identifier string `json:"identifier"` +} + +func NewCableReadyController(key string, l *slog.Logger) *Controller { + resolver := func(identifier string) (*SubscribeRequest, error) { + var msg CableReadyMesssage + + if err := json.Unmarshal([]byte(identifier), &msg); err != nil { + return nil, err + } + + return &SubscribeRequest{SignedStreamName: msg.Identifier}, nil + } + + return NewController(key, resolver, l) +} diff --git a/streams/controller_test.go b/streams/controller_test.go new file mode 100644 index 00000000..67f8c4c3 --- /dev/null +++ b/streams/controller_test.go @@ -0,0 +1,197 @@ +package streams + +import ( + "fmt" + "log/slog" + "testing" + + "github.com/anycable/anycable-go/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + key = "s3Krit" + // Turbo.signed_stream_verifier_key = 's3Krit' + // Turbo::StreamsChannel.signed_stream_name([:chat, "2021"]) + stream = "ImNoYXQ6MjAyMSI=--f9ee45dbccb1da04d8ceb99cc820207804370ba0d06b46fc3b8b373af1315628" +) + +func TestNewController(t *testing.T) { + t.Run("No stream name", func(t *testing.T) { + resolver := func(string) (*SubscribeRequest, error) { + return &SubscribeRequest{}, nil + } + + subject := NewController(key, resolver, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", "") + + require.Error(t, err) + require.NotNil(t, res) + + assert.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage("")}, res.Transmissions) + }) +} + +func TestStreamsController(t *testing.T) { + t.Run("Subscribe - public", func(t *testing.T) { + conf := NewConfig() + conf.Public = true + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", `{"channel":"$pubsub","stream_name":"chat:2024"}`) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) + assert.Equal(t, []string{"chat:2024"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe - no public allowed", func(t *testing.T) { + conf := NewConfig() + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + res, err := subject.Subscribe("42", nil, "name=jack", `{"channel":"$pubsub","stream_name":"chat:2024"}`) + + require.Error(t, err) + require.NotNil(t, res) + assert.Equal(t, []string{common.RejectionMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) + }) + + t.Run("Subscribe - signed", func(t *testing.T) { + conf := NewConfig() + conf.Secret = key + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}` + + res, err := subject.Subscribe("42", nil, "name=jack", identifier) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) +} + +func TestTurboController(t *testing.T) { + env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) + subject := NewTurboController(key, slog.Default()) + + t.Run("Subscribe (success)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe (failure)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", "fake_id") + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Subscribe (failure + not a string)", func(t *testing.T) { + signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", signed) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"Turbo::StreamsChannel\",\"signed_stream_name\":\"%s\"}", stream) + + res, err := subject.Unsubscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{}, res.Transmissions) + assert.Equal(t, []string{}, res.Streams) + assert.Equal(t, true, res.StopAllStreams) + }) +} + +func TestCableReadyController(t *testing.T) { + env := common.NewSessionEnv("ws://demo.anycable.io/cable", &map[string]string{"cookie": "val=1;"}) + subject := NewCableReadyController(key, slog.Default()) + + t.Run("Subscribe (success)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(channel)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + }) + + t.Run("Subscribe (failure)", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", "fake_id") + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Subscribe (failure + not a string)", func(t *testing.T) { + signed := "WyJjaGF0LzIwMjMiLDE2ODUwMjQwMTdd--5b6661024d4c463c4936cd1542bc9a7672dd8039ac407d0b6c901697190e8aeb" + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", signed) + + res, err := subject.Subscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.FAILURE, res.Status) + assert.Equal(t, []string{common.RejectionMessage(channel)}, res.Transmissions) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + channel := fmt.Sprintf("{\"channel\":\"CableReady::Stream\",\"identifier\":\"%s\"}", stream) + + res, err := subject.Unsubscribe("42", env, "name=jack", channel) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{}, res.Transmissions) + assert.Equal(t, []string{}, res.Streams) + assert.Equal(t, true, res.StopAllStreams) + }) +} diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 7c2a3a40..b6919203 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -178,11 +178,12 @@ func (t *Tracker) appProperties() map[string]interface{} { props.Set("deploy", guessPlatform()) // Features + props.Set("has-secret", t.config.Secret != "") + props.Set("no-auth", t.config.SkipAuth) props.Set("jwt", t.config.JWT.Enabled()) - props.Set("turbo", t.config.Rails.TurboRailsKey != "") - props.Set("turbo-ct", t.config.Rails.TurboRailsClearText) - props.Set("cr", t.config.Rails.CableReadyKey != "") - props.Set("cr-ct", t.config.Rails.CableReadyClearText) + props.Set("public-streams", t.config.Streams.Public) + props.Set("turbo", t.config.Streams.Turbo) + props.Set("cr", t.config.Streams.CableReady) props.Set("enats", t.config.EmbedNats) props.Set("broadcast", t.config.BroadcastAdapter) props.Set("pubsub", t.config.PubSubAdapter) diff --git a/utils/message_verifier.go b/utils/message_verifier.go index cb9c2366..d62e2125 100644 --- a/utils/message_verifier.go +++ b/utils/message_verifier.go @@ -9,6 +9,8 @@ import ( "errors" "fmt" "strings" + + "github.com/joomcode/errorx" ) type MessageVerifier struct { @@ -27,9 +29,13 @@ func (m *MessageVerifier) Generate(payload interface{}) (string, error) { } encoded := base64.StdEncoding.EncodeToString(payloadJson) - digest := hmac.New(sha256.New, m.key) - digest.Write([]byte(encoded)) - signature := []byte(fmt.Sprintf("%x", digest.Sum(nil))) + + signature, err := m.Sign([]byte(encoded)) + + if err != nil { + return "", err + } + signed := encoded + "--" + string(signature) return signed, nil } @@ -72,8 +78,23 @@ func (m *MessageVerifier) isValid(msg string) bool { data := []byte(parts[0]) digest := []byte(parts[1]) + return m.VerifySignature(data, digest) +} + +func (m *MessageVerifier) Sign(payload []byte) ([]byte, error) { + digest := hmac.New(sha256.New, m.key) + _, err := digest.Write(payload) + + if err != nil { + return nil, errorx.Decorate(err, "failed to sign payload") + } + + return []byte(fmt.Sprintf("%x", digest.Sum(nil))), nil +} + +func (m *MessageVerifier) VerifySignature(payload []byte, digest []byte) bool { h := hmac.New(sha256.New, m.key) - h.Write(data) + h.Write(payload) actual := []byte(fmt.Sprintf("%x", h.Sum(nil)))