Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

anycable.toml support #207

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .zed/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Folder-specific settings
//
// For a full list of overridable settings, and general information on folder-specific settings,
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files
{
"file_types": {
"Ruby": ["*.testfile", "*.benchfile", "Gemfile"]
}
}
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Add `anycable.toml` support. ([@palkan][])

- Print metrics with keys sorted alphabetically. ([@palkan][])

- Upgrade to Go 1.23. ([@palkan][])
Expand Down
38 changes: 33 additions & 5 deletions broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ const (
// HTTPConfig contains HTTP pubsub adapter configuration
type HTTPConfig struct {
// Port to listen on
Port int
Port int `toml:"port"`
// Path for HTTP broadast
Path string
Path string `toml:"path"`
// Secret token to authorize requests
Secret string
Secret string `toml:"secret"`
// SecretBase is a secret used to generate a token if none provided
SecretBase string
// AddCORSHeaders enables adding CORS headers (so you can perform broadcast requests from the browser)
// (We mostly need it for Stackblitz)
AddCORSHeaders bool
AddCORSHeaders bool `toml:"cors_headers"`
// CORSHosts contains a list of hostnames for CORS (comma-separated)
CORSHosts string
CORSHosts string `toml:"cors_hosts"`
}

// NewHTTPConfig builds a new config for HTTP pub/sub
Expand All @@ -47,6 +47,34 @@ func (c *HTTPConfig) IsSecured() bool {
return c.Secret != "" || c.SecretBase != ""
}

func (c HTTPConfig) ToToml() string {
var result strings.Builder

result.WriteString("# HTTP server port (can be the same as the main server port)\n")
result.WriteString(fmt.Sprintf("port = %d\n", c.Port))

result.WriteString("# HTTP endpoint path for broadcasts\n")
result.WriteString(fmt.Sprintf("path = \"%s\"\n", c.Path))

result.WriteString("# Secret token to authenticate broadcasting requests\n")
if c.Secret != "" {
result.WriteString(fmt.Sprintf("secret = \"%s\"\n", c.Secret))
} else {
result.WriteString("# secret = \"\"\n")
}

result.WriteString("# Enable CORS headers (allowed origins are used as allowed hosts)\n")
if c.AddCORSHeaders {
result.WriteString("cors_headers = true\n")
} else {
result.WriteString("# cors_headers = false\n")
}

result.WriteString("\n")

return result.String()
}

// HTTPBroadcaster represents HTTP broadcaster
type HTTPBroadcaster struct {
port int
Expand Down
24 changes: 24 additions & 0 deletions broadcast/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"testing"

"github.com/BurntSushi/toml"
"github.com/anycable/anycable-go/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -89,3 +90,26 @@ func TestHttpHandler(t *testing.T) {
assert.Equal(t, http.StatusCreated, rr.Code)
})
}

func TestHTTPConfig__ToToml(t *testing.T) {
conf := NewHTTPConfig()
conf.Port = 8080
conf.Path = "/broadcast"
conf.Secret = ""
conf.AddCORSHeaders = true

tomlStr := conf.ToToml()

assert.Contains(t, tomlStr, "port = 8080")
assert.Contains(t, tomlStr, "path = \"/broadcast\"")
assert.Contains(t, tomlStr, "# secret = \"\"")
assert.Contains(t, tomlStr, "cors_headers = true")

// Round-trip test
conf2 := NewHTTPConfig()

_, err := toml.Decode(tomlStr, &conf2)
require.NoError(t, err)

assert.Equal(t, conf, conf2)
}
32 changes: 27 additions & 5 deletions broadcast/legacy_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,46 @@ package broadcast

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/nats-io/nats.go"

nconfig "github.com/anycable/anycable-go/nats"
)

type LegacyNATSConfig struct {
Channel string `toml:"channel"`
NATS *nconfig.NATSConfig `toml:"nats"`
}

func NewLegacyNATSConfig() LegacyNATSConfig {
return LegacyNATSConfig{
Channel: "__anycable__",
}
}

func (c LegacyNATSConfig) ToToml() string {
var result strings.Builder
result.WriteString(fmt.Sprintf("channel = \"%s\"\n", c.Channel))

result.WriteString("\n")

return result.String()
}

type LegacyNATSBroadcaster struct {
conn *nats.Conn
handler Handler
config *nconfig.NATSConfig
config *LegacyNATSConfig

log *slog.Logger
}

var _ Broadcaster = (*LegacyNATSBroadcaster)(nil)

func NewLegacyNATSBroadcaster(node Handler, c *nconfig.NATSConfig, l *slog.Logger) *LegacyNATSBroadcaster {
func NewLegacyNATSBroadcaster(node Handler, c *LegacyNATSConfig, l *slog.Logger) *LegacyNATSBroadcaster {
return &LegacyNATSBroadcaster{
config: c,
handler: node,
Expand All @@ -34,7 +56,7 @@ func (LegacyNATSBroadcaster) IsFanout() bool {
func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
connectOptions := []nats.Option{
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(s.config.MaxReconnectAttempts),
nats.MaxReconnects(s.config.NATS.MaxReconnectAttempts),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
s.log.Warn("connection failed", "error", err)
Expand All @@ -45,11 +67,11 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
}),
}

if s.config.DontRandomizeServers {
if s.config.NATS.DontRandomizeServers {
connectOptions = append(connectOptions, nats.DontRandomize())
}

nc, err := nats.Connect(s.config.Servers, connectOptions...)
nc, err := nats.Connect(s.config.NATS.Servers, connectOptions...)

if err != nil {
return err
Expand Down
26 changes: 26 additions & 0 deletions broadcast/legacy_nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package broadcast

import (
"testing"

"github.com/BurntSushi/toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLegacyNATSConfig__ToToml(t *testing.T) {
conf := NewLegacyNATSConfig()
conf.Channel = "_test_"

tomlStr := conf.ToToml()

assert.Contains(t, tomlStr, "channel = \"_test_\"")

// Round-trip test
conf2 := NewLegacyNATSConfig()

_, err := toml.Decode(tomlStr, &conf2)
require.NoError(t, err)

assert.Equal(t, conf, conf2)
}
34 changes: 27 additions & 7 deletions broadcast/legacy_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@ import (
"github.com/gomodule/redigo/redis"
)

type LegacyRedisConfig struct {
Channel string `toml:"channel"`
Redis *rconfig.RedisConfig `toml:"redis"`
}

func NewLegacyRedisConfig() LegacyRedisConfig {
return LegacyRedisConfig{
Channel: "__anycable__",
}
}

func (c LegacyRedisConfig) ToToml() string {
var result strings.Builder
result.WriteString(fmt.Sprintf("channel = \"%s\"\n", c.Channel))

result.WriteString("\n")

return result.String()
}

// LegacyRedisBroadcaster contains information about Redis pubsub connection
type LegacyRedisBroadcaster struct {
node Handler
Expand All @@ -33,18 +53,18 @@ type LegacyRedisBroadcaster struct {
}

// NewLegacyRedisBroadcaster returns new RedisSubscriber struct
func NewLegacyRedisBroadcaster(node Handler, config *rconfig.RedisConfig, l *slog.Logger) *LegacyRedisBroadcaster {
func NewLegacyRedisBroadcaster(node Handler, config *LegacyRedisConfig, l *slog.Logger) *LegacyRedisBroadcaster {
return &LegacyRedisBroadcaster{
node: node,
url: config.URL,
sentinels: config.Sentinels,
sentinelDiscoveryInterval: time.Duration(config.SentinelDiscoveryInterval),
url: config.Redis.URL,
sentinels: config.Redis.Sentinels,
sentinelDiscoveryInterval: time.Duration(config.Redis.SentinelDiscoveryInterval),
channel: config.Channel,
pingInterval: time.Duration(config.KeepalivePingInterval),
pingInterval: time.Duration(config.Redis.KeepalivePingInterval),
reconnectAttempt: 0,
maxReconnectAttempts: config.MaxReconnectAttempts,
maxReconnectAttempts: config.Redis.MaxReconnectAttempts,
log: l.With("context", "broadcast").With("provider", "redis"),
tlsVerify: config.TLSVerify,
tlsVerify: config.Redis.TLSVerify,
}
}

Expand Down
26 changes: 26 additions & 0 deletions broadcast/legacy_redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package broadcast

import (
"testing"

"github.com/BurntSushi/toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLegacyRedisConfig__ToToml(t *testing.T) {
conf := NewLegacyRedisConfig()
conf.Channel = "_test_"

tomlStr := conf.ToToml()

assert.Contains(t, tomlStr, "channel = \"_test_\"")

// Round-trip test
conf2 := NewLegacyRedisConfig()

_, err := toml.Decode(tomlStr, &conf2)
require.NoError(t, err)

assert.Equal(t, conf, conf2)
}
Loading
Loading