diff --git a/example/proxysip/main_test.go b/example/proxysip/main_test.go index de10730..36a6c4e 100644 --- a/example/proxysip/main_test.go +++ b/example/proxysip/main_test.go @@ -87,7 +87,7 @@ func inviteScenario(t testing.TB, client1, client2 fakes.TestConnection, p *pars //RECEIVE INVITE { res := client2.TestReadConn(t) - inviteReqRec, err := p.Parse(res) + inviteReqRec, err := p.ParseSIP(res) require.Nil(t, err) assert.Equal(t, inviteReqRec.(*sip.Request).StartLine(), inviteReq.(*sip.Request).StartLine()) @@ -110,7 +110,7 @@ func inviteScenario(t testing.TB, client1, client2 fakes.TestConnection, p *pars for { res := client2.TestReadConn(t) - resReceived, err := p.Parse(res) + resReceived, err := p.ParseSIP(res) if req, ok := resReceived.(*sip.Request); ok && req.IsInvite() { continue } @@ -143,12 +143,12 @@ func inviteScenario(t testing.TB, client1, client2 fakes.TestConnection, p *pars t.Log("CLIENT1: Send INVITE") res := client1.TestRequest(t, []byte(inviteReq.String())) t.Log("CLIENT1 INVITE: Got response") - trying, err := p.Parse(res) + trying, err := p.ParseSIP(res) require.Nil(t, err) assert.Equal(t, "SIP/2.0 100 Trying", trying.(*sip.Response).StartLine()) res = client1.TestReadConn(t) - inviteOK, err := p.Parse(res) + inviteOK, err := p.ParseSIP(res) require.Nil(t, err) assert.Equal(t, "SIP/2.0 200 OK", inviteOK.(*sip.Response).StartLine()) } @@ -165,7 +165,7 @@ func inviteScenario(t testing.TB, client1, client2 fakes.TestConnection, p *pars t.Log("CLIENT1: Send BYE") res := client1.TestRequest(t, []byte(byeReq.String())) t.Log("CLIENT1 BYE: Got response") - byeOK, err := p.Parse(res) + byeOK, err := p.ParseSIP(res) require.Nil(t, err) assert.Equal(t, "SIP/2.0 200 OK", byeOK.(*sip.Response).StartLine()) } @@ -362,7 +362,7 @@ func TestRegisterTCP(t *testing.T) { }) res := client1.TestRequest(t, []byte(reg.String())) - res200, err := p.Parse(res) + res200, err := p.ParseSIP(res) require.Nil(t, err) t.Log(res200.String()) assert.Equal(t, "SIP/2.0 200 OK", res200.(*sip.Response).StartLine()) diff --git a/parser/parser.go b/parser/parser.go index a6f0a51..a4faffb 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -1,4 +1,3 @@ -// Originaly forked from github.com/StefanKopieczek/gossip by @StefanKopieczek package parser import ( @@ -37,24 +36,13 @@ var bufReader = sync.Pool{ }, } -// The buffer size of the parser input channel. -// SipParser is interface for decoding full message into sip message -type SIPParser interface { - Parse(data []byte) (sip.Message, error) -} - -// SIPParserStreamed is parser that allows streamed data for parsing -type SIPParserStreamed interface { - Write(data []byte) (int, error) - Output() chan sip.Message -} - -// Parse a SIP message by creating a parser on the fly. func ParseMessage(msgData []byte) (sip.Message, error) { parser := NewParser() - return parser.Parse(msgData) + return parser.ParseSIP(msgData) } +// Parser is implementation of sip.SIPParser +// It is optimized with faster header parsing type Parser struct { log zerolog.Logger // HeadersParsers uses default list of headers to be parsed. Smaller list parser will be faster @@ -96,8 +84,8 @@ func WithHeadersParsers(m map[string]HeaderParser) ParserOption { } } -// Parse converts data to sip message. Buffer must contain full sip message -func (p *Parser) Parse(data []byte) (msg sip.Message, err error) { +// ParseSIP converts data to sip message. Buffer must contain full sip message +func (p *Parser) ParseSIP(data []byte) (msg sip.Message, err error) { reader := bufReader.Get().(*bytes.Buffer) defer bufReader.Put(reader) reader.Reset() @@ -125,7 +113,7 @@ func (p *Parser) Parse(data []byte) (msg sip.Message, err error) { break } - header, err := p.ParseHeader(line) + header, err := p.parseHeader(line) if err == nil { msg.AppendHeader(header) } else { @@ -160,6 +148,32 @@ func (p *Parser) Parse(data []byte) (msg sip.Message, err error) { return msg, nil } +func (p *Parser) parseHeader(headerText string) (header sip.Header, err error) { + // p.log.Tracef("parsing header \"%s\"", headerText) + + colonIdx := strings.Index(headerText, ":") + if colonIdx == -1 { + err = fmt.Errorf("field name with no value in header: %s", headerText) + return + } + + fieldName := strings.TrimSpace(headerText[:colonIdx]) + lowerFieldName := sip.HeaderToLower(fieldName) + fieldText := strings.TrimSpace(headerText[colonIdx+1:]) + if headerParser, ok := p.headersParsers[lowerFieldName]; ok { + // We have a registered parser for this header type - use it. + // header, err = headerParser(lowerFieldName, fieldText) + header, err = headerParser(lowerFieldName, fieldText) + } else { + // We have no registered parser for this header type, + // so we encapsulate the header data in a GenericHeader struct. + // p.log.Tracef("no parser for header type %s", fieldName) + header = sip.NewHeader(fieldName, fieldText) + } + + return +} + func ParseLine(startLine string) (msg sip.Message, err error) { if isRequest(startLine) { recipient := sip.Uri{} @@ -330,29 +344,3 @@ func ParseStatusLine(statusLine string) ( return } - -func (p *Parser) ParseHeader(headerText string) (header sip.Header, err error) { - // p.log.Tracef("parsing header \"%s\"", headerText) - - colonIdx := strings.Index(headerText, ":") - if colonIdx == -1 { - err = fmt.Errorf("field name with no value in header: %s", headerText) - return - } - - fieldName := strings.TrimSpace(headerText[:colonIdx]) - lowerFieldName := sip.HeaderToLower(fieldName) - fieldText := strings.TrimSpace(headerText[colonIdx+1:]) - if headerParser, ok := p.headersParsers[lowerFieldName]; ok { - // We have a registered parser for this header type - use it. - // header, err = headerParser(lowerFieldName, fieldText) - header, err = headerParser(lowerFieldName, fieldText) - } else { - // We have no registered parser for this header type, - // so we encapsulate the header data in a GenericHeader struct. - // p.log.Tracef("no parser for header type %s", fieldName) - header = sip.NewHeader(fieldName, fieldText) - } - - return -} diff --git a/parser/parser_test.go b/parser/parser_test.go index 0c94a88..c506227 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -91,7 +91,7 @@ func TestParseHeaders(t *testing.T) { t.Run("ViaHeader", func(t *testing.T) { branch := sip.GenerateBranch() header := "Via: SIP/2.0/UDP 127.0.0.2:5060;rport;branch=" + branch - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err) hstr := h.String() @@ -102,7 +102,7 @@ func TestParseHeaders(t *testing.T) { t.Run("ToHeader", func(t *testing.T) { header := "To: \"Bob\" ;xxx=xxx;yyyy=yyyy" - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err) hstr := h.String() @@ -112,7 +112,7 @@ func TestParseHeaders(t *testing.T) { t.Run("FromHeader", func(t *testing.T) { header := "From: \"Bob\" " - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err) hstr := h.String() @@ -126,7 +126,7 @@ func TestParseHeaders(t *testing.T) { "Contact: ": "Contact: ", // "m: ;reg-id=1;+sip.instance=\"\"": "Contact: ;reg-id=1;+sip.instance=\"\"", } { - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err) assert.IsType(t, &sip.ContactHeader{}, h) @@ -137,7 +137,7 @@ func TestParseHeaders(t *testing.T) { t.Run("RouteHeader", func(t *testing.T) { header := "Route: " - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err, err) hstr := h.String() @@ -147,7 +147,7 @@ func TestParseHeaders(t *testing.T) { t.Run("RecordRouteHeader", func(t *testing.T) { header := "Record-Route: " - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err, err) hstr := h.String() @@ -157,7 +157,7 @@ func TestParseHeaders(t *testing.T) { t.Run("MaxForwards", func(t *testing.T) { header := "Max-Forwards: 70" - h, err := parser.ParseHeader(header) + h, err := parser.parseHeader(header) require.Nil(t, err, err) exp := sip.MaxForwardsHeader(70) @@ -256,7 +256,7 @@ func TestParseRequest(t *testing.T) { "INVITE sip:10.5.0.10:5060;transport=udp SIP/2.0\r\nContent-Length: 0\r\n\n", "INVITE sip:10.5.0.10:5060;transport=udp SIP/2.0\r\nContent-Length: 10\r\nabcd\nefgh", } { - _, err := parser.Parse([]byte(msgstr)) + _, err := parser.ParseSIP([]byte(msgstr)) assert.Equal(t, ErrLineNoCRLF, err) } }) @@ -301,7 +301,7 @@ func TestParseRequest(t *testing.T) { msgstr := strings.Join(rawMsg, "\r\n") - msg, err := parser.Parse([]byte(msgstr)) + msg, err := parser.ParseSIP([]byte(msgstr)) require.Nil(t, err) from, exists := msg.From() @@ -343,7 +343,7 @@ func TestRegisterRequestFail(t *testing.T) { data := []byte(strings.Join(rawMsg, "\r\n")) parser := NewParser() - msg, err := parser.Parse(data) + msg, err := parser.ParseSIP(data) require.Nil(t, err, err) req := msg.(*sip.Request) @@ -378,7 +378,7 @@ func BenchmarkParser(b *testing.B) { b.ResetTimer() testcase := func(b *testing.B) { for i := 0; i < b.N; i++ { - msg, err := parser.Parse(data) + msg, err := parser.ParseSIP(data) if err != nil { b.Fatal(err) } @@ -393,7 +393,7 @@ func BenchmarkParser(b *testing.B) { b.RunParallel(func(p *testing.PB) { i := 0 for p.Next() { - msg, err := parser.Parse(data) + msg, err := parser.ParseSIP(data) if err != nil { b.Fatal(err) } @@ -466,7 +466,7 @@ func BenchmarkParserNoData(b *testing.B) { b.Run("New", func(b *testing.B) { parser := NewParser() for i := 0; i < b.N; i++ { - parser.Parse(data) + parser.ParseSIP(data) } }) } diff --git a/server_test.go b/server_test.go index ee16360..7748162 100644 --- a/server_test.go +++ b/server_test.go @@ -185,7 +185,7 @@ func TestUDPUAS(t *testing.T) { rstr := req.String() data := client1.TestRequest(t, []byte(rstr)) - res, err := p.Parse(data) + res, err := p.ParseSIP(data) assert.Nil(t, err) assert.Equal(t, "SIP/2.0 200 OK", res.(*sip.Response).StartLine()) @@ -199,7 +199,7 @@ func TestUDPUAS(t *testing.T) { req := createSimpleRequest("NONALLOWED", sender, recipment, "UDP") data := client1.TestRequest(t, []byte(req.String())) - res, err := p.Parse(data) + res, err := p.ParseSIP(data) assert.Nil(t, err) assert.Equal(t, "SIP/2.0 405 Method Not Allowed", res.(*sip.Response).StartLine()) @@ -284,7 +284,7 @@ func TestTCPUAS(t *testing.T) { rstr := req.String() data := client1.TestRequest(t, []byte(rstr)) - res, err := p.Parse(data) + res, err := p.ParseSIP(data) assert.Nil(t, err) assert.Equal(t, "SIP/2.0 200 OK", res.(*sip.Response).StartLine()) } @@ -297,7 +297,7 @@ func TestTCPUAS(t *testing.T) { req := createSimpleRequest("NONALLOWED", sender, recipment, "TCP") data := client1.TestRequest(t, []byte(req.String())) - res, err := p.Parse(data) + res, err := p.ParseSIP(data) assert.Nil(t, err) assert.Equal(t, "SIP/2.0 405 Method Not Allowed", res.(*sip.Response).StartLine()) diff --git a/sip/sip.go b/sip/sip.go index 692be04..c853ba6 100644 --- a/sip/sip.go +++ b/sip/sip.go @@ -20,6 +20,12 @@ const ( RFC3261BranchMagicCookie = "z9hG4bK" ) +// The buffer size of the parser input channel. +// Parser is interface for decoding full message into sip message +type Parser interface { + ParseSIP(data []byte) (Message, error) +} + // GenerateBranch returns random unique branch ID. func GenerateBranch() string { return GenerateBranchN(16) diff --git a/transport/layer.go b/transport/layer.go index 9aaf332..e4375cd 100644 --- a/transport/layer.go +++ b/transport/layer.go @@ -36,7 +36,9 @@ type Layer struct { log zerolog.Logger - parser *parser.Parser + // Parser used by transport layer. It can be overrided before setuping network transports + Parser sip.Parser + // ConnectionReuse will force connection reuse when passing request ConnectionReuse bool } @@ -50,7 +52,7 @@ func NewLayer( transports: make(map[string]Transport), listenPorts: make(map[string][]int), dnsResolver: dnsResolver, - parser: parser.NewParser(), + Parser: parser.NewParser(), ConnectionReuse: true, } @@ -97,7 +99,7 @@ func (l *Layer) ServeUDP(c net.PacketConn) error { return err } - transport := NewUDPTransport(c.LocalAddr().String(), l.parser) + transport := NewUDPTransport(c.LocalAddr().String(), l.Parser) l.addTransport(transport, "udp", port) return transport.Serve(c, l.handleMessage) @@ -110,7 +112,7 @@ func (l *Layer) ServeTCP(c net.Listener) error { return err } - transport := NewTCPTransport(c.Addr().String(), l.parser) + transport := NewTCPTransport(c.Addr().String(), l.Parser) l.addTransport(transport, "tcp", port) return transport.Serve(c, l.handleMessage) @@ -123,7 +125,7 @@ func (l *Layer) ServeWS(c net.Listener) error { return err } - transport := NewWSTransport(c.Addr().String(), l.parser) + transport := NewWSTransport(c.Addr().String(), l.Parser) l.addTransport(transport, "ws", port) return transport.Serve(c, l.handleMessage) @@ -136,7 +138,7 @@ func (l *Layer) ServeTLS(c net.Listener, conf *tls.Config) error { return err } - transport := NewTLSTransport(c.Addr().String(), l.parser, conf) + transport := NewTLSTransport(c.Addr().String(), l.Parser, conf) l.addTransport(transport, "tls", port) return transport.Serve(c, l.handleMessage) @@ -156,7 +158,7 @@ func (l *Layer) ListenAndServe(ctx context.Context, network string, addr string) return ErrNetworkExists } - p := l.parser + p := l.Parser var t Transport switch network { case "udp": @@ -192,7 +194,7 @@ func (l *Layer) ListenAndServeTLS(ctx context.Context, network string, addr stri return ErrNetworkExists } - p := l.parser + p := l.Parser var t Transport switch network { case "tcp", "tls": diff --git a/transport/tcp.go b/transport/tcp.go index 2cf049d..44c38c9 100644 --- a/transport/tcp.go +++ b/transport/tcp.go @@ -8,7 +8,6 @@ import ( "net" "sync" - "github.com/emiago/sipgo/parser" "github.com/emiago/sipgo/sip" "github.com/rs/zerolog" @@ -22,14 +21,14 @@ type TCPTransport struct { addr string transport string listener net.Listener - parser parser.SIPParser + parser sip.Parser handler sip.MessageHandler log zerolog.Logger pool ConnectionPool } -func NewTCPTransport(addr string, par parser.SIPParser) *TCPTransport { +func NewTCPTransport(addr string, par sip.Parser) *TCPTransport { p := &TCPTransport{ addr: addr, parser: par, @@ -215,7 +214,7 @@ func (t *TCPTransport) parse(data []byte, src string) { } } - msg, err := t.parser.Parse(data) //Very expensive operation + msg, err := t.parser.ParseSIP(data) //Very expensive operation if err != nil { t.log.Error().Err(err).Str("data", string(data)).Msg("failed to parse") return diff --git a/transport/tls.go b/transport/tls.go index 184e817..57678b2 100644 --- a/transport/tls.go +++ b/transport/tls.go @@ -5,7 +5,6 @@ import ( "fmt" "net" - "github.com/emiago/sipgo/parser" "github.com/emiago/sipgo/sip" "github.com/rs/zerolog/log" @@ -21,7 +20,7 @@ type TLSTransport struct { tlsConf *tls.Config } -func NewTLSTransport(addr string, par parser.SIPParser, tlsConf *tls.Config) *TLSTransport { +func NewTLSTransport(addr string, par sip.Parser, tlsConf *tls.Config) *TLSTransport { tcptrans := NewTCPTransport(addr, par) tcptrans.transport = TransportTLS //Override transport p := &TLSTransport{ diff --git a/transport/udp.go b/transport/udp.go index e700a6b..cb8703b 100644 --- a/transport/udp.go +++ b/transport/udp.go @@ -6,7 +6,6 @@ import ( "fmt" "net" - "github.com/emiago/sipgo/parser" "github.com/emiago/sipgo/sip" "github.com/rs/zerolog" @@ -30,14 +29,14 @@ type UDPTransport struct { // listener *net.UDPConn addr string listener net.PacketConn - parser parser.SIPParser + parser sip.Parser handler sip.MessageHandler conn *UDPConnection log zerolog.Logger } -func NewUDPTransport(addr string, par parser.SIPParser) *UDPTransport { +func NewUDPTransport(addr string, par sip.Parser) *UDPTransport { p := &UDPTransport{ addr: addr, parser: par, @@ -198,7 +197,7 @@ func (t *UDPTransport) parse(data []byte, src string) { } } - msg, err := t.parser.Parse(data) //Very expensive operation + msg, err := t.parser.ParseSIP(data) //Very expensive operation if err != nil { t.log.Error().Err(err).Str("data", string(data)).Msg("failed to parse") return diff --git a/transport/ws.go b/transport/ws.go index f01b5b7..675f892 100644 --- a/transport/ws.go +++ b/transport/ws.go @@ -9,7 +9,6 @@ import ( "net" "sync" - "github.com/emiago/sipgo/parser" "github.com/emiago/sipgo/sip" "github.com/gobwas/ws" @@ -23,14 +22,14 @@ var () type WSTransport struct { addr string listener net.Listener - parser parser.SIPParser + parser sip.Parser handler sip.MessageHandler log zerolog.Logger pool ConnectionPool } -func NewWSTransport(addr string, par parser.SIPParser) *WSTransport { +func NewWSTransport(addr string, par sip.Parser) *WSTransport { p := &WSTransport{ addr: addr, parser: par, @@ -165,7 +164,7 @@ func (t *WSTransport) parse(data []byte, src string) { } } - msg, err := t.parser.Parse(data) //Very expensive operation + msg, err := t.parser.ParseSIP(data) //Very expensive operation if err != nil { t.log.Error().Err(err).Str("data", string(data)).Msg("failed to parse") return