This repository has been archived by the owner on Aug 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
113 lines (84 loc) · 2.3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
'use strict';
const faker = require('faker');
const nats = require('nats');
const {
Readable,
Writable
} = require('stream');
const {
Provider
} = require('../../');
const {
User
} = require('../schema/user');
const logger = console;
const MAX_USERS = 100000;
const TIMER_LABEL = `Pushed and received ${MAX_USERS} users in`;
function onTransportConnected(transport) {
logger.log('Connected to broker');
process.on('SIGINT', () => transport.close());
process.on('SIGTERM', () => transport.close());
const userProviderA = new Provider({
schema: User,
transport,
options: {
// Set higher highWaterMark to speed up streaming
highWaterMark: 1000,
// Do not acknowledge create messages to speed up streaming
noAckStream: true,
timeout: 5000
}
});
const userProviderB = new Provider({
schema: User,
transport,
options: {
timeout: 5000
}
});
// Generates MAX_USERS users
const userGenerator = (function* () {
for (let x = 0; x < MAX_USERS; ++x)
yield { name: faker.name.findName() };
})();
let receivedUsers = 0;
logger.time(TIMER_LABEL);
const inputStream = new Readable({
objectMode: true,
highWaterMark: 1000,
read() {
const result = userGenerator.next();
if (result.done)
return;
this.push(result.value);
}
});
const outputStream = new Writable({
objectMode: true,
highWaterMark: 1000,
write(_0, _1, callback) {
receivedUsers++;
// Close the stream when there are no more users to pipe
if (receivedUsers === MAX_USERS) {
logger.timeEnd(TIMER_LABEL);
inputStream.emit('close');
}
callback();
}
});
// Provider B pipes created entities from the bus into output stream
userProviderB.pipe(outputStream);
// Close transport once the input stream is closed
inputStream.on('close', () => setTimeout(() => {
transport.close();
}, 10));
// Entities are piped into provider A, that generate create events and
// publishes them on to the bus
inputStream.pipe(userProviderA);
}
{
const transport = nats.connect();
transport.on('error', logger.error);
transport.on('reconnect', () => logger.log('Transport reconnected'));
transport.on('connect', () => onTransportConnected(transport));
}