Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to using "go.uber.org/zap" for structured logging. #820

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions api_signaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
"sort"
"strings"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/pion/sdp/v3"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -265,7 +265,12 @@ func NewErrorDetail(code string, message string, details interface{}) *Error {
if details != nil {
var err error
if rawDetails, err = json.Marshal(details); err != nil {
log.Printf("Could not marshal details %+v for error %s with %s: %s", details, code, message, err)
zap.L().Error("Could not marshal error details",
zap.String("code", code),
zap.String("message", message),
zap.Any("details", details),
zap.Error(err),
)
return NewError("internal_error", "Could not marshal error details")
}
}
Expand Down
12 changes: 8 additions & 4 deletions async_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
*/
package signaling

import "sync"
import (
"sync"

"go.uber.org/zap"
)

type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(message *AsyncMessage)
Expand Down Expand Up @@ -60,13 +64,13 @@ type AsyncEvents interface {
PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error
}

func NewAsyncEvents(url string) (AsyncEvents, error) {
client, err := NewNatsClient(url)
func NewAsyncEvents(log *zap.Logger, url string) (AsyncEvents, error) {
client, err := NewNatsClient(log, url)
if err != nil {
return nil, err
}

return NewAsyncEventsNats(client)
return NewAsyncEventsNats(log, client)
}

type asyncBackendRoomSubscriber struct {
Expand Down
60 changes: 40 additions & 20 deletions async_events_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
package signaling

import (
"log"
"sync"
"time"

"github.com/nats-io/nats.go"
"go.uber.org/zap"
)

func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
Expand Down Expand Up @@ -58,6 +58,7 @@ func GetSubjectForSessionId(sessionId string, backend *Backend) string {
}

type asyncSubscriberNats struct {
log *zap.Logger
key string
client NatsClient

Expand All @@ -68,14 +69,17 @@ type asyncSubscriberNats struct {
processMessage func(*nats.Msg)
}

func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats, error) {
func newAsyncSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncSubscriberNats, error) {
receiver := make(chan *nats.Msg, 64)
sub, err := client.Subscribe(key, receiver)
if err != nil {
return nil, err
}

result := &asyncSubscriberNats{
log: log.With(
zap.String("key", key),
),
key: key,
client: client,

Expand All @@ -89,7 +93,9 @@ func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats
func (s *asyncSubscriberNats) run() {
defer func() {
if err := s.subscription.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing %s: %s", s.key, err)
s.log.Error("Error unsubscribing",
zap.Error(err),
)
}
}()

Expand All @@ -115,8 +121,8 @@ type asyncBackendRoomSubscriberNats struct {
asyncBackendRoomSubscriber
}

func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncBackendRoomSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +138,10 @@ func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBac
func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode NATS message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -144,8 +153,8 @@ type asyncRoomSubscriberNats struct {
*asyncSubscriberNats
}

func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncRoomSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -161,7 +170,10 @@ func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscr
func (s *asyncRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -173,8 +185,8 @@ type asyncUserSubscriberNats struct {
asyncUserSubscriber
}

func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncUserSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -190,7 +202,10 @@ func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscr
func (s *asyncUserSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -202,8 +217,8 @@ type asyncSessionSubscriberNats struct {
asyncSessionSubscriber
}

func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncSessionSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -219,14 +234,18 @@ func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSession
func (s *asyncSessionSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

s.processAsyncSessionMessage(&message)
}

type asyncEventsNats struct {
log *zap.Logger
mu sync.Mutex
client NatsClient

Expand All @@ -236,8 +255,9 @@ type asyncEventsNats struct {
sessionSubscriptions map[string]*asyncSessionSubscriberNats
}

func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) {
func NewAsyncEventsNats(log *zap.Logger, client NatsClient) (AsyncEvents, error) {
events := &asyncEventsNats{
log: log,
client: client,

backendRoomSubscriptions: make(map[string]*asyncBackendRoomSubscriberNats),
Expand Down Expand Up @@ -298,7 +318,7 @@ func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Ba
sub, found := e.backendRoomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncBackendRoomSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncBackendRoomSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -332,7 +352,7 @@ func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend,
sub, found := e.roomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncRoomSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncRoomSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -366,7 +386,7 @@ func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend,
sub, found := e.userSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncUserSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncUserSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -400,7 +420,7 @@ func (e *asyncEventsNats) RegisterSessionListener(sessionId string, backend *Bac
sub, found := e.sessionSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncSessionSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncSessionSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down
6 changes: 4 additions & 2 deletions async_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ func getAsyncEventsForTest(t *testing.T) AsyncEvents {

func getRealAsyncEventsForTest(t *testing.T) AsyncEvents {
url := startLocalNatsServer(t)
events, err := NewAsyncEvents(url)
log := GetLoggerForTest(t)
events, err := NewAsyncEvents(log, url)
if err != nil {
require.NoError(t, err)
}
return events
}

func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
events, err := NewAsyncEvents(NatsLoopbackUrl)
log := GetLoggerForTest(t)
events, err := NewAsyncEvents(log, NatsLoopbackUrl)
if err != nil {
require.NoError(t, err)
}
Expand Down
Loading
Loading