Skip to content

Commit

Permalink
feat: Implement backpressure handling (#115)
Browse files Browse the repository at this point in the history
* Add drain event
* Add socket.pause() method
* Add socket.resume() method
  • Loading branch information
Rapsssito authored Aug 27, 2021
1 parent 8990fb0 commit 8a90f32
Show file tree
Hide file tree
Showing 19 changed files with 545 additions and 280 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ React Native TCP socket API for Android, iOS & macOS with **client SSL/TLS suppo
- [Server](#server)
- [SSL Client](#ssl-client)
- [API](#api)
- [TcpSocket](#tcpsocket)
- [Socket](#socket)
- [`createConnection()`](#createconnection)
- [Server](#server-1)
- [`listen()`](#listen)
Expand Down Expand Up @@ -218,7 +218,7 @@ _Note: In order to use self-signed certificates make sure to [update your metro.
## API
Here are listed all methods implemented in `react-native-tcp-socket`, their functionalities are equivalent to those provided by Node's [net](https://nodejs.org/api/net.html) (more info on [#41](https://github.com/Rapsssito/react-native-tcp-socket/issues/41)). However, the **methods whose interface differs from Node are marked in bold**.

### TcpSocket
### Socket
* **Methods:**
* **[`TcpSocket.createConnection(options[, callback])`](#createconnection)**
* [`address()`](https://nodejs.org/api/net.html#net_socket_address)
Expand All @@ -229,17 +229,28 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func
* [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay)
* [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback)
* [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback)
* [`pause()`](https://nodejs.org/api/net.html#net_socket_pause)
* `ref()` - _Will not have any effect_
* [`resume()`](https://nodejs.org/api/net.html#net_socket_resume)
* `unref()` - _Will not have any effect_
* **Properties:**
* Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable):
* [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain)
* [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress)
* [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily)
* [`remotePort`](https://nodejs.org/api/net.html#net_socket_remoteport)
* [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress)
* [`localPort`](https://nodejs.org/api/net.html#net_socket_localport)
* **Events:**
* Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable):
* [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause)
* [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume)
* [`'close'`](https://nodejs.org/api/net.html#net_event_close_1)
* [`'connect'`](https://nodejs.org/api/net.html#net_event_connect)
* [`'data'`](https://nodejs.org/api/net.html#net_event_data)
* [`'drain'`](https://nodejs.org/api/net.html#net_event_drain)
* [`'error'`](https://nodejs.org/api/net.html#net_event_error_1)
* [`'timeout'`](https://nodejs.org/api/net.html#net_event_timeout)

#### `createConnection()`
`createConnection(options[, callback])` creates a TCP connection using the given [`options`](#createconnection-options).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.asterinet.react.tcpsocket;

import android.util.Base64;

import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.ReactContext;
import com.facebook.react.bridge.WritableMap;
import com.facebook.react.modules.core.DeviceEventManagerModule;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

import javax.annotation.Nullable;

public class TcpEventListener {

private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter;

public TcpEventListener(final ReactContext reactContext) {
rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class);
}

public void onConnection(int serverId, int clientId, Socket socket) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", serverId);

WritableMap infoParams = Arguments.createMap();
infoParams.putInt("id", clientId);

WritableMap connectionParams = Arguments.createMap();
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();

connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
connectionParams.putInt("localPort", socket.getLocalPort());
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
connectionParams.putInt("remotePort", socket.getPort());
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");

infoParams.putMap("connection", connectionParams);
eventParams.putMap("info", infoParams);

sendEvent("connection", eventParams);
}

public void onConnect(int id, TcpSocketClient client) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
WritableMap connectionParams = Arguments.createMap();
Socket socket = client.getSocket();
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();

connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
connectionParams.putInt("localPort", socket.getLocalPort());
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
connectionParams.putInt("remotePort", socket.getPort());
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");
eventParams.putMap("connection", connectionParams);
sendEvent("connect", eventParams);
}

public void onListen(int id, TcpSocketServer server) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
WritableMap connectionParams = Arguments.createMap();
ServerSocket serverSocket = server.getServerSocket();
InetAddress address = serverSocket.getInetAddress();

connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress());
connectionParams.putInt("localPort", serverSocket.getLocalPort());
connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4");
eventParams.putMap("connection", connectionParams);
sendEvent("listening", eventParams);
}

public void onData(int id, byte[] data) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP));

sendEvent("data", eventParams);
}

public void onWritten(int id, int msgId, @Nullable String error) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putInt("msgId", msgId);
eventParams.putString("err", error);

sendEvent("written", eventParams);
}

public void onClose(int id, String error) {
if (error != null) {
onError(id, error);
}
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putBoolean("hadError", error != null);

sendEvent("close", eventParams);
}

public void onError(int id, String error) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putString("error", error);

sendEvent("error", eventParams);
}

private void sendEvent(String eventName, WritableMap params) {
rctEvtEmitter.emit(eventName, params);
}
}
Original file line number Diff line number Diff line change
@@ -1,70 +1,64 @@
package com.asterinet.react.tcpsocket;

import android.os.AsyncTask;
import android.util.Pair;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.net.Socket;
import java.util.Arrays;

/**
* This is a specialized AsyncTask that receives data from a socket in the background, and
* This is a specialized Runnable that receives data from a socket in the background, and
* notifies it's listener when data is received. This is not threadsafe, the listener
* should handle synchronicity.
*/
class TcpReceiverTask extends AsyncTask<Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>, Void, Void> {
public class TcpReceiverTask implements Runnable {

private final TcpSocketClient clientSocket;
private final TcpEventListener receiverListener;
private boolean paused = false;

public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) {
this.clientSocket = clientSocket;
this.receiverListener = receiverListener;
}

/**
* An infinite loop to block and read data from the socket.
*/
@SafeVarargs
@Override
protected final Void doInBackground(Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>... params) {
if (params.length > 1) {
throw new IllegalArgumentException("This task is only for a single socket/listener pair.");
}

TcpSocketClient clientSocket = params[0].first;
OnDataReceivedListener receiverListener = params[0].second;
public void run() {
int socketId = clientSocket.getId();
Socket socket = clientSocket.getSocket();
byte[] buffer = new byte[8192];
int bufferCount;
byte[] buffer = new byte[16384];
try {
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
while (!isCancelled() && !socket.isClosed()) {
bufferCount = in.read(buffer);
while (!socket.isClosed()) {
int bufferCount = in.read(buffer);
waitIfPaused();
if (bufferCount > 0) {
receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount));
} else if (bufferCount == -1) {
clientSocket.destroy();
}
}
} catch (IOException ioe) {
} catch (IOException | InterruptedException ioe) {
if (receiverListener != null && !socket.isClosed()) {
receiverListener.onError(socketId, ioe.getMessage());
}
this.cancel(false);
}
return null;
}

/**
* Listener interface for receive events.
*/
@SuppressWarnings("WeakerAccess")
public interface OnDataReceivedListener {
void onConnection(Integer serverId, Integer clientId, Socket socket);

void onConnect(Integer id, TcpSocketClient client);

void onListen(Integer id, TcpSocketServer server);

void onData(Integer id, byte[] data);
public synchronized void pause() {
paused = true;
}

void onClose(Integer id, String error);
public synchronized void resume() {
paused = false;
notify();
}

void onError(Integer id, String error);
private synchronized void waitIfPaused() throws InterruptedException {
while (paused) {
wait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import android.content.Context;
import android.net.Network;
import android.util.Pair;

import com.facebook.react.bridge.ReadableMap;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand All @@ -23,21 +21,19 @@
import androidx.annotation.Nullable;

class TcpSocketClient extends TcpSocket {
private final ExecutorService executorService;
private TcpReceiverTask receiverTask;
private final ExecutorService listenExecutor;
private final ExecutorService writeExecutor;
private final TcpEventListener receiverListener;
private final TcpReceiverTask receiverTask;
private Socket socket;
private TcpReceiverTask.OnDataReceivedListener mReceiverListener;

TcpSocketClient(@NonNull final TcpReceiverTask.OnDataReceivedListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
super(id);
this.executorService = Executors.newFixedThreadPool(1);
listenExecutor = Executors.newSingleThreadExecutor();
writeExecutor = Executors.newSingleThreadExecutor();
receiverTask = new TcpReceiverTask(this, receiverListener);
this.socket = socket;
receiverTask = new TcpReceiverTask();
mReceiverListener = receiverListener;
}

ExecutorService getExecutorService() {
return this.executorService;
this.receiverListener = receiverListener;
}

public Socket getSocket() {
Expand Down Expand Up @@ -83,43 +79,43 @@ public void connect(@NonNull final Context context, @NonNull final String addres
startListening();
}

@SuppressWarnings("WeakerAccess")
public void startListening() {
//noinspection unchecked
receiverTask.executeOnExecutor(getExecutorService(), new Pair<>(this, mReceiverListener));
listenExecutor.execute(receiverTask);
}

/**
* Sends data from the socket
*
* @param data data to be sent
*/
public void write(final byte[] data) throws IOException {
if (socket == null) {
throw new IOException("Socket is not connected.");
}
OutputStream output = socket.getOutputStream();
output.write(data);
public void write(final int msgId, final byte[] data) {
writeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
socket.getOutputStream().write(data);
receiverListener.onWritten(getId(), msgId, null);
} catch (IOException e) {
receiverListener.onWritten(getId(), msgId, e.toString());
receiverListener.onError(getId(), e.toString());
}
}
});
}

/**
* Shuts down the receiver task, closing the socket.
*/
public void destroy() {
try {
if (receiverTask != null && !receiverTask.isCancelled()) {
// stop the receiving task
receiverTask.cancel(true);
getExecutorService().shutdown();
}
// close the socket
if (socket != null && !socket.isClosed()) {
socket.close();
mReceiverListener.onClose(getId(), null);
receiverListener.onClose(getId(), null);
socket = null;
}
} catch (IOException e) {
mReceiverListener.onClose(getId(), e.getMessage());
receiverListener.onClose(getId(), e.getMessage());
}
}

Expand All @@ -143,4 +139,12 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO
// `initialDelay` is ignored
socket.setKeepAlive(enable);
}

public void pause() {
receiverTask.pause();
}

public void resume() {
receiverTask.resume();
}
}
Loading

0 comments on commit 8a90f32

Please sign in to comment.