-
Notifications
You must be signed in to change notification settings - Fork 0
/
transport.js
119 lines (96 loc) · 2.58 KB
/
transport.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
'use strict';
const hooks = require('async-hooks');
class Transport {
constructor() {
this.initRequired = true;
this.transports = {};
hooks(this, 'emit', 'start', 'stop', 'send');
}
init({ app, logger, transports }) {
this.app = app;
this.logger = logger.get('transport');
transports.forEach((transport, name) => this.addTransport(name, transport));
app.did('start', () => this.start());
app.did('stop', () => this.stop());
}
addTransport(name, transport) {
this.transports[name] = transport;
transport.emit = (path, ...args) =>
this.handleEvent(`${name}/${path}`, ...args);
transport.error = error => this.handleError(error, name);
}
get(name) {
return this.transports[name];
}
async handleEvent(path, ...args) {
this.logger.info({ path }, 'Transport Event');
await this.emit(path, ...args);
}
// !! >> this is async hooks method
emit(path, ...args) {
this.app.emit(`transport/${path}`, ...args);
}
catchEmit(error, path) {
this.logError('Failed to emit event', error, { path });
}
handleError(error, transport) {
this.logError('Transport failed', error, { transport });
this.app.emit(`error/transport/${transport}`, error);
}
start() {
const promises = Object.entries(this.transports).map(([name, transport]) =>
transport.start().then(address => {
this.logger.info(
{
transport: name,
status: 'listening',
address
},
`Transport ${name} is listening at: ${address}`
);
return address;
})
);
return Promise.all(promises);
}
stop() {
const promises = Object.entries(this.transports).map(([name, transport]) =>
transport.stop().then(() => {
this.logger.info(
{
transport: name,
status: 'stopped'
},
`Transport ${name} has been stopped`
);
})
);
return Promise.all(promises);
}
send(message) {
if (!message) {
throw new Error('Can not send empty message');
}
const transport = this.get(message.transport);
if (!transport) {
throw new Error(`Transport '${message.transport}' not found`);
}
return transport.send(message);
}
catchSend(error, message) {
this.logError('Failed to send message', error, {
transport: message.transport
});
throw error;
}
logError(msg, error, data) {
this.logger.error(
{
...data,
stack: error.stack
},
`${msg}: ${error}`
);
}
}
module.exports = Transport;