diff --git a/backend/backend.go b/backend/backend.go new file mode 100644 index 00000000..cd711b00 --- /dev/null +++ b/backend/backend.go @@ -0,0 +1,73 @@ +package backend + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "io" + "io/ioutil" + "math/big" + "net" + "time" +) + +type QuicBackend interface { + Dial(ctx context.Context, remoteAddress string, port int) (QuicBackendConnection, error) + Listen(ctx context.Context, address string, port int) (QuicBackendConnection, error) + Close() error +} + +type QuicBackendConnection interface { + // LocalAddr returns the local address. + LocalAddr() net.Addr + // RemoteAddr returns the address of the peer. + RemoteAddr() net.Addr + OpenStream(context.Context) (QuicBackendStream, error) + AcceptStream(context.Context) (QuicBackendStream, error) + AcceptConnection(context.Context) (QuicBackendConnection, error) + Close(code int, message string) error +} + +type QuicBackendStream interface { + io.Reader + io.Writer + io.Closer + + ID() uint64 + + AbortRead(code uint64) + AbortWrite(code uint64) + + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} + +// generateTLSConfig creates a new x509 key/certificate pair and dumps it to the disk +func generateTLSConfig(fileprefix string) *tls.Config { + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + panic(err) + } + template := x509.Certificate{SerialNumber: big.NewInt(1)} + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + panic(err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + + ioutil.WriteFile(fileprefix+"_key.pem", keyPEM, 0777) + ioutil.WriteFile(fileprefix+"_cert.pem", certPEM, 0777) + + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + panic(err) + } + return &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + NextProtos: []string{"qpep"}, + } +} diff --git a/backend/backend_gen_linux.go b/backend/backend_gen_linux.go new file mode 100644 index 00000000..efc9ecba --- /dev/null +++ b/backend/backend_gen_linux.go @@ -0,0 +1,3 @@ +//go:generate bash -c "cd quicly-go && go generate -v ./..." + +package backend diff --git a/backend/backend_gen_windows.go b/backend/backend_gen_windows.go new file mode 100644 index 00000000..5da3764f --- /dev/null +++ b/backend/backend_gen_windows.go @@ -0,0 +1,3 @@ +//go:generate cmd /c "cd quicly-go && go generate -v ./..." + +package backend diff --git a/backend/backend_quicgo.go b/backend/backend_quicgo.go new file mode 100644 index 00000000..4bf6dd68 --- /dev/null +++ b/backend/backend_quicgo.go @@ -0,0 +1,200 @@ +//go:build !no_quicgo_backend + +package backend + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/Project-Faster/quic-go" + "github.com/parvit/qpep/logger" + "github.com/parvit/qpep/shared" + "net" + "time" +) + +const ( + QUICGO_BACKEND = "quic-go" +) + +var qgoBackend QuicBackend = &quicGoBackend{} + +func init() { + Register(QUICGO_BACKEND, qgoBackend) +} + +type quicGoBackend struct { + connections []QuicBackendConnection +} + +func (q *quicGoBackend) Dial(ctx context.Context, destination string, port int) (QuicBackendConnection, error) { + quicConfig := qgoGetConfiguration() + + var err error + var session quic.Connection + tlsConf := &tls.Config{InsecureSkipVerify: true, NextProtos: []string{"qpep"}} + gatewayPath := fmt.Sprintf("%s:%d", destination, port) + + logger.Info("== Dialing QUIC Session: %s ==\n", gatewayPath) + session, err = quic.DialAddr(gatewayPath, tlsConf, quicConfig) + if err != nil { + logger.Error("Unable to Dial QUIC Session: %v\n", err) + return nil, shared.ErrFailedGatewayConnect + } + + sessionAdapter := &qgoConnectionAdapter{ + context: ctx, + connection: session, + } + + logger.Info("== QUIC Session Dial ==\n") + q.connections = append(q.connections, sessionAdapter) + return sessionAdapter, nil +} + +func (q *quicGoBackend) Listen(ctx context.Context, address string, port int) (QuicBackendConnection, error) { + quicConfig := qgoGetConfiguration() + + tlsConf := generateTLSConfig("server") + + conn, err := quic.ListenAddr(fmt.Sprintf("%s:%d", address, port), tlsConf, quicConfig) + if err != nil { + logger.Error("Failed to listen on QUIC session: %v\n", err) + return nil, shared.ErrFailedGatewayConnect + } + + return &qgoConnectionAdapter{ + context: ctx, + listener: conn, + }, err +} + +func (q *quicGoBackend) Close() error { + for _, conn := range q.connections { + _ = conn.Close(0, "") + } + q.connections = nil + logger.Info("== QUIC Session Closed ==\n") + return nil +} + +func qgoGetConfiguration() *quic.Config { + return &quic.Config{ + MaxIncomingStreams: 1024, + DisablePathMTUDiscovery: true, + + HandshakeIdleTimeout: shared.GetScaledTimeout(10, time.Second), + //KeepAlivePeriod: 1 * time.Second, + + EnableDatagrams: true, + } +} + +type qgoConnectionAdapter struct { + context context.Context + listener quic.Listener + connection quic.Connection +} + +func (c *qgoConnectionAdapter) LocalAddr() net.Addr { + if c.connection != nil { + return c.connection.LocalAddr() + } + if c.listener != nil { + return c.listener.Addr() + } + panic(shared.ErrInvalidBackendOperation) +} + +func (c *qgoConnectionAdapter) RemoteAddr() net.Addr { + if c.connection != nil { + return c.connection.RemoteAddr() + } + if c.listener != nil { + return c.listener.Addr() + } + panic(shared.ErrInvalidBackendOperation) +} + +func (c *qgoConnectionAdapter) AcceptStream(ctx context.Context) (QuicBackendStream, error) { + if c.connection != nil { + stream, err := c.connection.AcceptStream(ctx) + return &qgoStreamAdapter{ + Stream: stream, + }, err + } + panic(shared.ErrInvalidBackendOperation) +} + +func (c *qgoConnectionAdapter) OpenStream(ctx context.Context) (QuicBackendStream, error) { + if c.connection != nil { + stream, err := c.connection.OpenStreamSync(ctx) + return &qgoStreamAdapter{ + Stream: stream, + }, err + } + panic(shared.ErrInvalidBackendOperation) +} + +func (c *qgoConnectionAdapter) AcceptConnection(ctx context.Context) (QuicBackendConnection, error) { + if c.listener != nil { + conn, err := c.listener.Accept(ctx) + if err != nil { + return nil, err + } + cNew := &qgoConnectionAdapter{ + context: ctx, + listener: c.listener, + connection: conn, + } + return cNew, nil + } + panic(shared.ErrInvalidBackendOperation) +} + +func (c *qgoConnectionAdapter) Close(code int, message string) error { + if c.connection != nil { + return c.connection.CloseWithError(quic.ApplicationErrorCode(code), message) + } + if c.listener != nil { + return c.listener.Close() + } + panic(shared.ErrInvalidBackendOperation) +} + +var _ QuicBackendConnection = &qgoConnectionAdapter{} + +type qgoStreamAdapter struct { + quic.Stream + + id *uint64 +} + +func (stream *qgoStreamAdapter) AbortRead(code uint64) { + stream.CancelRead(quic.StreamErrorCode(code)) +} + +func (stream *qgoStreamAdapter) AbortWrite(code uint64) { + stream.CancelWrite(quic.StreamErrorCode(code)) +} + +func (stream *qgoStreamAdapter) ID() uint64 { + if stream.id != nil { + return *stream.id + } + var sendStream quic.SendStream = stream + if sendStream != nil { + stream.id = new(uint64) + *stream.id = uint64(sendStream.StreamID()) + return *stream.id + } + var recvStream quic.ReceiveStream = stream + if recvStream != nil { + stream.id = new(uint64) + *stream.id = uint64(recvStream.StreamID()) + return *stream.id + } + return 0 +} + +var _ QuicBackendStream = &qgoStreamAdapter{} diff --git a/backend/factory.go b/backend/factory.go new file mode 100644 index 00000000..5021b807 --- /dev/null +++ b/backend/factory.go @@ -0,0 +1,35 @@ +package backend + +import ( + _ "github.com/Project-Faster/quic-go" + "strings" +) + +var bcRegister map[string]QuicBackend +var bcList []string +var bcDefaultBackend = QUICGO_BACKEND + +func Register(key string, backend QuicBackend) { + if bcRegister == nil { + bcRegister = make(map[string]QuicBackend) + bcList = make([]string, 0, 8) + } + key = strings.ToLower(key) + if _, ok := bcRegister[key]; !ok { + bcRegister[strings.ToLower(key)] = backend + bcList = append(bcList, key) + return + } +} + +func Get(key string) (QuicBackend, bool) { + val, ok := bcRegister[key] + if !ok { + return bcRegister[bcDefaultBackend], true + } + return val, ok +} + +func List() []string { + return append([]string{}, bcList...) +} diff --git a/go.mod b/go.mod index cb9c6cac..6fc0a091 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/parvit/qpep go 1.18 -// replace github.com/Project-Faster/quic-go => C:\\home\\dev\\src\\github.com\\parvit\\faster-quic-go +replace github.com/Project-Faster/quicly-go => ./backend/quicly-go require ( github.com/davecgh/go-spew v1.1.1 @@ -12,27 +12,28 @@ require ( github.com/kardianos/service v1.2.1 // indirect github.com/parvit/kardianos-service v0.0.0-20220822101756-89fc969969b8 github.com/rs/cors v1.8.2 - github.com/rs/zerolog v1.28.0 + github.com/rs/zerolog v1.29.1 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 github.com/sqweek/dialog v0.0.0-20220504154117-be45b268883a - golang.org/x/net v0.6.0 - golang.org/x/sys v0.6.0 + golang.org/x/net v0.10.0 + golang.org/x/sys v0.8.0 gopkg.in/yaml.v3 v3.0.1 ) require ( bou.ke/monkey v1.0.2 github.com/Project-Faster/quic-go v0.0.0-20230209052722-fd67b0616c6d + github.com/Project-Faster/quicly-go v0.0.0-00010101000000-000000000000 github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/jessevdk/go-flags v1.5.0 github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/nyaosorg/go-windows-dbg v0.0.0-20210914123807-2acba179a4e5 github.com/segmentio/fasthash v1.0.3 github.com/stretchr/testify v1.8.2 + gonum.org/v1/plot v0.12.0 ) require ( - gioui.org v0.0.0-20210308172011-57750fc8a0a6 // indirect git.sr.ht/~sbinet/gg v0.3.1 // indirect github.com/TheTitanrain/w32 v0.0.0-20180517000239-4f5cfb03fabf // indirect github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b // indirect @@ -60,16 +61,11 @@ require ( github.com/quic-go/qtls-go1-18 v0.2.0 // indirect github.com/quic-go/qtls-go1-19 v0.2.0 // indirect github.com/quic-go/qtls-go1-20 v0.1.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect - golang.org/x/exp/shiny v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/image v0.0.0-20220902085622-e7cb96979f69 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/text v0.7.0 // indirect - golang.org/x/tools v0.5.0 // indirect - gonum.org/v1/plot v0.12.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect - rsc.io/pdf v0.1.1 // indirect + golang.org/x/mod v0.10.0 // indirect + golang.org/x/sync v0.2.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.9.1 // indirect ) diff --git a/go.sum b/go.sum index cd3a4ef3..386d703d 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,8 @@ bou.ke/monkey v1.0.2 h1:kWcnsrCNUatbxncxR/ThdYqbytgOIArtYWqcQLQzKLI= bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA= -dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -gioui.org v0.0.0-20210308172011-57750fc8a0a6 h1:K72hopUosKG3ntOPNG4OzzbuhxGuVf06fa2la1/H/Ho= -gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1 h1:LNhjNn8DerC8f9DHLz6lS0YYul/b602DUxDgGkd/Aik= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Project-Faster/quic-go v0.0.0-20230209052722-fd67b0616c6d h1:gC9IVuYjsQRCtDK1XkpgIOr9ER9qpNDYyHxPZJphMKw= github.com/Project-Faster/quic-go v0.0.0-20230209052722-fd67b0616c6d/go.mod h1:SABQ6Yop6PbpjBXgrM0tFwg24uML+gHDQ/PRXR/CQAU= github.com/TheTitanrain/w32 v0.0.0-20180517000239-4f5cfb03fabf h1:FPsprx82rdrX2jiKyS17BH6IrTmUBYqZa/CXT4uvb+I= @@ -17,7 +13,7 @@ github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b h1:slYM766cy2nI3BwyR github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -39,13 +35,13 @@ github.com/getlantern/ops v0.0.0-20190325191751-d70cb0d6f85f h1:wrYrQttPS8FHIRSl github.com/getlantern/ops v0.0.0-20190325191751-d70cb0d6f85f/go.mod h1:D5ao98qkA6pxftxoqzibIBBrLSUli+kYnJqrgBf9cIA= github.com/getlantern/systray v1.2.1 h1:udsC2k98v2hN359VTFShuQW6GGprRprw6kD6539JikI= github.com/getlantern/systray v1.2.1/go.mod h1:AecygODWIsBquJCJFop8MEQcJbWFfw/1yWbVabNgpCM= +github.com/go-fonts/dejavu v0.1.0 h1:JSajPXURYqpr+Cu8U9bt8K+XcACIHWqWrvWCKyeFmVQ= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0 h1:5/Tv1Ek/QCr20C6ZOz15vw3g7GELYL98KWr8Hgo+3vk= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= github.com/go-fonts/liberation v0.2.0 h1:jAkAWJP4S+OsrPLZM4/eC9iW7CtHy+HBXrEwZXWo5VM= github.com/go-fonts/liberation v0.2.0/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY= github.com/go-fonts/stix v0.1.0/go.mod h1:w/c1f0ldAUlJmLBvlbkvVXLAD+tAMqobIIQpmnUIzUY= -github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81 h1:6zl3BbBhdnMkpSj2YY30qV3gDcVBGtFgVsV3+/i+mKQ= github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpxhq9o2S/CELCSUxEWWAuoCUcVCQWv7G2OCk= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= @@ -122,8 +118,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY= -github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0= +github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= +github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= @@ -134,11 +130,9 @@ github.com/sqweek/dialog v0.0.0-20220504154117-be45b268883a h1:M4NnixT1bCxVc2LC3 github.com/sqweek/dialog v0.0.0-20220504154117-be45b268883a/go.mod h1:/qNPSY91qTz/8TgHEMioAUc6q7+3SOybeKczHMXFcXw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -147,49 +141,38 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3/go.mod h1:NOZ3BPKG0ec/BKJQgnvsSFpcKLM5xXVWnvZS97DWHgE= golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= -golang.org/x/exp/shiny v0.0.0-20220722155223-a9213eeb770e h1:pkl1Ko5DrhA4ezwKwdnmO7H1sKmMy9qLuYKRjS7SlmE= -golang.org/x/exp/shiny v0.0.0-20220722155223-a9213eeb770e/go.mod h1:VjAR7z0ngyATZTELrBSkxOOHhhlnVUxDye4mcjx5h/8= -golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= -golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200430140353-33d19683fad8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= -golang.org/x/image v0.0.0-20200618115811-c13761719519/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20201208152932-35266b937fa6/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20210607152325-775e3b0c77b9/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM= golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM= golang.org/x/image v0.0.0-20211028202545-6944b10bf410/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM= golang.org/x/image v0.0.0-20220902085622-e7cb96979f69 h1:Lj6HJGCSn5AjxRAH2+r35Mir4icalbqku+CLUtjnvXY= golang.org/x/image v0.0.0-20220902085622-e7cb96979f69/go.mod h1:doUCurBvlfPMKfmIpRIywoHmhN3VyhnoFDbvIEWF4hY= -golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= -golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= +golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -197,28 +180,26 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4= -golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k= +golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= +golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -229,11 +210,8 @@ google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscL gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/logger/logger.go b/logger/logger.go index f6db5640..da129510 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -11,15 +11,14 @@ package logger import ( "fmt" + "io" "os" "path/filepath" "runtime" "time" - log "github.com/rs/zerolog" - stdlog "log" - "github.com/nyaosorg/go-windows-dbg" + log "github.com/rs/zerolog" ) // _log customized logger instance @@ -57,8 +56,9 @@ func SetupLogger(logName string) { log.SetGlobalLevel(log.InfoLevel) log.TimeFieldFormat = time.StampMilli - _log = log.New(_logFile).Level(log.DebugLevel). - With().Timestamp().Logger() + _log = log.New(io.MultiWriter(_logFile, os.Stdout)). + Level(log.InfoLevel). + With().Logger() } // CloseLogger Terminates the current log and resets it to stdout output @@ -73,11 +73,16 @@ func CloseLogger() { _log = log.New(os.Stdout) } +// GetLogger allows external libraries to integrate with the qpep logger +func GetLogger() *log.Logger { + return &_log +} + // Info Outputs a new formatted string with the provided parameters to the logger instance with Info level // Outputs the same data to the OutputDebugString facility if os is Windows and level is set to Debug func Info(format string, values ...interface{}) { - _log.Info().Msgf(format, values...) - stdlog.Printf(format, values...) + _log.Info().Time("time", time.Now()).Msgf(format, values...) + //stdlog.Printf(format, values...) if runtime.GOOS == "windows" && _log.GetLevel() >= log.DebugLevel { _, _ = dbg.Printf(format, values...) return @@ -90,8 +95,8 @@ func Debug(format string, values ...interface{}) { if log.GlobalLevel() != log.DebugLevel { return } - _log.Debug().Msgf(format, values...) - stdlog.Printf(format, values...) + _log.Debug().Time("time", time.Now()).Msgf(format, values...) + //stdlog.Printf(format, values...) if runtime.GOOS == "windows" && _log.GetLevel() >= log.DebugLevel { _, _ = dbg.Printf(format, values...) return @@ -101,8 +106,8 @@ func Debug(format string, values ...interface{}) { // Error Outputs a new formatted string with the provided parameters to the logger instance with Error level // Outputs the same data to the OutputDebugString facility if os is Windows and level is set to Debug func Error(format string, values ...interface{}) { - _log.Error().Msgf(format, values...) - stdlog.Printf(format, values...) + _log.Error().Time("time", time.Now()).Msgf(format, values...) + //stdlog.Printf(format, values...) if runtime.GOOS == "windows" && _log.GetLevel() >= log.DebugLevel { _, _ = dbg.Printf(format, values...) return @@ -113,13 +118,22 @@ func Error(format string, values ...interface{}) { // Outputs the same data to the OutputDebugString facility if os is Windows and level is set to Debug // and then panics with the same formatted string func Panic(format string, values ...interface{}) { - _log.Error().Msgf(format, values...) + _log.Error().Time("time", time.Now()).Msgf(format, values...) if runtime.GOOS == "windows" && _log.GetLevel() >= log.DebugLevel { _, _ = dbg.Printf(format, values...) } panic(fmt.Sprintf(format, values...)) } +func Trace() { + _, file, line, ok := runtime.Caller(1) + if !ok { + Info("[trace][%s:%d]", "", 0) + return + } + Info("[trace][%s:%d]", file, line) +} + // OnError method sends an error log only if the err value in input is not nil func OnError(err error, msg string) { if err == nil { diff --git a/main.go b/main.go index fb9b2cbc..b9170c7c 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,10 @@ package main import ( "github.com/parvit/qpep/logger" "github.com/parvit/qpep/service" + "github.com/parvit/qpep/shared" "os" "runtime/debug" + "runtime/trace" ) func init() { @@ -12,6 +14,9 @@ func init() { } func main() { + f, _ := os.Create("trace.out") + trace.Start(f) + defer func() { if err := recover(); err != nil { logger.Error("PANIC: %v", err) @@ -19,10 +24,16 @@ func main() { } }() + tsk := shared.StartRegion("ServiceMain") retcode := service.ServiceMain() + tsk.End() logger.Info("=== EXIT - code(%d) ===", retcode) logger.CloseLogger() + trace.Stop() + f.Sync() + f.Close() + os.Exit(retcode) } diff --git a/service.go b/service.go index d884ce82..24d709c2 100644 --- a/service.go +++ b/service.go @@ -4,6 +4,8 @@ import ( "context" "github.com/davecgh/go-spew/spew" "github.com/parvit/qpep/version" + "github.com/parvit/qpep/workers/client" + "github.com/parvit/qpep/workers/server" "os" "os/signal" "path/filepath" @@ -15,10 +17,8 @@ import ( service "github.com/parvit/kardianos-service" "github.com/parvit/qpep/api" - "github.com/parvit/qpep/client" "github.com/parvit/qpep/flags" . "github.com/parvit/qpep/logger" - "github.com/parvit/qpep/server" "github.com/parvit/qpep/shared" "github.com/parvit/qpep/windivert" ) diff --git a/service/service.go b/service/service.go index 2e1dae48..790e9180 100644 --- a/service/service.go +++ b/service/service.go @@ -3,8 +3,8 @@ package service import ( "context" "github.com/davecgh/go-spew/spew" - "github.com/parvit/qpep/logger" - "github.com/parvit/qpep/version" + "github.com/parvit/qpep/workers/client" + "github.com/parvit/qpep/workers/server" log "github.com/rs/zerolog" "os" "os/signal" @@ -17,10 +17,10 @@ import ( service "github.com/parvit/kardianos-service" "github.com/parvit/qpep/api" - "github.com/parvit/qpep/client" "github.com/parvit/qpep/flags" - "github.com/parvit/qpep/server" + "github.com/parvit/qpep/logger" "github.com/parvit/qpep/shared" + "github.com/parvit/qpep/version" "github.com/parvit/qpep/windivert" ) diff --git a/shared/debug_watcher.go b/shared/debug_watcher.go index 0230db99..965b01ce 100644 --- a/shared/debug_watcher.go +++ b/shared/debug_watcher.go @@ -1,11 +1,13 @@ package shared import ( + "context" "fmt" "io" "os" "runtime" "runtime/pprof" + "runtime/trace" "time" ) @@ -13,6 +15,10 @@ const ( DEBUG_FILE_FMT = "%s_%v_%s.prof" ) +var ( + traceContext = context.Background() +) + func WatcherCPU() { cpuWatcher(0) } @@ -48,3 +54,7 @@ func heapWatcher(idx int) { heapWatcher(idx + 1) }() } + +func StartRegion(key string) *trace.Region { + return trace.StartRegion(traceContext, key) +} diff --git a/shared/errors.go b/shared/errors.go index 1eddd1a9..c5264feb 100644 --- a/shared/errors.go +++ b/shared/errors.go @@ -4,6 +4,8 @@ import "errors" var ( ErrFailed = errors.New("failed") + ErrInvalidBackendOperation = errors.New("operation not supported by the backend selected") + ErrInvalidBackendSelected = errors.New("backend selected is not supported") ErrFailedGatewayDetect = errors.New("failed to detect the gateway") ErrFailedGatewayConnect = errors.New("failed to connect to the gateway") ErrNoCommand = errors.New("could not create command") diff --git a/shared/qpep_config.go b/shared/qpep_config.go index 66d955c8..aa7a63f8 100644 --- a/shared/qpep_config.go +++ b/shared/qpep_config.go @@ -36,6 +36,7 @@ const ( DEFAULT_CONFIG = ` acks: 10 ackdelay: 25 +backend: quic-go congestion: 4 decimate: 4 decimatetime: 100 @@ -72,6 +73,8 @@ type QPepConfigType struct { GatewayPort int `yaml:"port"` // GatewayAPIPort (yaml:apiport) Port on which the gateway qpep server listens for TCP API requests GatewayAPIPort int `yaml:"apiport"` + // Backend (yaml:backend) Specifies the backend to use for quic connections(available: quic-go and quicly-go) + Backend string `yaml:"backend"` // ListenHost (yaml:listenaddress) Address on which the local instance (client or server) listens for incoming connections // if indicates subnet 0. or 127. it will try to autodetect a good ip available ListenHost string `yaml:"listenaddress"` diff --git a/shared/qpep_header.go b/shared/qpep_header.go index bfa459ef..5602e06d 100644 --- a/shared/qpep_header.go +++ b/shared/qpep_header.go @@ -7,6 +7,7 @@ package shared import ( "encoding/binary" + "github.com/parvit/qpep/logger" "io" "net" ) @@ -112,6 +113,7 @@ func QPepHeaderFromBytes(stream io.Reader) (*QPepHeader, error) { if ipBytesNum != 2 || err != nil { return nil, ErrInvalidHeader } + logger.Info("PREAMBLE: %v", preamble) var sourceIpEnd int if preamble[0] == IPV4 { @@ -138,6 +140,9 @@ func QPepHeaderFromBytes(stream io.Reader) (*QPepHeader, error) { byteInput := make([]byte, flagsEnd) readDataBytes, err := stream.Read(byteInput) + logger.Info("HEADER: %v - (%d/%d/%d/%d/%d/%d) - %v", err, + sourceIpEnd, sourcePortEnd, destIpEnd, destPortEnd, flagsEnd, readDataBytes, + byteInput) if readDataBytes != flagsEnd || err != nil { return nil, ErrInvalidHeaderDataLength } diff --git a/shared/quic_config.go b/shared/quic_config.go index a386ee58..251e7545 100644 --- a/shared/quic_config.go +++ b/shared/quic_config.go @@ -10,13 +10,11 @@ import ( // quic-go library, it is common among the server and client packages func GetQuicConfiguration() *quic.Config { cfg := &quic.Config{ - MaxIncomingStreams: 10240, + MaxIncomingStreams: 1024, DisablePathMTUDiscovery: false, HandshakeIdleTimeout: GetScaledTimeout(10, time.Second), //KeepAlivePeriod: 1 * time.Second, - - EnableDatagrams: true, } // Only used in debug sessions diff --git a/client/client.go b/workers/client/client.go similarity index 99% rename from client/client.go rename to workers/client/client.go index ce71041d..d6065cf8 100644 --- a/client/client.go +++ b/workers/client/client.go @@ -31,7 +31,7 @@ var ( RedirectedInterfaces: []int64{}, QuicStreamTimeout: 2, MultiStream: shared.QPepConfig.MultiStream, MaxConnectionRetries: shared.DEFAULT_REDIRECT_RETRIES, - IdleTimeout: 30 * time.Second, + IdleTimeout: time.Duration(300) * time.Second, WinDivertThreads: 1, Verbose: false, } diff --git a/client/client_impl_linux.go b/workers/client/client_impl_linux.go similarity index 100% rename from client/client_impl_linux.go rename to workers/client/client_impl_linux.go diff --git a/client/client_impl_windows.go b/workers/client/client_impl_windows.go similarity index 100% rename from client/client_impl_windows.go rename to workers/client/client_impl_windows.go diff --git a/client/client_network.go b/workers/client/client_network.go similarity index 75% rename from client/client_network.go rename to workers/client/client_network.go index 2fb90a4d..f38ff796 100644 --- a/client/client_network.go +++ b/workers/client/client_network.go @@ -3,9 +3,7 @@ package client import ( "bufio" "bytes" - "crypto/tls" "fmt" - "github.com/parvit/qpep/logger" "io" "io/ioutil" "net" @@ -16,18 +14,15 @@ import ( "sync" "time" - "github.com/Project-Faster/quic-go" - + "github.com/parvit/qpep/backend" + "github.com/parvit/qpep/logger" "github.com/parvit/qpep/shared" "github.com/parvit/qpep/windivert" "golang.org/x/net/context" ) const ( - BUFFER_SIZE = 512 * 1024 - - ACTIVITY_RX_FLAG = "activity_rx" - ACTIVITY_TX_FLAG = "activity_tx" + BUFFER_SIZE = 32 * 1024 LOCAL_RECONNECTION_RETRIES = 10 ) @@ -38,7 +33,7 @@ var ( newSessionLock sync.RWMutex // quicSession listening quic connection to the server - quicSession quic.Connection + quicSession backend.QuicBackendConnection ) // listenTCPConn method implements the routine that listens to incoming diverted/proxied connections @@ -126,7 +121,7 @@ func handleTCPConn(tcpConn net.Conn) { } logger.Info("Connection flags : %d %d", sessionHeader.Flags, sessionHeader.Flags&shared.QPEP_LOCALSERVER_DESTINATION) - logger.Info("Sending QUIC header to server, SourceAddr: %v / DestAddr: %v", sessionHeader.SourceAddr, sessionHeader.DestAddr) + logger.Info("Sending QPEP header to server, SourceAddr: %v / DestAddr: %v", sessionHeader.SourceAddr, sessionHeader.DestAddr) _, err := quicStream.Write(sessionHeader.ToBytes()) logger.OnError(err, "writing to quic stream") } else { @@ -137,22 +132,20 @@ func handleTCPConn(tcpConn net.Conn) { } //Proxy all stream content from quic to TCP and from TCP to quic - logger.Info("== Stream %d Start ==", quicStream.StreamID()) + logger.Info("== Stream %d Start ==", quicStream.ID()) + go handleTcpToQuic(ctx, &streamWait, quicStream, tcpConn) go handleQuicToTcp(ctx, &streamWait, tcpConn, quicStream) //we exit (and close the TCP connection) once both streams are done copying - logger.Info("== Stream %d Wait ==", quicStream.StreamID()) + logger.Info("== Stream %d Wait ==", quicStream.ID()) streamWait.Wait() - logger.Info("== Stream %d WaitEnd ==", quicStream.StreamID()) + logger.Info("== Stream %d WaitEnd ==", quicStream.ID()) - quicStream.CancelWrite(0) - quicStream.CancelRead(0) quicStream.Close() - tcpConn.Close() - logger.Info("== Stream %d End ==", quicStream.StreamID()) + logger.Info("== Stream %d End ==", quicStream.ID()) if !ClientConfiguration.MultiStream { // destroy the session so a new one is created next time @@ -162,10 +155,10 @@ func handleTCPConn(tcpConn net.Conn) { // getQuicStream method handles the opening or reutilization of the quic session, and launches a new // quic stream for communication -func getQuicStream(ctx context.Context) (quic.Stream, error) { +func getQuicStream(ctx context.Context) (backend.QuicBackendStream, error) { var err error - var quicStream quic.Stream = nil - var localSession quic.Connection = nil + var quicStream backend.QuicBackendStream = nil + var localSession backend.QuicBackendConnection = nil newSessionLock.RLock() localSession = quicSession @@ -188,9 +181,9 @@ func getQuicStream(ctx context.Context) (quic.Stream, error) { // if we allow for multiple streams in a session, try and open on the existing session if ClientConfiguration.MultiStream && localSession != nil { logger.Info("Trying to open on existing session") - quicStream, err = localSession.OpenStream() + quicStream, err = localSession.OpenStream(context.Background()) if err == nil { - logger.Info("Opened a new stream: %d", quicStream.StreamID()) + logger.Info("Opened a new stream: %d", quicStream.ID()) return quicStream, nil } // if we weren't able to open a quicStream on that session (usually inactivity timeout), we can try to open a new session @@ -198,15 +191,15 @@ func getQuicStream(ctx context.Context) (quic.Stream, error) { quicStream = nil newSessionLock.Lock() - quicSession.CloseWithError(quic.ApplicationErrorCode(0), "Stream could not be opened") + quicSession.Close(0, "Stream could not be opened") quicSession = nil newSessionLock.Unlock() - return nil, quic.ErrServerClosed + return nil, shared.ErrFailedGatewayConnect } - //Open a stream to send writtenData on this new session - quicStream, err = quicSession.OpenStreamSync(ctx) + //Dial a stream to send writtenData on this new session + quicStream, err = quicSession.OpenStream(ctx) // if we cannot open a stream on this session, send a TCP RST and let the client decide to try again logger.OnError(err, "Unable to open QUIC stream") if err != nil { @@ -315,7 +308,7 @@ func handleProxyOpenConnection(tcpConn net.Conn) (*http.Request, error) { return req, nil } -func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn net.Conn, stream quic.Stream) error { +func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn net.Conn, stream backend.QuicBackendStream) error { switch req.Method { case http.MethodDelete: fallthrough @@ -337,6 +330,8 @@ func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn panic("Should not happen as the handleProxyOpenConnection method checks the http request") } + logger.Info("HOST: %s", req.Host) + header.DestAddr = &net.TCPAddr{ IP: address, Port: port, @@ -347,8 +342,11 @@ func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn } logger.Info("Proxied connection flags : %d %d", header.Flags, header.Flags&shared.QPEP_LOCALSERVER_DESTINATION) - logger.Info("Sending QUIC header to server, SourceAddr: %v / DestAddr: %v", header.SourceAddr, header.DestAddr) - _, err := stream.Write(header.ToBytes()) + logger.Info("Sending QPEP header to server, SourceAddr: %v / DestAddr: %v", header.SourceAddr, header.DestAddr) + + headerData := header.ToBytes() + logger.Info("QPEP header: %v", headerData) + _, err := stream.Write(headerData) if err != nil { _ = tcpConn.Close() logger.Error("Error writing to quic stream: %s", err.Error()) @@ -370,6 +368,8 @@ func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn panic("Should not happen as the handleProxyOpenConnection method checks the http request") } + logger.Info("HOST: %s", req.Host) + header.DestAddr = &net.TCPAddr{ IP: address, Port: port, @@ -395,7 +395,7 @@ func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn t.Write(tcpConn) logger.Info("Proxied connection") - logger.Info("Sending QUIC header to server, SourceAddr: %v / DestAddr: %v", header.SourceAddr, header.DestAddr) + logger.Info("Sending QPEP header to server, SourceAddr: %v / DestAddr: %v", header.SourceAddr, header.DestAddr) _, err := stream.Write(header.ToBytes()) if err != nil { _ = tcpConn.Close() @@ -410,16 +410,25 @@ func handleProxyedRequest(req *http.Request, header *shared.QPepHeader, tcpConn } // handleTcpToQuic method implements the tcp connection to quic connection side of the connection -func handleTcpToQuic(ctx context.Context, streamWait *sync.WaitGroup, dst quic.Stream, src net.Conn) { +func handleTcpToQuic(ctx context.Context, streamWait *sync.WaitGroup, dst backend.QuicBackendStream, src net.Conn) { + tskKey := fmt.Sprintf("Tcp->Quic:%v", dst.ID()) + tsk := shared.StartRegion(tskKey) defer func() { - _ = recover() - + if err := recover(); err != nil { + logger.Error("ERR: %v", err) + debug.PrintStack() + } + tsk.End() streamWait.Done() - logger.Info("== Stream %v TCP->Quic done ==", dst.StreamID()) + logger.Info("== Stream %v TCP->Quic done ==", dst.ID()) }() setLinger(src) + buf := make([]byte, BUFFER_SIZE) + timeoutCounter := 0 + i := 0 + for { select { case <-ctx.Done(): @@ -427,28 +436,56 @@ func handleTcpToQuic(ctx context.Context, streamWait *sync.WaitGroup, dst quic.S default: } - //logger.Info("[%d] T->Q: %v: %v", dst.StreamID(), activityFlag, *activityFlag) + tm := time.Now().Add(1 * time.Second) + _ = src.SetDeadline(tm) + _ = dst.SetReadDeadline(tm) - written, err := io.Copy(dst, io.LimitReader(src, BUFFER_SIZE)) + tm2 := time.Now().Add(10 * time.Second) + _ = dst.SetWriteDeadline(tm2) - if written == 0 && err != nil { + tsk := shared.StartRegion(fmt.Sprintf("copybuffer.%d.%s", i, tskKey)) + wr, err := io.CopyBuffer(dst, io.LimitReader(src, BUFFER_SIZE), buf) + tsk.End() + if wr == 0 { + timeoutCounter++ + if timeoutCounter > 5 { + return + } + } else { + timeoutCounter = 0 + } + + //logger.Info("[%d] T->Q: %v, %v", dst.ID(), wr, err) + + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } return } } - //logger.Info("Finished Copying TCP Conn %s->%s, Stream ID %d\n", src.LocalAddr().String(), src.RemoteAddr().String(), dst.StreamID()) + //logger.Info("Finished Copying TCP Conn %s->%s, Stream ID %d\n", src.LocalAddr().String(), src.RemoteAddr().String(), dst.ID()) } // handleQuicToTcp method implements the quic connection to tcp connection side of the connection -func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, dst net.Conn, src quic.Stream) { +func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, dst net.Conn, src backend.QuicBackendStream) { + tskKey := fmt.Sprintf("Quic->Tcp:%v", src.ID()) + tsk := shared.StartRegion(tskKey) defer func() { - _ = recover() - + if err := recover(); err != nil { + logger.Error("ERR: %v", err) + debug.PrintStack() + } + tsk.End() streamWait.Done() - logger.Info("== Stream %v Quic->TCP done ==", src.StreamID()) + logger.Info("== Stream %v Quic->TCP done ==", src.ID()) }() setLinger(dst) + buf := make([]byte, BUFFER_SIZE) + timeoutCounter := 0 + for { select { case <-ctx.Done(): @@ -456,11 +493,29 @@ func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, dst net.Co default: } - //logger.Info("[%d] Q->T: %v: %v", src.StreamID(), activityFlag, *activityFlag) + tm := time.Now().Add(1 * time.Second) + _ = src.SetReadDeadline(tm) + _ = src.SetWriteDeadline(tm) + + tm2 := time.Now().Add(10 * time.Second) + _ = dst.SetDeadline(tm2) + + wr, err := io.CopyBuffer(dst, io.LimitReader(src, BUFFER_SIZE), buf) + if wr == 0 { + timeoutCounter++ + if timeoutCounter > 5 { + return + } + } else { + timeoutCounter = 0 + } - written, err := io.Copy(dst, io.LimitReader(src, BUFFER_SIZE)) + //logger.Info("[%d] Q->T: %v, %v", src.ID(), wr, err) - if written == 0 && err != nil { + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } return } } @@ -509,26 +564,33 @@ func getAddressPortFromHost(host string) (net.IP, int, bool) { proxyable = true break } + if proxyable && port == 0 { + port = 80 + } } return address, int(port), proxyable } +var quicProvider backend.QuicBackend + // openQuicSession implements the quic connection request to the qpep server -func openQuicSession() (quic.Connection, error) { - var err error - var session quic.Connection - tlsConf := &tls.Config{InsecureSkipVerify: true, NextProtos: []string{"qpep"}} - gatewayPath := ClientConfiguration.GatewayHost + ":" + strconv.Itoa(ClientConfiguration.GatewayPort) - quicClientConfig := shared.GetQuicConfiguration() +func openQuicSession() (backend.QuicBackendConnection, error) { + if quicProvider == nil { + var ok bool + quicProvider, ok = backend.Get(shared.QPepConfig.Backend) + if !ok { + panic(shared.ErrInvalidBackendSelected) + } + } - logger.Info("Dialing QUIC Session: %s\n", gatewayPath) - session, err = quic.DialAddr(gatewayPath, tlsConf, quicClientConfig) + session, err := quicProvider.Dial(context.Background(), ClientConfiguration.GatewayHost, ClientConfiguration.GatewayPort) - if err == nil { - logger.Info("QUIC Session Open\n") - return session, nil + logger.Info("== Dialing QUIC Session: %s:%d ==\n", ClientConfiguration.GatewayHost, ClientConfiguration.GatewayPort) + if err != nil { + logger.Error("Unable to Dial QUIC Session: %v\n", err) + return nil, shared.ErrFailedGatewayConnect } + logger.Info("== QUIC Session Dial: %s:%d ==\n", ClientConfiguration.GatewayHost, ClientConfiguration.GatewayPort) - logger.Error("Unable to Open QUIC Session: %v\n", err) - return nil, shared.ErrFailedGatewayConnect + return session, nil } diff --git a/client/client_network_linux_test.go b/workers/client/client_network_linux_test.go similarity index 100% rename from client/client_network_linux_test.go rename to workers/client/client_network_linux_test.go diff --git a/client/client_network_windows_test.go b/workers/client/client_network_windows_test.go similarity index 100% rename from client/client_network_windows_test.go rename to workers/client/client_network_windows_test.go diff --git a/client/client_proxy_listener_darwin.go b/workers/client/client_proxy_listener_darwin.go similarity index 98% rename from client/client_proxy_listener_darwin.go rename to workers/client/client_proxy_listener_darwin.go index a08257b0..9421ade3 100644 --- a/client/client_proxy_listener_darwin.go +++ b/workers/client/client_proxy_listener_darwin.go @@ -51,7 +51,7 @@ func (listener *ClientProxyListener) Close() error { // NewClientProxyListener method instantiates a new ClientProxyListener on a tcp address base listener func NewClientProxyListener(network string, laddr *net.TCPAddr) (net.Listener, error) { - //Open basic TCP listener + //Dial basic TCP listener listener, err := net.ListenTCP(network, laddr) if err != nil { return nil, err diff --git a/client/client_proxy_listener_linux.go b/workers/client/client_proxy_listener_linux.go similarity index 98% rename from client/client_proxy_listener_linux.go rename to workers/client/client_proxy_listener_linux.go index e273e2bb..29a0eb7d 100644 --- a/client/client_proxy_listener_linux.go +++ b/workers/client/client_proxy_listener_linux.go @@ -51,7 +51,7 @@ func (listener *ClientProxyListener) Close() error { // NewClientProxyListener method instantiates a new ClientProxyListener on a tcp address base listener func NewClientProxyListener(network string, laddr *net.TCPAddr) (net.Listener, error) { - //Open basic TCP listener + //Dial basic TCP listener listener, err := net.ListenTCP(network, laddr) if err != nil { return nil, err diff --git a/client/client_proxy_listener_windows.go b/workers/client/client_proxy_listener_windows.go similarity index 98% rename from client/client_proxy_listener_windows.go rename to workers/client/client_proxy_listener_windows.go index e66f0395..aa8f37da 100644 --- a/client/client_proxy_listener_windows.go +++ b/workers/client/client_proxy_listener_windows.go @@ -47,7 +47,7 @@ func (listener *ClientProxyListener) Close() error { // NewClientProxyListener method instantiates a new ClientProxyListener on a tcp address base listener func NewClientProxyListener(network string, laddr *net.TCPAddr) (net.Listener, error) { - //Open basic TCP listener + //Dial basic TCP listener listener, err := net.ListenTCP(network, laddr) if err != nil { return nil, err diff --git a/client/client_proxy_test.go b/workers/client/client_proxy_test.go similarity index 100% rename from client/client_proxy_test.go rename to workers/client/client_proxy_test.go diff --git a/client/client_test.go b/workers/client/client_test.go similarity index 100% rename from client/client_test.go rename to workers/client/client_test.go diff --git a/server/server.go b/workers/server/server.go similarity index 72% rename from server/server.go rename to workers/server/server.go index 6f655e52..bc6b90be 100644 --- a/server/server.go +++ b/workers/server/server.go @@ -2,18 +2,7 @@ package server import ( "context" - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "encoding/pem" - "github.com/Project-Faster/quic-go" - //"github.com/Project-Faster/quic-go/logging" - //"github.com/Project-Faster/quic-go/qlog" - "io/ioutil" - "math/big" "runtime/debug" - "strconv" "time" "github.com/parvit/qpep/api" @@ -27,10 +16,8 @@ var ( ListenHost: "0.0.0.0", ListenPort: 443, APIPort: 444, - IdleTimeout: 30 * time.Second, + IdleTimeout: 3 * time.Second, } - // quicListener instance of the quic server that receives the connections from clients - quicListener quic.Listener ) // ServerConfiguration struct models the parameters necessary for running the quic server @@ -56,7 +43,7 @@ func RunServer(ctx context.Context, cancel context.CancelFunc) { debug.PrintStack() } if quicListener != nil { - quicListener.Close() + quicListener.Close(0, "") } cancel() }() @@ -69,17 +56,10 @@ func RunServer(ctx context.Context, cancel context.CancelFunc) { } defer api.Statistics.Stop() - listenAddr := ServerConfiguration.ListenHost + ":" + strconv.Itoa(ServerConfiguration.ListenPort) - logger.Info("Opening QPEP Server on: %s\n", listenAddr) - var err error - quicListener, err = quic.ListenAddr(listenAddr, generateTLSConfig(), shared.GetQuicConfiguration()) - if err != nil { - logger.Info("Encountered error while binding QUIC listener: %s\n", err) - return - } + logger.Info("Opening QPEP Server on: %s:%d\n", ServerConfiguration.ListenHost, ServerConfiguration.ListenPort) // launches listener - go listenQuicSession() + go listenQuicSession(ServerConfiguration.ListenHost, ServerConfiguration.ListenPort) ctxPerfWatcher, perfWatcherCancel := context.WithCancel(context.Background()) go performanceWatcher(ctxPerfWatcher) @@ -96,33 +76,6 @@ func RunServer(ctx context.Context, cancel context.CancelFunc) { } } -// generateTLSConfig creates a new x509 key/certificate pair and dumps it to the disk -func generateTLSConfig() *tls.Config { - key, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - panic(err) - } - template := x509.Certificate{SerialNumber: big.NewInt(1)} - certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) - if err != nil { - panic(err) - } - keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) - certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) - - ioutil.WriteFile("server_key.pem", keyPEM, 0777) - ioutil.WriteFile("server_cert.pem", certPEM, 0777) - - tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) - if err != nil { - panic(err) - } - return &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - NextProtos: []string{"qpep"}, - } -} - // performanceWatcher method is a goroutine that checks the current speed of every host every second and // updates the values for the current speed and total number of bytes uploaded / downloaded func performanceWatcher(ctx context.Context) { diff --git a/server/server_network.go b/workers/server/server_network.go similarity index 52% rename from server/server_network.go rename to workers/server/server_network.go index 9831b862..cddbc83b 100644 --- a/server/server_network.go +++ b/workers/server/server_network.go @@ -3,8 +3,8 @@ package server import ( "context" "fmt" - "github.com/Project-Faster/quic-go" "github.com/parvit/qpep/api" + "github.com/parvit/qpep/backend" "github.com/parvit/qpep/logger" "github.com/parvit/qpep/shared" "io" @@ -18,16 +18,37 @@ const ( BUFFER_SIZE = 512 * 1024 ) +var ( + // quicSession listening quic connection to the server + quicProvider backend.QuicBackend + quicListener backend.QuicBackendConnection +) + // listenQuicSession handles accepting the sessions and the launches goroutines to actually serve them -func listenQuicSession() { +func listenQuicSession(address string, port int) { defer func() { if err := recover(); err != nil { logger.Info("PANIC: %v\n", err) debug.PrintStack() } }() + if quicProvider == nil { + var ok bool + quicProvider, ok = backend.Get(shared.QPepConfig.Backend) + if !ok { + panic(shared.ErrInvalidBackendSelected) + } + } + + var err error + quicListener, err = quicProvider.Listen(context.Background(), address, port) + if err != nil { + logger.Error("Unrecoverable error while listening for QUIC connections: %s\n", err) + return + } + for { - quicSession, err := quicListener.Accept(context.Background()) + quicSession, err := quicListener.AcceptConnection(context.Background()) if err != nil { logger.Error("Unrecoverable error while accepting QUIC session: %s\n", err) return @@ -39,7 +60,7 @@ func listenQuicSession() { } // listenQuicConn handles opened quic sessions and accepts connections in goroutines to actually serve them -func listenQuicConn(quicSession quic.Connection) { +func listenQuicConn(quicSession backend.QuicBackendConnection) { defer func() { if err := recover(); err != nil { logger.Info("PANIC: %v\n", err) @@ -55,30 +76,30 @@ func listenQuicConn(quicSession quic.Connection) { return } go func() { - logger.Info("== [%d] Stream Start ==", stream.StreamID()) - handleQuicStream(stream) - logger.Info("== [%d] Stream End ==", stream.StreamID()) + tskKey := fmt.Sprintf("QuicStream:%v", stream.ID()) + tsk := shared.StartRegion(tskKey) + defer tsk.End() + for i := 0; i < 10; i++ { + connCounter := api.Statistics.GetCounter("", api.TOTAL_CONNECTIONS) + if connCounter >= 16 { + logger.Info("== [%d] Stream Queued (current: %d / max: %d) ==", stream.ID(), connCounter, 16) + <-time.After(100 * time.Millisecond) + continue + } + logger.Info("== [%d] Stream Start ==", stream.ID()) + handleQuicStream(stream) + logger.Info("== [%d] Stream End ==", stream.ID()) + return + } + logger.Info("== [%d] Session Rejected for too many connections ==", stream.ID()) + _ = stream.Close() + _ = quicSession.Close(1, "Session Rejected for too many connections") }() } } -func connectionActivityTimer(dst quic.Stream, src net.Conn, flag_rx, flag_tx *bool, cancelFunc context.CancelFunc) { - if flag_tx == nil || flag_rx == nil { - return - } - <-time.After(ServerConfiguration.IdleTimeout) - if !*flag_rx && !*flag_tx { - logger.Info("[%v] connection canceled for inactivity", dst.StreamID()) - cancelFunc() - dst.Close() - src.Close() - return - } - go connectionActivityTimer(dst, src, flag_rx, flag_tx, cancelFunc) -} - // handleQuicStream handles a quic stream connection and bridges to the standard tcp for the common internet -func handleQuicStream(quicStream quic.Stream) { +func handleQuicStream(quicStream backend.QuicBackendStream) { defer func() { if err := recover(); err != nil { logger.Info("PANIC: %v\n", err) @@ -101,33 +122,34 @@ func handleQuicStream(quicStream quic.Stream) { return } - logger.Info("[%d] Connection flags : %d %v", quicStream.StreamID(), qpepHeader.Flags, qpepHeader.Flags&shared.QPEP_LOCALSERVER_DESTINATION != 0) + logger.Info("[%d] Connection flags : %d %v", quicStream.ID(), qpepHeader.Flags, qpepHeader.Flags&shared.QPEP_LOCALSERVER_DESTINATION != 0) // To support the server being behind a private NAT (external gateway address != local listening address) // we dial the listening address when the connection is directed at the non-local API server destAddress := qpepHeader.DestAddr.String() if qpepHeader.Flags&shared.QPEP_LOCALSERVER_DESTINATION != 0 { - logger.Info("[%d] Local connection to server", quicStream.StreamID()) + logger.Info("[%d] Local connection to server", quicStream.ID()) destAddress = fmt.Sprintf("127.0.0.1:%d", qpepHeader.DestAddr.Port) } - logger.Debug("[%d] >> Opening TCP Conn to dest:%s, src:%s\n", quicStream.StreamID(), destAddress, qpepHeader.SourceAddr) + tskKey := fmt.Sprintf("TCP-Dial:%v:%v", quicStream.ID(), destAddress) + tsk := shared.StartRegion(tskKey) + logger.Debug("[%d] >> Opening TCP Conn to dest:%s, src:%s\n", quicStream.ID(), destAddress, qpepHeader.SourceAddr) dial := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP(ServerConfiguration.ListenHost)}, - Timeout: shared.GetScaledTimeout(10, time.Second), - KeepAlive: shared.GetScaledTimeout(15, time.Second), + Timeout: 10 * time.Second, + KeepAlive: 5 * time.Second, DualStack: true, FallbackDelay: 10 * time.Millisecond, } tcpConn, err := dial.Dial("tcp", destAddress) + tsk.End() if err != nil { - logger.Error("[%d] Unable to open TCP connection from QPEP quicStream: %s\n", quicStream.StreamID(), err) + logger.Error("[%d] Unable to open TCP connection from QPEP quicStream: %s\n", quicStream.ID(), err) quicStream.Close() - - shared.ScaleUpTimeout() return } - logger.Info(">> [%d] Opened TCP Conn %s -> %s\n", quicStream.StreamID(), qpepHeader.SourceAddr, destAddress) + logger.Info(">> [%d] Opened TCP Conn %s -> %s\n", quicStream.ID(), qpepHeader.SourceAddr, destAddress) trackedAddress := qpepHeader.SourceAddr.IP.String() proxyAddress := tcpConn.LocalAddr().String() @@ -146,20 +168,21 @@ func handleQuicStream(quicStream quic.Stream) { go handleQuicToTcp(ctx, &streamWait, srcLimit, tcpConn, quicStream, proxyAddress, trackedAddress) go handleTcpToQuic(ctx, &streamWait, dstLimit, quicStream, tcpConn, trackedAddress) - //go connectionActivityTimer(quicStream, tcpConn, &activityRX, &activityTX, cancel) //we exit (and close the TCP connection) once both streams are done copying or timeout - logger.Info("== Stream %d Wait ==", quicStream.StreamID()) + logger.Info("== Stream %d Wait ==", quicStream.ID()) streamWait.Wait() - logger.Info("== Stream %d WaitEnd ==", quicStream.StreamID()) + logger.Info("== Stream %d WaitEnd ==", quicStream.ID()) - quicStream.Close() tcpConn.Close() + quicStream.Close() } func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, speedLimit int64, - dst net.Conn, src quic.Stream, proxyAddress, trackedAddress string) { + dst net.Conn, src backend.QuicBackendStream, proxyAddress, trackedAddress string) { + tskKey := fmt.Sprintf("Quic->Tcp:%v", src.ID()) + tsk := shared.StartRegion(tskKey) defer func() { if err := recover(); err != nil { logger.Error("ERR: %v", err) @@ -167,15 +190,21 @@ func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, speedLimit } api.Statistics.DeleteMappedAddress(proxyAddress) + tsk.End() streamWait.Done() - logger.Info("== [%d] Stream Quic->TCP done ==", src.StreamID()) + logger.Info("== [%d] Stream Quic->TCP done ==", src.ID()) }() api.Statistics.SetMappedAddress(proxyAddress, trackedAddress) setLinger(dst) - written, err := io.Copy(dst, io.LimitReader(src, BUFFER_SIZE*2)) + timeoutCounter := 0 + var tempBuffer = make([]byte, BUFFER_SIZE) + + var wr int64 = 0 + var err error = nil + i := 0 for { select { case <-ctx.Done(): @@ -183,40 +212,70 @@ func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, speedLimit default: } - //logger.Info("[%d] Q->T: %v: %v", src.StreamID(), activityFlag, *activityFlag) + //logger.Info("[%d] Q->T: %v: %v", src.ID(), activityFlag, *activityFlag) + + tm := time.Now().Add(1 * time.Second) + _ = src.SetReadDeadline(tm) + _ = src.SetWriteDeadline(tm) + + tm2 := time.Now().Add(1 * time.Second) + _ = dst.SetReadDeadline(tm2) + _ = dst.SetWriteDeadline(tm2) + tsk := shared.StartRegion(fmt.Sprintf("copybuffer.%d.%s", i, tskKey)) + i++ if speedLimit == 0 { - written, err = io.Copy(dst, io.LimitReader(src, BUFFER_SIZE)) + wr, err = io.CopyBuffer(dst, io.LimitReader(src, BUFFER_SIZE), tempBuffer) } else { var now = time.Now() - written, err = io.Copy(dst, io.LimitReader(src, speedLimit)) + wr, err = io.Copy(dst, io.LimitReader(src, speedLimit)) var wait = time.Until(now.Add(1 * time.Second)) time.Sleep(wait) } + tsk.End() + + if wr == 0 { + timeoutCounter++ + if timeoutCounter > 5 { + return + } + } else { + timeoutCounter = 0 + } - if written == 0 && err != nil { + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } return } } } func handleTcpToQuic(ctx context.Context, streamWait *sync.WaitGroup, speedLimit int64, - dst quic.Stream, src net.Conn, trackedAddress string) { + dst backend.QuicBackendStream, src net.Conn, trackedAddress string) { + tskKey := fmt.Sprintf("Tcp->Quic:%v", dst.ID()) + tsk := shared.StartRegion(tskKey) defer func() { if err := recover(); err != nil { logger.Error("ERR: %v", err) debug.PrintStack() } - + tsk.End() streamWait.Done() - logger.Info("== [%d] Stream TCP->Quic done ==", dst.StreamID()) + logger.Info("== [%d] Stream TCP->Quic done ==", dst.ID()) }() setLinger(src) - written, err := io.Copy(dst, io.LimitReader(src, BUFFER_SIZE*2)) + var tempBuffer = make([]byte, BUFFER_SIZE) + + timeoutCounter := 0 + var wr int64 = 0 + var err error = nil + i := 0 for { select { case <-ctx.Done(): @@ -224,19 +283,42 @@ func handleTcpToQuic(ctx context.Context, streamWait *sync.WaitGroup, speedLimit default: } - //logger.Info("[%d] T->Q: %v: %v", dst.StreamID(), activityFlag, *activityFlag) + //logger.Info("[%d] T->Q: %v: %v", dst.ID(), activityFlag, *activityFlag) + + tm := time.Now().Add(1 * time.Second) + _ = src.SetReadDeadline(tm) + _ = src.SetWriteDeadline(tm) + + tm2 := time.Now().Add(10 * time.Second) + _ = dst.SetReadDeadline(tm2) + _ = dst.SetWriteDeadline(tm2) + tsk := shared.StartRegion(fmt.Sprintf("copybuffer.%d.%s", i, tskKey)) + i++ if speedLimit == 0 { - written, err = io.Copy(dst, io.LimitReader(src, BUFFER_SIZE)) + wr, err = io.CopyBuffer(dst, io.LimitReader(src, BUFFER_SIZE), tempBuffer) } else { var now = time.Now() - written, err = io.Copy(dst, io.LimitReader(src, speedLimit)) + wr, err = io.CopyBuffer(dst, io.LimitReader(src, speedLimit), tempBuffer) var wait = time.Until(now.Add(1 * time.Second)) time.Sleep(wait) } + tsk.End() - if written == 0 && err != nil { + if wr == 0 { + timeoutCounter++ + if timeoutCounter > 5 { + return + } + } else { + timeoutCounter = 0 + } + + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } return } } diff --git a/server/server_test.go b/workers/server/server_test.go similarity index 100% rename from server/server_test.go rename to workers/server/server_test.go