Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge branch 'wildcard_and_separator_options'
Browse files Browse the repository at this point in the history
Conflicts:
	test/redis_ascoltatore_spec.js
  • Loading branch information
mcollina committed Apr 17, 2014
2 parents 626d593 + 94062d3 commit ac26d4c
Show file tree
Hide file tree
Showing 23 changed files with 267 additions and 178 deletions.
57 changes: 55 additions & 2 deletions lib/abstract_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,31 @@ var EventEmitter = require('events').EventEmitter;
*
* @api public
*/
function AbstractAscoltatore() {
function AbstractAscoltatore(settings, nativeSettings) {
EventEmitter.call(this);


settings = settings || {};
nativeSettings = nativeSettings || {};

this._separator = settings.separator || '/',
this._wildcardOne = settings.wildcardOne || '+',
this._wildcardSome = settings.wildcardSome || '*';
this._nativeSettings = nativeSettings;

if (nativeSettings.separator &&
(this._separator !== nativeSettings.separator)) {
this._reInSeparator = new RegExp('\\' + this._separator, 'g');
this._reOutSeparator = new RegExp('\\' + nativeSettings.separator, 'g');
}
if (nativeSettings.wildcardOne &&
(this._wildcardOne !== nativeSettings.wildcardOne)) {
this._reInWildcardOne = new RegExp('\\' + this._wildcardOne, 'g');
}
if (nativeSettings.wildcardSome &&
(this._wildcardSome !== nativeSettings.wildcardSome)) {
this._reInWildcardSome = new RegExp('\\' + this._wildcardSome, 'g');
}

this._setPublish();

this._ready = false;
Expand Down Expand Up @@ -164,6 +186,37 @@ AbstractAscoltatore.prototype.registerDomain = function(domain) {
this._ascoltatore.registerDomain(domain);
};

AbstractAscoltatore.prototype._subTopic = function(topic) {
if (this._reInSeparator) {
topic = topic.replace(this._reInSeparator,
this._nativeSettings.separator);
}
if (this._reInWildcardSome) {
topic = topic.replace(this._reInWildcardSome,
this._nativeSettings.wildcardSome);
}
if (this._reInWildcardOne) {
topic = topic.replace(this._reInWildcardOne,
this._nativeSettings.wildcardOne);
}
return topic;
};

AbstractAscoltatore.prototype._recvTopic = function(topic) {
if (this._reOutSeparator) {
topic = topic.replace(this._reOutSeparator, this._separator);
}
return topic;
};

AbstractAscoltatore.prototype._pubTopic = function(topic) {
if (this._reInSeparator) {
topic = topic.replace(this._reInSeparator,
this._nativeSettings.separator);
}
return topic;
};

/**
* Exports the AbstractAscoltatore;
*
Expand Down
22 changes: 14 additions & 8 deletions lib/amqp_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ var debug = require("debug")("ascoltatori:amqp");
*/

function AMQPAscoltatore(opts) {
AbstractAscoltatore.call(this);
AbstractAscoltatore.call(this, opts, {
separator: '.',
wildcardOne: '*',
wildcardSome: '#'
});

this._opts = opts || {};
this._opts.amqp = this._opts.amqp || require("amqp");
this._ascoltatore = new TrieAscoltatore();
this._ascoltatore = new TrieAscoltatore(opts);

this._subs_counter = new SubsCounter();
this._startConn();
Expand Down Expand Up @@ -95,7 +99,8 @@ AMQPAscoltatore.prototype._startConn = function() {
}, function(message, headers, deliveryInfo) {
that._queue.shift();

var topic = deliveryInfo.routingKey.replace(/\./g, "/");
var topic = that._recvTopic(deliveryInfo.routingKey);

debug("new message received from queue on topic " + topic);

that._ascoltatore.publish(topic, message.data.toString());
Expand Down Expand Up @@ -133,7 +138,7 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
}, 5);
});

this._queue.bind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*'));
this._queue.bind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
}
Expand All @@ -148,7 +153,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) {

debug("new message published to " + topic);

this._exchange.publish(topic.replace(/\//g, "."), String(message));
this._exchange.publish(this._pubTopic(topic), String(message));
util.defer(done);
};

Expand All @@ -158,18 +163,19 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do

debug("deregistered subscriber for topic " + topic);

this._ascoltatore.unsubscribe(topic, callback);

if (!this._subs_counter.include(topic)) {
this._queue.once("queueUnbindOk", function() {
debug("queue unbound to topic " + topic);
util.defer(done);
});
this._queue.unbind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*'));

this._queue.unbind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
}

this._ascoltatore.unsubscribe(topic, callback);

return this;
};

Expand Down
8 changes: 4 additions & 4 deletions lib/behave_like_an_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ module.exports = function() {
callback = null;

callback = function(topic) {
expect(topic).to.equal("hello/42/there");
expect(topic).to.equal(["hello", "42", "there"].join(that.separator));
done();
};

Expand All @@ -141,7 +141,7 @@ module.exports = function() {
callback2 = null;

callback1 = function(topic) {
expect(topic).to.equal("hello/42/there");
expect(topic).to.equal(["hello", "42", "there"].join(that.separator));
done();
};

Expand All @@ -162,7 +162,7 @@ module.exports = function() {
count = 0;

callback = function(topic) {
expect(topic).to.equal("hello/42");
expect(topic).to.equal(["hello", "42"].join(that.separator));
count += 1;
if (count === 2) {
done();
Expand All @@ -179,7 +179,7 @@ module.exports = function() {
it("should publish the topic name", function(done) {
var that = this;
that.instance.sub("hello/*", function(topic) {
expect(topic).to.equal("hello/42");
expect(topic).to.equal(["hello", "42"].join(that.separator));
done();
}, function() {
that.instance.pub("hello/42");
Expand Down
13 changes: 8 additions & 5 deletions lib/event_emitter2_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ var ascoltatori = require('./ascoltatori');
* @api public
*/
function EventEmitter2Ascoltatore(settings) {
AbstractAscoltatore.call(this);
AbstractAscoltatore.call(this, settings, {
wildcardOne: '*',
wildcardSome: '**'
});

this._event = new EventEmitter2({
delimiter: '/',
delimiter: this._separator,
wildcard: true
});

Expand All @@ -39,15 +42,15 @@ EventEmitter2Ascoltatore.prototype.subscribe = function subscribe(topic, callbac
this._raiseIfClosed();
debug("registered new subscriber for topic " + topic);

this._event.on(topic.replace(/\*/g, '**').replace(/\+/g, '*').replace(/^\//g, ''), callback);
this._event.on(this._subTopic(topic).replace(/^\//g, ''), callback);
defer(done);
};

EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, done) {
this._raiseIfClosed();
debug("new message published to " + topic);

this._event.emit(topic.replace(/^\//g, ''), topic, message, options);
this._event.emit(this._pubTopic(topic).replace(/^\//g, ''), topic, message, options);

defer(done);
};
Expand All @@ -57,7 +60,7 @@ EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, cal

debug("deregistered subscriber for topic " + topic);

this._event.off(topic, callback);
this._event.off(this._subTopic(topic).replace(/^\//g, ''), callback);

defer(done);
};
Expand Down
24 changes: 16 additions & 8 deletions lib/filesystem_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ var debug = require('debug')('ascoltatori:filesystem');
*/
function FileSystemAscoltatore(opts)
{
AbstractAscoltatore.call(this);
AbstractAscoltatore.call(this, opts, {
separator: '.'
});

opts = opts || {};

var QlobberFSQ = (opts.qlobber_fsq || require('qlobber-fsq')).QlobberFSQ;

opts.separator = '.';
opts.wildcard_one = '+';
opts.wildcard_some = '*';
opts.separator = this._nativeSettings.separator;
opts.wildcard_one = this._wildcardOne;
opts.wildcard_some = this._wildcardSome;
opts.dedup = false;

this._dehnd = '__filesystem_ascoltatore' + crypto.randomBytes(16).toString('base64');
Expand All @@ -43,10 +45,12 @@ FileSystemAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

FileSystemAscoltatore.prototype.subscribe = function (topic, callback, done)
{
var ths = this;

function cb(data, info, cb2)
{
data = JSON.parse(data);
callback(info.topic.replace(/\./g, '\/'), data.message, data.options);
callback(ths._recvTopic(info.topic), data.message, data.options);
cb2();
}

Expand All @@ -58,17 +62,21 @@ FileSystemAscoltatore.prototype.subscribe = function (topic, callback, done)

callback[this._dehnd] = callback[this._dehnd] || f;

this._fsq.subscribe(topic.replace(/\//g, '.'), callback[this._dehnd], done);
this._fsq.subscribe(this._subTopic(topic),
callback[this._dehnd],
done);
};

FileSystemAscoltatore.prototype.unsubscribe = function (topic, callback, done)
{
this._fsq.unsubscribe(topic.replace(/\//g, '.'), callback[this._dehnd] || callback, done);
this._fsq.unsubscribe(this._subTopic(topic),
callback[this._dehnd] || callback,
done);
};

FileSystemAscoltatore.prototype.publish = function (topic, message, options, done)
{
this._fsq.publish(topic.replace(/\//g, '.'), JSON.stringify({
this._fsq.publish(this._pubTopic(topic), JSON.stringify({
message: message,
options: options
}), done);
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var MongoAscoltatore = function(opts) {
this._pubsubCollection = opts.pubsubCollection || 'pubsub';
this.mongoOpts = this._opts.mongo || {};

this._ascoltatore = new TrieAscoltatore();
this._ascoltatore = new TrieAscoltatore(opts);

this.channels = {};
this._closed = false;
Expand Down
16 changes: 10 additions & 6 deletions lib/mqtt_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ var SubsCounter = require("./subs_counter");
* @param {Object} opts The options object
*/
function MQTTAscoltatore(opts) {
AbstractAscoltatore.call(this);
AbstractAscoltatore.call(this, opts, {
separator: '/',
wildcardOne: '+',
wildcardSome: '#'
});

this._opts = opts || {};
this._opts.keepalive = this._opts.keepalive || 3000;
this._opts.mqtt = this._opts.mqtt || require("mqtt");

this._subs_counter = new SubsCounter();

this._ascoltatore = new TrieAscoltatore();
this._ascoltatore = new TrieAscoltatore(opts);
this._startConn();
}

Expand Down Expand Up @@ -71,7 +75,7 @@ MQTTAscoltatore.prototype._startConn = function() {
// we need to skip out this callback, so we do not
// break the client when an exception occurs
util.defer(function() {
that._ascoltatore.publish(topic, payload, packet);
that._ascoltatore.publish(that._recvTopic(topic), payload, packet);
});
});
this._client.on('error', function(e) {
Expand All @@ -93,7 +97,7 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
var opts = {
qos: 1
};
this._client.subscribe(topic.replace(/\*/g, "#"), opts, function() {
this._client.subscribe(this._subTopic(topic), opts, function() {
debug("registered new subscriber for topic " + topic);
util.defer(done);
});
Expand All @@ -108,7 +112,7 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
MQTTAscoltatore.prototype.publish = function publish(topic, message, options, done) {
this._raiseIfClosed();

this._client.publish(topic, String(message), {
this._client.publish(this._pubTopic(topic), String(message), {
qos: (options && (options.qos !== undefined)) ? options.qos : 1
}, function() {
debug("new message published to " + topic);
Expand All @@ -135,7 +139,7 @@ MQTTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do
}

debug("deregistering subscriber for topic " + topic);
this._client.unsubscribe(topic.replace(/\*/g, "#"), newDone);
this._client.unsubscribe(this._subTopic(topic), newDone);
};

MQTTAscoltatore.prototype.close = function close(done) {
Expand Down
10 changes: 8 additions & 2 deletions lib/prefix_acoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function PrefixAscoltatore(prefix, ascoltatore) {
DecoratorAscoltatore.call(this, ascoltatore || new TrieAscoltatore());

this._prefix = prefix;
this._separator = this._ascoltatore._separator;
}

/**
Expand All @@ -38,7 +39,7 @@ PrefixAscoltatore.prototype.wrapCallback = function(callback, next) {

PrefixAscoltatore.prototype._localToParent = function(topic) {
var newTopic = this._prefix;
newTopic += (topic.indexOf('/') !== 0) ? '/' : '';
newTopic += (topic.indexOf(this._separator) !== 0) ? this._separator : '';
newTopic += topic;
debug("rewriting local topic " + topic + " into " + newTopic);
return newTopic;
Expand All @@ -49,7 +50,12 @@ PrefixAscoltatore.prototype.wrapTopic = function(topic, next) {
};

PrefixAscoltatore.prototype._parentToLocal = function(topic) {
var newTopic = topic.replace(new RegExp("^" + this._prefix + "/"), "");
var newTopic;
if (topic.lastIndexOf(this._prefix + this._separator, 0) === 0) {
newTopic = topic.substr(this._prefix.length + 1);
} else {
newTopic = topic;
}
debug("rewriting remote topic " + topic + " into " + newTopic);
return newTopic;
};
Expand Down
Loading

0 comments on commit ac26d4c

Please sign in to comment.