From 96a1a2f3909d89c84a0f281625086a4f346783cf Mon Sep 17 00:00:00 2001 From: Bernd Niehues Date: Wed, 21 Feb 2018 10:36:44 +0100 Subject: [PATCH 1/3] [BUGFIX] Forward amqp options when publish to exchange When publishing a message to rabbitmq via amqp the options object was dropped in amqp_ascolatore. The options object is now forwarded to the exchange. --- lib/abstract_ascoltatore.js | 2 +- lib/amqp_ascoltatore.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/abstract_ascoltatore.js b/lib/abstract_ascoltatore.js index 0b78491..74bc514 100644 --- a/lib/abstract_ascoltatore.js +++ b/lib/abstract_ascoltatore.js @@ -128,7 +128,7 @@ AbstractAscoltatore.prototype._setPublish = function() { publish = this.publish; } else { publish = function (topic, payload, options, done) { - return f.call(this, topic, payload, done); + return f.call(this, topic, payload, options, done); }; } diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index b278c62..97ceb69 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -149,12 +149,12 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) debug("registered new subscriber for topic " + topic); }; -AMQPAscoltatore.prototype.publish = function publish(topic, message, done) { +AMQPAscoltatore.prototype.publish = function publish(topic, message, options, done) { this._raiseIfClosed(); debug("new message published to " + topic); - this._exchange.publish(this._pubTopic(topic), String(message)); + this._exchange.publish(this._pubTopic(topic), String(message), options); defer(done); }; From 7d635b53fffabe46a2f7e66c32892bc1d2273d28 Mon Sep 17 00:00:00 2001 From: Bernd Niehues Date: Wed, 21 Feb 2018 12:03:11 +0100 Subject: [PATCH 2/3] [FEATURE] Add exchangeOptions Adding exchangeOptions object (next to exchange in configuration) that enables the user to declare the exchange as durable. --- lib/amqp_ascoltatore.js | 12 ++++++++---- package.json | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index 97ceb69..5b5a0e8 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -8,6 +8,12 @@ var AbstractAscoltatore = require('./abstract_ascoltatore'); var steed = require("steed")(); var SubsCounter = require("./subs_counter"); var debug = require("debug")("ascoltatori:amqp"); +var _ = require('lodash'); + +var defaultExchangeOptions = { + type: "topic", + confirm: true +}; /** * The AMQPAscoltatore is a class that inherits from AbstractAscoltatore. @@ -80,10 +86,8 @@ AMQPAscoltatore.prototype._startConn = function() { function(callback) { debug("connected"); - that._exchange = conn.exchange(that._opts.exchange, { - type: "topic", - confirm: true - }); + var exchangeOptions = _.assignIn({}, defaultExchangeOptions, that._opts.exchangeOptions || {}); + that._exchange = conn.exchange(that._opts.exchange, exchangeOptions); that._exchange.once("open", wrap(callback)); }, diff --git a/package.json b/package.json index 4ad5b12..5b6c73d 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ }, "dependencies": { "debug": "^2.2.0", + "lodash": "^4.0.0", "uuid": "^3.0.0", "qlobber": "~0.7.0", "steed": "^1.1.3" From 4830f2c3d1d65ab892fd1c9ebaa96da408280ea0 Mon Sep 17 00:00:00 2001 From: Bernd Niehues Date: Thu, 22 Feb 2018 12:15:39 +0100 Subject: [PATCH 3/3] [BUGFIX] Forward callback function when publish to exchange When publishing a message to rabbitmq via amqp the callback function was defered independent of the result of exchange publish process. Now the callback is called by this, enhancing reliability of "done" state. --- lib/amqp_ascoltatore.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index 5b5a0e8..d8bae7a 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -158,8 +158,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, options, do debug("new message published to " + topic); - this._exchange.publish(this._pubTopic(topic), String(message), options); - defer(done); + this._exchange.publish(this._pubTopic(topic), String(message), options, done); }; AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {