diff --git a/.gitignore b/.gitignore index 254ace2..cb06193 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,11 @@ .history .vscode +dist/ +logs/ anyproxy anyproxy-alpine anyproxy-darwin anyproxy-windows.exe tunneld tunneld-alpine -tunnel/tunneld -tunnel/tunneld-alpine -logs/ -tunnel/logs/ \ No newline at end of file +tunnel/logs/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..807c23c --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ + +linux: + bash ./scripts/build.sh linux +mac: + bash ./scripts/build.sh mac +windows: + bash ./scripts/build.sh windows +alpine: + bash ./scripts/build.sh alpine +all: + bash ./scripts/build.sh all +clean: + rm -rf dist/* \ No newline at end of file diff --git a/README.md b/README.md index 53a9fb3..34f3226 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,11 @@ anyproxy 是一个部署在Linux系统上的tcp流转发器,可以直接将本地或网络收到的请求发出,也可以将请求转到tunneld或SOCKS或charles等代理。可以代替Proxifier做Linux下的客户端, 也可以配合Proxifier当它的服务端。经过跨平台编译,如果只做网络包的转发可以在windows等平台使用。 -[下载Linux包](http://cloudme.io/anyproxy) 、 [下载Mac包](http://cloudme.io/anyproxy-darwin) 、 -[下载Windows包](http://cloudme.io/anyproxy-windows.exe) 、 [下载alpine包](http://cloudme.io/anyproxy-alpine) +[下载Linux包](http://cloudme.io/anyproxy) 、 [下载Mac包](http://cloudme.io/anyproxy-darwin) 、 [下载Windows包](http://cloudme.io/anyproxy-windows.exe) 提醒:请使用浏览器右键的“链接另存为”下载文件 -tunneld 是一个anyproxy的服务端,部署在服务器上接收anyproxy的请求,并代理发出请求或是转到下一个tunneld。用于跨内网访问资源使用 +tunneld 是一个anyproxy的服务端,部署在服务器上接收anyproxy的请求,并代理发出请求或是转到下一个tunneld。用于跨内网访问资源使用。非anyproxy请求一概拒绝处理 # 路由支持 @@ -28,13 +27,13 @@ tunneld 是一个anyproxy的服务端,部署在服务器上接收anyproxy的 # or +----------+ +----------+ +---------+ +---------+ +----------+ -| Computer | <==> | anyproxy | <==> | tunneld | <==> | tunneld | <==> | Internet | +| Computer | <==> | anyproxy | <==> | tunneld | <==> | socks5 | <==> | Internet | +----------+ +----------+ +---------+ +---------+ +----------+ # or -+----------+ +----------+ +---------+ +---------+ +----------+ -| Computer | <==> | anyproxy | <==> | tunneld | <==> | socks5 | <==> | Internet | -+----------+ +----------+ +---------+ +---------+ +----------+ ++----------+ +---------+ +-----------+ ws +-----------+ +---------+ +| Computer | <==> | Nginx A | <==> | anyproxy S| <==> | anyproxy C| <==> | Nginx B | ++----------+ +---------+ +-----------+ +-----------+ +---------+ ``` # 使用案例 @@ -42,12 +41,14 @@ tunneld 是一个anyproxy的服务端,部署在服务器上接收anyproxy的 `使用iptables将本用户下tcp流转到anyproxy,再进行docker pull操作` -![解决Docker pull问题](examples/docker_pull.png) - > 案例2: 解决相同域名访问网站不同测试环境的问题 `本地通过内网 anyproxy 代理上网,遇到测试服务器域名则跳到外网tunneld转发,网站的nginx根据来源IP进行转发到特定测试环境(有几个环境就需要有几个tunneld服务且IP要不同)` +> 案例3: 解决HTTPS抓包问题 + +`本地将https请求到服务器,服务器解证书后增加特定头部转到anyproxy websocket服务端,本地另起一个anyproxy的websocket客户端接收并将http请求转发到Charles` + # 源码编译 > 安装Go环境并设置GOPROXY @@ -64,7 +65,7 @@ go env -w GOPROXY=https://goproxy.cn,direct ``` git clone https://github.com/keminar/anyproxy.git cd anyproxy -go build anyproxy.git +make all ``` > 本机启动 @@ -77,10 +78,10 @@ sudo -u anyproxy ./anyproxy ./anyproxy -daemon # 示例3. 启动tunneld -./tunneld +./anyproxy -mode tunnel # 示例4. 启动anyproxy并将请求转给tunneld -./anyproxy -p '127.0.0.1:3001' +./anyproxy -p 'tunnel://127.0.0.1:3001' # 示例5. 启动anyproxy并将请求转给socks5 ./anyproxy -p 'socks5://127.0.0.1:10000' @@ -149,7 +150,6 @@ sudo iptables -t nat -D OUTPUT 2 > ~~划线~~ 部分为已实现功能 * ~~可将请求转发到Tunnel服务~~ -* 根据CIDR做不同出口请求 * ~~对域名支持加Host绑定~~ * ~~对域名配置请求出口~~ * ~~增加全局默认出口配置~~ @@ -161,17 +161,17 @@ sudo iptables -t nat -D OUTPUT 2 * ~~可以自定义代理server,如果不可用则用全局的~~ * ~~server多级转发~~ * ~~加域名黑名单功能,不给请求~~ -* 请求Body内容体记录, 涉及安全,可能不会实现 -* 服务间通信http请求完全加密(header+body) -* HTTPS的SNI的支持? * ~~支持转发到socket5服务~~ -* TCP 增加更多协议解析支持,如rtmp,ftp等 -* 与Tunnel的多账户认证,账户可设置有效期 * ~~支持HTTP/1.1 keep-alive 一外链接多次请求不同域名~~ -* HTTP/1.1 keep-alive后端也能复用tcp * ~~修复iptables转发后百度贴吧无法访问的问题~~ -* 转发的mysql的连接请求会一直卡住 * ~~支持windows平台使用~~ +* ~~通过websocket实现内网穿透(必须为http的非CONNECT请求)~~ +* ~~订阅增加邮箱标识,用于辨别在线用户~~ +* ~~与Tunnel功能合并,使用mode区分~~ +* ~~启用ws-listen后的平滑重启问题~~ +* ~~监听配置文件变化重新加载路由~~ +* TCP 增加更多协议解析支持,如rtmp,ftp, socks5, https(SNI)等 +* TCP 转发的mysql的连接请求会一直卡住 # 感谢 diff --git a/anyproxy.go b/anyproxy.go index b9524e9..c1f2151 100644 --- a/anyproxy.go +++ b/anyproxy.go @@ -8,20 +8,25 @@ import ( "net/http" _ "net/http/pprof" "os" - "strings" + "time" "github.com/keminar/anyproxy/config" "github.com/keminar/anyproxy/grace" "github.com/keminar/anyproxy/logging" + "github.com/keminar/anyproxy/nat" "github.com/keminar/anyproxy/proto" "github.com/keminar/anyproxy/utils/conf" "github.com/keminar/anyproxy/utils/daemon" "github.com/keminar/anyproxy/utils/help" + "github.com/keminar/anyproxy/utils/tools" ) var ( gListenAddrPort string gProxyServerSpec string + gWebsocketListen string + gWebsocketConn string + gMode string gHelp bool gDebug int gPprof string @@ -31,10 +36,12 @@ func init() { flag.Usage = help.Usage flag.StringVar(&gListenAddrPort, "l", "", "Address and port to listen on") flag.StringVar(&gProxyServerSpec, "p", "", "Proxy servers to use") + flag.StringVar(&gWebsocketListen, "ws-listen", "", "Websocket address and port to listen on") + flag.StringVar(&gWebsocketConn, "ws-connect", "", "Websocket Address and port to connect") + flag.StringVar(&gMode, "mode", "", "Run mode(proxy, tunnel). proxy mode default") flag.IntVar(&gDebug, "debug", 0, "debug mode (0, 1, 2)") flag.StringVar(&gPprof, "pprof", "", "pprof port, disable if empty") flag.BoolVar(&gHelp, "h", false, "This usage message") - } func main() { @@ -52,20 +59,14 @@ func main() { } cmdName := "anyproxy" - logDir := "./logs/" - if conf.RouterConfig.Log.Dir != "" { - logDir = conf.RouterConfig.Log.Dir - } + logDir := config.IfEmptyThen(conf.RouterConfig.Log.Dir, "./logs/", "") envRunMode := fmt.Sprintf("%s_run_mode", cmdName) fd := logging.ErrlogFd(logDir, cmdName) // 是否后台运行 daemon.Daemonize(envRunMode, fd) gListenAddrPort = config.IfEmptyThen(gListenAddrPort, conf.RouterConfig.Listen, ":3000") - // 支持只输入端口的形式 - if !strings.Contains(gListenAddrPort, ":") { - gListenAddrPort = ":" + gListenAddrPort - } + gListenAddrPort = tools.FillPort(gListenAddrPort) config.SetListenPort(gListenAddrPort) var writer io.Writer @@ -77,23 +78,43 @@ func main() { logging.SetDefaultLogger(logDir, cmdName, true, 3, writer) // 设置代理 - gProxyServerSpec = config.IfEmptyThen(gProxyServerSpec, conf.RouterConfig.Proxy, "") + gProxyServerSpec = config.IfEmptyThen(gProxyServerSpec, conf.RouterConfig.Default.Proxy, "") config.SetProxyServer(gProxyServerSpec) // 调试模式 if len(gPprof) > 0 { go func() { - // 支持只输入端口的形式 - if !strings.Contains(gPprof, ":") { - gPprof = ":" + gPprof - } + gPprof = tools.FillPort(gPprof) //浏览器访问: http://:5001/debug/pprof/ log.Println("Starting pprof debug server ...") // 这里不要使用log.Fatal会在平滑重启时导致进程退出 - // 因为http server现在没办法加入平滑重启,第一次重启会报端口冲突,可以通过重启两次来启动到pprof - log.Println(http.ListenAndServe(gPprof, nil)) + // 因为http server现在没办法一次平滑重启,会报端口冲突,可以通过多次重试来启动pprof + for i := 0; i < 10; i++ { + log.Println(http.ListenAndServe(gPprof, nil)) + time.Sleep(10 * time.Second) + } }() } - server := grace.NewServer(gListenAddrPort, proto.ClientHandler) - server.ListenAndServe() + + // websocket 服务端 + gWebsocketListen = config.IfEmptyThen(gWebsocketListen, conf.RouterConfig.Websocket.Listen, "") + if gWebsocketListen != "" { + gWebsocketListen = tools.FillPort(gWebsocketListen) + go nat.NewServer(&gWebsocketListen) + } + // websocket 客户端 + gWebsocketConn = config.IfEmptyThen(gWebsocketConn, conf.RouterConfig.Websocket.Connect, "") + if gWebsocketConn != "" { + gWebsocketConn = tools.FillPort(gWebsocketConn) + go nat.ConnectServer(&gWebsocketConn) + } + + // 运行模式 + if gMode == "tunnel" { + server := grace.NewServer(gListenAddrPort, proto.ServerHandler) + server.ListenAndServe() + } else { + server := grace.NewServer(gListenAddrPort, proto.ClientHandler) + server.ListenAndServe() + } } diff --git a/conf/router.yaml b/conf/router.yaml index 754390a..e5018d6 100644 --- a/conf/router.yaml +++ b/conf/router.yaml @@ -1,18 +1,30 @@ -log: - dir: ./logs/ -# 使用的DNS服务器 local 当前环境, remote远程, 仅当target使用remote有效 -dns: local -# 默认环境,local 当前环境, remote 远程, deny 禁止 -# auto根据dial选择,local dial失败则remote -target: auto -tcpTarget: remote -# 默认域名比对方案 -match: equal -# 全局代理服务器, 优先级低于启动传参 -proxy: # 监听端口IP, 优先级低于启动传参 listen: -# 域名 +# 日志目录 +log: + dir: ./logs/ +# 监听配置文件变化 +watcher: true +# anyproxy 和 tunnel通信密钥, 必须16位长度 +token: anyproxyproxyany +# 可访问的客户端IP,为空不限制 +#allowIP: + +# 默认操作,可热加载 +default: + # 使用的DNS服务器 local 当前环境, remote远程, 仅当target使用remote有效 + dns: local + # 默认环境,local 当前环境, remote 远程, deny 禁止 + # auto根据dial选择,local dial失败则remote + target: auto + # tcp 请求环境,local 当前环境, remote 远程, deny 禁止 + tcpTarget: remote + # 默认域名比对方案,contain 包含,equal 完全相等, preg 正则 + match: equal + # 全局代理服务器, 优先级低于启动传参 + proxy: + +# 域名,可热加载 hosts: - name: github # contain 包含,equal 完全相等, preg 正则 @@ -21,7 +33,7 @@ hosts: # 如果有用proxy自定义代理可用,target强制当remote使用,proxy代理不可用,target按原逻辑处理 target: remote # 参考全局localDns - dns: local + dns: remote # 支持 http:// , tunnel:// , socks5:// 三种协议,默认 tunnel:// proxy: http://127.0.0.1:8888 - name: golang.org @@ -36,7 +48,22 @@ hosts: target: deny - name: dev.example.com ip: 127.0.0.1 -# anyproxy 和 tunnel通信密钥, 必须16位长度 -#token: anyproxyproxyany -# 可访问的客户端IP,为空不限制 -#allowIP: + +#websocket配置 +websocket: + # 监听端口 + listen: + # ip 端口 + connect: + # connect 域名 + host: + # 用户名 + user: + # 密码 + pass: + # Email用于定位用户 + email: + # 订阅头部信息 + subscribe: + - key: + val: \ No newline at end of file diff --git a/examples/chan/send.go b/examples/chan/send.go new file mode 100644 index 0000000..bf85da4 --- /dev/null +++ b/examples/chan/send.go @@ -0,0 +1,101 @@ +package main + +import "log" + +func main() { + log.Println("test1============") + test1() + log.Println("test2============") + test2() + log.Println("test3============") + test3() + log.Println("test4============") + test4() + log.Println("test5============") + test5() + log.Println("test6============") + test6() + + //结论 ok 为判断通道是否关闭, default为判断通道是否放满或者无数据时都会调用 +} + +func test1() { + send := make(chan int) + close(send) + + select { + case t := <-send: //不用ok + log.Println(t) //被执行 + default: + log.Println("default") + } +} + +func test2() { + send := make(chan int) + close(send) + + select { + case t, ok := <-send: // 使用ok + log.Println(t, ok) //被执行,且ok为false + default: + log.Println("default") + } +} + +func test3() { + send := make(chan int) + close(send) + + select { + case t, ok := <-send: // 使用ok + log.Println(t, ok) //被执行,且ok为false + } +} + +func test4() { + send := make(chan int) + go func() { + // 无close + for i := 0; i < 10; i++ { + send <- i + } + }() + + for i := 0; i < 20; i++ { + select { + case t, ok := <-send: + log.Println(t, ok) + default: + log.Println("send is full or send is empty") //部分被执行 + } + } +} + +func test5() { + send := make(chan int) + go func() { + for i := 0; i < 10; i++ { + send <- i + } + }() + + for i := 0; i < 10; i++ { + select { + case t, ok := <-send: + log.Println(t, ok) //全部执行 + } + } +} + +func test6() { + send := make(chan int, 1) + for i := 0; i < 5; i++ { + select { + case send <- i: + log.Println(i) + default: + log.Println("send is full") //部分被执行 + } + } +} diff --git a/examples/https_capture.png b/examples/https_capture.png new file mode 100644 index 0000000..52ba753 Binary files /dev/null and b/examples/https_capture.png differ diff --git a/examples/nginx_vhost.conf b/examples/nginx_vhost.conf new file mode 100644 index 0000000..21163ec --- /dev/null +++ b/examples/nginx_vhost.conf @@ -0,0 +1,32 @@ +map $http_upgrade $connection_upgrade { + default upgrade; + '' close; +} + +server { + listen 80; + server_name ws.example.com; + + default_type application/octet-stream; + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + gzip on; + gzip_min_length 1000; + gzip_proxied any; + + proxy_next_upstream error; + + location / { + include proxy.conf; + proxy_pass http://127.0.0.1:3002; + keepalive_timeout 65; + proxy_http_version 1.1; + proxy_set_header X-Scheme $scheme; + proxy_set_header Host $http_host; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + } + access_log logs/n_$HOST.log; +} \ No newline at end of file diff --git a/examples/websocket/client.go b/examples/websocket/client.go new file mode 100644 index 0000000..6142662 --- /dev/null +++ b/examples/websocket/client.go @@ -0,0 +1,73 @@ +// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build ignore + +package main + +import ( + "flag" + "log" + "net/url" + "os" + "os/signal" + "time" + + "github.com/gorilla/websocket" +) + +var addr = flag.String("addr", "localhost:8080", "http service address") + +func main() { + flag.Parse() + log.SetFlags(0) + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"} + log.Printf("connecting to %s", u.String()) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + return + } + log.Printf("recv: %s", message) + } + }() + + for { + select { + case <-done: + return + case <-interrupt: + log.Println("interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Println("write close:", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} diff --git a/examples/websocket/conn.go b/examples/websocket/conn.go new file mode 100644 index 0000000..b7dc933 --- /dev/null +++ b/examples/websocket/conn.go @@ -0,0 +1,101 @@ +// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.BinaryMessage) + if err != nil { + return + } + w.Write(message) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// serveWs handles websocket requests from the peer. +func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() +} diff --git a/examples/websocket/hub.go b/examples/websocket/hub.go new file mode 100644 index 0000000..960b046 --- /dev/null +++ b/examples/websocket/hub.go @@ -0,0 +1,54 @@ +// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + // Registered clients. + clients map[*Client]bool + + // Inbound messages from the clients. + broadcast chan []byte + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +func newHub() *Hub { + return &Hub{ + broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + case client := <-h.unregister: + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + } + case message := <-h.broadcast: + //fmt.Println(string(message)) + for client := range h.clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.clients, client) + } + } + } + } +} diff --git a/examples/websocket/main.go b/examples/websocket/main.go new file mode 100644 index 0000000..5a68eac --- /dev/null +++ b/examples/websocket/main.go @@ -0,0 +1,35 @@ +// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "time" +) + +var addr = flag.String("addr", ":8080", "http service address") + +func main() { + flag.Parse() + hub := newHub() + go hub.run() + go func() { + // 发布消息 + for { + hub.broadcast <- []byte(fmt.Sprintf("test - %d", time.Now().Second())) + time.Sleep(time.Duration(2) * time.Second) + } + }() + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + serveWs(hub, w, r) + }) + err := http.ListenAndServe(*addr, nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +} diff --git a/go.mod b/go.mod index baef4c9..5cee9e3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/keminar/anyproxy go 1.12 require ( + github.com/fsnotify/fsnotify v1.4.9 + github.com/gorilla/websocket v1.4.2 golang.org/x/net v0.0.0-20200602114024-627f9648deb9 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index b555ff7..9fed637 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,13 @@ +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM= golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/logging/logger.go b/logging/logger.go index dcfee20..db2944b 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -31,6 +31,8 @@ func SetDefaultLogger(dir, prefix string, compress bool, reserveDay int, w io.Wr log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds) case config.LevelDebug: log.SetFlags(log.Llongfile | log.Ldate | log.Lmicroseconds) + case config.LevelDebugBody: + log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds) default: log.SetFlags(log.Lshortfile | log.LstdFlags) } @@ -38,11 +40,17 @@ func SetDefaultLogger(dir, prefix string, compress bool, reserveDay int, w io.Wr // ErrlogFd 标准输出错误输出文件 func ErrlogFd(logDir string, cmdName string) *os.File { + if _, err := os.Stat(logDir); os.IsNotExist(err) { + err = os.Mkdir(logDir, 0777) + if err != nil { + log.Fatalln("logs dir create error", err.Error()) + } + } errFile := filepath.Join(logDir, fmt.Sprintf("%s.err.log", cmdName)) fd, err := os.OpenFile(errFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) if err != nil { //报错并退出 - log.Fatalln(err.Error()) + log.Fatalln("open log file error", err.Error()) } return fd } diff --git a/nat/bridge.go b/nat/bridge.go new file mode 100644 index 0000000..da0cf21 --- /dev/null +++ b/nat/bridge.go @@ -0,0 +1,134 @@ +package nat + +import ( + "io" + "log" + "net" + + "github.com/keminar/anyproxy/config" + "github.com/keminar/anyproxy/utils/trace" +) + +// Bridge 桥接 +type Bridge struct { + bridgeHub *BridgeHub + client *Client + + reqID uint //请求id + conn *net.TCPConn + + // Buffered channel of outbound messages. + send chan []byte +} + +// Unregister 包外面调用取消注册 +func (b *Bridge) Unregister() { + b.bridgeHub.unregister <- b +} + +// 向websocket hub写数据 +func (b *Bridge) Write(p []byte) (n int, err error) { + // 先把p拷贝一份,否则会被外面的CopyBuffer再次修改,因为是引入传递 + body := make([]byte, len(p)) + copy(body, p) + msg := &Message{ID: b.reqID, Body: body} + + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(msg.Body) + log.Println("nat_debug_write_chan", msg.ID, md5Val) + } + + cmsg := &CMessage{client: b.client, message: msg} + b.client.hub.broadcast <- cmsg + return len(p), nil +} + +// Open 通知websocket 创建连接 +func (b *Bridge) Open() { + msg := &Message{ID: b.reqID, Method: METHOD_CREATE} + //b.client.send <- msg //注意:不能直接写send会与close有并发安全冲突 + cmsg := &CMessage{client: b.client, message: msg} + b.client.hub.broadcast <- cmsg +} + +// CloseWrite 通知tcp关闭连接 +func (b *Bridge) CloseWrite() { + msg := &Message{ID: b.reqID, Method: METHOD_CLOSE} + cmsg := &CMessage{client: b.client, message: msg} + b.client.hub.broadcast <- cmsg +} + +// WritePump 从websocket hub读数据写到请求http端 +func (b *Bridge) WritePump() (written int64, err error) { + defer func() { + b.conn.CloseWrite() + if config.DebugLevel >= config.LevelDebug { + log.Println("net_debug_write_proxy_close") + } + }() + for { + select { + case message, ok := <-b.send: //ok为判断channel是否关闭 + if !ok { + if config.DebugLevel >= config.LevelDebug { + log.Println("nat_debug_bridge_send_chan_closed") + } + return + } + var nw int + nw, err = b.conn.Write(message) + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(message) + log.Println("nat_debug_write_proxy", md5Val, err, "\n", string(message)) + } + if err != nil { + return + } + written += int64(nw) + } + } +} + +// CopyBuffer 传输数据 +func (b *Bridge) CopyBuffer(dst io.Writer, src io.Reader, srcname string) (written int64, err error) { + //如果设置过大会耗内存高,4k比较合理 + size := 4 * 1024 + buf := make([]byte, size) + i := 0 + for { + i++ + if config.DebugLevel >= config.LevelDebug { + log.Printf("%s bridge of %s proxy, n=%d\n", trace.ID(b.reqID), srcname, i) + } + nr, er := src.Read(buf) + if nr > 0 { + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(buf[0:nr]) + log.Println("net_debug_copy_buffer", trace.ID(b.reqID), srcname, i, nr, md5Val) + } + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + if config.DebugLevel >= config.LevelDebug { + log.Println("nat_debug_read_error", srcname, er) + } + break + } + + } + return written, err +} diff --git a/nat/bridge_hub.go b/nat/bridge_hub.go new file mode 100644 index 0000000..9e3b3d5 --- /dev/null +++ b/nat/bridge_hub.go @@ -0,0 +1,81 @@ +package nat + +import ( + "log" + "net" + + "github.com/keminar/anyproxy/config" +) + +// BridgeHub 桥接组 +type BridgeHub struct { + // Registered clients. + bridges map[*Bridge]bool + + // Inbound messages from the clients. + broadcast chan *Message + + // Register requests from the clients. + register chan *Bridge + + // Unregister requests from clients. + unregister chan *Bridge +} + +func newBridgeHub() *BridgeHub { + // 无缓冲通道,保证并发安全 + return &BridgeHub{ + broadcast: make(chan *Message), + register: make(chan *Bridge), + unregister: make(chan *Bridge), + bridges: make(map[*Bridge]bool), + } +} + +func (h *BridgeHub) run() { + for { + select { + case bridge := <-h.register: + h.bridges[bridge] = true + case bridge := <-h.unregister: + if _, ok := h.bridges[bridge]; ok { + delete(h.bridges, bridge) + close(bridge.send) + } + case message := <-h.broadcast: + if config.DebugLevel >= config.LevelDebug { + log.Println("bridge nums", len(h.bridges)) + } + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(message.Body) + log.Println("nat_debug_write_bridge_hub", message.ID, message.Method, md5Val) + } + Exit: + for bridge := range h.bridges { + if bridge.reqID != message.ID { + continue + } + if message.Method == METHOD_CLOSE { + close(bridge.send) + delete(h.bridges, bridge) + break Exit + } + select { + case bridge.send <- message.Body: + break Exit + default: // 当send chan写不进时会走进default,防止某一个send卡着影响整个系统 + log.Println("net_bridge_send_chan_full", message.ID) + close(bridge.send) + delete(h.bridges, bridge) + } + } + } + } +} + +// Register 注册 +func (h *BridgeHub) Register(c *Client, ID uint, conn *net.TCPConn) *Bridge { + b := &Bridge{bridgeHub: h, reqID: ID, conn: conn, send: make(chan []byte, 100), client: c} + h.register <- b + return b +} diff --git a/nat/client.go b/nat/client.go new file mode 100644 index 0000000..02f275b --- /dev/null +++ b/nat/client.go @@ -0,0 +1,158 @@ +package nat + +import ( + "io" + "log" + "net" + "time" + + "github.com/gorilla/websocket" + "github.com/keminar/anyproxy/config" + "github.com/keminar/anyproxy/utils/trace" +) + +var interruptClose bool + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan *Message + + // 用户 + User string + + // 订阅特征 + Subscribe []SubscribeMessage +} + +// 写数据到websocket的对端 +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: //ok为判断channel是否关闭 + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + log.Println("nat_debug_client_send_chan_close") + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.BinaryMessage) + if err != nil { + return + } + + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(message.Body) + log.Println("nat_debug_write_websocket", message.ID, message.Method, md5Val, "\n", string(message.Body)) + } + msgByte, _ := message.encode() + w.Write(msgByte) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// 服务器从websocket的客户端读取数据 +func (c *Client) serverReadPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, p, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("nat_debug_read_message_error: %v", err) + } + break + } + msg, err := decodeMessage(p) + if err != nil { + log.Printf("nat_debug_decode_message_error: %v", err) + break + } + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(msg.Body) + log.Println("nat_debug_read_from_websocket", msg.ID, msg.Method, md5Val) + } + ServerBridge.broadcast <- msg + } +} + +// 本地从websocket服务端取数据 +func (c *Client) localReadPump() { + for { + _, p, err := c.conn.ReadMessage() + if err != nil { + log.Println("nat_local_debug_read_error", err.Error()) + return + } + + msg, err := decodeMessage(p) + if err != nil { + log.Println("nat_local_debug_decode_error", err.Error()) + return + } + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(msg.Body) + log.Println("nat_local_read_from_websocket_message", msg.ID, msg.Method, md5Val) + } + + if msg.Method == METHOD_CREATE { + proxConn := dialProxy() //创建本地与本地代理端口之间的连接 + b := LocalBridge.Register(c, msg.ID, proxConn.(*net.TCPConn)) + go func() { + written, err := b.WritePump() + logCopyErr(trace.ID(msg.ID), "nat_local_debug websocket->local", err) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(msg.ID), "nat debug response size", written) + } + }() + + // 从tcp返回数据到ws + go func() { + defer b.Unregister() + readSize, err := b.CopyBuffer(b, proxConn, "local") + logCopyErr(trace.ID(msg.ID), "nat_local_debug local->websocket", err) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(msg.ID), "nat debug request body size", readSize) + } + b.CloseWrite() + }() + } else { + LocalBridge.broadcast <- msg + } + } +} + +func logCopyErr(traceID, name string, err error) { + if err == nil { + return + } + if config.DebugLevel >= config.LevelLong { + log.Println(traceID, name, err.Error()) + } else if err != io.EOF { + log.Println(traceID, name, err.Error()) + } +} diff --git a/nat/client_hub.go b/nat/client_hub.go new file mode 100644 index 0000000..31fc51f --- /dev/null +++ b/nat/client_hub.go @@ -0,0 +1,85 @@ +package nat + +import ( + "log" + + "github.com/keminar/anyproxy/config" + "github.com/keminar/anyproxy/proto/http" +) + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + // Registered clients. + clients map[*Client]bool + + // Inbound messages from the clients. + broadcast chan *CMessage + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +func newHub() *Hub { + // 无缓冲通道,保证并发安全 + return &Hub{ + broadcast: make(chan *CMessage), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + case client := <-h.unregister: + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + } + case cmessage := <-h.broadcast: + if config.DebugLevel >= config.LevelDebug { + log.Println("client nums", len(h.clients)) + } + if config.DebugLevel >= config.LevelDebugBody { + md5Val, _ := md5Byte(cmessage.message.Body) + log.Println("nat_debug_write_client_hub", cmessage.message.ID, cmessage.message.Method, md5Val) + } + // 使用broadcast 无缓冲且不会关闭解决并发问题 + // 如果在外部直接写client.send,会与close()有并发安全冲突 + Exit: + for client := range h.clients { + if client != cmessage.client { + continue + } + select { + case client.send <- cmessage.message: + break Exit + default: // 当send chan写不进时会走进default,防止某一个send卡着影响整个系统 + log.Println("net_client_send_chan_full") + close(client.send) + delete(h.clients, client) + } + } + } + } +} + +// GetClient 获取某一个订阅者 +func (h *Hub) GetClient(header http.Header) *Client { + for client := range h.clients { + for _, s := range client.Subscribe { + val := header.Get(s.Key) + if val != "" && val == s.Val { + return client + } + } + } + return nil +} diff --git a/nat/conn.go b/nat/conn.go new file mode 100644 index 0000000..001e7de --- /dev/null +++ b/nat/conn.go @@ -0,0 +1,159 @@ +package nat + +import ( + "fmt" + "log" + "net" + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/keminar/anyproxy/utils/conf" + "github.com/keminar/anyproxy/utils/tools" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// ServerHub 服务端的ws链接信息 +var ServerHub *Hub + +// ServerBridge 服务端的http与ws链接 +var ServerBridge *BridgeHub + +// serverStart 是否开启服务 +var serverStart = false + +// Eable 检查是否可以发送nat请求 +func Eable() bool { + if !serverStart { + return false + } + if len(ServerHub.clients) == 0 { + return false + } + return true +} + +// NewServer 开启服务 +func NewServer(addr *string) { + ServerHub = newHub() + go ServerHub.run() + ServerBridge = newBridgeHub() + go ServerBridge.run() + serverStart = true + + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + serveWs(ServerHub, w, r) + }) + + log.Println(fmt.Sprintf("Listening for websocket connections on %s", *addr)) + + for i := 0; i < 1000; i++ { + // 副服务,出错不退出并定时重试。方便主服务做平滑重启 + err := http.ListenAndServe(*addr, nil) + if err != nil { + log.Println(fmt.Sprintf("ListenAndServe: num=%d, err=%v ", i, err)) + } + time.Sleep(10 * time.Second) + } +} + +// serveWs handles websocket requests from the peer. +func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + // 认证 + var user AuthMessage + err = conn.ReadJSON(&user) + if err != nil { + log.Println(err) + return + } + xtime := time.Now().Unix() + if xtime-user.Xtime > 300 { + conn.WriteMessage(websocket.TextMessage, []byte("xtime err, please check local time")) + return + } + if user.User != conf.RouterConfig.Websocket.User { + conn.WriteMessage(websocket.TextMessage, []byte("user err")) + return + } + + token, err := tools.Md5Str(fmt.Sprintf("%s|%s|%d", user.User, conf.RouterConfig.Websocket.Pass, user.Xtime)) + if err != nil || user.Token != token { + conn.WriteMessage(websocket.TextMessage, []byte("token err")) + return + } + conn.WriteMessage(websocket.TextMessage, []byte("ok")) + + // 订阅 + var subscribe []SubscribeMessage + err = conn.ReadJSON(&subscribe) + if err != nil { + log.Println(err) + return + } + if len(subscribe) == 0 { + conn.WriteMessage(websocket.TextMessage, []byte("subscribe empty err")) + return + } + conn.WriteMessage(websocket.TextMessage, []byte("ok")) + + clientNum := len(hub.clients) + // 注册连接 + client := &Client{hub: hub, conn: conn, send: make(chan *Message, SEND_CHAN_LEN), User: user.User, Subscribe: subscribe} + client.hub.register <- client + clientNum++ //这里不用len计算是因为chan异步不确认谁先执行 + + remote := getIPAdress(r, []string{"X-Real-IP"}) + log.Printf("client email %s ip %s connected, subscribe %v, total client nums %d\n", user.Email, remote, subscribe, clientNum) + + go client.writePump() + go client.serverReadPump() +} + +// getIPAdress 客户端IP +func getIPAdress(req *http.Request, head []string) string { + var ipAddress string + // X-Forwarded-For容易被伪造,最好不用 + if len(head) == 0 { + head = []string{"X-Real-IP"} + } + for _, h := range head { + for _, ip := range strings.Split(req.Header.Get(h), ",") { + ip = strings.TrimSpace(ip) + realIP := net.ParseIP(ip) + if realIP != nil { + ipAddress = ip + } + } + } + if len(ipAddress) == 0 { + ipAddress, _, _ = net.SplitHostPort(req.RemoteAddr) + } + return ipAddress +} diff --git a/nat/handler.go b/nat/handler.go new file mode 100644 index 0000000..b6a86f6 --- /dev/null +++ b/nat/handler.go @@ -0,0 +1,218 @@ +package nat + +import ( + "crypto/md5" + "encoding/hex" + "errors" + "fmt" + "log" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "strings" + "time" + + "github.com/keminar/anyproxy/utils/conf" + "github.com/keminar/anyproxy/utils/tools" + + "github.com/gorilla/websocket" + "github.com/keminar/anyproxy/config" +) + +// ClientHub 客户端的ws信息 +var ClientHub *Hub + +// LocalBridge 客户端的ws与http关系 +var LocalBridge *BridgeHub + +var tempDelay time.Duration + +// ConnectServer 连接到websocket服务 +func ConnectServer(addr *string) { + interruptClose = false + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + ClientHub = newHub() + go ClientHub.run() + LocalBridge = newBridgeHub() + go LocalBridge.run() + + addrs := strings.Split(*addr, "://") + if addrs[0] == "ws" && len(addrs) == 2 { + *addr = addrs[1] + } + for { + connect(addr, interrupt) + if interruptClose { + break + } + } +} + +// 连接本地Proxy服务 +func dialProxy() net.Conn { + connTimeout := time.Duration(5) * time.Second + var err error + localProxy := fmt.Sprintf("%s:%d", "127.0.0.1", config.ListenPort) + proxyConn, err := net.DialTimeout("tcp", localProxy, connTimeout) + if err != nil { + log.Println("dial local proxy", err) + } + log.Printf("local websocket connecting to %s", localProxy) + return proxyConn +} + +// 认证连接并交换数据 +func connect(addr *string, interrupt chan os.Signal) { + u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"} + log.Printf("connecting to %s", u.String()) + + h := http.Header{} + if conf.RouterConfig.Websocket.Host != "" { + h.Add("Host", conf.RouterConfig.Websocket.Host) + } + c, _, err := websocket.DefaultDialer.Dial(u.String(), h) + if err != nil { + log.Println("dial:", err) + time.Sleep(time.Duration(3) * time.Second) + return + } + defer c.Close() + + w := newClientHandler(c) + err = w.auth(conf.RouterConfig.Websocket.User, conf.RouterConfig.Websocket.Pass, conf.RouterConfig.Websocket.Email) + if err != nil { + log.Println("auth:", err) + + if tempDelay == 0 { + tempDelay = 3 * time.Second + } else { + tempDelay *= 2 + } + if max := 1 * time.Minute; tempDelay > max { + tempDelay = max + } + time.Sleep(tempDelay) + return + } + tempDelay = 0 + err = w.subscribe(conf.RouterConfig.Websocket.Subscribe) + if err != nil { + log.Println("subscribe:", err) + time.Sleep(time.Duration(3) * time.Second) + return + } + log.Println("websocket auth and subscribe ok") + + client := &Client{hub: ClientHub, conn: c, send: make(chan *Message, SEND_CHAN_LEN)} + client.hub.register <- client + defer func() { + client.hub.unregister <- client + }() + + go client.writePump() + done := make(chan struct{}) + go func() { //客户端的client.readRump + defer close(done) + client.localReadPump() + }() + + for { + select { + case <-done: + return + case <-interrupt: + log.Println("interrupt") + interruptClose = true + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Println("write close:", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} + +// ClientHandler 认证助手 +type ClientHandler struct { + c *websocket.Conn +} + +func newClientHandler(c *websocket.Conn) *ClientHandler { + return &ClientHandler{c: c} +} + +// auth 认证 +func (h *ClientHandler) auth(user string, pass string, email string) error { + if user == "" { + return errors.New("user is empty") + } + if email == "" { + return errors.New("email is emtpy") + } + xtime := time.Now().Unix() + token, err := tools.Md5Str(fmt.Sprintf("%s|%s|%d", user, pass, xtime)) + if err != nil { + return err + } + msg := AuthMessage{User: user, Token: token, Xtime: xtime, Email: email} + return h.ask(&msg) +} + +// subscribe 订阅 +func (h *ClientHandler) subscribe(sub []conf.Subscribe) error { + msg := []SubscribeMessage{} + for _, s := range sub { + msg = append(msg, SubscribeMessage{Key: s.Key, Val: s.Val}) + } + return h.ask(&msg) +} + +func (h *ClientHandler) ask(v interface{}) error { + err := h.c.WriteJSON(v) + if err != nil { + return err + } + ticker := time.NewTicker(3 * time.Second) + defer func() { + ticker.Stop() + }() + + send := make(chan []byte) + go func() { + defer close(send) + _, message, _ := h.c.ReadMessage() + send <- message + }() + select { + case message, ok := <-send: //ok为判断channel是否关闭 + if !ok { + return errors.New("fail") + } + if string(message) != "ok" { + return errors.New("fail, " + string(message)) + } + case <-ticker.C: + return errors.New("timeout") + } + return nil +} + +// md5 +func md5Byte(data []byte) (string, error) { + h := md5.New() + h.Write(data) + cipherStr := h.Sum(nil) + return hex.EncodeToString(cipherStr), nil +} diff --git a/nat/message.go b/nat/message.go new file mode 100644 index 0000000..fe83ab6 --- /dev/null +++ b/nat/message.go @@ -0,0 +1,63 @@ +package nat + +import ( + "bytes" + "encoding/gob" +) + +// METHOD_CREATE 创建连接命令 +const METHOD_CREATE = "create" + +// METHOD_CLOSE 关闭连接命令 +const METHOD_CLOSE = "close" + +// SEND_CHAN_LEN 发送通道长度 +const SEND_CHAN_LEN = 200 + +// AuthMessage 认证 +type AuthMessage struct { + User string + Email string + Token string + Xtime int64 +} + +// SubscribeMessage 订阅 +type SubscribeMessage struct { + Key string + Val string +} + +// Message 普通消息体 +type Message struct { + ID uint + Method string + Body []byte +} + +// CMessage 普通消息体的复合类型,标记要向哪个Client发送 +type CMessage struct { + client *Client + message *Message +} + +// 转成二进制 +func (m *Message) encode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(*m) + return buf.Bytes(), err +} + +// 转成struct +func decodeMessage(data []byte) (*Message, error) { + var buf bytes.Buffer + var m Message + _, err := buf.Write(data) + if err != nil { + return &m, err + } + dec := gob.NewDecoder(&buf) + err = dec.Decode(&m) + return &m, err +} diff --git a/proto/http.go b/proto/http.go index 13f0b5f..7fa606a 100644 --- a/proto/http.go +++ b/proto/http.go @@ -12,6 +12,7 @@ import ( "github.com/keminar/anyproxy/config" "github.com/keminar/anyproxy/crypto" + "github.com/keminar/anyproxy/nat" "github.com/keminar/anyproxy/proto/http" "github.com/keminar/anyproxy/proto/text" "github.com/keminar/anyproxy/utils/trace" @@ -258,6 +259,29 @@ func (that *httpStream) badRequest(err error) { } func (that *httpStream) response() error { + specialHeader := "Anyproxy-Action" + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(that.req.ID), "nat server status:", nat.Eable(), ",special header:", that.Header.Get(specialHeader)) + } + if that.Method != "CONNECT" && nat.Eable() { //CONNECT 请求不支持ws转发 + if that.Header.Get(specialHeader) == "websocket" { + that.Header.Del(specialHeader) + tunnel := newWsTunnel(that.req, that.Header) + if tunnel.getTarget(that.req.DstName) { + // 先将请求头部发出 + tunnel.buffer.Write([]byte(fmt.Sprintf("%s\r\n", that.FirstLine))) + that.Header.Write(tunnel.buffer) + tunnel.buffer.Write([]byte("\r\n")) + // 多读取的body部分 + tunnel.buffer.Write(that.BodyBuf) + ok := tunnel.transfer() + if ok == true { + return nil + } + // 请求不成,则走普通转发 + } + } + } tunnel := newTunnel(that.req) if ip, ok := tunnel.isAllowed(); !ok { err := errors.New(ip + " is not allowed") diff --git a/proto/tunnel.go b/proto/tunnel.go index 9325b15..4ad45b4 100644 --- a/proto/tunnel.go +++ b/proto/tunnel.go @@ -67,7 +67,7 @@ func (s *tunnel) copyBuffer(dst io.Writer, src *tcp.Reader, srcname string) (wri nr, er := src.Read(buf) if nr > 0 { // 如果为HTTP/1.1的Keep-alive情况下 - if srcname == "client" && s.clientUnRead >= 0 { + if srcname == "request" && s.clientUnRead >= 0 { // 之前已读完,说明要建新链接 if s.clientUnRead == 0 { // 关闭与旧的服务器的连接的写 @@ -102,7 +102,6 @@ func (s *tunnel) copyBuffer(dst io.Writer, src *tcp.Reader, srcname string) (wri } } if er != nil { - if er != io.EOF { err = er } else { @@ -115,12 +114,16 @@ func (s *tunnel) copyBuffer(dst io.Writer, src *tcp.Reader, srcname string) (wri break } else if s.curState != stateClosed { // 如果非客户端导致的服务端关闭,则关闭客户端读 - dst.(*net.TCPConn).CloseRead() + // Notice: 如果只是CloseRead(),当在windows上执行时,且是做为订阅端从服务器收到请求再转到charles + // 等服务时,当请求的地址返回足够长的内容时会触发卡住问题。 + // 流程如 curl -> anyproxy(server) -> ws -> anyproxy(windows) -> charles + // 用Close()可以解决卡住,不过客户端会收到use of closed network connection的错误提醒 + dst.(*net.TCPConn).Close() } } } - if srcname == "client" { + if srcname == "request" { // 当客户端断开或出错了,服务端也不用再读了,可以关闭,解决读Server卡住不能到EOF的问题 s.conn.CloseWrite() s.curState = stateClosed @@ -138,29 +141,32 @@ func (s *tunnel) transfer(clientUnRead int) { } s.curState = stateActive s.clientUnRead = clientUnRead - done := make(chan int, 1) + done := make(chan struct{}) //发送请求 go func() { defer func() { - done <- 1 close(done) }() //不能和外层共用err var err error - s.readSize, err = s.copyBuffer(s.conn, s.req.reader, "client") - s.logCopyErr("client->server", err) - log.Println(trace.ID(s.req.ID), "request body size", s.readSize) + s.readSize, err = s.copyBuffer(s.conn, s.req.reader, "request") + s.logCopyErr("request->server", err) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(s.req.ID), "request body size", s.readSize) + } }() var err error //取返回结果 s.writeSize, err = s.copyBuffer(s.req.conn, tcp.NewReader(s.conn), "server") - s.logCopyErr("server->client", err) + s.logCopyErr("server->request", err) <-done // 不管是不是正常结束,只要server结束了,函数就会返回,然后底层会自动断开与client的连接 - log.Println(trace.ID(s.req.ID), "transfer finished, response size", s.writeSize) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(s.req.ID), "transfer finished, response size", s.writeSize) + } } func (s *tunnel) logCopyErr(name string, err error) { @@ -212,9 +218,9 @@ func (s *tunnel) lookup(dstName, dstIP string) (string, cache.DialState) { } // 查询配置 -func (s *tunnel) findHost(dstName, dstIP string) conf.Host { +func findHost(dstName, dstIP string) conf.Host { for _, h := range conf.RouterConfig.Hosts { - confMatch := getString(h.Match, conf.RouterConfig.Match, "equal") + confMatch := getString(h.Match, conf.RouterConfig.Default.Match, "equal") switch confMatch { case "equal": if h.Name == dstName || h.Name == dstIP { @@ -249,14 +255,14 @@ func (s *tunnel) handshake(proto string, dstName, dstIP string, dstPort uint16) // http请求,dns解析 dstIP, state = s.lookup(dstName, dstIP) } - host := s.findHost(dstName, dstIP) + host := findHost(dstName, dstIP) var confTarget string if proto == protoTCP { - confTarget = getString(host.Target, conf.RouterConfig.TCPTarget, "auto") + confTarget = getString(host.Target, conf.RouterConfig.Default.TCPTarget, "auto") } else { - confTarget = getString(host.Target, conf.RouterConfig.Target, "auto") + confTarget = getString(host.Target, conf.RouterConfig.Default.Target, "auto") } - confDNS := getString(host.DNS, conf.RouterConfig.DNS, "local") + confDNS := getString(host.DNS, conf.RouterConfig.Default.DNS, "local") // tcp 请求,如果是解析的IP被禁(代理端也无法telnet),不知道域名又无法使用远程dns解析,只能手动换ip // 如golang.org 解析为180.97.235.30 不通,配置改为 216.239.37.1就行 @@ -275,7 +281,7 @@ func (s *tunnel) handshake(proto string, dstName, dstIP string, dstPort uint16) proxyScheme2, proxyServer2, proxyPort2, err := getProxyServer(host.Proxy) if err != nil { // 如果自定义代理不可用,confTarget走原来逻辑 - log.Println(trace.ID(s.req.ID), "host.proxy err", host.Proxy, err) + log.Println(trace.ID(s.req.ID), "host.proxy err", err) } else { proxyScheme = proxyScheme2 proxyServer = proxyServer2 @@ -331,10 +337,12 @@ func (s *tunnel) handshake(proto string, dstName, dstIP string, dstPort uint16) return } } else { - if dstIP == "" { - err = errors.New("dstIP is empty") - } else { + if dstIP != "" { err = s.dail(dstIP, dstPort) + } else if dstName != "" { + err = s.dail(dstName, dstPort) + } else { + err = errors.New("dstName && dstIP is empty") } } if err != nil { diff --git a/proto/websocket.go b/proto/websocket.go new file mode 100644 index 0000000..665d390 --- /dev/null +++ b/proto/websocket.go @@ -0,0 +1,105 @@ +package proto + +import ( + "bytes" + "io" + "log" + + "github.com/keminar/anyproxy/config" + "github.com/keminar/anyproxy/nat" + "github.com/keminar/anyproxy/proto/http" + "github.com/keminar/anyproxy/utils/conf" + "github.com/keminar/anyproxy/utils/trace" +) + +// 转发实体 +type wsTunnel struct { + req *Request + header http.Header + + readSize int64 + writeSize int64 + + buffer *bytes.Buffer +} + +// newTunnel 实例 +func newWsTunnel(req *Request, header http.Header) *wsTunnel { + s := &wsTunnel{ + req: req, + header: header, + buffer: new(bytes.Buffer), + } + return s +} + +// 检查ws转发是否允许 +func (s *wsTunnel) getTarget(dstName string) (ok bool) { + if dstName == "" { + return false + } + host := findHost(dstName, dstName) + var confTarget string + confTarget = getString(host.Target, conf.RouterConfig.Default.Target, "auto") + + if confTarget == "deny" { + return false + } + return true +} + +// transfer 交换数据 +func (s *wsTunnel) transfer() bool { + if config.DebugLevel >= config.LevelLong { + log.Println(trace.ID(s.req.ID), "websocket transfer start") + } + + c := nat.ServerHub.GetClient(s.header) + if c == nil { + // 走旧转发 + log.Println(trace.ID(s.req.ID), "websocket transfer fail") + return false + } + b := nat.ServerBridge.Register(c, s.req.ID, s.req.conn) + defer func() { + b.Unregister() + }() + + // 发送创建连接请求 + b.Open() + var err error + done := make(chan struct{}) + + //发送请求给websocket + go func() { + defer close(done) + b.Write([]byte(s.buffer.String())) + s.readSize, err = b.CopyBuffer(b, s.req.reader, "request") + s.logCopyErr("request->websocket", err) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(s.req.ID), "request body size", s.readSize) + } + b.CloseWrite() + }() + //取返回结果写入请求端 + s.writeSize, err = b.WritePump() + s.logCopyErr("websocket->request", err) + + <-done + // 不管是不是正常结束,只要server结束了,函数就会返回,然后底层会自动断开与client的连接 + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(s.req.ID), "websocket transfer finished, response size", s.writeSize) + } + return true +} + +func (s *wsTunnel) logCopyErr(name string, err error) { + if err == nil { + return + } + if config.DebugLevel >= config.LevelLong { + log.Println(trace.ID(s.req.ID), name, err.Error()) + } else if err != io.EOF { + log.Println(trace.ID(s.req.ID), name, err.Error()) + } +} diff --git a/scripts/build.sh b/scripts/build.sh index 1c1580d..8370eb8 100644 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -3,30 +3,46 @@ SCRIPT=$(readlink -f $0) ROOT_DIR=$(dirname $SCRIPT)/../ cd $ROOT_DIR +mkdir -p dist/ -# anyproxy -echo "build anyproxy" -# for linux -echo " for linux" -go build -o anyproxy anyproxy.go +# 路径 +GOCMD="go" +GITCMD="git" -# for mac -echo " for mac" -CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o anyproxy-darwin anyproxy.go +# 目标文件前缀 +BIN="anyproxy" -# for windows -echo " for windows" -CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o anyproxy-windows.exe anyproxy.go +# 版本号 +ARCH="amd64" -# for alpine -echo " for alpine" -go build -tags netgo -o anyproxy-alpine anyproxy.go +#组装变量 +GOBUILD="${GOCMD} build" +VER=`${GITCMD} describe --tags $(${GITCMD} rev-list --tags --max-count=1)` +GOVER=`${GOCMD} version` +COMMIT_SHA1=`${GITCMD} rev-parse HEAD` +HELP_PRE="github.com/keminar/anyproxy/utils/help" +LDFLAGS="-X '${HELP_PRE}.goVersion=${GOVER}'" +LDFLAGS="${LDFLAGS} -X '${HELP_PRE}.gitHash=${COMMIT_SHA1}'" +LDFLAGS="${LDFLAGS} -X '${HELP_PRE}.version=${VER}'" -# tunneld -echo "build tunneld" -echo " for linux" -go build -o tunnel/tunneld tunnel/tunneld.go +# 编译 +echo "build ..." +if [ "$1" == "all" ] || [ "$1" == "linux" ] ;then + echo " for linux" + CGO_ENABLED=0 GOOS=linux GOARCH=${ARCH} ${GOBUILD} -ldflags "$LDFLAGS" -o dist/${BIN}-${ARCH}-${VER} anyproxy.go +fi -# for alpine -echo " for alpine" -go build -tags netgo -o tunnel/tunneld-alpine tunnel/tunneld.go \ No newline at end of file +if [ "$1" == "all" ] || [ "$1" == "mac" ] ;then + echo " for mac" + CGO_ENABLED=0 GOOS=darwin GOARCH=${ARCH} ${GOBUILD} -ldflags "$LDFLAGS" -o dist/${BIN}-darwin-${ARCH}-${VER} anyproxy.go +fi + +if [ "$1" == "all" ] || [ "$1" == "windows" ] ;then + echo " for windows" + CGO_ENABLED=0 GOOS=windows GOARCH=${ARCH} ${GOBUILD} -ldflags "$LDFLAGS" -o dist/${BIN}-windows-${ARCH}-${VER}.exe anyproxy.go +fi + +if [ "$1" == "all" ] || [ "$1" == "alpine" ] ;then + echo " for alpine" + CGO_ENABLED=0 GOOS=linux GOARCH=${ARCH} ${GOBUILD} -tags netgo -ldflags "$LDFLAGS" -o dist/${BIN}-alpine-${ARCH}-${VER} anyproxy.go +fi \ No newline at end of file diff --git a/tunnel/tunneld.go b/tunnel/tunneld.go deleted file mode 100644 index 03d1deb..0000000 --- a/tunnel/tunneld.go +++ /dev/null @@ -1,79 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "io" - "os" - "strings" - - "github.com/keminar/anyproxy/config" - "github.com/keminar/anyproxy/grace" - "github.com/keminar/anyproxy/logging" - "github.com/keminar/anyproxy/proto" - "github.com/keminar/anyproxy/utils/conf" - "github.com/keminar/anyproxy/utils/daemon" - "github.com/keminar/anyproxy/utils/help" -) - -var ( - gListenAddrPort string - gProxyServerSpec string - gHelp bool - gDebug int -) - -func init() { - flag.Usage = help.Usage - flag.StringVar(&gListenAddrPort, "l", "", "Address and port to listen on") - flag.StringVar(&gProxyServerSpec, "p", "", "Proxy servers to use") - flag.IntVar(&gDebug, "d", 0, "debug mode") - flag.BoolVar(&gHelp, "h", false, "This usage message") - -} - -func main() { - flag.Parse() - if gHelp { - flag.Usage() - return - } - - config.SetDebugLevel(gDebug) - conf.LoadAllConfig() - // 检查配置是否存在 - if conf.RouterConfig == nil { - os.Exit(2) - } - - cmdName := "tunneld" - logDir := "./logs/" - if conf.RouterConfig.Log.Dir != "" { - logDir = conf.RouterConfig.Log.Dir - } - envRunMode := fmt.Sprintf("%s_run_mode", cmdName) - fd := logging.ErrlogFd(logDir, cmdName) - // 是否后台运行 - daemon.Daemonize(envRunMode, fd) - - gListenAddrPort = config.IfEmptyThen(gListenAddrPort, conf.RouterConfig.Listen, ":3001") - // 支持只输入端口的形式 - if !strings.Contains(gListenAddrPort, ":") { - gListenAddrPort = ":" + gListenAddrPort - } - - var writer io.Writer - // 前台执行,daemon运行根据环境变量识别 - if daemon.IsForeground(envRunMode) { - // 同时输出到日志和标准输出 - writer = io.Writer(os.Stdout) - } - - logging.SetDefaultLogger(logDir, cmdName, true, 3, writer) - // 设置代理 - gProxyServerSpec = config.IfEmptyThen(gProxyServerSpec, conf.RouterConfig.Proxy, "") - config.SetProxyServer(gProxyServerSpec) - - server := grace.NewServer(gListenAddrPort, proto.ServerHandler) - server.ListenAndServe() -} diff --git a/utils/cache/cache.go b/utils/cache/cache.go index e0efb52..0c75fc1 100644 --- a/utils/cache/cache.go +++ b/utils/cache/cache.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/keminar/anyproxy/config" "github.com/keminar/anyproxy/utils/trace" ) @@ -54,13 +55,19 @@ func (c *resolveLookupCache) Lookup(logID uint, host string) (string, DialState) hit := c.ips[host] if hit != nil { if hit.expires.After(time.Now()) { - log.Println(trace.ID(logID), "lookup(): CACHE_HIT", hit.state) + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(logID), "lookup(): CACHE_HIT", hit.state) + } return hit.ipv4, hit.state } - log.Println(trace.ID(logID), "lookup(): CACHE_EXPIRED") + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(logID), "lookup(): CACHE_EXPIRED") + } delete(c.ips, host) } else { - log.Println(trace.ID(logID), "lookup(): CACHE_MISS") + if config.DebugLevel >= config.LevelDebug { + log.Println(trace.ID(logID), "lookup(): CACHE_MISS") + } } return "", StateNone } diff --git a/utils/conf/config.go b/utils/conf/config.go index cb21a70..3d4526e 100644 --- a/utils/conf/config.go +++ b/utils/conf/config.go @@ -3,6 +3,8 @@ package conf import ( "fmt" "log" + + "github.com/fsnotify/fsnotify" ) // RouterConfig 配置 @@ -10,10 +12,61 @@ var RouterConfig *Router // LoadAllConfig 加载顺序要求,不写成init func LoadAllConfig() { - conf, err := LoadRouterConfig("router") + filePath, err := GetPath("router.yaml") + if err != nil { + log.Println(fmt.Sprintf("config file %s path err:%s", "router", err.Error())) + return + } + conf, err := LoadRouterConfig(filePath) if err != nil { - log.Println(fmt.Sprintf("yaml.load: %s loadYaml err:%s", "router", err.Error())) + log.Println(fmt.Sprintf("config file %s load err:%s", "router", err.Error())) return } RouterConfig = &conf + if conf.Watcher { + go notify(filePath) + } +} + +func notify(filePath string) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Println("config new notify watcher err", err) + return + } + defer watcher.Close() + + done := make(chan bool) + go func() { + defer close(done) + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if event.Op&fsnotify.Write == fsnotify.Write { + conf, err := LoadRouterConfig(filePath) + if err != nil { + log.Println(fmt.Sprintf("config file %s load err:%s", "router", err.Error())) + } else { + RouterConfig = &conf + log.Println("config file reloaded:", filePath) + } + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("config notify watcher error:", err) + } + } + }() + + err = watcher.Add(filePath) + if err != nil { + log.Println("config notify add file err", err) + return + } + <-done } diff --git a/utils/conf/router.go b/utils/conf/router.go index 31c12df..57f1a4e 100644 --- a/utils/conf/router.go +++ b/utils/conf/router.go @@ -11,7 +11,7 @@ import ( // Host 域名 type Host struct { - Name string `yaml:"name"` + Name string `yaml:"name"` //域名关键字 Match string `yaml:"match"` //contain 包含, equal 完全相等, preg 正则 Target string `yaml:"target"` //local 当前环境, remote 远程, deny 禁止, auto根据dial选择 DNS string `yaml:"dns"` //local 当前环境, remote 远程, 仅当target使用remote有效 @@ -24,37 +24,56 @@ type Log struct { Dir string `yaml:"dir"` } +// Subscribe 订阅标志 +type Subscribe struct { + Key string `yaml:"key"` //Header的key + Val string `yaml:"val"` //Header的val +} + +// Websocket 与服务端websocket通信 +type Websocket struct { + Listen string `yaml:"listen"` //websocket 监听 + Connect string `yaml:"connect"` //websocket 连接 + Host string `yaml:"host"` //connect的域名 + User string `yaml:"user"` //认证用户 + Pass string `yaml:"pass"` //密码 + Email string `yaml:"email"` //邮箱 + Subscribe []Subscribe `yaml:"subscribe"` //订阅信息 +} + +// Default 域名 +type Default struct { + Match string `yaml:"match"` //默认域名比对 + Target string `yaml:"target"` //http默认访问策略 + DNS string `yaml:"dns"` //默认的DNS服务器 + Proxy string `yaml:"proxy"` //全局代理服务器 + TCPTarget string `yaml:"tcpTarget"` //tcp默认访问策略 +} + // Router 配置文件模型 type Router struct { - Listen string `yaml:"listen"` //监听端口 - Log Log `yaml:"log"` //日志目录 - Token string `yaml:"token"` //加密值 - DNS string `yaml:"dns"` //默认的DNS服务器 - Target string `yaml:"target"` //http默认访问策略 - TCPTarget string `yaml:"tcpTarget"` //tcp默认访问策略 - Match string `yaml:"match"` //默认域名比对 - Proxy string `yaml:"proxy"` //全局代理服务器 - Hosts []Host `yaml:"hosts"` //域名列表 - AllowIP []string `yaml:"allowIP"` //可以访问的客户端IP + Listen string `yaml:"listen"` //监听端口 + Log Log `yaml:"log"` //日志目录 + Watcher bool `yaml:"watcher"` //是否监听配置文件变化 + Token string `yaml:"token"` //加密值, 和tunnel通信密钥, 必须16位长度 + Default Default `yaml:"default"` //默认配置 + Hosts []Host `yaml:"hosts"` //域名列表 + AllowIP []string `yaml:"allowIP"` //可以访问的客户端IP + Websocket Websocket `yaml:"websocket"` //会话订阅请求信息 } // LoadRouterConfig 加载配置 -func LoadRouterConfig(configName string) (cnf Router, err error) { - configPath, err := getPath(configName + ".yaml") - if err != nil { - return cnf, err - } +func LoadRouterConfig(configPath string) (cnf Router, err error) { data, err := ioutil.ReadFile(configPath) if err != nil { - return cnf, err + return } - t := Router{} - err = yaml.Unmarshal(data, &t) - return t, err + err = yaml.Unmarshal(data, &cnf) + return } // 获取文件路径 -func getPath(filename string) (string, error) { +func GetPath(filename string) (string, error) { // 当前登录用户所在目录 workPath, err := os.Getwd() if err != nil { @@ -66,7 +85,7 @@ func getPath(filename string) (string, error) { if !fileExists(configPath) { configPath = filepath.Join(AppSrcPath, "conf", filename) if !fileExists(configPath) { - return "", errors.New(filename + " not found") + return "", errors.New("conf/" + filename + " not found") } } } diff --git a/utils/help/help.go b/utils/help/help.go index 4838e32..2bde782 100644 --- a/utils/help/help.go +++ b/utils/help/help.go @@ -8,20 +8,27 @@ import ( "time" ) -//VERSION 版本 -const VERSION = "0.9" +var ( + version string + gitHash string + goVersion string +) // Usage 帮助 func Usage() { - fmt.Fprintf(os.Stdout, "%s\n\n", versionString()) - fmt.Fprintf(os.Stdout, "usage: %s -l listenaddress -p proxies \n", os.Args[0]) + fmt.Fprintf(os.Stdout, "%s\n\n", versionString("anyproxy")) + fmt.Fprintf(os.Stdout, "Usage: %s -l listenaddress -p proxies \n", os.Args[0]) fmt.Fprintf(os.Stdout, " Proxies any tcp port transparently using Linux netfilter\n\n") fmt.Fprintf(os.Stdout, "Mandatory\n") - fmt.Fprintf(os.Stdout, " -l=ADDRPORT Address and port to listen on (e.g., :3128 or 127.0.0.1:3128)\n") + fmt.Fprintf(os.Stdout, " -l=ADDRPORT Address and port to listen on (e.g., :3000 or 127.0.0.1:3000)\n") fmt.Fprintf(os.Stdout, "Optional\n") fmt.Fprintf(os.Stdout, " -p=PROXIES Address and ports of upstream proxy servers to use\n") - fmt.Fprintf(os.Stdout, " (e.g., 10.1.1.1:80 will use http proxy, socks5://10.2.2.2:3128 use socks5 proxy\n") + fmt.Fprintf(os.Stdout, " (e.g., 10.1.1.1:80 will use http proxy, socks5://10.2.2.2:3128 use socks5 proxy,\n") + fmt.Fprintf(os.Stdout, " tunnel://10.2.2.2:3001 use tunnel proxy)\n") + fmt.Fprintf(os.Stdout, " -ws-listen Websocket address and port to listen on\n") + fmt.Fprintf(os.Stdout, " -ws-connect Websocket Address and port to connect\n") fmt.Fprintf(os.Stdout, " -daemon Run as a Unix daemon\n") + fmt.Fprintf(os.Stdout, " -mode Run mode(proxy, tunnel). proxy mode default\n") fmt.Fprintf(os.Stdout, " -debug Debug mode (0, 1, 2, 3)\n") fmt.Fprintf(os.Stdout, " -pprof Pprof port, disable if empty\n") fmt.Fprintf(os.Stdout, " -h This usage message\n\n") @@ -51,10 +58,12 @@ func Usage() { fmt.Fprintf(os.Stdout, "Thanks to https://github.com/ryanchapman/go-any-proxy.git\n") } -func versionString() (v string) { +func versionString(name string) (v string) { now := time.Now().Unix() buildNum := strings.ToUpper(strconv.FormatInt(now, 36)) buildDate := time.Unix(now, 0).Format(time.UnixDate) - v = fmt.Sprintf("anyproxy %s (build %v, %v)", VERSION, buildNum, buildDate) + v = fmt.Sprintf("%s %s (build %v, %v)", name, version, buildNum, buildDate) + v += fmt.Sprintf("\nGit Commit Hash: %s", gitHash) + v += fmt.Sprintf("\nGoLang Version: %s", goVersion) return } diff --git a/utils/tools/string.go b/utils/tools/string.go index 1f95606..2c2962d 100644 --- a/utils/tools/string.go +++ b/utils/tools/string.go @@ -1,5 +1,12 @@ package tools +import ( + "crypto/md5" + "encoding/hex" + "strconv" + "strings" +) + // GetPort 从 127.0.0.1:3000 结构中取出3000 func GetPort(addr string) string { for i := len(addr) - 1; i >= 0; i-- { @@ -9,3 +16,21 @@ func GetPort(addr string) string { } return "" } + +func Md5Str(str string) (string, error) { + h := md5.New() + h.Write([]byte(str)) + cipherStr := h.Sum(nil) + return hex.EncodeToString(cipherStr), nil +} + +// 支持只输入端口的形式 +func FillPort(port string) string { + if !strings.Contains(port, ":") { + d, err := strconv.Atoi(port) + if err == nil && strconv.Itoa(d) == port { //说明输入为纯数字 + port = ":" + port + } + } + return port +}