-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
97 lines (79 loc) · 2.82 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
const EventSource = require('eventsource');
const A = require('async');
class SSEEngine {
constructor(script, events, helpers) {
this.script = script;
this.events = events;
this.helpers = helpers;
this.target = script.config?.target;
this.eventSourceConfig = script.config?.engines?.['sse'] || {};
return this;
}
createScenario(spec, events) {
const self = this;
return function vu(initialContext, vuDone) {
const steps = [];
for (const step of spec.flow) {
if (step === 'open') {
steps.push(function open(next) {
// TODO: need to wait for state here / handle connection error, e.g. with invalid URL
const es = new EventSource(self.target, self.eventSourceConfig);
es.on('error', (err) => {
if (err.status) {
events.emit('counter', `sse.error.${err.status}`, 1)
} else {
events.emit('counter', 'sse.error', 1);
}
});
es.on('message', (_msg) => {
events.emit('counter', 'sse.message', 1);
});
if (spec.onMessage) {
// TODO: Warn if no processor function
if (self.script.config.processor?.[spec.onMessage]) {
es.on('message', (msg) => {
self.script.config.processor[spec.onMessage].call(null, msg, initialContext, events);
});
}
}
if(spec.onEvent) {
for(const handlerSpec of spec.onEvent) {
// TODO: Warn if no processor function
if (self.script.config.processor?.[handlerSpec.handler]) {
es.on(handlerSpec.eventName, (e) => {
self.script.config.processor[handlerSpec.handler].call(null, e, initialContext, events);
});
}
}
}
es.on('open', () => {
events.emit('counter', 'sse.open', 1);
});
initialContext.es = es;
events.emit('started');
return next(null, initialContext);
});
};
if (step.log) {
steps.push(function log(context, callback) {
console.log(self.helpers.template(step.log, context));
return process.nextTick(function () { callback(null, context); });
});
}
if (step.think) {
steps.push(self.helpers.createThink(step, self.script.config.defaults?.think || {}));
}
if (step == 'close') {
steps.push(function close(context, next) {
context.es?.close();
return next(null, context);
});
}
} // for
A.waterfall(steps, (err, context) => {
vuDone(err, context);
});
}
}
}
module.exports = SSEEngine;