Skip to content
This repository has been archived by the owner on Aug 27, 2020. It is now read-only.

Commit

Permalink
v0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
lzjluzijie committed Jun 5, 2018
2 parents ac32e43 + 74ebdaf commit fb0661e
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 30 deletions.
66 changes: 39 additions & 27 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Client struct {
ListenAddr *net.TCPAddr
URL *url.URL

Mux bool
MuxWS *MuxWebSocket

Dialer *websocket.Dialer

CreatedAt time.Time
Expand All @@ -34,6 +37,16 @@ func (client *Client) Listen() (err error) {

defer listener.Close()

if client.Mux {
err := client.OpenMux()
if err != nil {
logger.Debugf(err.Error())
return err
}

go client.MuxWS.ClientListen()
}

for {
conn, err := listener.AcceptTCP()
if err != nil {
Expand All @@ -47,39 +60,53 @@ func (client *Client) Listen() (err error) {
return nil
}

func (client *Client) handleConn(conn *net.TCPConn) (err error) {
defer func() {
if err != nil {
logger.Debugf("Handle connection error: %s", err.Error())
}
}()

func (client *Client) handleConn(conn *net.TCPConn) {
defer conn.Close()

conn.SetLinger(0)

err = handShake(conn)
err := handShake(conn)
if err != nil {
logger.Errorf(err.Error())
return
}

_, host, err := getRequest(conn)
if err != nil {
logger.Errorf(err.Error())
return
}

logger.Debugf("Host: %s", host)

_, err = conn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x08, 0x43})
if err != nil {
logger.Errorf(err.Error())
return
}

ws, err := client.dial(host)
if client.Mux {
client.DialMuxConn(host, conn)
} else {
client.DialWSConn(host, conn)
}

return
}

func (client *Client) DialWSConn(host string, conn *net.TCPConn) {
wsConn, _, err := client.Dialer.Dial(client.URL.String(), map[string][]string{
"WebSocks-Host": {host},
})

if err != nil {
return
}

logger.Debugf("dialed ws for %s", host)

ws := &WebSocket{
conn: wsConn,
}

go func() {
_, err = io.Copy(ws, conn)
if err != nil {
Expand All @@ -91,23 +118,8 @@ func (client *Client) handleConn(conn *net.TCPConn) (err error) {

_, err = io.Copy(conn, ws)
if err != nil {
logger.Debugf(err.Error())
return
}

return
}

func (client *Client) dial(host string) (ws *WebSocket, err error) {
conn, _, err := client.Dialer.Dial(client.URL.String(), map[string][]string{
"WebSocks-Host": {host},
})

if err != nil {
return
}

ws = &WebSocket{
conn: conn,
}
return
}
112 changes: 112 additions & 0 deletions core/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package core

import (
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
)

const (
MessageMethodData = iota
MessageMethodDial
)

type Message struct {
Method byte
ConnID uint64
MessageID uint64
Data []byte
}

type MuxConn struct {
ID uint64
muxWS *MuxWebSocket

mutex sync.Mutex
buf []byte
wait chan int

receiveMessageID uint64
sendMessageID *uint64
}

//client use
func NewMuxConn(muxWS *MuxWebSocket) (conn *MuxConn) {
return &MuxConn{
ID: rand.Uint64(),
muxWS: muxWS,
wait: make(chan int),
sendMessageID: new(uint64),
}
}

func (conn *MuxConn) Write(p []byte) (n int, err error) {
m := &Message{
Method: MessageMethodData,
ConnID: conn.ID,
MessageID: conn.SendMessageID(),
Data: p,
}

err = conn.muxWS.SendMessage(m)
if err != nil {
return 0, err
}
return len(p), nil
}

func (conn *MuxConn) Read(p []byte) (n int, err error) {
if len(conn.buf) == 0 {
logger.Debugf("%d buf is 0, waiting", conn.ID)
<-conn.wait
}

conn.mutex.Lock()
logger.Debugf("%d buf: %v", conn.buf)
n = copy(p, conn.buf)
conn.buf = conn.buf[n:]
conn.mutex.Unlock()
return
}

func (conn *MuxConn) HandleMessage(m *Message) (err error) {
logger.Debugf("handle message %d %d", m.ConnID, m.MessageID)
for {
if conn.receiveMessageID == m.MessageID {
conn.mutex.Lock()
conn.buf = append(conn.buf, m.Data...)
conn.receiveMessageID++
close(conn.wait)
conn.wait = make(chan int)
conn.mutex.Unlock()
logger.Debugf("handled message %d %d", m.ConnID, m.MessageID)
return
}
<-conn.wait
}
return
}

func (conn *MuxConn) SendMessageID() (id uint64) {
id = atomic.LoadUint64(conn.sendMessageID)
atomic.AddUint64(conn.sendMessageID, 1)
return
}

func (conn *MuxConn) Run(c *net.TCPConn) {
go func() {
_, err := io.Copy(c, conn)
if err != nil {
logger.Debugf(err.Error())
}
}()

_, err := io.Copy(conn, c)
if err != nil {
logger.Debugf(err.Error())
}

return
}
81 changes: 81 additions & 0 deletions core/muxclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package core

import (
"net"
)

func (muxWS *MuxWebSocket) ClientListen() {
for {
m, err := muxWS.ReceiveMessage()
if err != nil {
logger.Debugf(err.Error())
return
}

//get conn and send message
conn := muxWS.GetMuxConn(m.ConnID)
err = conn.HandleMessage(m)
if err != nil {
logger.Debugf(err.Error())
continue
}
}
}

func (client *Client) OpenMux() (err error) {
wsConn, _, err := client.Dialer.Dial(client.URL.String(), map[string][]string{
"WebSocks-Mux": {"mux"},
})

if err != nil {
return
}

ws := &WebSocket{
conn: wsConn,
}

muxWS := NewMuxWebSocket(ws)
client.MuxWS = muxWS
return
}
func (client *Client) DialMuxConn(host string, conn *net.TCPConn) {
muxConn := NewMuxConn(client.MuxWS)

err := muxConn.DialMessage(host)
if err != nil {
logger.Errorf(err.Error())
err = client.OpenMux()
if err != nil {
logger.Errorf(err.Error())
}
return
}

muxConn.muxWS.PutMuxConn(muxConn)

logger.Debugf("dialed mux for %s", host)

muxConn.Run(conn)
return
}

//client dial remote
func (conn *MuxConn) DialMessage(host string) (err error) {
m := &Message{
Method: MessageMethodDial,
MessageID: 18446744073709551615,
ConnID: conn.ID,
Data: []byte(host),
}

logger.Debugf("dial for %s", host)

err = conn.muxWS.SendMessage(m)
if err != nil {
return
}

logger.Debugf("%d %s", conn.ID, host)
return
}
Loading

0 comments on commit fb0661e

Please sign in to comment.