Skip to content

Commit

Permalink
feature: #61 - Port filter on outgoing connections via 'ignored_ports…
Browse files Browse the repository at this point in the history
…' configuration value
  • Loading branch information
parvit committed Sep 24, 2024
1 parent 243723b commit 1e06ca2
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 29 deletions.
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (p *QPepService) Logger(errs chan<- error) (kservice.Logger, error) {
// runAsClient method wraps the logic to setup the system as client mode
func runAsClient(execContext context.Context, cancel context.CancelFunc) {
logger.Info("Running Client")
windivert.EnableDiverterLogging(configuration.QPepConfig.General.Verbose)
windivert.EnableDiverterLogging(flags.Globals.Verbose)
go client.RunClient(execContext, cancel)
}

Expand Down
5 changes: 3 additions & 2 deletions shared/configuration/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var DefaultConfig = QPepConfigType{
Verbose: false,
},
Limits: &LimitsDefinition{
Incoming: nil,
Outgoing: nil,
Incoming: nil,
Outgoing: nil,
IgnoredPorts: []int{},
},
Analytics: &AnalyticsDefinition{
Enabled: false,
Expand Down
3 changes: 3 additions & 0 deletions shared/configuration/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type LimitsDefinition struct {
Incoming map[string]string `yaml:"incoming"`
// Outgoing (yaml:destinations) key defines the speed limits for outgoing connections
Outgoing map[string]string `yaml:"outgoing"`
// IgnoredPorts list of network ports to be excluded from redirection
IgnoredPorts []int `yaml:"ignored_ports"`
}

// AnalyticsDefinition struct models the configuration values for the analytics client, by default it
Expand Down Expand Up @@ -143,6 +145,7 @@ func (q *LimitsDefinition) merge(r *LimitsDefinition) {
if r != nil {
q.Incoming = r.Incoming
q.Outgoing = r.Outgoing
q.IgnoredPorts = r.IgnoredPorts
}
}

Expand Down
9 changes: 6 additions & 3 deletions windivert/WinDivertEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ UINT32 allowedGatewayInterface = 0; //!< Allowed interface id to be redirected
* @param numThreads Number of worker threads to use (1-8)
* @return DIVERT_OK if everything ok, an error otherwise
*/
int InitializeWinDivertEngine(char* gatewayHost, char* listenHost, int gatewayPort, int listenPort, int numThreads)
int InitializeWinDivertEngine(char* gatewayHost, char* listenHost, int gatewayPort, int listenPort, int numThreads, int* ranges, int len_ranges)
{
if( gatewayPort < 1 || gatewayPort > 65536 || numThreads < 1 || numThreads > MAX_THREADS ) {
logNativeMessageToGo(0, "Cannot initialize windiver engine with provided data, gateway port:%d, threads:%d", gatewayPort, numThreads);
Expand All @@ -50,8 +50,11 @@ int InitializeWinDivertEngine(char* gatewayHost, char* listenHost, int gatewayPo
InitializeSRWLock(&sharedRWLock);

// The filter for windivert, captures outbound tcp packets which are not directed at the client listening port
char filterOut[256] = "";
snprintf(filterOut, 256, FILTER_OUTBOUND, listenPort);
char filterOut[FILTER_MAX] = "";
int wr = snprintf(filterOut, FILTER_MAX_SIZE, FILTER_OUTBOUND, listenPort );
for( int i=0; i<1024 && i < len_ranges; i++ ) {
wr += snprintf( filterOut+wr, FILTER_MAX_SIZE-wr, FILTER_IGNORE, ranges[i], ranges[i] );
}
logNativeMessageToGo(0, "Filtering outbound with %s", filterOut);

// Open Windivert engine
Expand Down
5 changes: 4 additions & 1 deletion windivert/include/engine.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#define FILTER_OUTBOUND "!impostor and ip.SrcAddr!=127.0.0.1 and ip.DstAddr!=127.0.0.1 and tcp and tcp.DstPort!=%d and tcp.DstPort!=53"
#define FILTER_MAX_SIZE (65 * 1024)
#define FILTER_MAX (1024)
#define FILTER_OUTBOUND "!impostor && ip.SrcAddr!=127.0.0.1 && ip.DstAddr!=127.0.0.1 && tcp && tcp.DstPort!=%d"
#define FILTER_IGNORE " && tcp.DstPort!=%d && tcp.SrcPort!=%d"

#define MAXBUF WINDIVERT_MTU_MAX
#define INET6_ADDRSTRLEN 45
Expand Down
2 changes: 1 addition & 1 deletion windivert/include/windivert_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum {
DIVERT_ERROR_NOT_OPEN = 4, //!< Connection is not open so no state available
};

extern int InitializeWinDivertEngine(char* gatewayHost, char* listenHost, int gatewayPort, int listenPort, int numThreads);
extern int InitializeWinDivertEngine(char* gatewayHost, char* listenHost, int gatewayPort, int listenPort, int numThreads, int* ranges, int len_ranges);
extern int CloseWinDivertEngine();
extern void logMessageToGo( char* message );
extern void EnableMessageOutputToGo( int enabled );
Expand Down
6 changes: 5 additions & 1 deletion windivert/windivert_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import "C"
// * _listenPort_ Packets must have source from this port
// * _numThreads_ Number of threads to use for the packet capturing routines
// * _gatewayInterfaces_ Only accept divert of packets of this interface id
func InitializeWinDivertEngine(gatewayAddr, listenAddr string, gatewayPort, listenPort, numThreads int, gatewayInterfaces int64) int {
func InitializeWinDivertEngine(gatewayAddr, listenAddr string,
gatewayPort, listenPort, numThreads int,
gatewayInterface int64,
ignoredPorts []int) int {

return DIVERT_OK
}

Expand Down
6 changes: 3 additions & 3 deletions windivert/windivert_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *WinDivertSuite) TestInitializeWinDivertEngine() {
code := InitializeWinDivertEngine(
addr, addr,
configuration.QPepConfig.General.APIPort, 445,
4, 0)
4, 0, []int{})

assert.Equal(t, DIVERT_OK, code)
}
Expand All @@ -57,7 +57,7 @@ func (s *WinDivertSuite) TestInitializeWinDivertEngine_Fail() {
code := InitializeWinDivertEngine(
addr, addr,
0, 0,
4, 0)
4, 0, []int{80})

assert.Equal(t, DIVERT_OK, code) // ok because it's not implemented on linux
}
Expand All @@ -70,7 +70,7 @@ func (s *WinDivertSuite) TestCloseWinDivertEngine() {
code := InitializeWinDivertEngine(
addr, addr,
configuration.QPepConfig.General.APIPort, 445,
4, 0)
4, 0, []int{80})

assert.Equal(t, DIVERT_OK, code)

Expand Down
19 changes: 17 additions & 2 deletions windivert/windivert_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@ import (
// * _listenPort_ Packets must have source from this port
// * _numThreads_ Number of threads to use for the packet capturing routines
// * _gatewayInterfaces_ Only accept divert of packets of this interface id
func InitializeWinDivertEngine(gatewayAddr, listenAddr string, gatewayPort, listenPort, numThreads int, gatewayInterface int64) int {
// * _portRanges_ List of ports to ignore
func InitializeWinDivertEngine(gatewayAddr, listenAddr string,
gatewayPort, listenPort, numThreads int,
gatewayInterface int64,
ignoredPorts []int) int {

ports := make([]C.int, 0, len(ignoredPorts))
ports = append(ports, C.int(53)) // DNS

for i := 0; i < len(ignoredPorts); i++ {
if ignoredPorts[i] < 0 || ignoredPorts[i] >= 65536 {
return DIVERT_ERROR_FAILED
}
ports = append(ports, C.int(ignoredPorts[i]))
}

gatewayStr := C.CString(gatewayAddr)
listenStr := C.CString(listenAddr)
response := int(C.InitializeWinDivertEngine(gatewayStr, listenStr, C.int(gatewayPort), C.int(listenPort), C.int(numThreads)))
response := int(C.InitializeWinDivertEngine(gatewayStr, listenStr, C.int(gatewayPort), C.int(listenPort), C.int(numThreads), (*C.int)(&ports[0]), C.int(len(ports))))
if response != DIVERT_OK {
return response
}
Expand Down
6 changes: 3 additions & 3 deletions windivert/windivert_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *WinDivertSuite) TestInitializeWinDivertEngine() {
code := InitializeWinDivertEngine(
"127.0.0.1", "127.0.0.2",
configuration.QPepConfig.General.APIPort, 445,
4, itFaces[0])
4, itFaces[0], []int{80})

assert.Equal(t, DIVERT_OK, code)
}
Expand All @@ -77,7 +77,7 @@ func (s *WinDivertSuite) TestInitializeWinDivertEngine_Fail() {
code := InitializeWinDivertEngine(
"127.0.0.1", "127.0.0.2",
0, 0,
4, itFaces[0])
4, itFaces[0], []int{80})

assert.NotEqual(t, DIVERT_OK, code)
}
Expand All @@ -90,7 +90,7 @@ func (s *WinDivertSuite) TestCloseWinDivertEngine() {
code := InitializeWinDivertEngine(
"127.0.0.1", "127.0.0.2",
configuration.QPepConfig.General.APIPort, 445,
4, itFaces[0])
4, itFaces[0], []int{80})

assert.Equal(t, DIVERT_OK, code)

Expand Down
3 changes: 2 additions & 1 deletion workers/client/client_impl_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
func initDiverter() bool {
generalConfig := configuration.QPepConfig.General
clientConfig := configuration.QPepConfig.Client
filteredPorts := configuration.QPepConfig.Limits.IgnoredPorts

gatewayHost := clientConfig.GatewayHost
gatewayPort := clientConfig.GatewayPort
Expand All @@ -34,7 +35,7 @@ func initDiverter() bool {
}
}

redirected = gateway.SetConnectionDiverter(true, gatewayHost, listenHost, gatewayPort, listenPort, threads, redirectedInetID)
redirected = gateway.SetConnectionDiverter(true, gatewayHost, listenHost, gatewayPort, listenPort, threads, redirectedInetID, filteredPorts)
return redirected
}

Expand Down
98 changes: 88 additions & 10 deletions workers/client/client_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (
newSessionLock sync.RWMutex
// quicSession listening quic connection to the server
quicSession backend.QuicBackendConnection

filteredPorts map[int]struct{} = nil
)

func setLinger(c net.Conn) {
Expand Down Expand Up @@ -87,12 +89,28 @@ func handleTCPConn(tcpConn net.Conn) {
var proxyRequest *http.Request
var errProxy error
if !diverted {
if filteredPorts == nil {
filteredPorts = make(map[int]struct{})
for _, p := range shared.QPepConfig.IgnoredPorts {
filteredPorts[p] = struct{}{}
}
}

// proxy open connection
proxyRequest, errProxy = handleProxyOpenConnection(tcpConn)
if errProxy == errors.ErrProxyCheckRequest {
logger.Info("Checked for proxy usage, closing.")
return
}

// check direct connection
_, port, _ := getAddressPortFromHost(proxyRequest.Host)
if _, ok := filteredPorts[port]; ok {
logger.Info("opening proxy direct connection")
handleProxyedRequest(proxyRequest, nil, tcpConn, nil)
return
}

logger.OnError(errProxy, "opening proxy connection")
}

Expand Down Expand Up @@ -351,6 +369,12 @@ func handleProxyedRequest(req *http.Request, header *protocol.QPepHeader, tcpCon

logger.Info("HOST: %s", req.Host)

// direct
if header == nil {
handleDirectConnection(tcpConn, req, fmt.Sprintf("%s:%d", address, port))
break
}

header.DestAddr = &net.TCPAddr{
IP: address,
Port: port,
Expand Down Expand Up @@ -384,16 +408,6 @@ func handleProxyedRequest(req *http.Request, header *protocol.QPepHeader, tcpCon

logger.Info("HOST: %s", req.Host)

header.DestAddr = &net.TCPAddr{
IP: address,
Port: port,
}

if header.DestAddr.IP.String() == clientConfig.GatewayHost {
header.Flags |= protocol.QPEP_LOCALSERVER_DESTINATION
}
logger.Info("Proxied connection flags : %d %d", header.Flags, header.Flags&protocol.QPEP_LOCALSERVER_DESTINATION)

t := http.Response{
Status: "200 Connection established",
StatusCode: http.StatusOK,
Expand All @@ -408,6 +422,21 @@ func handleProxyedRequest(req *http.Request, header *protocol.QPepHeader, tcpCon

t.Write(tcpConn)

if header == nil {
handleDirectConnection(tcpConn, nil, fmt.Sprintf("%s:%d", address, port))
break
}

header.DestAddr = &net.TCPAddr{
IP: address,
Port: port,
}

if header.DestAddr.IP.String() == ClientConfiguration.GatewayHost {
header.Flags |= shared.QPEP_LOCALSERVER_DESTINATION
}
logger.Info("Proxied connection flags : %d %d", header.Flags, header.Flags&shared.QPEP_LOCALSERVER_DESTINATION)

logger.Info("(Proxied) Sending QPEP header to server, SourceAddr: %v / DestAddr: %v / ID: %v", header.SourceAddr, header.DestAddr, stream.ID())

_, err := stream.Write(header.ToBytes())
Expand Down Expand Up @@ -548,6 +577,55 @@ func handleQuicToTcp(ctx context.Context, streamWait *sync.WaitGroup, dst net.Co
}
}

func handleDirectConnection(conn net.Conn, req *http.Request, dest string) {
defer conn.Close()

logger.Info("Start direct connection: %v -> %v -> %v", conn.RemoteAddr(), conn.LocalAddr(), dest)
defer logger.Info("End direct connection: %v -> %v -> %v", conn.RemoteAddr(), conn.LocalAddr(), dest)

//
dialer := &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 3 * time.Second,
DualStack: true,
}

c, err := dialer.Dial("tcp", dest)
if err != nil {
logger.Error("ERROR: %v", err)
return
}
defer c.Close()

if req != nil {
req.Write(c)
}

wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
defer func() {
_ = recover()
wg.Done()
}()
conn.SetDeadline(time.Now().Add(10 * time.Second))
c.SetDeadline(time.Now().Add(10 * time.Second))
_, _ = io.Copy(conn, c)
}()
go func() {
defer func() {
_ = recover()
wg.Done()
}()
conn.SetDeadline(time.Now().Add(10 * time.Second))
c.SetDeadline(time.Now().Add(10 * time.Second))
_, _ = io.Copy(c, conn)
}()

wg.Wait()
}

func checkProxyTestConnection(host string) bool {
return strings.Contains(host, "qpep-client-proxy-check")
}
Expand Down
2 changes: 1 addition & 1 deletion workers/client/client_proxy_listener_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (listener *ClientProxyListener) Addr() net.Addr {
return listener.base.Addr()
}

// Addr method close the listener
// Close method close the listener
func (listener *ClientProxyListener) Close() error {
if listener.base == nil {
return nil
Expand Down

0 comments on commit 1e06ca2

Please sign in to comment.