Skip to content

Commit

Permalink
chore: classify mux (#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmywarting authored Jun 25, 2023
1 parent eaf0186 commit 4791f2d
Showing 1 changed file with 96 additions and 99 deletions.
195 changes: 96 additions & 99 deletions lib/mux.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,117 +13,114 @@ var assert = require('assert');
var schedule = (typeof setImmediate === 'function') ?
setImmediate : process.nextTick;

function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}

// There are 2 states we can be in:

// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,

// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event

// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
class Mux {
constructor (downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function () {
self.blocked = false;
self._readIncoming();
});
}

Mux.prototype._readIncoming = function() {
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
_readIncoming () {

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;

var accepting = true;
var out = this.out;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin (streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
roundrobin(this.newStreams);

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
}

var accepting = true;
var out = this.out;
_scheduleRead () {
var self = this;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
if (!self.scheduledRead) {
schedule(function () {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
}

roundrobin(this.newStreams);
pipeFrom (readable) {
var self = this;

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.

if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};

Mux.prototype._scheduleRead = function() {
var self = this;

if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
function enqueue () {
self.newStreams.push(readable);
self._scheduleRead();
}

Mux.prototype.pipeFrom = function(readable) {
var self = this;
function cleanup () {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe (dest) {
if (dest === self) cleanup();
}

function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
}

function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
unpipeFrom (readable) {
readable.emit('unpipeFrom', this);
}

readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};

Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
}

module.exports.Mux = Mux;

0 comments on commit 4791f2d

Please sign in to comment.