Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New signed streams #202

Merged
merged 9 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ Gemfile.lock
/k6

/coverage.out
admin/ui/build
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,29 @@

## master

- HTTP broadcaster is enabled by default. ([@palkan][])

HTTP broadcaster is now enabled by default in addition to Redis (backward-compatibility). It will be the primary default broadcaster in v2.

- Added new top-level `--secret` configuration option. ([@palkan][])

This is a one secret to rule them all! Unless specific secrets are specified, all features relying on secret keys use the global (_application_) secret either directly (signed streams, JWT) or as a secret base to generate auth tokens (HTTP broadcasting and HTTP RPC).

- Added public mode (`--public`). ([@palkan][])

In this mode, AnyCable doesn't enforce connection authentication, enables public streams, and disables HTTP broadcast endpoint authentication. (So, it's a combination of `--noauth` and `--public_streams`).

- Added direct stream subscribing via a dedicated channel. ([@palkan][])

Added a `$pubsub` reserved channel to allow subscribing to streams (signed or unsigned) without relying on channels (similar to Turbo Streams).

Enable public (unsigned) streams support via the `--public_streams` (or `ANYCABLE_PUBLIC_STREAMS=1`) option or provide a secret key to verify signed streams via the `--streams_secret=<val>` (or `ANYCABLE_STREAMS_SECRET=<val>`) option.

- Config options refactoring (renaming):

- Deprecated (but still supported): `--turbo_rails_key` (now `--turbo_streams_secret`), `--cable_ready_key` (now `--cable_ready_secret`), `--jwt_id_key` (now `--jwt_secret`), `--jwt_id_param` (now `--jwt_param`), `--jwt_id_enforce` (now `--enforce_jwt`).
- Deprecated (no longer supported): `--turbo_rails_cleartext`, `--cable_ready_cleartext` (use `--public_streams` instead).

- Allowing embedding AnyCable into existing web applications. ([@palkan][])

You can now set up an AnyCable instance without an HTTP server and mount AnyCable WebSocket/SSE handlers wherever you like.
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ test-conformance-http: tmp/anycable-go-test
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/_broadcast \
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --rpc_host=http://localhost:9292/_anycable" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-nats: tmp/anycable-go-test
Expand All @@ -162,17 +163,18 @@ test-conformance-nats-embedded: tmp/anycable-go-test
ANYCABLE_BROADCAST_ADAPTER=nats ANYCABLE_NATS_SERVERS=nats://127.0.0.1:4242 ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4242 bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-broker-http: tmp/anycable-go-test
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-broker-redis: tmp/anycable-go-test
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-broker-nats: tmp/anycable-go-test
ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb
ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-embedded: tmp/anycable-embedded-test
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/broadcast \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
ANYCABLE_RPC_HOST=http://localhost:9292/_anycable \
ANYCABLE_HEADERS=cookie,x-api-token \
Expand Down
71 changes: 56 additions & 15 deletions broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"strconv"

"github.com/anycable/anycable-go/server"
"github.com/anycable/anycable-go/utils"
"github.com/joomcode/errorx"
)

const (
defaultHTTPPort = 8090
defaultHTTPPath = "/_broadcast"
defaultHTTPPath = "/_broadcast"
broadcastKeyPhrase = "broadcast-cable"
)

// HTTPConfig contains HTTP pubsub adapter configuration
Expand All @@ -24,20 +26,26 @@ type HTTPConfig struct {
Path string
// Secret token to authorize requests
Secret string
// SecretBase is a secret used to generate a token if none provided
SecretBase string
}

// NewHTTPConfig builds a new config for HTTP pub/sub
func NewHTTPConfig() HTTPConfig {
return HTTPConfig{
Port: defaultHTTPPort,
Path: defaultHTTPPath,
}
}

func (c *HTTPConfig) IsSecured() bool {
return c.Secret != "" || c.SecretBase != ""
}

// HTTPBroadcaster represents HTTP broadcaster
type HTTPBroadcaster struct {
port int
path string
conf *HTTPConfig
authHeader string
server *server.HTTPServer
node Handler
Expand All @@ -48,25 +56,45 @@ var _ Broadcaster = (*HTTPBroadcaster)(nil)

// NewHTTPBroadcaster builds a new HTTPSubscriber struct
func NewHTTPBroadcaster(node Handler, config *HTTPConfig, l *slog.Logger) *HTTPBroadcaster {
authHeader := ""

if config.Secret != "" {
authHeader = fmt.Sprintf("Bearer %s", config.Secret)
}

return &HTTPBroadcaster{
node: node,
log: l.With("context", "broadcast").With("provider", "http"),
port: config.Port,
path: config.Path,
authHeader: authHeader,
node: node,
log: l.With("context", "broadcast").With("provider", "http"),
port: config.Port,
path: config.Path,
conf: config,
}
}

func (HTTPBroadcaster) IsFanout() bool {
return false
}

// Prepare configures the broadcaster to make it ready to accept requests
// (i.e., calculates the authentication token, etc.)
func (s *HTTPBroadcaster) Prepare() error {
authHeader := ""

if s.conf.Secret == "" && s.conf.SecretBase != "" {
secret, err := utils.NewMessageVerifier(s.conf.SecretBase).Sign([]byte(broadcastKeyPhrase))

if err != nil {
err = errorx.Decorate(err, "failed to auto-generate authentication key for HTTP broadcaster")
return err
}

s.log.Info("auto-generated authorization secret from the application secret")
s.conf.Secret = string(secret)
}

if s.conf.Secret != "" {
authHeader = fmt.Sprintf("Bearer %s", s.conf.Secret)
}

s.authHeader = authHeader

return nil
}

// Start creates an HTTP server or attaches a handler to the existing one
func (s *HTTPBroadcaster) Start(done chan (error)) error {
server, err := server.ForPort(strconv.Itoa(s.port))
Expand All @@ -75,10 +103,23 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error {
return err
}

err = s.Prepare()
if err != nil {
return err
}

s.server = server
s.server.SetupHandler(s.path, http.HandlerFunc(s.Handler))

s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s", s.server.Address(), s.path))
var verifiedVia string

if s.authHeader != "" {
verifiedVia = "authorization required"
} else {
verifiedVia = "no authorization"
}

s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s (%s)", s.server.Address(), s.path, verifiedVia))

go func() {
if err := s.server.StartAndAnnounce("broadcasting HTTP server"); err != nil {
Expand Down
38 changes: 22 additions & 16 deletions broadcast/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package broadcast

import (
"context"
"encoding/json"
"log/slog"
"net/http"
Expand All @@ -10,15 +11,28 @@ import (

"github.com/anycable/anycable-go/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHttpHandler(t *testing.T) {
handler := &mocks.Handler{}
config := HTTPConfig{}
secretConfig := HTTPConfig{Secret: "secret"}
config := NewHTTPConfig()

secretConfig := NewHTTPConfig()
secretConfig.SecretBase = "qwerty"
broadcastKey := "42923a28b760e667fc92f7c6123bb07a282822b329dd2ef48e7aee7830d98485"

broadcaster := NewHTTPBroadcaster(handler, &config, slog.Default())
protectedBroadcaster := NewHTTPBroadcaster(handler, &secretConfig, slog.Default())

done := make(chan (error))

require.NoError(t, broadcaster.Start(done))
defer broadcaster.Shutdown(context.Background()) // nolint:errcheck

require.NoError(t, protectedBroadcaster.Start(done))
defer protectedBroadcaster.Shutdown(context.Background()) // nolint:errcheck

payload, err := json.Marshal(map[string]string{"stream": "any_test", "data": "123_test"})
if err != nil {
t.Fatal(err)
Expand All @@ -31,9 +45,7 @@ func TestHttpHandler(t *testing.T) {

t.Run("Handles broadcasts", func(t *testing.T) {
req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload)))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(broadcaster.Handler)
Expand All @@ -44,9 +56,7 @@ func TestHttpHandler(t *testing.T) {

t.Run("Rejects non-POST requests", func(t *testing.T) {
req, err := http.NewRequest("GET", "/", strings.NewReader(string(payload)))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(broadcaster.Handler)
Expand All @@ -57,9 +67,7 @@ func TestHttpHandler(t *testing.T) {

t.Run("Rejects when authorization header is missing", func(t *testing.T) {
req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload)))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(protectedBroadcaster.Handler)
Expand All @@ -68,13 +76,11 @@ func TestHttpHandler(t *testing.T) {
assert.Equal(t, http.StatusUnauthorized, rr.Code)
})

t.Run("Rejects when authorization header is valid", func(t *testing.T) {
t.Run("Accepts when authorization header is valid", func(t *testing.T) {
req, err := http.NewRequest("POST", "/", strings.NewReader(string(payload)))
req.Header.Set("Authorization", "Bearer secret")
req.Header.Set("Authorization", "Bearer "+broadcastKey)

if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(protectedBroadcaster.Handler)
Expand Down
36 changes: 29 additions & 7 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/anycable/anycable-go/mrb"
"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/pubsub"
"github.com/anycable/anycable-go/rails"
"github.com/anycable/anycable-go/router"
"github.com/anycable/anycable-go/server"
"github.com/anycable/anycable-go/sse"
"github.com/anycable/anycable-go/streams"
"github.com/anycable/anycable-go/telemetry"
"github.com/anycable/anycable-go/utils"
"github.com/anycable/anycable-go/version"
Expand Down Expand Up @@ -162,6 +162,10 @@ func (r *Runner) Run() error {

r.log.Info(fmt.Sprintf("Starting %s %s%s (pid: %d, open file limit: %s, gomaxprocs: %d)", r.name, version.Version(), mrubySupport, os.Getpid(), utils.OpenFileLimit(), numProcs))

if r.config.IsPublic() {
r.log.Warn("Server is running in the public mode")
}

appNode, err := r.runNode()

if err != nil {
Expand Down Expand Up @@ -377,10 +381,23 @@ func (r *Runner) newController(metrics *metricspkg.Metrics) (node.Controller, er
return nil, errorx.Decorate(err, "!!! Failed to initialize controller !!!")
}

ids := []identity.Identifier{}

if r.config.JWT.Enabled() {
identifier := identity.NewJWTIdentifier(&r.config.JWT, r.log)
ids = append(ids, identity.NewJWTIdentifier(&r.config.JWT, r.log))
r.log.Info(fmt.Sprintf("JWT authentication is enabled (param: %s, enforced: %v)", r.config.JWT.Param, r.config.JWT.Force))
}

if r.config.SkipAuth {
ids = append(ids, identity.NewPublicIdentifier())
r.log.Info("connection authentication is disabled")
}

if len(ids) > 1 {
identifier := identity.NewIdentifierPipeline(ids...)
controller = identity.NewIdentifiableController(controller, identifier)
r.log.Info(fmt.Sprintf("JWT identification is enabled (param: %s, enforced: %v)", r.config.JWT.Param, r.config.JWT.Force))
} else if len(ids) == 1 {
controller = identity.NewIdentifiableController(controller, ids[0])
}

if !r.Router().Empty() {
Expand Down Expand Up @@ -480,13 +497,18 @@ func (r *Runner) Instrumenter() metricspkg.Instrumenter {
func (r *Runner) defaultRouter() *router.RouterController {
router := router.NewRouterController(nil)

if r.config.Rails.TurboRailsKey != "" || r.config.Rails.TurboRailsClearText {
turboController := rails.NewTurboController(r.config.Rails.TurboRailsKey, r.log)
if r.config.Streams.PubSubChannel != "" {
streamController := streams.NewStreamsController(&r.config.Streams, r.log)
router.Route(r.config.Streams.PubSubChannel, streamController) // nolint:errcheck
}

if r.config.Streams.Turbo && r.config.Streams.GetTurboSecret() != "" {
turboController := streams.NewTurboController(r.config.Streams.GetTurboSecret(), r.log)
router.Route("Turbo::StreamsChannel", turboController) // nolint:errcheck
}

if r.config.Rails.CableReadyKey != "" || r.config.Rails.CableReadyClearText {
crController := rails.NewCableReadyController(r.config.Rails.CableReadyKey, r.log)
if r.config.Streams.CableReady && r.config.Streams.GetCableReadySecret() != "" {
crController := streams.NewCableReadyController(r.config.Streams.GetCableReadySecret(), r.log)
router.Route("CableReady::Stream", crController) // nolint:errcheck
}

Expand Down
14 changes: 14 additions & 0 deletions cli/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"

"github.com/anycable/anycable-go/broadcast"
"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/utils"
"github.com/anycable/anycable-go/version"
Expand Down Expand Up @@ -40,6 +41,19 @@ func (e *Embedded) SSEHandler(ctx context.Context) (http.Handler, error) {
return sseHandler, nil
}

// HTTPBroadcastHandler returns an HTTP handler to process broadcasting requests
func (e *Embedded) HTTPBroadcastHandler() (http.Handler, error) {
broadcaster := broadcast.NewHTTPBroadcaster(e.n, &e.r.config.HTTPBroadcast, e.r.log)

err := broadcaster.Prepare()

if err != nil {
return nil, err
}

return http.HandlerFunc(broadcaster.Handler), nil
}

// Shutdown stops the AnyCable node gracefully.
func (e *Embedded) Shutdown(ctx context.Context) error {
for _, shutdownable := range e.r.shutdownables {
Expand Down
Loading
Loading