diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 164a76f0..97f43832 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -91,7 +91,6 @@ func (c *Client) Stop() { c.sipCli.Close() c.sipCli = nil } - // FIXME: anything else? } func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) { diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 948da57a..499be86a 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/rtp" "github.com/livekit/sip/pkg/media/ulaw" + "github.com/livekit/sip/pkg/stats" ) func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { @@ -54,7 +55,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr h := req.GetHeader("Proxy-Authorization") if h == nil { - logger.Infow(fmt.Sprintf("Requesting inbound auth for %s", from), "from", from) + logger.Infow("Requesting inbound auth", "from", from) inviteState.challenge = digest.Challenge{ Realm: UserAgent, Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()), @@ -94,7 +95,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr } func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { - s.mon.InviteReqRaw(false) + s.mon.InviteReqRaw(stats.Inbound) _ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil)) tag, err := getTagValue(req) @@ -116,26 +117,30 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { } src := req.Source() - s.mon.InviteReq(false, from.Address.String(), to.Address.String()) - logger.Infow(fmt.Sprintf("INVITE from %q to %q", from.Address.String(), to.Address.String()), - "tag", tag, "from", from, "to", to) + cmon := s.mon.NewCall(stats.Inbound, from.Address.String(), to.Address.String()) + + cmon.InviteReq() + defer cmon.SessionDur()() + joinDur := cmon.JoinDur() + logger.Infow("INVITE received", "tag", tag, "from", from, "to", to) username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src) if err != nil { - s.mon.InviteError(false, from.Address.String(), to.Address.String(), "no-rule") - logger.Warnw(fmt.Sprintf("Rejecting inbound call to %q, doesn't match any Trunks", to.Address.String()), err, + cmon.InviteError("no-rule") + logger.Warnw("Rejecting inbound, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host) sipErrorResponse(tx, req) return } if !s.handleInviteAuth(req, tx, from.Address.User, username, password) { - s.mon.InviteError(false, from.Address.String(), to.Address.String(), "unauthorized") + cmon.InviteError("unauthorized") // handleInviteAuth will generate the SIP Response as needed return } - s.mon.InviteReq(false, from.Address.String(), to.Address.String()) + cmon.InviteAccept() - call := s.newInboundCall(tag, from, to, src) + call := s.newInboundCall(cmon, tag, from, to, src) + call.joinDur = joinDur call.handleInvite(call.ctx, req, tx, s.conf) } @@ -158,6 +163,7 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) { type inboundCall struct { s *Server + mon *stats.CallMonitor tag string ctx context.Context cancel func() @@ -170,12 +176,15 @@ type inboundCall struct { audioHandler atomic.Pointer[rtp.Handler] dtmf chan byte // buffered; DTMF digits as characters lkRoom *Room // LiveKit room; only active after correct pin is entered + callDur func() time.Duration + joinDur func() time.Duration done atomic.Bool } -func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall { +func (s *Server) newInboundCall(mon *stats.CallMonitor, tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall { c := &inboundCall{ s: s, + mon: mon, tag: tag, from: from, to: to, @@ -191,15 +200,16 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead } func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip.ServerTransaction, conf *config.Config) { - c.s.mon.CallStart(false, c.from.Address.String(), c.to.Address.String()) - defer c.s.mon.CallEnd(false, c.from.Address.String(), c.to.Address.String()) - defer c.close() + c.mon.CallStart() + defer c.mon.CallEnd() + defer c.close("other") // Send initial request. In the best case scenario, we will immediately get a room name to join. // Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required. roomName, identity, wsUrl, token, requirePin, rejectInvite := c.s.dispatchRuleHandler(ctx, c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false) if rejectInvite { logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src) sipErrorResponse(tx, req) + c.close("no-dispatch") return } @@ -207,6 +217,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip answerData, err := c.runMediaConn(req.Body(), conf) if err != nil { sipErrorResponse(tx, req) + c.close("media-failed") return } @@ -227,6 +238,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip } // Wait for the caller to terminate the call. <-ctx.Done() + c.close("hangup") } func (c *inboundCall) sendBye() { @@ -261,7 +273,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe return nil, err } conn := NewMediaConn() - conn.OnRTP(c) + conn.OnRTP(&rtpStatsHandler{mon: c.mon, h: c}) if dst := sdpGetAudioDest(offer); dst != nil { conn.SetDestAddr(dst) } @@ -272,7 +284,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe // Encoding pipeline (LK -> SIP) // Need to be created earlier to send the pin prompts. - s := rtp.NewMediaStreamOut[ulaw.Sample](conn, rtpPacketDur) + s := rtp.NewMediaStreamOut[ulaw.Sample](&rtpStatsWriter{mon: c.mon, w: conn}, rtpPacketDur) c.lkRoom.SetOutput(ulaw.Encode(s)) return sdpGenerateAnswer(offer, c.s.signalingIp, conn.LocalAddr().Port) @@ -305,7 +317,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context) { if reject || requirePin || roomName == "" { logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin) c.playAudio(ctx, c.s.res.wrongPin) - c.Close() + c.close("wrong-pin") return } c.joinRoom(ctx, roomName, identity, wsUrl, token) @@ -315,7 +327,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context) { pin += string(b) if len(pin) > pinLimit { c.playAudio(ctx, c.s.res.wrongPin) - c.Close() + c.close("wrong-pin") return } } @@ -323,16 +335,21 @@ func (c *inboundCall) pinPrompt(ctx context.Context) { } // close should only be called from handleInvite. -func (c *inboundCall) close() { +func (c *inboundCall) close(reason string) { if !c.done.CompareAndSwap(false, true) { return } - logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User) + c.mon.CallTerminate(reason) + logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "reason", reason) c.sendBye() c.closeMedia() + if c.callDur != nil { + c.callDur() + } c.s.cmu.Lock() delete(c.s.activeCalls, c.tag) c.s.cmu.Unlock() + c.cancel() } func (c *inboundCall) Close() error { @@ -354,13 +371,16 @@ func (c *inboundCall) closeMedia() { } func (c *inboundCall) HandleRTP(p *rtp.Packet) error { - if p.Marker && p.PayloadType == 101 { - c.handleDTMF(p.Payload) - return nil - } - // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? - if h := c.audioHandler.Load(); h != nil { - return (*h).HandleRTP(p) + switch p.PayloadType { + case 101: + if p.Marker { + c.handleDTMF(p.Payload) + } + default: + // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? + if h := c.audioHandler.Load(); h != nil { + return (*h).HandleRTP(p) + } } return nil } @@ -390,10 +410,15 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, roomName, pa } func (c *inboundCall) joinRoom(ctx context.Context, roomName, identity, wsUrl, token string) { + if c.joinDur != nil { + c.joinDur() + } + c.callDur = c.mon.CallDur() logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity) c.playAudio(ctx, c.s.res.roomJoin) if err := c.createLiveKitParticipant(ctx, roomName, identity, wsUrl, token); err != nil { logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag) + c.close("participant-failed") } } diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 0de72437..2fb2bfe3 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,7 +15,13 @@ package sip import ( + "strconv" "time" + + "github.com/pion/rtp" + + srtp "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/stats" ) const ( @@ -25,3 +31,47 @@ const ( sampleDurPart = int(time.Second / sampleDur) rtpPacketDur = uint32(sampleRate / sampleDurPart) ) + +type rtpStatsHandler struct { + h srtp.Handler + mon *stats.CallMonitor +} + +func (h *rtpStatsHandler) HandleRTP(p *rtp.Packet) error { + if h.mon != nil { + if typ, ok := rtpPacketType(p); ok { + h.mon.RTPPacketRecv(typ) + } + } + return h.h.HandleRTP(p) +} + +type rtpStatsWriter struct { + w srtp.Writer + mon *stats.CallMonitor +} + +func (h *rtpStatsWriter) WriteRTP(p *rtp.Packet) error { + if h.mon != nil { + if typ, ok := rtpPacketType(p); ok { + h.mon.RTPPacketSend(typ) + } + } + return h.w.WriteRTP(p) +} + +func rtpPacketType(p *rtp.Packet) (string, bool) { + switch p.PayloadType { + case 101: + if p.Marker { + return "dtmf", true + } + default: + if p.PayloadType == 0 { + return "audio", true + } else { + return strconv.Itoa(int(p.PayloadType)), true + } + } + return "", false +} diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 1b6c7294..630cdc49 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/rtp" "github.com/livekit/sip/pkg/media/ulaw" + "github.com/livekit/sip/pkg/stats" ) type sipOutboundConfig struct { @@ -43,6 +44,7 @@ type outboundCall struct { rtpConn *MediaConn mu sync.RWMutex + mon *stats.CallMonitor mediaRunning bool lkCur lkRoomConfig lkRoom *Room @@ -87,11 +89,11 @@ func (c *Client) newCall(participantId string) *outboundCall { func (c *outboundCall) Close() error { c.mu.Lock() defer c.mu.Unlock() - c.close() + c.close("shutdown") return nil } -func (c *outboundCall) close() { +func (c *outboundCall) close(reason string) { c.rtpConn.OnRTP(nil) c.lkRoom.SetOutput(nil) @@ -107,7 +109,7 @@ func (c *outboundCall) close() { c.lkRoomIn = nil c.lkCur = lkRoomConfig{} - c.stopSIP() + c.stopSIP(reason) c.sipCur = sipOutboundConfig{} // FIXME: remove call from the client map? @@ -130,19 +132,20 @@ func (c *outboundCall) Update(ctx context.Context, sipNew sipOutboundConfig, lkN logger.Infow("Shutdown of outbound SIP call", "roomName", lkNew.roomName, "from", sipNew.from, "to", sipNew.to, "address", sipNew.address) // shutdown the call - c.close() + c.close("shutdown") return nil } + c.startMonitor(sipNew) if err := c.startMedia(conf); err != nil { - c.close() + c.close("media-failed") return fmt.Errorf("start media failed: %w", err) } if err := c.updateRoom(lkNew); err != nil { - c.close() + c.close("join-failed") return fmt.Errorf("update room failed: %w", err) } if err := c.updateSIP(sipNew); err != nil { - c.close() + c.close("invite-failed") return fmt.Errorf("update SIP failed: %w", err) } c.relinkMedia() @@ -151,6 +154,10 @@ func (c *outboundCall) Update(ctx context.Context, sipNew sipOutboundConfig, lkN return nil } +func (c *outboundCall) startMonitor(conf sipOutboundConfig) { + c.mon = c.c.mon.NewCall(stats.Outbound, conf.from, conf.to) +} + func (c *outboundCall) startMedia(conf *config.Config) error { if c.mediaRunning { return nil @@ -190,7 +197,7 @@ func (c *outboundCall) updateSIP(sipNew sipOutboundConfig) error { if c.sipCur == sipNew { return nil } - c.stopSIP() + c.stopSIP("update") if err := c.sipSignal(sipNew); err != nil { return err } @@ -206,12 +213,12 @@ func (c *outboundCall) relinkMedia() { return } // Encoding pipeline (LK -> SIP) - s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) + s := rtp.NewMediaStreamOut[ulaw.Sample](&rtpStatsWriter{mon: c.mon, w: c.rtpConn}, rtpPacketDur) c.lkRoom.SetOutput(ulaw.Encode(s)) // Decoding pipeline (SIP -> LK) law := ulaw.Decode(c.lkRoomIn) - c.rtpConn.OnRTP(rtp.NewMediaStreamIn(law)) + c.rtpConn.OnRTP(&rtpStatsHandler{mon: c.mon, h: rtp.NewMediaStreamIn(law)}) } func (c *outboundCall) SendDTMF(ctx context.Context, digits string) error { @@ -237,11 +244,15 @@ func sipResponse(tx sip.ClientTransaction) (*sip.Response, error) { } } -func (c *outboundCall) stopSIP() { +func (c *outboundCall) stopSIP(reason string) { if c.sipInviteReq != nil { if err := c.sipBye(); err != nil { logger.Errorw("SIP bye failed", err) } + if c.mon != nil { + c.mon.CallTerminate(reason) + c.mon.CallEnd() + } } c.sipInviteReq = nil c.sipInviteResp = nil @@ -254,21 +265,27 @@ func (c *outboundCall) sipSignal(conf sipOutboundConfig) error { if err != nil { return err } + c.mon.CallStart() + joinDur := c.mon.JoinDur() inviteReq, inviteResp, err := c.sipInvite(offer, conf) if err != nil { + c.mon.CallEnd() logger.Errorw("SIP invite failed", err) return err // TODO: should we retry? maybe new offer will work } err = c.sipAccept(inviteReq, inviteResp) if err != nil { + c.mon.CallEnd() logger.Errorw("SIP accept failed", err) return err } + joinDur() c.sipInviteReq, c.sipInviteResp = inviteReq, inviteResp return nil } func (c *outboundCall) sipAttemptInvite(offer []byte, conf sipOutboundConfig, authHeader string) (*sip.Request, *sip.Response, error) { + c.mon.InviteReq() to := &sip.Uri{User: conf.to, Host: conf.address, Port: 5060} from := &sip.Uri{User: conf.from, Host: c.c.signalingIp, Port: 5060} req := sip.NewRequest(sip.INVITE, to) @@ -286,11 +303,15 @@ func (c *outboundCall) sipAttemptInvite(offer []byte, conf sipOutboundConfig, au tx, err := c.c.sipCli.TransactionRequest(req) if err != nil { + c.mon.InviteError("tx-failed") return nil, nil, err } defer tx.Terminate() resp, err := sipResponse(tx) + if err != nil { + c.mon.InviteError("tx-failed") + } return req, resp, err } @@ -303,8 +324,10 @@ func (c *outboundCall) sipInvite(offer []byte, conf sipOutboundConfig) (*sip.Req } switch resp.StatusCode { default: + c.mon.InviteError(fmt.Sprintf("status-%d", resp.StatusCode)) return nil, nil, fmt.Errorf("Unexpected StatusCode from INVITE response %d", resp.StatusCode) case 400: + c.mon.InviteError("status-400") var reason string if body := resp.Body(); len(body) != 0 { reason = string(body) @@ -316,9 +339,11 @@ func (c *outboundCall) sipInvite(offer []byte, conf sipOutboundConfig) (*sip.Req } return nil, nil, fmt.Errorf("INVITE failed with status %d", resp.StatusCode) case 200: + c.mon.InviteAccept() return req, resp, nil case 407: // auth required + c.mon.InviteError("auth-required") } if conf.user == "" || conf.pass == "" { return nil, nil, fmt.Errorf("Server responded with 407, but no username or password was provided") diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 6228dc1f..1f0f40f3 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -15,18 +15,43 @@ package stats import ( + "time" + "github.com/frostbyte73/core" "github.com/prometheus/client_golang/prometheus" "github.com/livekit/sip/pkg/config" ) +var durBuckets = []float64{ + // TODO +} + +type CallDir bool + +func (d CallDir) String() string { + if d == Inbound { + return "inbound" + } + return "outbound" +} + +const ( + Inbound = CallDir(false) + Outbound = CallDir(true) +) + type Monitor struct { - inviteReqRaw prometheus.Counter - inviteReq *prometheus.CounterVec - inviteAccept *prometheus.CounterVec - inviteErr *prometheus.CounterVec - callsActive *prometheus.GaugeVec + inviteReqRaw prometheus.Counter + inviteReq *prometheus.CounterVec + inviteAccept *prometheus.CounterVec + inviteErr *prometheus.CounterVec + callsActive *prometheus.GaugeVec + callsTerminated *prometheus.CounterVec + packetsRTP *prometheus.CounterVec + durSession *prometheus.HistogramVec + durCall *prometheus.HistogramVec + durJoin *prometheus.HistogramVec started core.Fuse shutdown core.Fuse @@ -44,6 +69,7 @@ func (m *Monitor) Start(conf *config.Config) error { Namespace: "livekit", Subsystem: "sip", Name: "invite_requests_raw", + Help: "Number of unvalidated SIP INVITE requests received", ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, }) @@ -51,31 +77,81 @@ func (m *Monitor) Start(conf *config.Config) error { Namespace: "livekit", Subsystem: "sip", Name: "invite_requests", + Help: "Number of valid SIP INVITE requests received", ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, - }, []string{"type", "from", "to"}) + }, []string{"dir", "from", "to"}) m.inviteAccept = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "livekit", Subsystem: "sip", Name: "invite_accepted", + Help: "Number of accepted SIP INVITE requests (that matched a trunk and passed auth)", ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, - }, []string{"type", "from", "to"}) + }, []string{"dir", "from", "to"}) m.inviteErr = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "livekit", Subsystem: "sip", Name: "invite_error", + Help: "Number of rejected SIP INVITE requests", ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, - }, []string{"type", "from", "to", "reason"}) + }, []string{"dir", "from", "to", "reason"}) m.callsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "livekit", Subsystem: "sip", Name: "calls_active", + Help: "Number of currently active SIP calls", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + }, []string{"dir", "from", "to"}) + + m.callsTerminated = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "calls_terminated", + Help: "Number of calls terminated by SIP bridge", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + }, []string{"dir", "from", "to", "reason"}) + + m.packetsRTP = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "packets_rtp", + Help: "Number of RTP packets sent or received by SIP bridge", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + }, []string{"dir", "from", "to", "op", "payload"}) + + m.durSession = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "dur_session_sec", + Help: "SIP session duration (from INVITE to closed)", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + Buckets: durBuckets, + }, []string{"dir", "from", "to"}) + + m.durCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "dur_call_sec", + Help: "SIP call duration (from successful pin to closed)", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + Buckets: durBuckets, + }, []string{"dir", "from", "to"}) + + m.durJoin = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "dur_join_sec", + Help: "SIP room join duration (from INVITE to mixed room audio)", ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, - }, []string{"type", "from", "to"}) + Buckets: durBuckets, + }, []string{"dir", "from", "to"}) - prometheus.MustRegister(m.inviteReqRaw, m.inviteReq, m.inviteAccept, m.inviteErr, m.callsActive) + prometheus.MustRegister( + m.inviteReqRaw, m.inviteReq, m.inviteAccept, m.inviteErr, m.callsActive, m.callsTerminated, + m.packetsRTP, m.durSession, m.durCall, m.durJoin, + ) m.started.Break() @@ -92,6 +168,11 @@ func (m *Monitor) Stop() { prometheus.Unregister(m.inviteAccept) prometheus.Unregister(m.inviteErr) prometheus.Unregister(m.callsActive) + prometheus.Unregister(m.callsTerminated) + prometheus.Unregister(m.packetsRTP) + prometheus.Unregister(m.durSession) + prometheus.Unregister(m.durCall) + prometheus.Unregister(m.durJoin) } func (m *Monitor) CanAccept() bool { @@ -102,34 +183,73 @@ func (m *Monitor) CanAccept() bool { return true } -func (m *Monitor) InviteReqRaw(outbound bool) { +func (m *Monitor) InviteReqRaw(dir CallDir) { m.inviteReqRaw.Inc() } -func callType(outbound bool) string { - typ := "inbound" - if outbound { - typ = "outbound" +func (m *Monitor) NewCall(dir CallDir, from, to string) *CallMonitor { + return &CallMonitor{ + m: m, + dir: dir, + from: from, + to: to, } - return typ } -func (m *Monitor) InviteReq(outbound bool, from, to string) { - m.inviteReq.With(prometheus.Labels{"type": callType(outbound), "from": from, "to": to}).Inc() +type CallMonitor struct { + m *Monitor + dir CallDir + from, to string +} + +func (c *CallMonitor) labels(l prometheus.Labels) prometheus.Labels { + out := prometheus.Labels{"dir": c.dir.String(), "from": c.from, "to": c.to} + for k, v := range l { + out[k] = v + } + return out +} + +func (c *CallMonitor) InviteReq() { + c.m.inviteReq.With(c.labels(nil)).Inc() +} + +func (c *CallMonitor) InviteAccept() { + c.m.inviteAccept.With(c.labels(nil)).Inc() +} + +func (c *CallMonitor) InviteError(reason string) { + c.m.inviteErr.With(c.labels(prometheus.Labels{"reason": reason})).Inc() +} + +func (c *CallMonitor) CallStart() { + c.m.callsActive.With(c.labels(nil)).Inc() +} + +func (c *CallMonitor) CallEnd() { + c.m.callsActive.With(c.labels(nil)).Dec() +} + +func (c *CallMonitor) CallTerminate(reason string) { + c.m.callsTerminated.With(c.labels(prometheus.Labels{"reason": reason})).Inc() +} + +func (c *CallMonitor) RTPPacketSend(payloadType string) { + c.m.packetsRTP.With(c.labels(prometheus.Labels{"op": "send", "payload": payloadType})).Inc() } -func (m *Monitor) InviteAccept(outbound bool, from, to string) { - m.inviteAccept.With(prometheus.Labels{"type": callType(outbound), "from": from, "to": to}).Inc() +func (c *CallMonitor) RTPPacketRecv(payloadType string) { + c.m.packetsRTP.With(c.labels(prometheus.Labels{"op": "recv", "payload": payloadType})).Inc() } -func (m *Monitor) InviteError(outbound bool, from, to string, reason string) { - m.inviteErr.With(prometheus.Labels{"type": callType(outbound), "from": from, "to": to, "reason": reason}).Inc() +func (c *CallMonitor) SessionDur() func() time.Duration { + return prometheus.NewTimer(c.m.durSession.With(c.labels(nil))).ObserveDuration } -func (m *Monitor) CallStart(outbound bool, from, to string) { - m.callsActive.With(prometheus.Labels{"type": callType(outbound), "from": from, "to": to}).Inc() +func (c *CallMonitor) CallDur() func() time.Duration { + return prometheus.NewTimer(c.m.durCall.With(c.labels(nil))).ObserveDuration } -func (m *Monitor) CallEnd(outbound bool, from, to string) { - m.callsActive.With(prometheus.Labels{"type": callType(outbound), "from": from, "to": to}).Dec() +func (c *CallMonitor) JoinDur() func() time.Duration { + return prometheus.NewTimer(c.m.durJoin.With(c.labels(nil))).ObserveDuration }