Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

WIP - choose from max score candidates #1081

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 12 additions & 38 deletions node/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,37 +343,6 @@ TChannelPeer.prototype.makeOutConnection = function makeOutConnection(socket) {
return conn;
};

TChannelPeer.prototype.outPendingWeightedRandom = function outPendingWeightedRandom() {
// Returns a score in the range from 0 to 1, where it is preferable to use
// a peer with a higher score over one with a lower score.
// This range is divided among an infinite set of subranges corresponding
// to peers with the same number of pending requests.
// So, the range (1/2, 1) is reserved for peers with 0 pending connections.
// The range (1/4, 1/2) is reserved for peers with 1 pending connections.
// The range (1/8, 1/4) is reserved for peers with 2 pending connections.
// Ad nauseam.
// Within each equivalence class, each peer receives a uniform random
// value.
//
// The previous score was a weighted random variable:
// random() ** (1 + pending)
// This had the attribute that a less loaded peer was merely more likely to
// be chosen over a more loaded peer.
// We observed with the introduction of a heap, that a less favored peer
// would have its score less frequently re-evaluated.
// An emergent behavior was that scores would, over time, be squeezed
// toward zero and the least favored peer would remain the least favored
// for ever increasing durations.
//
// This remains true with this algorithm, within each equivalence class.
var self = this;
var pending = self.pendingIdentified + self.countOutPending();
var max = Math.pow(0.5, pending);
var min = max / 2;
var diff = max - min;
return min + diff * self.random();
};

TChannelPeer.prototype.countOutPending = function countOutPending() {
var self = this;
var pending = 0;
Expand Down Expand Up @@ -434,13 +403,18 @@ PreferOutgoingHandler.prototype.getTier = function getTier() {
PreferOutgoingHandler.prototype.getScore = function getScore() {
var self = this;

// space:
// domain, per tier of availability:
// [0.1, 0.4) peers with no identified outgoing connection
// [0.4, 1.0) identified outgoing connections
var random = self.peer.outPendingWeightedRandom();
var qos = self.getTier();
self.lastTier = qos;
switch (qos) {

// within each domain, the score is a function of the number of pending
// requests. 1 for 0 pending, 1/2 for 1 pending, 1/3 for 2 pending, etc.
var pending = self.peer.pendingIdentified + self.peer.countOutPending();
var pendingScore = 1 / (1 + pending);

var tier = self.getTier();
self.lastTier = tier;
switch (tier) {
case TIER_ONLY_INCOMING:
if (!self.peer.channel.destroyed) {
self.peer.connect();
Expand All @@ -449,9 +423,9 @@ PreferOutgoingHandler.prototype.getScore = function getScore() {
case TIER_UNCONNECTED:
/* falls through */
case TIER_FRESH_OUTGOING:
return 0.1 + random * 0.3;
return 0.1 + pendingScore * 0.3;
case TIER_READY_OUTGOING:
return 0.4 + random * 0.6;
return 0.4 + pendingScore * 0.6;
}
};

Expand Down
35 changes: 33 additions & 2 deletions node/peer_heap.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ function PeerHeap() {
// TODO: worth it to keep a tail free list like TimeHeap?
// self.end = 0;
self._stack = [];
// This is a reusable array, used in the non-reentrant chooseEl routine.
self._candidates = [];
}

PeerHeap.prototype.choose = function choose(threshold, filter) {
Expand All @@ -57,12 +59,41 @@ PeerHeap.prototype.choose = function choose(threshold, filter) {

PeerHeap.prototype._chooseEl = function _chooseEl(threshold) {
var self = this;
var i;

var el = self.array[0];
if (el.score <= threshold) { // TODO: why inclusive?
var score = self.array[0].score;

if (score <= threshold) { // TODO: why inclusive?
return null;
}

// Seed the candidate list with the one obvious maximum
self._candidates.push(0);

// Find all elements atop the heap that share the maximum score
for (i = 0; i < self._candidates.length; i++) {
var left = (2 * i) + 1;
if (left >= self.array.length) {
continue;
}
if (self.array[left].score === score) {
self._candidates.push(left);
}
var right = left + 1;
if (right >= self.array.length) {
continue;
}
if (self.array[right].score === score) {
self._candidates.push(right);
}
}

// Choose one candidate randomly
i = Math.floor(Math.random() * self._candidates.length);
var el = self.array[i];

self._candidates.length = 0;

return el;
};

Expand Down
7 changes: 5 additions & 2 deletions node/test/pool-of-servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ var parallel = require('run-parallel');
var allocCluster = require('./lib/alloc-cluster.js');

allocCluster.test('request().send() to a pool of servers', {
numPeers: 5
numPeers: 5,
channelOptions: {
choosePeerWithHeap: true
}
}, function t(cluster, assert) {
var client = cluster.channels[0];

Expand Down Expand Up @@ -174,7 +177,7 @@ allocCluster.test('request().send() to a pool of servers', {
}

var keys = Object.keys(byServer);
assert.equal(keys.length, numPeers, 'expected 25 servers');
assert.equal(keys.length, numPeers, 'expected responses from all servers');

for (var k = 0; k < keys.length; k++) {
var count = byServer[keys[k]];
Expand Down