From 4a3ee3ad4950db5e13be7d9448e54cc4fed03674 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Thu, 29 Feb 2024 15:13:56 -0800 Subject: [PATCH] fix(logs): truncate long values, handle bytes, improve attrs --- broadcast/legacy_nats.go | 2 +- broadcast/legacy_redis.go | 2 +- broadcast/redis.go | 3 +- cli/cli.go | 4 -- cli/options.go | 1 - common/common.go | 143 ++++++++++++++++++++++++++++++++++---- hub/gate.go | 2 +- hub/hub.go | 12 ++-- logger/values.go | 65 +++++++++++++++++ logger/values_test.go | 90 ++++++++++++++++++++++++ node/node.go | 41 ++++++----- node/session.go | 7 +- pubsub/nats.go | 9 ++- pubsub/redis.go | 9 ++- rpc/rpc.go | 7 -- 15 files changed, 334 insertions(+), 63 deletions(-) create mode 100644 logger/values.go create mode 100644 logger/values_test.go diff --git a/broadcast/legacy_nats.go b/broadcast/legacy_nats.go index 8f8c867e..c651659a 100644 --- a/broadcast/legacy_nats.go +++ b/broadcast/legacy_nats.go @@ -56,7 +56,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error { } _, err = nc.Subscribe(s.config.Channel, func(m *nats.Msg) { - s.log.Debug("incoming pubsub message", "data", m.Data) + s.log.Debug("received pubsub message") s.handler.HandlePubSub(m.Data) }) diff --git a/broadcast/legacy_redis.go b/broadcast/legacy_redis.go index aae22d54..97e3e689 100644 --- a/broadcast/legacy_redis.go +++ b/broadcast/legacy_redis.go @@ -215,7 +215,7 @@ func (s *LegacyRedisBroadcaster) listen() error { for { switch v := psc.Receive().(type) { case redis.Message: - s.log.Debug("incoming pubsub message", "data", v.Data) + s.log.Debug("received pubsub message") s.node.HandlePubSub(v.Data) case redis.Subscription: s.log.Info("subscribed to Redis channel", "channel", v.Channel) diff --git a/broadcast/redis.go b/broadcast/redis.go index 45129593..49c40006 100644 --- a/broadcast/redis.go +++ b/broadcast/redis.go @@ -252,8 +252,7 @@ func (s *RedisBroadcaster) autoclaimMessages(blockTime int64) ([]rueidis.XRangeE func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) { for _, message := range messages { if payload, pok := message.FieldValues["payload"]; pok { - s.log.Debug("incoming broadcast", "data", payload) - + s.log.Debug("received broadcast") s.node.HandleBroadcast([]byte(payload)) ackRes := s.client.DoMulti(context.Background(), diff --git a/cli/cli.go b/cli/cli.go index b8bfdbbb..4f5261d1 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -318,10 +318,6 @@ func (r *Runner) runNode() (*node.Node, error) { } for _, broadcaster := range broadcasters { - if broadcaster.IsFanout() && subscriber.IsMultiNode() { - r.log.Warn("Using pub/sub with a distributed broadcaster has no effect") - } - if !broadcaster.IsFanout() && !subscriber.IsMultiNode() { r.log.Warn("Using a non-distributed broadcaster without a pub/sub enabled; each broadcasted message is only processed by a single node") } diff --git a/cli/options.go b/cli/options.go index 5f6ff811..153d9ba0 100644 --- a/cli/options.go +++ b/cli/options.go @@ -131,7 +131,6 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error, if c.Debug { c.LogLevel = "debug" - c.LogFormat = "text" } if c.Metrics.Port == 0 { diff --git a/common/common.go b/common/common.go index 8b9fcdde..fe06214b 100644 --- a/common/common.go +++ b/common/common.go @@ -3,6 +3,10 @@ package common import ( "encoding/json" + "log/slog" + + "github.com/anycable/anycable-go/logger" + "github.com/anycable/anycable-go/utils" ) // Command result status @@ -12,6 +16,19 @@ const ( ERROR ) +func StatusName(status int) string { + switch status { + case SUCCESS: + return "success" + case FAILURE: + return "failure" + case ERROR: + return "error" + default: + return "unknown" + } +} + const ( ActionCableV1JSON = "actioncable-v1-json" ActionCableV1ExtJSON = "actioncable-v1-ext-json" @@ -161,6 +178,18 @@ type ConnectResult struct { Status int } +func (c *ConnectResult) LogValue() slog.Value { + return slog.GroupValue( + slog.String("status", StatusName(c.Status)), + slog.Any("transmissions", logger.CompactValues(c.Transmissions)), + slog.Any("broadcasts", c.Broadcasts), + slog.String("identifier", c.Identifier), + slog.Int("disconnect_interest", c.DisconnectInterest), + slog.Any("cstate", c.CState), + slog.Any("istate", c.IState), + ) +} + // ToCallResult returns the corresponding CallResult func (c *ConnectResult) ToCallResult() *CallResult { res := CallResult{Transmissions: c.Transmissions, Broadcasts: c.Broadcasts} @@ -190,6 +219,21 @@ type CommandResult struct { Status int } +func (c *CommandResult) LogValue() slog.Value { + return slog.GroupValue( + slog.String("status", StatusName(c.Status)), + slog.Any("streams", logger.CompactValues(c.Streams)), + slog.Any("transmissions", logger.CompactValues(c.Transmissions)), + slog.Any("stopped_streams", logger.CompactValues(c.StoppedStreams)), + slog.Bool("stop_all_streams", c.StopAllStreams), + slog.Any("broadcasts", c.Broadcasts), + slog.Bool("disconnect", c.Disconnect), + slog.Int("disconnect_interest", c.DisconnectInterest), + slog.Any("cstate", c.CState), + slog.Any("istate", c.IState), + ) +} + // ToCallResult returns the corresponding CallResult func (c *CommandResult) ToCallResult() *CallResult { res := CallResult{Transmissions: c.Transmissions, Broadcasts: c.Broadcasts} @@ -207,6 +251,10 @@ type HistoryPosition struct { Offset uint64 `json:"offset"` } +func (hp *HistoryPosition) LogValue() slog.Value { + return slog.GroupValue(slog.String("epoch", hp.Epoch), slog.Uint64("offset", hp.Offset)) +} + // HistoryRequest represents a client's streams state (offsets) or a timestamp since // which we should return the messages for the current streams type HistoryRequest struct { @@ -216,6 +264,10 @@ type HistoryRequest struct { Streams map[string]HistoryPosition `json:"streams,omitempty"` } +func (hr *HistoryRequest) LogValue() slog.Value { + return slog.GroupValue(slog.Int64("since", hr.Since), slog.Any("streams", hr.Streams)) +} + // Message represents incoming client message type Message struct { Command string `json:"command"` @@ -224,12 +276,25 @@ type Message struct { History HistoryRequest `json:"history,omitempty"` } +func (m *Message) LogValue() slog.Value { + return slog.GroupValue( + slog.String("command", m.Command), + slog.String("identifier", m.Identifier), + slog.Any("data", logger.CompactAny(m.Data)), + slog.Any("history", m.History), + ) +} + // StreamMessageMetadata describes additional information about a stream message // which can be used to modify delivery behavior type StreamMessageMetadata struct { ExcludeSocket string `json:"exclude_socket,omitempty"` } +func (smm *StreamMessageMetadata) LogValue() slog.Value { + return slog.GroupValue(slog.String("exclude_socket", smm.ExcludeSocket)) +} + // StreamMessage represents a pub/sub message to be sent to stream type StreamMessage struct { Stream string `json:"stream"` @@ -242,6 +307,23 @@ type StreamMessage struct { Epoch string } +func (sm *StreamMessage) LogValue() slog.Value { + attrs := []slog.Attr{ + slog.String("stream", sm.Stream), + slog.Any("data", logger.CompactValue(sm.Data)), + } + + if sm.Epoch != "" { + attrs = append(attrs, slog.Uint64("offset", sm.Offset), slog.String("epoch", sm.Epoch)) + } + + if sm.Meta != nil { + attrs = append(attrs, slog.Any("meta", sm.Meta)) + } + + return slog.GroupValue(attrs...) +} + func (sm *StreamMessage) ToReplyFor(identifier string) *Reply { data := sm.Data @@ -276,6 +358,10 @@ type RemoteCommandMessage struct { Payload json.RawMessage `json:"payload,omitempty"` } +func (m *RemoteCommandMessage) LogValue() slog.Value { + return slog.GroupValue(slog.String("command", m.Command), slog.Any("payload", m.Payload)) +} + func (m *RemoteCommandMessage) ToRemoteDisconnectMessage() (*RemoteDisconnectMessage, error) { dmsg := RemoteDisconnectMessage{} @@ -292,12 +378,20 @@ type RemoteDisconnectMessage struct { Reconnect bool `json:"reconnect"` } +func (m *RemoteDisconnectMessage) LogValue() slog.Value { + return slog.GroupValue(slog.String("ids", m.Identifier), slog.Bool("reconnect", m.Reconnect)) +} + // PingMessage represents a server ping type PingMessage struct { Type string `json:"type"` Message interface{} `json:"message,omitempty"` } +func (p *PingMessage) LogValue() slog.Value { + return slog.GroupValue(slog.String("type", p.Type), slog.Any("message", p.Message)) +} + func (p *PingMessage) GetType() string { return PingType } @@ -309,6 +403,10 @@ type DisconnectMessage struct { Reconnect bool `json:"reconnect"` } +func (d *DisconnectMessage) LogValue() slog.Value { + return slog.GroupValue(slog.String("type", d.Type), slog.String("reason", d.Reason), slog.Bool("reconnect", d.Reconnect)) +} + func (d *DisconnectMessage) GetType() string { return DisconnectType } @@ -332,6 +430,36 @@ type Reply struct { RestoredIDs []string `json:"restored_ids,omitempty"` } +func (r *Reply) LogValue() slog.Value { + attrs := []slog.Attr{} + + if r.Type != "" { + attrs = append(attrs, slog.String("type", r.Type)) + } + + if r.Identifier != "" { + attrs = append(attrs, slog.String("identifier", r.Identifier)) + } + + if r.Message != nil { + attrs = append(attrs, slog.Any("message", logger.CompactAny(r.Message))) + } + + if r.Reason != "" { + attrs = append(attrs, slog.String("reason", r.Reason), slog.Bool("reconnect", r.Reconnect)) + } + + if r.StreamID != "" { + attrs = append(attrs, slog.String("stream_id", r.StreamID), slog.String("epoch", r.Epoch), slog.Uint64("offset", r.Offset)) + } + + if r.Sid != "" { + attrs = append(attrs, slog.String("sid", r.Sid), slog.Bool("restored", r.Restored), slog.Any("restored_ids", r.RestoredIDs)) + } + + return slog.GroupValue(attrs...) +} + func (r *Reply) GetType() string { return r.Type } @@ -365,24 +493,15 @@ func PubSubMessageFromJSON(raw []byte) (interface{}, error) { // WelcomeMessage for a session ID func WelcomeMessage(sid string) string { - return string(toJSON(Reply{Sid: sid, Type: WelcomeType})) + return string(utils.ToJSON(Reply{Sid: sid, Type: WelcomeType})) } // ConfirmationMessage returns a subscription confirmation message for a specified identifier func ConfirmationMessage(identifier string) string { - return string(toJSON(Reply{Identifier: identifier, Type: ConfirmedType})) + return string(utils.ToJSON(Reply{Identifier: identifier, Type: ConfirmedType})) } // RejectionMessage returns a subscription rejection message for a specified identifier func RejectionMessage(identifier string) string { - return string(toJSON(Reply{Identifier: identifier, Type: RejectedType})) -} - -func toJSON(msg Reply) []byte { - b, err := json.Marshal(&msg) - if err != nil { - panic("Failed to build JSON 😲") - } - - return b + return string(utils.ToJSON(Reply{Identifier: identifier, Type: RejectedType})) } diff --git a/hub/gate.go b/hub/gate.go index 6ac35036..673d0624 100644 --- a/hub/gate.go +++ b/hub/gate.go @@ -49,7 +49,7 @@ func (g *Gate) Broadcast(streamMsg *common.StreamMessage) { ctx := g.log.With("stream", stream) - ctx.Debug("broadcast message", "stream", streamMsg, "data", streamMsg.Data, "offset", streamMsg.Offset, "epoch", streamMsg.Epoch, "meta", streamMsg.Meta) + ctx.Debug("schedule broadcast", "message", streamMsg) g.mu.RLock() if _, ok := g.streams[stream]; !ok { diff --git a/hub/hub.go b/hub/hub.go index a76cd354..4d5d8047 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -100,12 +100,12 @@ func NewHub(poolSize int, l *slog.Logger) *Hub { register: make(chan HubRegistration, 2048), sessions: make(map[string]*HubSessionInfo), identifiers: make(map[string]map[string]bool), - gates: buildGates(ctx, poolSize, l.With("context", "hub")), + gates: buildGates(ctx, poolSize, l), gatesNum: poolSize, pool: utils.NewGoPool("remote commands", 256), doneFn: doneFn, shutdown: make(chan struct{}), - log: l.With("context", "hub"), + log: l.With("component", "hub"), } } @@ -267,7 +267,7 @@ func (h *Hub) UnsubscribeSessionFromChannel(session HubSession, targetIdentifier } } - h.log.With("sid", sid).Debug("unsubscribed", "channel", targetIdentifier) + h.log.With("sid", sid).Debug("unsubscribed", "identifier", targetIdentifier) } func (h *Hub) SubscribeSession(session HubSession, stream string, identifier string) { @@ -284,7 +284,7 @@ func (h *Hub) SubscribeSession(session HubSession, stream string, identifier str h.sessions[sid].AddStream(stream, identifier) - h.log.With("sid", sid).Debug("subscribed", "channel", identifier, "stream", stream) + h.log.With("sid", sid).Debug("subscribed", "identifier", identifier, "stream", stream) } func (h *Hub) UnsubscribeSession(session HubSession, stream string, identifier string) { @@ -299,7 +299,7 @@ func (h *Hub) UnsubscribeSession(session HubSession, stream string, identifier s info.RemoveStream(stream, identifier) } - h.log.With("sid", sid).Debug("unsubscribed", "channel", identifier, "stream", stream) + h.log.With("sid", sid).Debug("unsubscribed", "identifier", identifier, "stream", stream) } func (h *Hub) broadcastToStream(streamMsg *common.StreamMessage) { @@ -365,7 +365,7 @@ func (h *Hub) Sessions() []HubSession { func buildGates(ctx context.Context, num int, l *slog.Logger) []*Gate { gates := make([]*Gate, 0, num) for i := 0; i < num; i++ { - gates = append(gates, NewGate(ctx, l)) + gates = append(gates, NewGate(ctx, l.With("component", "hub", "gate", i))) } return gates diff --git a/logger/values.go b/logger/values.go new file mode 100644 index 00000000..1690ebbe --- /dev/null +++ b/logger/values.go @@ -0,0 +1,65 @@ +package logger + +import ( + "fmt" + "log/slog" +) + +const ( + maxValueLength = 100 +) + +type compactValue[T string | []byte] struct { + val T +} + +func (c *compactValue[T]) LogValue() slog.Value { + return slog.StringValue(c.String()) +} + +func (c *compactValue[T]) String() string { + val := string(c.val) + + if len(val) > maxValueLength { + return fmt.Sprintf("%s...(%d)", val[:maxValueLength], len(val)-maxValueLength) + } + + return val +} + +// CompactValue wraps any scalar value to show it in log truncated +func CompactValue[T string | []byte](v T) *compactValue[T] { + return &compactValue[T]{val: v} +} + +func CompactValues[T string | []byte, S []T](v S) []*compactValue[T] { + res := make([]*compactValue[T], len(v)) + for i, val := range v { + res[i] = CompactValue(val) + } + + return res +} + +type compactAny struct { + val interface{} +} + +func (c *compactAny) String() string { + val := fmt.Sprintf("%+v", c.val) + + if len(val) > maxValueLength { + return fmt.Sprintf("%s...(%d)", val[:maxValueLength], len(val)-maxValueLength) + } + + return val +} + +func (c *compactAny) LogValue() slog.Value { + return slog.StringValue(c.String()) +} + +// CompactAny wraps any value to show it in log truncated +func CompactAny(val interface{}) *compactAny { + return &compactAny{val} +} diff --git a/logger/values_test.go b/logger/values_test.go new file mode 100644 index 00000000..6b9b9228 --- /dev/null +++ b/logger/values_test.go @@ -0,0 +1,90 @@ +package logger + +import ( + "bytes" + "log/slog" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompactValue_string(t *testing.T) { + shortvalue := "log-no-long" + assert.Equal(t, shortvalue, CompactValue(shortvalue).LogValue().String()) + + longvalue := strings.Repeat("log-long", 50) + + truncated := CompactValue(longvalue).LogValue().String() + assert.Equal(t, "log-longlog-l", truncated[:13]) + assert.Len(t, truncated, maxValueLength+8) +} + +func TestCompactValue_bytes(t *testing.T) { + shortvalue := []byte("log-no-long") + assert.Equal(t, "log-no-long", CompactValue(shortvalue).LogValue().String()) + + longvalue := []byte(strings.Repeat("log-long", 50)) + + truncated := CompactValue(longvalue).LogValue().String() + assert.Equal(t, "log-longlog-l", truncated[:13]) + assert.Len(t, truncated, maxValueLength+8) +} + +func TestCompactValues_string(t *testing.T) { + values := []string{ + "log-no-long", + strings.Repeat("log-long", 50), + } + + compacts := CompactValues(values) + + assert.Equal(t, "log-no-long", compacts[0].LogValue().String()) + + truncated := compacts[1].LogValue().String() + assert.Equal(t, "log-longlog-l", truncated[:13]) + assert.Len(t, truncated, maxValueLength+8) +} + +func TestCompactValues_bytes(t *testing.T) { + values := [][]byte{ + []byte("log-no-long"), + []byte(strings.Repeat("log-long", 50)), + } + + any := slog.Any("t", CompactValues(values)).String() + assert.Contains(t, any, "log-no-long") + + compacts := CompactValues(values) + + assert.Equal(t, "log-no-long", compacts[0].LogValue().String()) + + truncated := compacts[1].LogValue().String() + assert.Equal(t, "log-longlog-l", truncated[:13]) + assert.Len(t, truncated, maxValueLength+8) +} + +func TestCompactAny(t *testing.T) { + value := struct{ val string }{strings.Repeat("log-long", 50)} + compact := CompactAny(value) + + logValue := compact.LogValue() + str := logValue.String() + + assert.Equal(t, "{val:log-long", str[:13]) + assert.Len(t, str, maxValueLength+8) +} + +func TestWithJSONHandler(t *testing.T) { + buf := bytes.Buffer{} + logger := slog.New(slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + str := "short" + longstr := strings.Repeat("long", 500) + + logger.Debug("test", "b", CompactValue([]byte(str)), "l", CompactValues([]string{str, longstr})) + logged := buf.String() + + assert.Contains(t, logged, str) + assert.Less(t, len(logged), 300) +} diff --git a/node/node.go b/node/node.go index 823d1f5b..e1753037 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "github.com/anycable/anycable-go/broker" "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/hub" + "github.com/anycable/anycable-go/logger" "github.com/anycable/anycable-go/metrics" "github.com/anycable/anycable-go/utils" "github.com/anycable/anycable-go/ws" @@ -121,11 +122,6 @@ func NewNode(config *Config, opts ...NodeOption) *Node { n.log = slog.With("context", "node") } - // Ensure nodeid in logs - if n.id != "" { - n.log = n.log.With("nodeid", n.id) - } - n.hub = hub.NewHub(config.HubGopoolSize, n.log) if n.metrics != nil { @@ -190,18 +186,21 @@ func (n *Node) HandleBroadcast(raw []byte) { if err != nil { n.metrics.CounterIncrement(metricsUnknownBroadcast) - n.log.Warn("failed to parse pubsub message", "data", raw, "error", err) + n.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(raw), "error", err) return } switch v := msg.(type) { case common.StreamMessage: + n.log.Debug("handle broadcast message", "payload", &v) n.broker.HandleBroadcast(&v) case []*common.StreamMessage: + n.log.Debug("handle batch-broadcast message", "payload", &v) for _, el := range v { n.broker.HandleBroadcast(el) } case common.RemoteCommandMessage: + n.log.Debug("handle remote command", "command", &v) n.broker.HandleCommand(&v) } } @@ -212,7 +211,7 @@ func (n *Node) HandlePubSub(raw []byte) { if err != nil { n.metrics.CounterIncrement(metricsUnknownBroadcast) - n.log.Warn("failed to parse pubsub message", "data", raw, "error", err) + n.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(raw), "error", err) return } @@ -323,6 +322,8 @@ func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectR res, err := n.controller.Authenticate(s.GetID(), s.env) + s.Log.Debug("controller authenticate", "response", res, "err", err) + if err != nil { s.Disconnect("Auth Error", ws.CloseInternalServerErr) return nil, errorx.Decorate(err, "failed to authenticate") @@ -428,6 +429,8 @@ func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult res, err := n.controller.Subscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier) + s.Log.Debug("controller subscribe", "response", res, "err", err) + var confirmed bool if err != nil { // nolint: gocritic @@ -437,9 +440,9 @@ func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult } else if res.Status == common.SUCCESS { confirmed = true s.subscriptions.AddChannel(msg.Identifier) - s.Log.Debug("subscribed", "channel", msg.Identifier) + s.Log.Debug("subscribed", "identifier", msg.Identifier) } else { - s.Log.Debug("subscription rejected", "channel", msg.Identifier) + s.Log.Debug("subscription rejected", "identifier", msg.Identifier) } s.smu.Unlock() @@ -479,6 +482,8 @@ func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResu res, err := n.controller.Unsubscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier) + s.Log.Debug("controller unsubscribe", "response", res, "err", err) + if err != nil { if res == nil || res.Status == common.ERROR { return nil, errorx.Decorate(err, "failed to unsubscribe from %s", msg.Identifier) @@ -489,7 +494,7 @@ func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResu s.subscriptions.RemoveChannel(msg.Identifier) - s.Log.Debug("unsubscribed", "channel", msg.Identifier) + s.Log.Debug("unsubscribed", "identifier", msg.Identifier) } s.smu.Unlock() @@ -526,12 +531,12 @@ func (n *Node) Perform(s *Session, msg *common.Message) (*common.CommandResult, res, err := n.controller.Perform(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier, data) + s.Log.Debug("controller perform", "response", res, "err", err) + if err != nil { if res == nil || res.Status == common.ERROR { return nil, errorx.Decorate(err, "perform failed for %s", msg.Identifier) } - } else { - s.Log.Debug("perform result", "data", res) } if res != nil { @@ -626,7 +631,7 @@ func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string) // Broadcast message to stream (locally) func (n *Node) Broadcast(msg *common.StreamMessage) { n.metrics.CounterIncrement(metricsBroadcastMsg) - n.log.Debug("incoming broadcast message", "data", msg) + n.log.Debug("incoming broadcast message", "payload", msg) n.hub.BroadcastMessage(msg) } @@ -634,16 +639,16 @@ func (n *Node) Broadcast(msg *common.StreamMessage) { func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage) { // TODO: Add remote commands metrics // n.metrics.CounterIncrement(metricsRemoteCommandsMsg) - n.log.Debug("incoming remote command", "data", msg) - switch msg.Command { // nolint:gocritic case "disconnect": dmsg, err := msg.ToRemoteDisconnectMessage() if err != nil { - n.log.Warn("failed to parse remote disconnect command", "error", err) + n.log.Warn("failed to parse remote disconnect command", "data", msg, "error", err) return } + n.log.Debug("incoming remote command", "command", dmsg) + n.RemoteDisconnect(dmsg) } } @@ -685,9 +690,11 @@ func (n *Node) DisconnectNow(s *Session) error { ) if err != nil { - s.Log.Error("disconnect failed", "error", err) + s.Log.Error("controller disconnect failed", "error", err) } + s.Log.Debug("controller disconnect succeeded") + return err } diff --git a/node/session.go b/node/session.go index 97df3c4b..889f6ebe 100644 --- a/node/session.go +++ b/node/session.go @@ -11,6 +11,7 @@ import ( "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/encoders" + "github.com/anycable/anycable-go/logger" "github.com/anycable/anycable-go/metrics" "github.com/anycable/anycable-go/ws" ) @@ -365,7 +366,7 @@ func (s *Session) ReadMessage(message []byte) error { if err := s.executor.HandleCommand(s, command); err != nil { s.metrics.CounterIncrement(metricsFailedCommandReceived) - s.Log.Warn("failed to handle incoming message", "data", message, "error", err) + s.Log.Warn("failed to handle incoming message", "data", logger.CompactValue(message), "error", err) } return nil @@ -390,7 +391,7 @@ func (s *Session) SendJSONTransmission(msg string) { s.sendFrame(b) } } else { - s.Log.Warn("failed to encode transmission", "data", msg, "error", err) + s.Log.Warn("failed to encode transmission", "data", logger.CompactValue(msg), "error", err) } } @@ -677,7 +678,7 @@ func (s *Session) handlePong(msg *common.Message) { defer s.mu.Unlock() if s.pongTimer == nil { - s.Log.Debug("unexpected PONG received") + s.Log.Debug("unexpected pong received") return } diff --git a/pubsub/nats.go b/pubsub/nats.go index f7ce624e..9af3e331 100644 --- a/pubsub/nats.go +++ b/pubsub/nats.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/anycable/anycable-go/common" + "github.com/anycable/anycable-go/logger" nconfig "github.com/anycable/anycable-go/nats" "github.com/anycable/anycable-go/utils" @@ -131,19 +132,21 @@ func (s *NATSSubscriber) Publish(stream string, msg interface{}) { } func (s *NATSSubscriber) handleMessage(m *nats.Msg) { - s.log.With("channel", m.Subject).Debug("received message", "data", m.Data) - msg, err := common.PubSubMessageFromJSON(m.Data) if err != nil { - s.log.Warn("failed to parse pubsub message", "data", m.Data, "error", err) + s.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(m.Data), "error", err) return } switch v := msg.(type) { case common.StreamMessage: + s.log.With("channel", m.Subject).Debug("received broadcast message") s.node.Broadcast(&v) case common.RemoteCommandMessage: + s.log.With("channel", m.Subject).Debug("received remote command") s.node.ExecuteRemoteCommand(&v) + default: + s.log.With("channel", m.Subject).Warn("received unknown message", "data", logger.CompactValue(m.Data)) } } diff --git a/pubsub/redis.go b/pubsub/redis.go index b0305802..e6c2106f 100644 --- a/pubsub/redis.go +++ b/pubsub/redis.go @@ -9,6 +9,7 @@ import ( "time" "github.com/anycable/anycable-go/common" + "github.com/anycable/anycable-go/logger" rconfig "github.com/anycable/anycable-go/redis" "github.com/anycable/anycable-go/utils" "github.com/redis/rueidis" @@ -188,8 +189,6 @@ func (s *RedisSubscriber) runPubSub(done chan (error)) { s.subMu.Lock() defer s.subMu.Unlock() - s.log.Debug("subscription message", "data", m) - if m.Kind == "subscribe" && m.Channel == s.config.InternalChannel { if s.reconnectAttempt > 0 { s.log.Info("reconnected to Redis") @@ -210,19 +209,19 @@ func (s *RedisSubscriber) runPubSub(done chan (error)) { } }, OnMessage: func(m rueidis.PubSubMessage) { - s.log.With("channel", m.Channel).Debug("received message", "data", m.Message) - msg, err := common.PubSubMessageFromJSON([]byte(m.Message)) if err != nil { - s.log.Warn("failed to parse pubsub message", "data", m.Message, "error", err) + s.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(m.Message), "error", err) return } switch v := msg.(type) { case common.StreamMessage: + s.log.With("channel", m.Channel).Debug("received broadcast message") s.node.Broadcast(&v) case common.RemoteCommandMessage: + s.log.With("channel", m.Channel).Debug("received remote command") s.node.ExecuteRemoteCommand(&v) } }, diff --git a/rpc/rpc.go b/rpc/rpc.go index fa4e1505..7a9d4d13 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -292,9 +292,6 @@ func (c *Controller) Authenticate(sid string, env *common.SessionEnv) (*common.C } if r, ok := response.(*pb.ConnectionResponse); ok { - - c.log.With("sid", sid).Debug("authenticate response", "data", r) - reply, err := protocol.ParseConnectResponse(r) return reply, err @@ -390,8 +387,6 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, id string, s } if r, ok := response.(*pb.DisconnectResponse); ok { - c.log.With("sid", sid).Debug("Disconnect response", "data", r) - err = protocol.ParseDisconnectResponse(r) if err != nil { @@ -414,8 +409,6 @@ func (c *Controller) parseCommandResponse(sid string, response interface{}, err } if r, ok := response.(*pb.CommandResponse); ok { - c.log.With("sid", sid).Debug("command response", "data", r) - res, err := protocol.ParseCommandResponse(r) return res, err