Skip to content

Commit

Permalink
fix api body close, fix partial write copybuffer error, add quicgo tr…
Browse files Browse the repository at this point in the history
…acer support, added buffersize configuration, fix quick disconnect on error, fix speedlimit handling, fix tray proxy check, update for icon connection issue, refactor copyBuffer method to backend
  • Loading branch information
parvit committed Aug 8, 2024
1 parent 8351ba4 commit 90d0000
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 343 deletions.
26 changes: 22 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import (
"github.com/parvit/qpep/version"
)

func closeBody(req *http.Request) {
if req != nil && req.Body != nil {
req.Body.Close()
}
}

// formatRequest method formats to a string the request in input, if verbose configuration
// is set then also the body of the request is extracted
func formatRequest(r *http.Request) string {
Expand All @@ -30,7 +36,9 @@ func formatRequest(r *http.Request) string {

// apiStatus handles the api path /status , which sends as output a json object
// of type StatusResponse
func apiStatus(w http.ResponseWriter, _ *http.Request, ps httprouter.Params) {
func apiStatus(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
defer closeBody(r)

addr := ps.ByName("addr")

if len(addr) == 0 {
Expand All @@ -54,6 +62,8 @@ func apiStatus(w http.ResponseWriter, _ *http.Request, ps httprouter.Params) {
// apiEcho handles the api path /echo , which sends as output a json object
// of type EchoResponse
func apiEcho(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
defer closeBody(r)

mappedAddr := r.RemoteAddr

if !strings.HasPrefix(r.RemoteAddr, "127.") {
Expand Down Expand Up @@ -98,6 +108,8 @@ func apiEcho(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
// apiVersions handles the api path /versions , which sends as output a json object
// of type VersionsResponse
func apiVersions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
defer closeBody(r)

server := "N/A"
client := "N/A"
if strings.Contains(r.URL.String(), API_PREFIX_SERVER) {
Expand All @@ -123,7 +135,9 @@ func apiVersions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
// apiStatisticsHosts handles the api path /statistics/hosts , which responds using a json object
// of type StatsInfoResponse, containing an attribute object StatsInfo of value "Address" for
// every host tracked
func apiStatisticsHosts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
func apiStatisticsHosts(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
defer closeBody(r)

info := StatsInfoResponse{}
hosts := Statistics.GetHosts()

Expand Down Expand Up @@ -153,7 +167,9 @@ func apiStatisticsHosts(w http.ResponseWriter, _ *http.Request, _ httprouter.Par
// * INFO_UPDATE
// * INFO_PLATFORM
// for the requested address (via the _addr_ parameter) or for the responding system if not specified
func apiStatisticsInfo(w http.ResponseWriter, _ *http.Request, ps httprouter.Params) {
func apiStatisticsInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
defer closeBody(r)

reqAddress := ps.ByName("addr")

lastUpdate := ""
Expand Down Expand Up @@ -207,7 +223,9 @@ func apiStatisticsInfo(w http.ResponseWriter, _ *http.Request, ps httprouter.Par
// * PERF_UP_TOTAL
// * PERF_DW_TOTAL
// for the requested address or for the responding system if not specified
func apiStatisticsData(w http.ResponseWriter, _ *http.Request, ps httprouter.Params) {
func apiStatisticsData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
defer closeBody(r)

reqAddress := ps.ByName("addr")

currConnections := Statistics.GetCounter(PERF_CONN, reqAddress)
Expand Down
1 change: 1 addition & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func apiFilter(next httprouter.Handle) httprouter.Handle {

// Request is found for API request
w.Header().Add("Content-Type", "application/json")
w.Header().Set("Connection", "close")
next(w, r, ps)
})
}
Expand Down
86 changes: 86 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"github.com/parvit/qpep/logger"
"github.com/parvit/qpep/shared"
"hash/crc64"
"io"
"io/ioutil"
"math/big"
"net"
"os"
"time"
)

var (
localPacketCounter = 0
crcTable = crc64.MakeTable(crc64.ISO)
)

type QuicBackend interface {
Dial(ctx context.Context, remoteAddress string, port int, clientCertPath string, ccAlgorithm string, ccSlowstartAlgo string, traceOn bool) (QuicBackendConnection, error)
Listen(ctx context.Context, address string, port int, serverCertPath string, serverKeyPath string, ccAlgorithm string, ccSlowstartAlgo string, traceOn bool) (QuicBackendConnection, error)
Expand Down Expand Up @@ -50,6 +60,82 @@ type QuicBackendStream interface {
IsClosed() bool
}

type ReaderTimeout interface {
io.Reader
SetReadDeadline(time.Time) error
}

type WriterTimeout interface {
io.Writer
SetWriteDeadline(time.Time) error
}

func CopyBuffer(dst WriterTimeout, src ReaderTimeout, buf []byte, timeout time.Duration, debugPrefix string) (written, read int64, err error) {
src.SetReadDeadline(time.Now().Add(timeout))

var nr int
var er error
var nw int
var ew error

nr, er = src.Read(buf)

if nr > 0 {
read += int64(nr)

dumpPacket("rd", debugPrefix, buf, nr)

offset := 0
for written != read {
dst.SetWriteDeadline(time.Now().Add(timeout))
nw, ew = dst.Write(buf[offset:nr])
written += int64(nw)
offset += nw
if ew == nil && nw <= 0 {
nw = 0
ew = io.ErrUnexpectedEOF
err = io.ErrUnexpectedEOF
}
if ew != nil {
if err2, ok := ew.(net.Error); ok && !err2.Timeout() {
continue
}
err = ew
}
}

dumpPacket("wr", debugPrefix, buf, nw)

} else {
if er != nil {
err = er
}
}
if er == io.EOF {
err = nil
}

logger.Debug("[%d][%s] %d,%v wr: %d,%v - %v **", localPacketCounter, debugPrefix, nr, er, nw, ew, err)

localPacketCounter++
return written, read, err
}

func dumpPacket(dmpType, prefix string, buf []byte, nr int) {
if !shared.DEBUG_DUMP_PACKETS {
return
}

dump, derr := os.Create(fmt.Sprintf("%s.%d-rd.bin", prefix, localPacketCounter))
if derr != nil {
panic(derr)
}
dump.Write(buf[0:nr])
dump.Sync()
dump.Close()
logger.Debug("[%d][%s] %s: %d (%v)", localPacketCounter, dump.Name(), dmpType, nr, crc64.Checksum(buf[0:nr], crcTable))
}

// GenerateTLSConfig creates a new x509 key/certificate pair and dumps it to the disk
func GenerateTLSConfig(certfile, keyfile string) tls.Certificate {
key, err := rsa.GenerateKey(rand.Reader, 2048)
Expand Down
82 changes: 77 additions & 5 deletions backend/backend_quicgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"errors"
"fmt"
"github.com/Project-Faster/quic-go"
"github.com/Project-Faster/quic-go/logging"
"github.com/parvit/qpep/logger"
"github.com/parvit/qpep/shared"
"io/ioutil"
Expand All @@ -40,7 +41,7 @@ type quicGoBackend struct {
}

func (q *quicGoBackend) Dial(ctx context.Context, remoteAddress string, port int, clientCertPath string, ccAlgorithm string, ccSlowstartAlgo string, traceOn bool) (QuicBackendConnection, error) {
quicConfig := qgoGetConfiguration()
quicConfig := qgoGetConfiguration(traceOn)

var err error
var session quic.Connection
Expand All @@ -64,7 +65,7 @@ func (q *quicGoBackend) Dial(ctx context.Context, remoteAddress string, port int
}

func (q *quicGoBackend) Listen(ctx context.Context, address string, port int, serverCertPath string, serverKeyPath string, ccAlgorithm string, ccSlowstartAlgo string, traceOn bool) (QuicBackendConnection, error) {
quicConfig := qgoGetConfiguration()
quicConfig := qgoGetConfiguration(traceOn)

tlsConf := loadTLSConfig(serverCertPath, serverKeyPath)

Expand All @@ -89,17 +90,24 @@ func (q *quicGoBackend) Close() error {
return nil
}

func qgoGetConfiguration() *quic.Config {
return &quic.Config{
func qgoGetConfiguration(traceOn bool) *quic.Config {
cfg := &quic.Config{
MaxIncomingStreams: 1024,
DisablePathMTUDiscovery: true,
MaxIdleTimeout: 3 * time.Second,
MaxIdleTimeout: 15 * time.Second,

InitialConnectionReceiveWindow: 10 * 1024 * 1024,

HandshakeIdleTimeout: shared.GetScaledTimeout(10, time.Second),
KeepAlivePeriod: 0,

EnableDatagrams: false,
}
if traceOn {
cfg.Tracer = &qpepQuicTracer{}
}

return cfg
}

type qgoConnectionAdapter struct {
Expand Down Expand Up @@ -242,6 +250,13 @@ func (stream *qgoStreamAdapter) IsClosed() bool {
return false // stream.closedRead || stream.closedWrite
}

func (stream *qgoStreamAdapter) Close() error {
ctx := stream.Stream.Context()
<-ctx.Done()

return stream.Stream.Close()
}

var _ QuicBackendStream = &qgoStreamAdapter{}

// --- Certificate support --- //
Expand Down Expand Up @@ -367,3 +382,60 @@ func parsePrivateKey(der []byte) (crypto.PrivateKey, error) {

return nil, errors.New("tls: failed to parse private key")
}

// --- Tracer --- //
type qpepQuicTracer struct {
logging.NullTracer
}

var tracer = &qpepQuicConnectionTracer{}

func (t *qpepQuicTracer) TracerForConnection(ctx context.Context, p logging.Perspective, odcid logging.ConnectionID) logging.ConnectionTracer {
return tracer
}
func (t *qpepQuicTracer) SentPacket(addr net.Addr, hdr *logging.Header, count logging.ByteCount, frames []logging.Frame) {
logger.Info("[QGO] Sent packet to %s: %s %d", addr, hdr.PacketType(), count)
}
func (t *qpepQuicTracer) DroppedPacket(addr net.Addr, typePkt logging.PacketType, count logging.ByteCount, reason logging.PacketDropReason) {
logger.Info("[QGO] Dropped packet to %s: %d %d - %v", addr, typePkt, count, reason)
}

type qpepQuicConnectionTracer struct {
logging.NullConnectionTracer
}

func (n *qpepQuicConnectionTracer) SentLongHeaderPacket(hdr *logging.ExtendedHeader, count logging.ByteCount, _ *logging.AckFrame, frames []logging.Frame) {
logger.Info("[QGO] Sent packet (long) %v: %d", hdr, count)
}
func (n *qpepQuicConnectionTracer) SentShortHeaderPacket(hdr *logging.ShortHeader, count logging.ByteCount, _ *logging.AckFrame, frames []logging.Frame) {
logger.Info("[QGO] Sent packet (short) %v: %d", hdr, count)
}
func (n *qpepQuicConnectionTracer) ReceivedRetry(hdr *logging.Header) {
logger.Info("[QGO] Retry packet %v", hdr)
}
func (n *qpepQuicConnectionTracer) ReceivedLongHeaderPacket(hdr *logging.ExtendedHeader, count logging.ByteCount, frames []logging.Frame) {
logger.Info("[QGO] Recv packet (long) %v: %d", hdr, count)
}
func (n *qpepQuicConnectionTracer) ReceivedShortHeaderPacket(hdr *logging.ShortHeader, count logging.ByteCount, frames []logging.Frame) {
logger.Info("[QGO] Recv packet (short) %v: %d", hdr, count)
}

func (n *qpepQuicConnectionTracer) BufferedPacket(typePkt logging.PacketType, count logging.ByteCount) {
logger.Info("[QGO] Buffered packet %d: %d", typePkt, count)
}
func (n *qpepQuicConnectionTracer) AcknowledgedPacket(level logging.EncryptionLevel, number logging.PacketNumber) {
logger.Info("[QGO] Ack packet %d", number)
}
func (n *qpepQuicConnectionTracer) LostPacket(level logging.EncryptionLevel, number logging.PacketNumber, reason logging.PacketLossReason) {
logger.Info("[QGO] Lost packet %d - %v", number, reason)
}
func (n *qpepQuicConnectionTracer) UpdatedCongestionState(state logging.CongestionState) {
logger.Info("[QGO] congestion changed to state %v", state)
}
func (n *qpepQuicConnectionTracer) Close() {
logger.Info("[QGO] Close")
}

func (n *qpepQuicConnectionTracer) ClosedConnection(err error) {
logger.Info("[QGO] Close Connection: %v", err)
}
28 changes: 6 additions & 22 deletions qpep-tray/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NotifyUser(message, category string, longNotification bool) {
Message: message,
Duration: duration,
Icon: MainIconData,

ActivationType: "background",
}
if err := n.Push(); err != nil {
log.Println("ERR: ", err)
Expand Down Expand Up @@ -267,7 +269,7 @@ func onExit() {
break
}

NotifyUser("Closed", "Info", false)
log.Println("Closed")
}

func startConnectionStatusWatchdog() (context.Context, context.CancelFunc) {
Expand Down Expand Up @@ -337,18 +339,16 @@ func startConnectionStatusWatchdog() (context.Context, context.CancelFunc) {
}
}()

var pubAddress = ""

CHECKLOOP:
for {
select {
case <-ctx.Done():
log.Println("Stopping connection check watchdog")
break CHECKLOOP

case <-time.After(10 * time.Second):
case <-time.After(3 * time.Second):
if !clientActive && !serverActive {
pubAddress = ""
state = stateDisconnected
continue
}

Expand Down Expand Up @@ -386,23 +386,7 @@ func startConnectionStatusWatchdog() (context.Context, context.CancelFunc) {
}

log.Printf("Server Echo: %s %d\n", resp.Address, resp.Port)
pubAddress = resp.Address
}

if len(pubAddress) > 0 {
var status = api.RequestStatus(listenHost, gatewayHost, gatewayAPIPort, pubAddress, clientToServer)
if status == nil {
log.Printf("Server Status: no / invalid response\n")
} else if status.ConnectionCounter < 0 {
log.Printf("Server Status: no connections received\n")
}
if status == nil || status.ConnectionCounter < 0 {
pubAddress = ""
state = stateConnecting
continue
}

log.Printf("Server Status: %s %d\n", status.LastCheck, status.ConnectionCounter)
NotifyUser("Connection established", "Info", false)
state = stateConnected
}
Expand All @@ -419,7 +403,7 @@ func startConnectionStatusWatchdog() (context.Context, context.CancelFunc) {
// "false" value means the proxy is not running correctly
func fakeAPICallCheckProxy() bool {
data, err, _ := shared.RunCommand("powershell.exe", "-ExecutionPolicy", "ByPass", "-Command",
"Invoke-WebRequest -Uri \"http://192.168.1.40:444/qpep-client-proxy-check\" -UseBasicParsing -TimeoutSec 1",
fmt.Sprintf("Invoke-WebRequest -Uri \"http://%s:%d/qpep-client-proxy-check\" -UseBasicParsing -TimeoutSec 1", shared.QPepConfig.ListenHost, shared.QPepConfig.ListenPort),
)
logger.Info("proxy check data: %s", data)
logger.Info("proxy check error: %v", err)
Expand Down
Loading

0 comments on commit 90d0000

Please sign in to comment.