Skip to content

Commit

Permalink
fix(logs): truncate long values, handle bytes, improve attrs
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Feb 29, 2024
1 parent 28bf0fc commit 4a3ee3a
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 63 deletions.
2 changes: 1 addition & 1 deletion broadcast/legacy_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
}

_, err = nc.Subscribe(s.config.Channel, func(m *nats.Msg) {
s.log.Debug("incoming pubsub message", "data", m.Data)
s.log.Debug("received pubsub message")
s.handler.HandlePubSub(m.Data)
})

Expand Down
2 changes: 1 addition & 1 deletion broadcast/legacy_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *LegacyRedisBroadcaster) listen() error {
for {
switch v := psc.Receive().(type) {
case redis.Message:
s.log.Debug("incoming pubsub message", "data", v.Data)
s.log.Debug("received pubsub message")
s.node.HandlePubSub(v.Data)
case redis.Subscription:
s.log.Info("subscribed to Redis channel", "channel", v.Channel)
Expand Down
3 changes: 1 addition & 2 deletions broadcast/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@ func (s *RedisBroadcaster) autoclaimMessages(blockTime int64) ([]rueidis.XRangeE
func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) {
for _, message := range messages {
if payload, pok := message.FieldValues["payload"]; pok {
s.log.Debug("incoming broadcast", "data", payload)

s.log.Debug("received broadcast")
s.node.HandleBroadcast([]byte(payload))

ackRes := s.client.DoMulti(context.Background(),
Expand Down
4 changes: 0 additions & 4 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,6 @@ func (r *Runner) runNode() (*node.Node, error) {
}

for _, broadcaster := range broadcasters {
if broadcaster.IsFanout() && subscriber.IsMultiNode() {
r.log.Warn("Using pub/sub with a distributed broadcaster has no effect")
}

if !broadcaster.IsFanout() && !subscriber.IsMultiNode() {
r.log.Warn("Using a non-distributed broadcaster without a pub/sub enabled; each broadcasted message is only processed by a single node")
}
Expand Down
1 change: 0 additions & 1 deletion cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error,

if c.Debug {
c.LogLevel = "debug"
c.LogFormat = "text"
}

if c.Metrics.Port == 0 {
Expand Down
143 changes: 131 additions & 12 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package common

import (
"encoding/json"
"log/slog"

"github.com/anycable/anycable-go/logger"
"github.com/anycable/anycable-go/utils"
)

// Command result status
Expand All @@ -12,6 +16,19 @@ const (
ERROR
)

func StatusName(status int) string {
switch status {
case SUCCESS:
return "success"
case FAILURE:
return "failure"
case ERROR:
return "error"
default:
return "unknown"
}
}

const (
ActionCableV1JSON = "actioncable-v1-json"
ActionCableV1ExtJSON = "actioncable-v1-ext-json"
Expand Down Expand Up @@ -161,6 +178,18 @@ type ConnectResult struct {
Status int
}

func (c *ConnectResult) LogValue() slog.Value {
return slog.GroupValue(
slog.String("status", StatusName(c.Status)),
slog.Any("transmissions", logger.CompactValues(c.Transmissions)),
slog.Any("broadcasts", c.Broadcasts),
slog.String("identifier", c.Identifier),
slog.Int("disconnect_interest", c.DisconnectInterest),
slog.Any("cstate", c.CState),
slog.Any("istate", c.IState),
)
}

// ToCallResult returns the corresponding CallResult
func (c *ConnectResult) ToCallResult() *CallResult {
res := CallResult{Transmissions: c.Transmissions, Broadcasts: c.Broadcasts}
Expand Down Expand Up @@ -190,6 +219,21 @@ type CommandResult struct {
Status int
}

func (c *CommandResult) LogValue() slog.Value {
return slog.GroupValue(
slog.String("status", StatusName(c.Status)),
slog.Any("streams", logger.CompactValues(c.Streams)),
slog.Any("transmissions", logger.CompactValues(c.Transmissions)),
slog.Any("stopped_streams", logger.CompactValues(c.StoppedStreams)),
slog.Bool("stop_all_streams", c.StopAllStreams),
slog.Any("broadcasts", c.Broadcasts),
slog.Bool("disconnect", c.Disconnect),
slog.Int("disconnect_interest", c.DisconnectInterest),
slog.Any("cstate", c.CState),
slog.Any("istate", c.IState),
)
}

// ToCallResult returns the corresponding CallResult
func (c *CommandResult) ToCallResult() *CallResult {
res := CallResult{Transmissions: c.Transmissions, Broadcasts: c.Broadcasts}
Expand All @@ -207,6 +251,10 @@ type HistoryPosition struct {
Offset uint64 `json:"offset"`
}

func (hp *HistoryPosition) LogValue() slog.Value {
return slog.GroupValue(slog.String("epoch", hp.Epoch), slog.Uint64("offset", hp.Offset))
}

// HistoryRequest represents a client's streams state (offsets) or a timestamp since
// which we should return the messages for the current streams
type HistoryRequest struct {
Expand All @@ -216,6 +264,10 @@ type HistoryRequest struct {
Streams map[string]HistoryPosition `json:"streams,omitempty"`
}

func (hr *HistoryRequest) LogValue() slog.Value {
return slog.GroupValue(slog.Int64("since", hr.Since), slog.Any("streams", hr.Streams))
}

// Message represents incoming client message
type Message struct {
Command string `json:"command"`
Expand All @@ -224,12 +276,25 @@ type Message struct {
History HistoryRequest `json:"history,omitempty"`
}

func (m *Message) LogValue() slog.Value {
return slog.GroupValue(
slog.String("command", m.Command),
slog.String("identifier", m.Identifier),
slog.Any("data", logger.CompactAny(m.Data)),
slog.Any("history", m.History),
)
}

// StreamMessageMetadata describes additional information about a stream message
// which can be used to modify delivery behavior
type StreamMessageMetadata struct {
ExcludeSocket string `json:"exclude_socket,omitempty"`
}

func (smm *StreamMessageMetadata) LogValue() slog.Value {
return slog.GroupValue(slog.String("exclude_socket", smm.ExcludeSocket))
}

// StreamMessage represents a pub/sub message to be sent to stream
type StreamMessage struct {
Stream string `json:"stream"`
Expand All @@ -242,6 +307,23 @@ type StreamMessage struct {
Epoch string
}

func (sm *StreamMessage) LogValue() slog.Value {
attrs := []slog.Attr{
slog.String("stream", sm.Stream),
slog.Any("data", logger.CompactValue(sm.Data)),
}

if sm.Epoch != "" {
attrs = append(attrs, slog.Uint64("offset", sm.Offset), slog.String("epoch", sm.Epoch))
}

if sm.Meta != nil {
attrs = append(attrs, slog.Any("meta", sm.Meta))
}

return slog.GroupValue(attrs...)
}

func (sm *StreamMessage) ToReplyFor(identifier string) *Reply {
data := sm.Data

Expand Down Expand Up @@ -276,6 +358,10 @@ type RemoteCommandMessage struct {
Payload json.RawMessage `json:"payload,omitempty"`
}

func (m *RemoteCommandMessage) LogValue() slog.Value {
return slog.GroupValue(slog.String("command", m.Command), slog.Any("payload", m.Payload))
}

func (m *RemoteCommandMessage) ToRemoteDisconnectMessage() (*RemoteDisconnectMessage, error) {
dmsg := RemoteDisconnectMessage{}

Expand All @@ -292,12 +378,20 @@ type RemoteDisconnectMessage struct {
Reconnect bool `json:"reconnect"`
}

func (m *RemoteDisconnectMessage) LogValue() slog.Value {
return slog.GroupValue(slog.String("ids", m.Identifier), slog.Bool("reconnect", m.Reconnect))
}

// PingMessage represents a server ping
type PingMessage struct {
Type string `json:"type"`
Message interface{} `json:"message,omitempty"`
}

func (p *PingMessage) LogValue() slog.Value {
return slog.GroupValue(slog.String("type", p.Type), slog.Any("message", p.Message))
}

func (p *PingMessage) GetType() string {
return PingType
}
Expand All @@ -309,6 +403,10 @@ type DisconnectMessage struct {
Reconnect bool `json:"reconnect"`
}

func (d *DisconnectMessage) LogValue() slog.Value {
return slog.GroupValue(slog.String("type", d.Type), slog.String("reason", d.Reason), slog.Bool("reconnect", d.Reconnect))
}

func (d *DisconnectMessage) GetType() string {
return DisconnectType
}
Expand All @@ -332,6 +430,36 @@ type Reply struct {
RestoredIDs []string `json:"restored_ids,omitempty"`
}

func (r *Reply) LogValue() slog.Value {
attrs := []slog.Attr{}

if r.Type != "" {
attrs = append(attrs, slog.String("type", r.Type))
}

if r.Identifier != "" {
attrs = append(attrs, slog.String("identifier", r.Identifier))
}

if r.Message != nil {
attrs = append(attrs, slog.Any("message", logger.CompactAny(r.Message)))
}

if r.Reason != "" {
attrs = append(attrs, slog.String("reason", r.Reason), slog.Bool("reconnect", r.Reconnect))
}

if r.StreamID != "" {
attrs = append(attrs, slog.String("stream_id", r.StreamID), slog.String("epoch", r.Epoch), slog.Uint64("offset", r.Offset))
}

if r.Sid != "" {
attrs = append(attrs, slog.String("sid", r.Sid), slog.Bool("restored", r.Restored), slog.Any("restored_ids", r.RestoredIDs))
}

return slog.GroupValue(attrs...)
}

func (r *Reply) GetType() string {
return r.Type
}
Expand Down Expand Up @@ -365,24 +493,15 @@ func PubSubMessageFromJSON(raw []byte) (interface{}, error) {

// WelcomeMessage for a session ID
func WelcomeMessage(sid string) string {
return string(toJSON(Reply{Sid: sid, Type: WelcomeType}))
return string(utils.ToJSON(Reply{Sid: sid, Type: WelcomeType}))
}

// ConfirmationMessage returns a subscription confirmation message for a specified identifier
func ConfirmationMessage(identifier string) string {
return string(toJSON(Reply{Identifier: identifier, Type: ConfirmedType}))
return string(utils.ToJSON(Reply{Identifier: identifier, Type: ConfirmedType}))
}

// RejectionMessage returns a subscription rejection message for a specified identifier
func RejectionMessage(identifier string) string {
return string(toJSON(Reply{Identifier: identifier, Type: RejectedType}))
}

func toJSON(msg Reply) []byte {
b, err := json.Marshal(&msg)
if err != nil {
panic("Failed to build JSON 😲")
}

return b
return string(utils.ToJSON(Reply{Identifier: identifier, Type: RejectedType}))
}
2 changes: 1 addition & 1 deletion hub/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (g *Gate) Broadcast(streamMsg *common.StreamMessage) {

ctx := g.log.With("stream", stream)

ctx.Debug("broadcast message", "stream", streamMsg, "data", streamMsg.Data, "offset", streamMsg.Offset, "epoch", streamMsg.Epoch, "meta", streamMsg.Meta)
ctx.Debug("schedule broadcast", "message", streamMsg)

g.mu.RLock()
if _, ok := g.streams[stream]; !ok {
Expand Down
12 changes: 6 additions & 6 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func NewHub(poolSize int, l *slog.Logger) *Hub {
register: make(chan HubRegistration, 2048),
sessions: make(map[string]*HubSessionInfo),
identifiers: make(map[string]map[string]bool),
gates: buildGates(ctx, poolSize, l.With("context", "hub")),
gates: buildGates(ctx, poolSize, l),
gatesNum: poolSize,
pool: utils.NewGoPool("remote commands", 256),
doneFn: doneFn,
shutdown: make(chan struct{}),
log: l.With("context", "hub"),
log: l.With("component", "hub"),
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ func (h *Hub) UnsubscribeSessionFromChannel(session HubSession, targetIdentifier
}
}

h.log.With("sid", sid).Debug("unsubscribed", "channel", targetIdentifier)
h.log.With("sid", sid).Debug("unsubscribed", "identifier", targetIdentifier)
}

func (h *Hub) SubscribeSession(session HubSession, stream string, identifier string) {
Expand All @@ -284,7 +284,7 @@ func (h *Hub) SubscribeSession(session HubSession, stream string, identifier str

h.sessions[sid].AddStream(stream, identifier)

h.log.With("sid", sid).Debug("subscribed", "channel", identifier, "stream", stream)
h.log.With("sid", sid).Debug("subscribed", "identifier", identifier, "stream", stream)
}

func (h *Hub) UnsubscribeSession(session HubSession, stream string, identifier string) {
Expand All @@ -299,7 +299,7 @@ func (h *Hub) UnsubscribeSession(session HubSession, stream string, identifier s
info.RemoveStream(stream, identifier)
}

h.log.With("sid", sid).Debug("unsubscribed", "channel", identifier, "stream", stream)
h.log.With("sid", sid).Debug("unsubscribed", "identifier", identifier, "stream", stream)
}

func (h *Hub) broadcastToStream(streamMsg *common.StreamMessage) {
Expand Down Expand Up @@ -365,7 +365,7 @@ func (h *Hub) Sessions() []HubSession {
func buildGates(ctx context.Context, num int, l *slog.Logger) []*Gate {
gates := make([]*Gate, 0, num)
for i := 0; i < num; i++ {
gates = append(gates, NewGate(ctx, l))
gates = append(gates, NewGate(ctx, l.With("component", "hub", "gate", i)))
}

return gates
Expand Down
Loading

0 comments on commit 4a3ee3a

Please sign in to comment.