From 12f2ecc45dacd0aeb7bca18f273fe11fea83f956 Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 26 May 2023 20:29:56 +0800 Subject: [PATCH 1/5] simplify websocket extension --- client.go | 19 +--- examples/nbio/server/server.go | 2 +- examples/pubsub_ws_with_nbio/server/server.go | 16 ++-- extension/jsclient/arpc.js | 95 ++++++++++--------- extension/protocol/websocket/websocket.go | 32 ++++--- websocket.go | 6 -- 6 files changed, 80 insertions(+), 90 deletions(-) delete mode 100644 websocket.go diff --git a/client.go b/client.go index 783e783..3663016 100644 --- a/client.go +++ b/client.go @@ -750,19 +750,6 @@ func (c *Client) run() { } } -func (c *Client) runWebsocket() { - c.mux.Lock() - defer c.mux.Unlock() - if !c.running { - c.running = true - c.initReader() - if c.Handler.AsyncWrite() { - go util.Safe(c.sendLoop) - } - c.Conn.(WebsocketConn).HandleWebsocket(c.recvLoop) - } -} - func (c *Client) initReader() { if c.Handler.BatchRecv() { c.Reader = c.Handler.WrapReader(c.Conn) @@ -958,11 +945,7 @@ func newClientWithConn(conn net.Conn, codec codec.Codec, handler Handler, onStop c.asyncHandlerMap = make(map[uint64]*asyncHandler) c.onStop = onStop - if _, ok := conn.(WebsocketConn); !ok { - c.run() - } else { - c.runWebsocket() - } + c.run() return c } diff --git a/examples/nbio/server/server.go b/examples/nbio/server/server.go index 886a75d..b271542 100644 --- a/examples/nbio/server/server.go +++ b/examples/nbio/server/server.go @@ -28,7 +28,7 @@ var ( stdSvr = arpc.NewServer() nbioSvr = nbio.NewGopher(nbio.Config{}) - pool = taskpool.NewMixedPool(1024*8, 1, 1024*8) + pool = taskpool.New(1024*8, 1024) method = "/echo" ) diff --git a/examples/pubsub_ws_with_nbio/server/server.go b/examples/pubsub_ws_with_nbio/server/server.go index 7c48adc..bcadf1f 100644 --- a/examples/pubsub_ws_with_nbio/server/server.go +++ b/examples/pubsub_ws_with_nbio/server/server.go @@ -18,11 +18,11 @@ package main import ( "context" + "errors" "fmt" "net/http" "os" "os/signal" - "runtime" "time" "github.com/lesismal/arpc" @@ -37,9 +37,11 @@ import ( var ( psServer *pubsub.Server - executePool = taskpool.NewMixedPool(1024*4, 1, 1024*runtime.NumCPU()) + executePool = taskpool.New(1024*4, 1024) keepaliveTime = 60 * time.Second + + upgrader = newUpgrader() ) func onWebsocket(w http.ResponseWriter, r *http.Request) { @@ -48,13 +50,11 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { // return // } - upgrader := newUpgrader() - conn, err := upgrader.Upgrade(w, r, nil) + wsConn, err := upgrader.Upgrade(w, r, nil) if err != nil { - conn.Close() + wsConn.Close() return } - wsConn := conn.(*websocket.Conn) wsConn.SetReadDeadline(time.Now().Add(keepaliveTime)) } @@ -111,6 +111,10 @@ type WebsocketConn struct { *websocket.Conn } +func (c *WebsocketConn) Read(buf []byte) (int, error) { + return -1, errors.New("unsupported") +} + func (c *WebsocketConn) Write(data []byte) (int, error) { err := c.Conn.WriteMessage(websocket.BinaryMessage, data) if err != nil { diff --git a/extension/jsclient/arpc.js b/extension/jsclient/arpc.js index 76b4629..c4389b8 100644 --- a/extension/jsclient/arpc.js +++ b/extension/jsclient/arpc.js @@ -6,6 +6,8 @@ var _CmdNone = 0; var _CmdRequest = 1; var _CmdResponse = 2; var _CmdNotify = 3; +var _CmdPing = 4; +var _CmdPong = 5; var _HeaderIndexBodyLenBegin = 0; var _HeaderIndexBodyLenEnd = 4; @@ -79,17 +81,21 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } this.call = function (method, request, timeout, cb, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { + if (typeof (cb) == 'function') { + cb({ data: null, err: _ErrClosed }); + } return new Promise(function (resolve, reject) { resolve({ data: null, err: _ErrClosed }); }); } if (this.state == _SOCK_STATE_CONNECTING) { + if (typeof (cb) == 'function') { + cb({ data: null, err: _ErrReconnecting }); + } return new Promise(function (resolve, reject) { resolve({ data: null, err: _ErrReconnecting }); }); } - this.seqNum++; - var seq = this.seqNum; var session = {}; var p = new Promise(function (resolve, reject) { session.resolve = resolve; @@ -105,41 +111,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { session.resolve({ data: null, err: "timeout" }); }, timeout); } - - var buffer; - if (request) { - var data = this.codec.Marshal(request); - if (data) { - buffer = new Uint8Array(16 + method.length + data.length); - for (var i = 0; i < data.length; i++) { - buffer[16 + method.length + i] = data[i]; - } - } - } else { - buffer = new Uint8Array(16 + method.length); - } - var bodyLen = buffer.length - 16; - for (var i = _HeaderIndexBodyLenBegin; i < _HeaderIndexBodyLenEnd; i++) { - buffer[i] = (bodyLen >> ((i - _HeaderIndexBodyLenBegin) * 8)) & 0xFF; - } - - buffer[_HeaderIndexCmd] = _CmdRequest & 0xFF; - buffer[_HeaderIndexMethodLen] = method.length & 0xFF; - for (var i = _HeaderIndexSeqBegin; i < _HeaderIndexSeqBegin + 4; i++) { - buffer[i] = (seq >> ((i - _HeaderIndexSeqBegin) * 8)) & 0xFF; - } - - var methodBuffer = new TextEncoder("utf-8").encode(method); - for (var i = 0; i < methodBuffer.length; i++) { - buffer[16 + i] = methodBuffer[i]; - } - - if (!isHttp) { - this.ws.send(buffer); - } else { - this.request(buffer, this._onMessage); - } - + this.write(_CmdRequest, method, request, this.seqNum, this._onMessage, isHttp); return p; } @@ -153,10 +125,37 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (this.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - this.seqNum++; + this.write(_CmdNotify, method, notify, function () { }, isHttp); + } + this.ping = function () { + if (client.state == _SOCK_STATE_CLOSED) { + return _ErrClosed; + } + if (client.state == _SOCK_STATE_CONNECTING) { + return _ErrReconnecting; + } + client.write(_CmdPing, "", null, function () { }); + } + this.pong = function () { + if (client.state == _SOCK_STATE_CLOSED) { + return _ErrClosed; + } + if (client.state == _SOCK_STATE_CONNECTING) { + return _ErrReconnecting; + } + client.write(_CmdPong, "", null, function () { }); + } + this.keepalive = function (timeout) { + if (this._keepaliveInited) return; + this._keepaliveInited = true; + if (!timeout) timeout = 1000 * 30; + setInterval(this.ping, timeout); + } + + this.write = function (cmd, method, arg, cb, isHttp) { var buffer; - if (notify) { - var data = this.codec.Marshal(notify); + if (arg) { + var data = this.codec.Marshal(arg); if (data) { buffer = new Uint8Array(16 + method.length + data.length); for (var i = 0; i < data.length; i++) { @@ -166,25 +165,25 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } else { buffer = new Uint8Array(16 + method.length); } + var bodyLen = buffer.length - 16; for (var i = _HeaderIndexBodyLenBegin; i < _HeaderIndexBodyLenEnd; i++) { buffer[i] = (bodyLen >> ((i - _HeaderIndexBodyLenBegin) * 8)) & 0xFF; } - buffer[_HeaderIndexCmd] = _CmdNotify & 0xFF; + buffer[_HeaderIndexCmd] = cmd & 0xFF; buffer[_HeaderIndexMethodLen] = method.length & 0xFF; + this.seqNum++; for (var i = _HeaderIndexSeqBegin; i < _HeaderIndexSeqBegin + 4; i++) { buffer[i] = (this.seqNum >> ((i - _HeaderIndexSeqBegin) * 8)) & 0xFF; } - var methodBuffer = new TextEncoder("utf-8").encode(method); for (var i = 0; i < methodBuffer.length; i++) { buffer[16 + i] = methodBuffer[i]; } - if (!isHttp) { this.ws.send(buffer); } else { - this.request(buffer, function () { }); + this.request(buffer, cb); } } @@ -244,6 +243,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { seq |= headArr[i] << (i - offset - _HeaderIndexSeqBegin); } + switch (cmd) { + case _CmdPing: + client.pong(); + return; + case _CmdPong: + return; + } + if (methodLen == 0) { console.log("[ArpcClient] onMessage: invalid request message with 0 method length, dropped"); return diff --git a/extension/protocol/websocket/websocket.go b/extension/protocol/websocket/websocket.go index 0ce5b54..24eaba5 100644 --- a/extension/protocol/websocket/websocket.go +++ b/extension/protocol/websocket/websocket.go @@ -8,6 +8,7 @@ import ( "errors" "net" "net/http" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -24,9 +25,11 @@ var ( // Listener . type Listener struct { - addr net.Addr - upgrader *websocket.Upgrader - acceptQueue chan net.Conn + addr net.Addr + upgrader *websocket.Upgrader + chAccept chan net.Conn + chClose chan struct{} + closed uint32 } // Handler . @@ -36,22 +39,20 @@ func (ln *Listener) Handler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusForbidden) return } - defer c.Close() wsc := &Conn{Conn: c, chHandler: make(chan func(), 1)} - ln.acceptQueue <- wsc - timeout := time.NewTimer(time.Second) select { - case handler := <-wsc.chHandler: - timeout.Stop() - handler() - case <-timeout.C: + case ln.chAccept <- wsc: + case <-ln.chClose: } + } // Close . func (ln *Listener) Close() error { - close(ln.acceptQueue) + if atomic.CompareAndSwapUint32(&ln.closed, 0, 1) { + close(ln.chClose) + } return nil } @@ -62,7 +63,7 @@ func (ln *Listener) Addr() net.Addr { // Accept . func (ln *Listener) Accept() (net.Conn, error) { - c := <-ln.acceptQueue + c := <-ln.chAccept if c != nil { return c, nil } @@ -139,9 +140,10 @@ func Listen(addr string, upgrader *websocket.Upgrader) (net.Listener, error) { } } ln := &Listener{ - addr: tcpAddr, - upgrader: upgrader, - acceptQueue: make(chan net.Conn, 4096), + addr: tcpAddr, + upgrader: upgrader, + chAccept: make(chan net.Conn, 4096), + chClose: make(chan struct{}), } return ln, nil } diff --git a/websocket.go b/websocket.go deleted file mode 100644 index 47d6f8c..0000000 --- a/websocket.go +++ /dev/null @@ -1,6 +0,0 @@ -package arpc - -// WebsocketConn defines websocket-conn interface. -type WebsocketConn interface { - HandleWebsocket(func()) -} From 3ee998f3097c59da6cfa21a55c4e9ff51761e173 Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 26 May 2023 20:37:38 +0800 Subject: [PATCH 2/5] close ws conn when listener closed --- extension/protocol/websocket/websocket.go | 1 + 1 file changed, 1 insertion(+) diff --git a/extension/protocol/websocket/websocket.go b/extension/protocol/websocket/websocket.go index 24eaba5..caa9424 100644 --- a/extension/protocol/websocket/websocket.go +++ b/extension/protocol/websocket/websocket.go @@ -44,6 +44,7 @@ func (ln *Listener) Handler(w http.ResponseWriter, r *http.Request) { select { case ln.chAccept <- wsc: case <-ln.chClose: + c.Close() } } From 8751369be0d9630c6a2a5f7c5494169f6f9c0eae Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 6 Oct 2023 02:38:56 +0800 Subject: [PATCH 3/5] fix js client ping/pong parsing in loop --- examples/httprpc/arpc.js | 76 +++++++++---------- examples/protocols/websocket/jsclient/arpc.js | 76 +++++++++---------- examples/webchat/arpc.js | 76 +++++++++---------- extension/jsclient/arpc.js | 76 +++++++++---------- 4 files changed, 152 insertions(+), 152 deletions(-) diff --git a/examples/httprpc/arpc.js b/examples/httprpc/arpc.js index c4389b8..7b0377a 100644 --- a/examples/httprpc/arpc.js +++ b/examples/httprpc/arpc.js @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]"; var _ErrReconnecting = "[error reconnecting]"; function Codec() { - this.Marshal = function (obj) { - if (typeof (obj) == 'string') { + this.Marshal = function(obj) { + if (typeof(obj) == 'string') { return new TextEncoder("utf-8").encode(obj); } return new TextEncoder("utf-8").encode(JSON.stringify(obj)); } - this.Unmarshal = function (data) { + this.Unmarshal = function(data) { try { data = JSON.parse(new TextDecoder("utf-8").decode(data)); return data; @@ -69,45 +69,45 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.state = _SOCK_STATE_CONNECTING; - this.handle = function (method, h) { + this.handle = function(method, h) { if (this.handlers[method]) { throw ("handler for [${method}] exists"); } this.handlers[method] = { h: h }; } - this.callHttp = function (method, request, timeout, cb) { + this.callHttp = function(method, request, timeout, cb) { this.call(method, request, timeout, cb, true); } - this.call = function (method, request, timeout, cb, isHttp) { + this.call = function(method, request, timeout, cb, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrClosed }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrClosed }); }); } if (this.state == _SOCK_STATE_CONNECTING) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrReconnecting }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrReconnecting }); }); } var session = {}; - var p = new Promise(function (resolve, reject) { + var p = new Promise(function(resolve, reject) { session.resolve = resolve; }); - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { session.resolve = cb; } this.sessionMap[seq] = session; if (timeout > 0) { - session.timer = setTimeout(function () { - delete (client.sessionMap[seq]); + session.timer = setTimeout(function() { + delete(client.sessionMap[seq]); session.resolve({ data: null, err: "timeout" }); }, timeout); } @@ -115,44 +115,44 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this.notifyHttp = function (method, notify) { + this.notifyHttp = function(method, notify) { this.notify(method, notify, true); } - this.notify = function (method, notify, isHttp) { + this.notify = function(method, notify, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (this.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - this.write(_CmdNotify, method, notify, function () { }, isHttp); + this.write(_CmdNotify, method, notify, function() {}, isHttp); } - this.ping = function () { + this.ping = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPing, "", null, function () { }); + client.write(_CmdPing, "", null, function() {}); } - this.pong = function () { + this.pong = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPong, "", null, function () { }); + client.write(_CmdPong, "", null, function() {}); } - this.keepalive = function (timeout) { + this.keepalive = function(timeout) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; setInterval(this.ping, timeout); } - this.write = function (cmd, method, arg, cb, isHttp) { + this.write = function(cmd, method, arg, cb, isHttp) { var buffer; if (arg) { var data = this.codec.Marshal(arg); @@ -187,24 +187,24 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.shutdown = function () { + this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; } - this.request = function (data, cb) { + this.request = function(data, cb) { let resolve; - let p = new Promise(function (res) { + let p = new Promise(function(res) { resolve = res; - if (typeof (cb) == 'function') { - resolve = function (ret) { + if (typeof(cb) == 'function') { + resolve = function(ret) { res(ret); cb(ret); } } let r = new XMLHttpRequest(); r.open(this.httpMethod, this.httpUrl, true); - r.onreadystatechange = function () { + r.onreadystatechange = function() { if (r.readyState != 4) { return; } @@ -220,7 +220,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this._onMessage = function (event) { + this._onMessage = function(event) { try { var offset = 0; while (offset < event.data.byteLength) { @@ -246,9 +246,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - return; + continue; case _CmdPong: - return; + continue; } if (methodLen == 0) { @@ -274,7 +274,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (session.timer) { clearTimeout(session.timer); } - delete (client.sessionMap[seq]); + delete(client.sessionMap[seq]); var data = client.codec.Unmarshal(bodyArr); if (isError) { session.resolve({ data: null, err: data }); @@ -296,7 +296,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.init = function () { + this.init = function() { console.log("[ArpcClient] init..."); if ('WebSocket' in window) { client.ws = new WebSocket(this.url); @@ -311,14 +311,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; - client.ws.onopen = function (event) { + client.ws.onopen = function(event) { client.state = _SOCK_STATE_CONNECTED; console.log("[ArpcClient] websocket onopen"); if (client.onOpen) { client.onOpen(client); } }; - client.ws.onclose = function (event) { + client.ws.onclose = function(event) { console.log("[ArpcClient] websocket onclose"); if (client.onClose) { client.onClose(client); @@ -341,7 +341,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; client.init(); }; - client.ws.onerror = function (event) { + client.ws.onerror = function(event) { console.log("[ArpcClient] websocket onerror"); if (client.onError) { client.onError(client); @@ -355,4 +355,4 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } catch (e) { console.log("[ArpcClient] init() failed:", e); } -} +} \ No newline at end of file diff --git a/examples/protocols/websocket/jsclient/arpc.js b/examples/protocols/websocket/jsclient/arpc.js index c4389b8..7b0377a 100644 --- a/examples/protocols/websocket/jsclient/arpc.js +++ b/examples/protocols/websocket/jsclient/arpc.js @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]"; var _ErrReconnecting = "[error reconnecting]"; function Codec() { - this.Marshal = function (obj) { - if (typeof (obj) == 'string') { + this.Marshal = function(obj) { + if (typeof(obj) == 'string') { return new TextEncoder("utf-8").encode(obj); } return new TextEncoder("utf-8").encode(JSON.stringify(obj)); } - this.Unmarshal = function (data) { + this.Unmarshal = function(data) { try { data = JSON.parse(new TextDecoder("utf-8").decode(data)); return data; @@ -69,45 +69,45 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.state = _SOCK_STATE_CONNECTING; - this.handle = function (method, h) { + this.handle = function(method, h) { if (this.handlers[method]) { throw ("handler for [${method}] exists"); } this.handlers[method] = { h: h }; } - this.callHttp = function (method, request, timeout, cb) { + this.callHttp = function(method, request, timeout, cb) { this.call(method, request, timeout, cb, true); } - this.call = function (method, request, timeout, cb, isHttp) { + this.call = function(method, request, timeout, cb, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrClosed }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrClosed }); }); } if (this.state == _SOCK_STATE_CONNECTING) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrReconnecting }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrReconnecting }); }); } var session = {}; - var p = new Promise(function (resolve, reject) { + var p = new Promise(function(resolve, reject) { session.resolve = resolve; }); - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { session.resolve = cb; } this.sessionMap[seq] = session; if (timeout > 0) { - session.timer = setTimeout(function () { - delete (client.sessionMap[seq]); + session.timer = setTimeout(function() { + delete(client.sessionMap[seq]); session.resolve({ data: null, err: "timeout" }); }, timeout); } @@ -115,44 +115,44 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this.notifyHttp = function (method, notify) { + this.notifyHttp = function(method, notify) { this.notify(method, notify, true); } - this.notify = function (method, notify, isHttp) { + this.notify = function(method, notify, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (this.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - this.write(_CmdNotify, method, notify, function () { }, isHttp); + this.write(_CmdNotify, method, notify, function() {}, isHttp); } - this.ping = function () { + this.ping = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPing, "", null, function () { }); + client.write(_CmdPing, "", null, function() {}); } - this.pong = function () { + this.pong = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPong, "", null, function () { }); + client.write(_CmdPong, "", null, function() {}); } - this.keepalive = function (timeout) { + this.keepalive = function(timeout) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; setInterval(this.ping, timeout); } - this.write = function (cmd, method, arg, cb, isHttp) { + this.write = function(cmd, method, arg, cb, isHttp) { var buffer; if (arg) { var data = this.codec.Marshal(arg); @@ -187,24 +187,24 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.shutdown = function () { + this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; } - this.request = function (data, cb) { + this.request = function(data, cb) { let resolve; - let p = new Promise(function (res) { + let p = new Promise(function(res) { resolve = res; - if (typeof (cb) == 'function') { - resolve = function (ret) { + if (typeof(cb) == 'function') { + resolve = function(ret) { res(ret); cb(ret); } } let r = new XMLHttpRequest(); r.open(this.httpMethod, this.httpUrl, true); - r.onreadystatechange = function () { + r.onreadystatechange = function() { if (r.readyState != 4) { return; } @@ -220,7 +220,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this._onMessage = function (event) { + this._onMessage = function(event) { try { var offset = 0; while (offset < event.data.byteLength) { @@ -246,9 +246,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - return; + continue; case _CmdPong: - return; + continue; } if (methodLen == 0) { @@ -274,7 +274,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (session.timer) { clearTimeout(session.timer); } - delete (client.sessionMap[seq]); + delete(client.sessionMap[seq]); var data = client.codec.Unmarshal(bodyArr); if (isError) { session.resolve({ data: null, err: data }); @@ -296,7 +296,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.init = function () { + this.init = function() { console.log("[ArpcClient] init..."); if ('WebSocket' in window) { client.ws = new WebSocket(this.url); @@ -311,14 +311,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; - client.ws.onopen = function (event) { + client.ws.onopen = function(event) { client.state = _SOCK_STATE_CONNECTED; console.log("[ArpcClient] websocket onopen"); if (client.onOpen) { client.onOpen(client); } }; - client.ws.onclose = function (event) { + client.ws.onclose = function(event) { console.log("[ArpcClient] websocket onclose"); if (client.onClose) { client.onClose(client); @@ -341,7 +341,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; client.init(); }; - client.ws.onerror = function (event) { + client.ws.onerror = function(event) { console.log("[ArpcClient] websocket onerror"); if (client.onError) { client.onError(client); @@ -355,4 +355,4 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } catch (e) { console.log("[ArpcClient] init() failed:", e); } -} +} \ No newline at end of file diff --git a/examples/webchat/arpc.js b/examples/webchat/arpc.js index c4389b8..7b0377a 100644 --- a/examples/webchat/arpc.js +++ b/examples/webchat/arpc.js @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]"; var _ErrReconnecting = "[error reconnecting]"; function Codec() { - this.Marshal = function (obj) { - if (typeof (obj) == 'string') { + this.Marshal = function(obj) { + if (typeof(obj) == 'string') { return new TextEncoder("utf-8").encode(obj); } return new TextEncoder("utf-8").encode(JSON.stringify(obj)); } - this.Unmarshal = function (data) { + this.Unmarshal = function(data) { try { data = JSON.parse(new TextDecoder("utf-8").decode(data)); return data; @@ -69,45 +69,45 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.state = _SOCK_STATE_CONNECTING; - this.handle = function (method, h) { + this.handle = function(method, h) { if (this.handlers[method]) { throw ("handler for [${method}] exists"); } this.handlers[method] = { h: h }; } - this.callHttp = function (method, request, timeout, cb) { + this.callHttp = function(method, request, timeout, cb) { this.call(method, request, timeout, cb, true); } - this.call = function (method, request, timeout, cb, isHttp) { + this.call = function(method, request, timeout, cb, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrClosed }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrClosed }); }); } if (this.state == _SOCK_STATE_CONNECTING) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrReconnecting }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrReconnecting }); }); } var session = {}; - var p = new Promise(function (resolve, reject) { + var p = new Promise(function(resolve, reject) { session.resolve = resolve; }); - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { session.resolve = cb; } this.sessionMap[seq] = session; if (timeout > 0) { - session.timer = setTimeout(function () { - delete (client.sessionMap[seq]); + session.timer = setTimeout(function() { + delete(client.sessionMap[seq]); session.resolve({ data: null, err: "timeout" }); }, timeout); } @@ -115,44 +115,44 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this.notifyHttp = function (method, notify) { + this.notifyHttp = function(method, notify) { this.notify(method, notify, true); } - this.notify = function (method, notify, isHttp) { + this.notify = function(method, notify, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (this.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - this.write(_CmdNotify, method, notify, function () { }, isHttp); + this.write(_CmdNotify, method, notify, function() {}, isHttp); } - this.ping = function () { + this.ping = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPing, "", null, function () { }); + client.write(_CmdPing, "", null, function() {}); } - this.pong = function () { + this.pong = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPong, "", null, function () { }); + client.write(_CmdPong, "", null, function() {}); } - this.keepalive = function (timeout) { + this.keepalive = function(timeout) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; setInterval(this.ping, timeout); } - this.write = function (cmd, method, arg, cb, isHttp) { + this.write = function(cmd, method, arg, cb, isHttp) { var buffer; if (arg) { var data = this.codec.Marshal(arg); @@ -187,24 +187,24 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.shutdown = function () { + this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; } - this.request = function (data, cb) { + this.request = function(data, cb) { let resolve; - let p = new Promise(function (res) { + let p = new Promise(function(res) { resolve = res; - if (typeof (cb) == 'function') { - resolve = function (ret) { + if (typeof(cb) == 'function') { + resolve = function(ret) { res(ret); cb(ret); } } let r = new XMLHttpRequest(); r.open(this.httpMethod, this.httpUrl, true); - r.onreadystatechange = function () { + r.onreadystatechange = function() { if (r.readyState != 4) { return; } @@ -220,7 +220,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this._onMessage = function (event) { + this._onMessage = function(event) { try { var offset = 0; while (offset < event.data.byteLength) { @@ -246,9 +246,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - return; + continue; case _CmdPong: - return; + continue; } if (methodLen == 0) { @@ -274,7 +274,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (session.timer) { clearTimeout(session.timer); } - delete (client.sessionMap[seq]); + delete(client.sessionMap[seq]); var data = client.codec.Unmarshal(bodyArr); if (isError) { session.resolve({ data: null, err: data }); @@ -296,7 +296,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.init = function () { + this.init = function() { console.log("[ArpcClient] init..."); if ('WebSocket' in window) { client.ws = new WebSocket(this.url); @@ -311,14 +311,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; - client.ws.onopen = function (event) { + client.ws.onopen = function(event) { client.state = _SOCK_STATE_CONNECTED; console.log("[ArpcClient] websocket onopen"); if (client.onOpen) { client.onOpen(client); } }; - client.ws.onclose = function (event) { + client.ws.onclose = function(event) { console.log("[ArpcClient] websocket onclose"); if (client.onClose) { client.onClose(client); @@ -341,7 +341,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; client.init(); }; - client.ws.onerror = function (event) { + client.ws.onerror = function(event) { console.log("[ArpcClient] websocket onerror"); if (client.onError) { client.onError(client); @@ -355,4 +355,4 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } catch (e) { console.log("[ArpcClient] init() failed:", e); } -} +} \ No newline at end of file diff --git a/extension/jsclient/arpc.js b/extension/jsclient/arpc.js index c4389b8..7b0377a 100644 --- a/extension/jsclient/arpc.js +++ b/extension/jsclient/arpc.js @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]"; var _ErrReconnecting = "[error reconnecting]"; function Codec() { - this.Marshal = function (obj) { - if (typeof (obj) == 'string') { + this.Marshal = function(obj) { + if (typeof(obj) == 'string') { return new TextEncoder("utf-8").encode(obj); } return new TextEncoder("utf-8").encode(JSON.stringify(obj)); } - this.Unmarshal = function (data) { + this.Unmarshal = function(data) { try { data = JSON.parse(new TextDecoder("utf-8").decode(data)); return data; @@ -69,45 +69,45 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.state = _SOCK_STATE_CONNECTING; - this.handle = function (method, h) { + this.handle = function(method, h) { if (this.handlers[method]) { throw ("handler for [${method}] exists"); } this.handlers[method] = { h: h }; } - this.callHttp = function (method, request, timeout, cb) { + this.callHttp = function(method, request, timeout, cb) { this.call(method, request, timeout, cb, true); } - this.call = function (method, request, timeout, cb, isHttp) { + this.call = function(method, request, timeout, cb, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrClosed }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrClosed }); }); } if (this.state == _SOCK_STATE_CONNECTING) { - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { cb({ data: null, err: _ErrReconnecting }); } - return new Promise(function (resolve, reject) { + return new Promise(function(resolve, reject) { resolve({ data: null, err: _ErrReconnecting }); }); } var session = {}; - var p = new Promise(function (resolve, reject) { + var p = new Promise(function(resolve, reject) { session.resolve = resolve; }); - if (typeof (cb) == 'function') { + if (typeof(cb) == 'function') { session.resolve = cb; } this.sessionMap[seq] = session; if (timeout > 0) { - session.timer = setTimeout(function () { - delete (client.sessionMap[seq]); + session.timer = setTimeout(function() { + delete(client.sessionMap[seq]); session.resolve({ data: null, err: "timeout" }); }, timeout); } @@ -115,44 +115,44 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this.notifyHttp = function (method, notify) { + this.notifyHttp = function(method, notify) { this.notify(method, notify, true); } - this.notify = function (method, notify, isHttp) { + this.notify = function(method, notify, isHttp) { if (this.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (this.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - this.write(_CmdNotify, method, notify, function () { }, isHttp); + this.write(_CmdNotify, method, notify, function() {}, isHttp); } - this.ping = function () { + this.ping = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPing, "", null, function () { }); + client.write(_CmdPing, "", null, function() {}); } - this.pong = function () { + this.pong = function() { if (client.state == _SOCK_STATE_CLOSED) { return _ErrClosed; } if (client.state == _SOCK_STATE_CONNECTING) { return _ErrReconnecting; } - client.write(_CmdPong, "", null, function () { }); + client.write(_CmdPong, "", null, function() {}); } - this.keepalive = function (timeout) { + this.keepalive = function(timeout) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; setInterval(this.ping, timeout); } - this.write = function (cmd, method, arg, cb, isHttp) { + this.write = function(cmd, method, arg, cb, isHttp) { var buffer; if (arg) { var data = this.codec.Marshal(arg); @@ -187,24 +187,24 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.shutdown = function () { + this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; } - this.request = function (data, cb) { + this.request = function(data, cb) { let resolve; - let p = new Promise(function (res) { + let p = new Promise(function(res) { resolve = res; - if (typeof (cb) == 'function') { - resolve = function (ret) { + if (typeof(cb) == 'function') { + resolve = function(ret) { res(ret); cb(ret); } } let r = new XMLHttpRequest(); r.open(this.httpMethod, this.httpUrl, true); - r.onreadystatechange = function () { + r.onreadystatechange = function() { if (r.readyState != 4) { return; } @@ -220,7 +220,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { return p; } - this._onMessage = function (event) { + this._onMessage = function(event) { try { var offset = 0; while (offset < event.data.byteLength) { @@ -246,9 +246,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - return; + continue; case _CmdPong: - return; + continue; } if (methodLen == 0) { @@ -274,7 +274,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (session.timer) { clearTimeout(session.timer); } - delete (client.sessionMap[seq]); + delete(client.sessionMap[seq]); var data = client.codec.Unmarshal(bodyArr); if (isError) { session.resolve({ data: null, err: data }); @@ -296,7 +296,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } } - this.init = function () { + this.init = function() { console.log("[ArpcClient] init..."); if ('WebSocket' in window) { client.ws = new WebSocket(this.url); @@ -311,14 +311,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; - client.ws.onopen = function (event) { + client.ws.onopen = function(event) { client.state = _SOCK_STATE_CONNECTED; console.log("[ArpcClient] websocket onopen"); if (client.onOpen) { client.onOpen(client); } }; - client.ws.onclose = function (event) { + client.ws.onclose = function(event) { console.log("[ArpcClient] websocket onclose"); if (client.onClose) { client.onClose(client); @@ -341,7 +341,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.state = _SOCK_STATE_CONNECTING; client.init(); }; - client.ws.onerror = function (event) { + client.ws.onerror = function(event) { console.log("[ArpcClient] websocket onerror"); if (client.onError) { client.onError(client); @@ -355,4 +355,4 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { } catch (e) { console.log("[ArpcClient] init() failed:", e); } -} +} \ No newline at end of file From 6439d2cc018b6a5f212e2cc21afc0e66f96c3e16 Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 6 Oct 2023 02:45:56 +0800 Subject: [PATCH 4/5] typo --- examples/httprpc/arpc.js | 2 +- examples/protocols/websocket/jsclient/arpc.js | 2 +- examples/webchat/arpc.js | 2 +- extension/jsclient/arpc.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/httprpc/arpc.js b/examples/httprpc/arpc.js index 7b0377a..638c709 100644 --- a/examples/httprpc/arpc.js +++ b/examples/httprpc/arpc.js @@ -306,7 +306,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.ws = new SockJS(this.url); } - // 消息类型,不设置则默认为'text' + // if not set this to `arraybuffer`, it will be 'text' by default. client.ws.binaryType = 'arraybuffer'; client.state = _SOCK_STATE_CONNECTING; diff --git a/examples/protocols/websocket/jsclient/arpc.js b/examples/protocols/websocket/jsclient/arpc.js index 7b0377a..638c709 100644 --- a/examples/protocols/websocket/jsclient/arpc.js +++ b/examples/protocols/websocket/jsclient/arpc.js @@ -306,7 +306,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.ws = new SockJS(this.url); } - // 消息类型,不设置则默认为'text' + // if not set this to `arraybuffer`, it will be 'text' by default. client.ws.binaryType = 'arraybuffer'; client.state = _SOCK_STATE_CONNECTING; diff --git a/examples/webchat/arpc.js b/examples/webchat/arpc.js index 7b0377a..638c709 100644 --- a/examples/webchat/arpc.js +++ b/examples/webchat/arpc.js @@ -306,7 +306,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.ws = new SockJS(this.url); } - // 消息类型,不设置则默认为'text' + // if not set this to `arraybuffer`, it will be 'text' by default. client.ws.binaryType = 'arraybuffer'; client.state = _SOCK_STATE_CONNECTING; diff --git a/extension/jsclient/arpc.js b/extension/jsclient/arpc.js index 7b0377a..638c709 100644 --- a/extension/jsclient/arpc.js +++ b/extension/jsclient/arpc.js @@ -306,7 +306,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { client.ws = new SockJS(this.url); } - // 消息类型,不设置则默认为'text' + // if not set this to `arraybuffer`, it will be 'text' by default. client.ws.binaryType = 'arraybuffer'; client.state = _SOCK_STATE_CONNECTING; From 9b56db47763b134542dda646a77bb5597b33176d Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 6 Oct 2023 14:56:32 +0800 Subject: [PATCH 5/5] fix js client ping/pong dead loop --- examples/httprpc/arpc.js | 9 ++++++--- examples/protocols/websocket/jsclient/arpc.js | 9 ++++++--- examples/webchat/arpc.js | 9 ++++++--- extension/jsclient/arpc.js | 9 ++++++--- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/examples/httprpc/arpc.js b/examples/httprpc/arpc.js index 638c709..e1c086b 100644 --- a/examples/httprpc/arpc.js +++ b/examples/httprpc/arpc.js @@ -149,7 +149,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; - setInterval(this.ping, timeout); + this.keepaliveIntervalID = setInterval(this.ping, timeout); } this.write = function(cmd, method, arg, cb, isHttp) { @@ -190,6 +190,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; + if (!!this.keepaliveIntervalID) { + clearInterval(this.keepaliveIntervalID); + } } this.request = function(data, cb) { @@ -246,9 +249,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - continue; + return; case _CmdPong: - continue; + return; } if (methodLen == 0) { diff --git a/examples/protocols/websocket/jsclient/arpc.js b/examples/protocols/websocket/jsclient/arpc.js index 638c709..e1c086b 100644 --- a/examples/protocols/websocket/jsclient/arpc.js +++ b/examples/protocols/websocket/jsclient/arpc.js @@ -149,7 +149,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; - setInterval(this.ping, timeout); + this.keepaliveIntervalID = setInterval(this.ping, timeout); } this.write = function(cmd, method, arg, cb, isHttp) { @@ -190,6 +190,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; + if (!!this.keepaliveIntervalID) { + clearInterval(this.keepaliveIntervalID); + } } this.request = function(data, cb) { @@ -246,9 +249,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - continue; + return; case _CmdPong: - continue; + return; } if (methodLen == 0) { diff --git a/examples/webchat/arpc.js b/examples/webchat/arpc.js index 638c709..e1c086b 100644 --- a/examples/webchat/arpc.js +++ b/examples/webchat/arpc.js @@ -149,7 +149,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; - setInterval(this.ping, timeout); + this.keepaliveIntervalID = setInterval(this.ping, timeout); } this.write = function(cmd, method, arg, cb, isHttp) { @@ -190,6 +190,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; + if (!!this.keepaliveIntervalID) { + clearInterval(this.keepaliveIntervalID); + } } this.request = function(data, cb) { @@ -246,9 +249,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - continue; + return; case _CmdPong: - continue; + return; } if (methodLen == 0) { diff --git a/extension/jsclient/arpc.js b/extension/jsclient/arpc.js index 638c709..e1c086b 100644 --- a/extension/jsclient/arpc.js +++ b/extension/jsclient/arpc.js @@ -149,7 +149,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { if (this._keepaliveInited) return; this._keepaliveInited = true; if (!timeout) timeout = 1000 * 30; - setInterval(this.ping, timeout); + this.keepaliveIntervalID = setInterval(this.ping, timeout); } this.write = function(cmd, method, arg, cb, isHttp) { @@ -190,6 +190,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { this.shutdown = function() { this.ws.close(); this.state = _SOCK_STATE_CLOSED; + if (!!this.keepaliveIntervalID) { + clearInterval(this.keepaliveIntervalID); + } } this.request = function(data, cb) { @@ -246,9 +249,9 @@ function ArpcClient(url, codec, httpUrl, httpMethod) { switch (cmd) { case _CmdPing: client.pong(); - continue; + return; case _CmdPong: - continue; + return; } if (methodLen == 0) {