-
Notifications
You must be signed in to change notification settings - Fork 4
/
run.js
139 lines (119 loc) · 3.73 KB
/
run.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**
* Entrypoint for starting task step.
*/
const logger = require('./lib/logging.js');
const Sailor = require('./lib/sailor.js').Sailor;
const settings = require('./lib/settings.js');
const { IPC } = require('./lib/ipc.js');
const Q = require('q');
const http = require('http');
const https = require('https');
let sailor;
let sailorInit;
let disconnectRequired;
function prepareSandbox() {
// enable keep alive by default to handle issues like https://github.com/elasticio/elasticio/issues/4874
http.globalAgent = new http.Agent({
keepAlive: true
});
https.globalAgent = new https.Agent({
keepAlive: true
});
}
async function putOutToSea(settings, ipc) {
ipc.send('init:started');
const deferred = Q.defer();
sailorInit = deferred.promise;
sailor = new Sailor(settings);
//eslint-disable-next-line no-extra-boolean-cast
if (!!settings.HOOK_SHUTDOWN) {
disconnectRequired = false;
//eslint-disable-next-line no-empty-function
sailor.reportError = () => {
};
await sailor.prepare();
await sailor.runHookShutdown();
return;
}
disconnectRequired = true;
await sailor.connect();
await sailor.prepare();
//eslint-disable-next-line no-extra-boolean-cast
if (!!settings.STARTUP_REQUIRED) {
await sailor.startup();
}
await sailor.runHookInit();
await sailor.run();
deferred.resolve();
ipc.send('init:ended');
}
async function disconnectAndExit() {
if (!disconnectRequired) {
return;
}
disconnectRequired = false;
try {
logger.info('Disconnecting...');
await sailor.disconnect();
logger.info('Successfully disconnected');
process.exit();
} catch (err) {
logger.error(err, 'Unable to disconnect');
process.exit(-1);
}
}
async function gracefulShutdown() {
if (!disconnectRequired) {
return;
}
if (!sailor) {
logger.warn('Something went wrong – sailor is falsy');
return;
}
// we connect to amqp, create channels, start listen a queue on init and interrupting this process with 'disconnect'
// will lead to undefined behaviour
logger.trace('Checking/waiting for init before graceful shutdown');
await sailorInit;
logger.trace('Waited an init before graceful shutdown');
await sailor.scheduleShutdown();
await disconnectAndExit();
}
async function run(settings, ipc) {
prepareSandbox();
try {
await putOutToSea(settings, ipc);
logger.info('Fully initialized and waiting for messages');
} catch (e) {
if (sailor && !sailor.amqpConnection.closed) {
await sailor.reportError(e);
}
logger.criticalErrorAndExit('putOutToSea.catch', e);
}
}
exports.__test__ = {
disconnectOnly: function disconnectOnly() {
if (!disconnectRequired) {
return Promise.resolve();
}
return sailor.disconnect();
},
closeConsumerChannel: function closeConsumerChannel() {
return sailor.amqpConnection.consumerChannel.close();
}
};
exports.run = run;
exports.putOutToSea = putOutToSea;
if (require.main === module || process.mainModule.filename === __filename) {
process.on('SIGTERM', function onSigterm() {
logger.info('Received SIGTERM');
gracefulShutdown();
});
process.on('SIGINT', function onSigint() {
logger.info('Received SIGINT');
gracefulShutdown();
});
process.on('uncaughtException', logger.criticalErrorAndExit.bind(logger, 'process.uncaughtException'));
process.on('unhandledRejection', (err) => logger.error(err, 'process.unhandledRejection'));
const ipc = new IPC();
run(settings.readFrom(process.env), ipc);
}