-
Notifications
You must be signed in to change notification settings - Fork 0
/
trigger_worker.js
107 lines (93 loc) · 3.45 KB
/
trigger_worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'use strict';
require(!process.env.TEST ? './bootstrap' : './bootstrap.test');
var logger = require('./helpers/logger');
var config = require('./config')(logger);
/**
* @callback startCallback
* @param {Object} config Application configuration
* @param {Object} logger Logger
* @param {Object} rabbit Wascally RabbitMQ broker
*/
/**
* @param {startCallback} f Callback called when starting the worker
* @param {Object} options Options
* @param {Boolean} options.autoSubscribeQueue
* If false, the AMQP queue described by config.amqp.alerting.trigger.queue will have to be subscribed manually.
*/
function getConfiguredApp(f, options) {
var defaultOptions = {
autoSubscribeQueue: true
};
options = _.defaults(options || {}, defaultOptions);
var operators = require('./src/operators')();
_.extend(operators, options.operators || {}); // extend operators
var domain = require('./src/domain')(config, logger, operators);
var MessageHandler = require('./src/message_handler')(logger, config, domain.AlertRepository, domain.MeasurementEntity);
var messageHandler = new MessageHandler();
// Consts
assert(_.isString(config.statwarn.schema.monitoring.create));
var SCHEMA_MONITORING_CREATE = config.statwarn.schema.monitoring.create;
var SCHEMA_ALERTS_TRIGGERED = config.statwarn.schema.alerts.triggered;
// connect on amqp
var rabbit = require('wascally');
// after this call, any new callbacks attached via handle will be wrapped in a try/catch
// that nacks the message on an error
rabbit.nackOnError();
/**
* Event handler called when an action is triggered (event 'action:new' on message_handler)
* @param {Object} data
* @param {Object} data.action Action triggered
* @param {Object} data.alert Alert related to the triggered action
* @param {Object} data.measurement Measurement which caused the trigger
*/
function handleActionTriggered(data) {
rabbit.publish(config.amqp.alerting.trigger.exchange.name, {
routingKey: config.amqp.alerting.trigger.routing_key.prefix + data.alert.alert_id,
body: {
measurement: data.measurement,
action: data.action,
alert: data.alert
},
type: SCHEMA_ALERTS_TRIGGERED
}).catch(function (err) {
logger.err("TriggerWorker: RabbitMQ publish error:", err);
});
}
// Publish triggered actions on RabbitMQ
messageHandler.on('action:new', handleActionTriggered);
rabbit
.handle(SCHEMA_MONITORING_CREATE, messageHandler.handle.bind(messageHandler))['catch'](function (err, msg) {
logger.error(err);
// do something with the error & message
msg.nack();
});
rabbit.configure({
connection: config.amqp,
queues: [{
name: config.amqp.alerting.trigger.queue,
subscribe: options.autoSubscribeQueue
}],
exchanges: [{
name: config.amqp.alerting.trigger.exchange.name,
type: config.amqp.alerting.trigger.exchange.type
}]
}).done(function () {
f(config, logger, rabbit);
}, function (err) {
logger.error("TriggerWorker: RabbitMQ configure error:", err);
// Propagate the error if we can't connect to RabbitMQ
throw err;
});
}
function defaultAppHandler(config, logger, rabbit) {
logger.info('TriggerWorker: Ready to trigger');
}
if (module.parent === null) {
// The worker was directly launched
getConfiguredApp(defaultAppHandler, {
autoSubscribeQueue: true
});
} else {
// The worker was required by another module
module.exports = getConfiguredApp;
}