From 9cfbda3eb98fb2199f8c942cab78c5c09c41916c Mon Sep 17 00:00:00 2001 From: l4mby <38959260+l4mby@users.noreply.github.com> Date: Wed, 24 Jan 2024 14:24:34 +0100 Subject: [PATCH] 142-docs-example-for-cluster-and-load-balancer (#152) * Adds info to readme * Add example with load-balancer * Minor fixes * Adds more comments in readme section * Make create stream arguments optionals * Adds reconnect and superstream example * Update readme with super stream example --------- Co-authored-by: magne --- README.md | 169 +++++++++++++++++++- example/package-lock.json | 73 ++++++++- example/package.json | 5 +- example/src/autoreconnect_example.js | 49 ++++++ example/src/cluster_example.js | 46 ++++++ example/src/superstream_example.js | 48 ++++++ example/tsconfig.json | 2 +- src/client.ts | 4 +- src/requests/create_stream_request.ts | 18 ++- src/requests/create_super_stream_request.ts | 19 +-- 10 files changed, 401 insertions(+), 32 deletions(-) create mode 100644 example/src/autoreconnect_example.js create mode 100644 example/src/cluster_example.js create mode 100644 example/src/superstream_example.js diff --git a/README.md b/README.md index 4c2fa13a..fdf88967 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,18 @@ - [Getting started](#getting-started) - [Usage](#usage) - [Connect](#connect) + - [Connect through TLS/SSL](#connect-through-tls-ssl) - [Basic Publish](#basic-publish) + - [Sub Batch Entry Publishing](#sub-batch-entry-publishing) - [Basic Consuming](#basic-consuming) + - [Single Active Consumer](#single-active-consumer) + - [Clustering](#clustering) + - [Load Balancer](#loadbalancer) - [Running Examples](#running-examples) - [Build from source](#build-from-source) - [MISC](#misc) + - [Super Stream](#super-stream) + - [Filtering](#filtering) ## Overview @@ -70,6 +77,27 @@ const client = await connect({ await client.close() ``` +### Connect through TLS/SSL + +```typescript +const client = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", + ssl: { + key: "", + cert: "", + ca: "", // Optional + }, +}) + +// ... + +await client.close() +``` + ### Basic Publish ```typescript @@ -93,6 +121,40 @@ await publisher.send(Buffer.from("my message content")) await client.close() ``` +### Sub Batch Entry Publishing + +```typescript +const client = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", +}) + +const publisher = await client.declarePublisher({ + stream: "stream-name", + publisherRef: "my-publisher", +}) + +const messages = [ + { content: Buffer.from("my message content 1") }, + { content: Buffer.from("my message content 2") }, + { content: Buffer.from("my message content 3") }, + { content: Buffer.from("my message content 4") }, +] + +await publisher.sendSubEntries(messages) +/* + It is also possible to specify a compression when sending sub entries of messages: + e.g: await publisher.sendSubEntries(messages, CompressionType.Gzip) + + The current values for the compression types are CompressionType.None or CompressionType.Gzip +*/ + +await client.close() +``` + ### Basic Consuming ```typescript @@ -104,17 +166,118 @@ const client = await connect({ vhost: "/", }) -const consumerOptions = { stream: "stream-name", offset: Offset.next() } // see docs for various offset types +const consumerOptions = { stream: "stream-name", offset: Offset.next() } +/* + When creating a consumer the offset and the stream name are mandatory parameters. + The offset parameter can be created from one of the following functions: + - Offset.first() ---> Start reading from the first available offset. + - Offset.next() ---> Start reading from the next offset to be written. + - Offset.last() ---> Start reading from the last chunk of messages in the stream. + - Offset.offset(x) ---> Start reading from the specified offset. The parameter has to be a bigint. + - Offset.timestamp(t) ---> Start reading from the messages stored after the timestamp t. + +*/ const consumer = await client.declareConsumer(consumerOptions, (message: Message) => { - console.log(message.content) // it's a Buffer + console.log(message.content) // it's a Buffer }) +// declareConsumer works even with sub batch entry publishing and compression + // ... await client.close() ``` +### Single Active Consumer + +It is possible to create a consumer as single active. +For the given reference only one consumer will be able to consume messages + +```typescript +const consumerOptions = { + stream: "stream-name", + offset: Offset.next(), + singleActive: true, + consumerRef: "my-consumer-ref", +} // see docs for various offset types + +const consumer = await client.declareConsumer(consumerOptions, (message: Message) => { + console.log(message.content) // it's a Buffer +}) +// ... +``` + +### Clustering + +Every time we create a new producer or a new consumer, a new connection is created. +In particular for the producer the connection is created on the node leader. +For more running the tests in a cluster follow the readme under the folder /cluster + +### Load Balancer + +With the load balancer, what happens is we will connect first to the AddressResolver +and then we will connect to the node through the AddressResolver. +The address resolver is going to give us a node leader for a Producer and a node +replica for the consumer, otherwise it will close the connection and retry. + +```typescript +const client = await connect({ + hostname: "node0", + port: 5562, + username: "rabbit", + password: "rabbit", + vhost: "/", + addressResolver: { enabled: true }, +}) + +const streamName = "my-stream" +await rabbit.createStream(streamName) + +await wait(200) // wait for replicas to be created + +// ... + +await client.close() +``` + +### Super Stream + +It is possible to create a super stream directly through the client only if you are using the latest (3.13.0-rc) management version. +Currently we do not support batch publishing and compression - that feature is coming soon + +```typescript +const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, +}) +await client.createSuperStream({ streamName: "super-stream-example" }) +await sleep(200) // Waiting for partitions to be created + +const routingKeyExtractor = (content, msgOptions) => msgOptions.messageProperties.messageId +const publisher = await client.declareSuperStreamPublisher({ superStream: "super-stream-example" }, routingKeyExtractor) + +await publisher.send(Buffer.from("Test message 1"), { messageProperties: { messageId: "1" } }) +await publisher.send(Buffer.from("Test message 2"), { messageProperties: { messageId: "2" } }) +await publisher.send(Buffer.from("Test message 3"), { messageProperties: { messageId: "3" } }) + +await client.declareSuperStreamConsumer({ superStream: "super-stream-example" }, (message) => { + console.log(`Received message ${message.content.toString()}`) +}) + +await sleep(2000) + +await client.close() +``` + +### Filtering + +Work in progress ⚠️ + ## Running Examples the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps @@ -129,7 +292,7 @@ npm i run the docker-compose to launch a rabbit instance already stream enabled ```shell -docker-compose up -d +docker-compose up -d ``` add this line to your host file (on linux `/etc/hosts`) to correctly resolve rabbitmq diff --git a/example/package-lock.json b/example/package-lock.json index 18a2ddc9..d6f4f133 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "amqplib": "^0.10.3", - "rabbitmq-stream-js-client": "^0.1.1" + "rabbitmq-stream-js-client": "file:../." }, "devDependencies": { "typescript": "^4.9.5" @@ -22,6 +22,40 @@ " ..": { "extraneous": true }, + "..": { + "version": "0.1.1", + "license": "ISC", + "dependencies": { + "semver": "^7.5.4" + }, + "devDependencies": { + "@tsconfig/node16": "^1.0.3", + "@types/amqplib": "^0.10.1", + "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", + "@types/mocha": "^10.0.1", + "@types/node": "^16.18.11", + "@typescript-eslint/eslint-plugin": "^5.50.0", + "@typescript-eslint/parser": "^5.50.0", + "amqplib": "^0.10.3", + "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", + "cspell": "^6.21.0", + "eslint": "^8.33.0", + "eslint-config-prettier": "^8.6.0", + "eslint-plugin-deprecation": "^1.3.3", + "eslint-plugin-import": "^2.27.5", + "eslint-plugin-no-only-tests": "^3.1.0", + "eslint-plugin-prettier": "^4.2.1", + "got": "^11.8.5", + "mocha": "^10.2.0", + "ts-node": "^10.9.1", + "typescript": "^4.9.5", + "winston": "^3.8.2" + } + }, "node_modules/@acuminous/bitsyntax": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz", @@ -96,9 +130,8 @@ "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" }, "node_modules/rabbitmq-stream-js-client": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.1.1.tgz", - "integrity": "sha512-0Oq3uV0J8bMJemU7C4bMoidS0ynEEWWAENgzB8HaJp0oP5swByskaUQ3Wjme8WS8BUvl+z4CK6ETGF7/XDcbpw==" + "resolved": "..", + "link": true }, "node_modules/readable-stream": { "version": "1.1.14", @@ -210,9 +243,35 @@ "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" }, "rabbitmq-stream-js-client": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.1.1.tgz", - "integrity": "sha512-0Oq3uV0J8bMJemU7C4bMoidS0ynEEWWAENgzB8HaJp0oP5swByskaUQ3Wjme8WS8BUvl+z4CK6ETGF7/XDcbpw==" + "version": "file:..", + "requires": { + "@tsconfig/node16": "^1.0.3", + "@types/amqplib": "^0.10.1", + "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", + "@types/mocha": "^10.0.1", + "@types/node": "^16.18.11", + "@typescript-eslint/eslint-plugin": "^5.50.0", + "@typescript-eslint/parser": "^5.50.0", + "amqplib": "^0.10.3", + "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", + "cspell": "^6.21.0", + "eslint": "^8.33.0", + "eslint-config-prettier": "^8.6.0", + "eslint-plugin-deprecation": "^1.3.3", + "eslint-plugin-import": "^2.27.5", + "eslint-plugin-no-only-tests": "^3.1.0", + "eslint-plugin-prettier": "^4.2.1", + "got": "^11.8.5", + "mocha": "^10.2.0", + "semver": "^7.5.4", + "ts-node": "^10.9.1", + "typescript": "^4.9.5", + "winston": "^3.8.2" + } }, "readable-stream": { "version": "1.1.14", diff --git a/example/package.json b/example/package.json index c5fbf66d..c8186a95 100644 --- a/example/package.json +++ b/example/package.json @@ -5,13 +5,14 @@ "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", - "start": "node index.js" + "start": "node index.js", + "cluster-example": "node cluster_example.js" }, "author": "", "license": "ISC", "dependencies": { "amqplib": "^0.10.3", - "rabbitmq-stream-js-client": "^0.1.1" + "rabbitmq-stream-js-client": "file:../." }, "engines": { "node": "16.x.x" diff --git a/example/src/autoreconnect_example.js b/example/src/autoreconnect_example.js new file mode 100644 index 00000000..fc2a47b4 --- /dev/null +++ b/example/src/autoreconnect_example.js @@ -0,0 +1,49 @@ +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const streamName = `example-${randomUUID()}` + console.log(`Creating stream ${streamName}`) + + let client = undefined + + const connectToRabbit = async () => { + client = await rabbit.connect({ + hostname: "localhost", + port: 5553, + username: rabbitUser, + password: rabbitPassword, + listeners: { + connection_closed: async () => { + await sleep(Math.random() * 3000) + connectToRabbit() + .then(() => console.log("Successfully re-connected to rabbit!")) + .catch((e) => console.error("Error while reconnecting to Rabbit!", e)) + }, + }, + vhost: "/", + heartbeat: 0, + }) + } + + await connectToRabbit() + + await sleep(2000) + + console.log("Closing!") + await client.close() + console.log("Now it should reopen!") + + await sleep(10000) +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) diff --git a/example/src/cluster_example.js b/example/src/cluster_example.js new file mode 100644 index 00000000..4f0ea6a7 --- /dev/null +++ b/example/src/cluster_example.js @@ -0,0 +1,46 @@ +/* + Run this example only after creating the cluster. + Following the indications at https://github.com/coders51/rabbitmq-stream-js-client/tree/main/cluster +*/ + +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const streamName = `example-${randomUUID()}` + console.log(`Creating stream ${streamName}`) + + const client = await rabbit.connect({ + hostname: "node0", + port: 5562, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, + addressResolver: { enabled: true, endpoint: { host: "localhost", port: 5553 } }, + }) + await client.createStream({ stream: streamName, arguments: {} }) + await sleep(200) // Waiting for replicas to be created + const publisher = await client.declarePublisher({ stream: streamName }) + + await publisher.send(Buffer.from("Test message")) + + await client.declareConsumer({ stream: streamName, offset: rabbit.Offset.first() }, (message) => { + console.log(`Received message ${message.content.toString()}`) + }) + + await sleep(2000) + + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) diff --git a/example/src/superstream_example.js b/example/src/superstream_example.js new file mode 100644 index 00000000..5273d6a2 --- /dev/null +++ b/example/src/superstream_example.js @@ -0,0 +1,48 @@ +/* + Run this example only with rabbit management version >= 3.13.0. +*/ + +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const superStreamName = `example-${randomUUID()}` + console.log(`Creating super stream ${superStreamName}`) + + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, + }) + await client.createSuperStream({ streamName: superStreamName }) + await sleep(200) // Waiting for partitions to be created + + const routingKeyExtractor = (content, msgOptions) => msgOptions.messageProperties.messageId + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor) + + await publisher.send(Buffer.from("Test message 1"), { messageProperties: { messageId: "1" } }) + await publisher.send(Buffer.from("Test message 2"), { messageProperties: { messageId: "2" } }) + await publisher.send(Buffer.from("Test message 3"), { messageProperties: { messageId: "3" } }) + + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message) => { + console.log(`Received message ${message.content.toString()}`) + }) + + await sleep(2000) + + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) diff --git a/example/tsconfig.json b/example/tsconfig.json index 41d2c3fc..a265fa5a 100644 --- a/example/tsconfig.json +++ b/example/tsconfig.json @@ -15,6 +15,6 @@ "declaration": true, "allowJs": true }, - "include": ["./*.js"], + "include": ["./*.js", "src/autoreconnect_example.js", "src/cluster_example.js", "src/superstream_example.js"], "exclude": ["node_modules", "**/*.spec.ts"] } diff --git a/src/client.ts b/src/client.ts index 70486fc7..592b9516 100644 --- a/src/client.ts +++ b/src/client.ts @@ -425,7 +425,7 @@ export class Client { }) } - public async createStream(params: { stream: string; arguments: CreateStreamArguments }): Promise { + public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise { this.logger.debug(`Create Stream...`) const res = await this.sendAndWait(new CreateStreamRequest(params)) if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) { @@ -452,7 +452,7 @@ export class Client { public async createSuperStream( params: { streamName: string - arguments: CreateStreamArguments + arguments?: CreateStreamArguments }, bindingKeys?: string[], numberOfPartitions = 3 diff --git a/src/requests/create_stream_request.ts b/src/requests/create_stream_request.ts index f70c530d..85b8b221 100644 --- a/src/requests/create_stream_request.ts +++ b/src/requests/create_stream_request.ts @@ -18,21 +18,23 @@ export class CreateStreamRequest extends AbstractRequest { private readonly _arguments: { key: keyof CreateStreamArguments; value: string | number }[] = [] private readonly stream: string - constructor(params: { stream: string; arguments: CreateStreamArguments }) { + constructor(params: { stream: string; arguments?: CreateStreamArguments }) { super() - this._arguments = (Object.keys(params.arguments) as Array).map((key) => { - return { - key, - value: params.arguments[key] ?? "", - } - }) + if (params.arguments) { + this._arguments = (Object.keys(params.arguments) as Array).map((key) => { + return { + key, + value: params.arguments![key] ?? "", + } + }) + } this.stream = params.stream } writeContent(writer: DataWriter) { writer.writeString(this.stream) - writer.writeUInt32(this._arguments.length) + writer.writeUInt32(this._arguments?.length ?? 0) this._arguments.forEach(({ key, value }) => { writer.writeString(key) writer.writeString(value.toString()) diff --git a/src/requests/create_super_stream_request.ts b/src/requests/create_super_stream_request.ts index 98f4536e..df300387 100644 --- a/src/requests/create_super_stream_request.ts +++ b/src/requests/create_super_stream_request.ts @@ -7,7 +7,7 @@ export interface CreateSuperStreamParams { streamName: string partitions: string[] bindingKeys: string[] - arguments: CreateStreamArguments + arguments?: CreateStreamArguments } export class CreateSuperStreamRequest extends AbstractRequest { @@ -22,13 +22,14 @@ export class CreateSuperStreamRequest extends AbstractRequest { constructor(params: CreateSuperStreamParams) { super() - this._arguments = (Object.keys(params.arguments) as Array).map((key) => { - return { - key, - value: params.arguments[key] ?? "", - } - }) - + if (params.arguments) { + this._arguments = (Object.keys(params.arguments) as Array).map((key) => { + return { + key, + value: params.arguments![key] ?? "", + } + }) + } this.streamName = params.streamName this.partitions = params.partitions this.bindingKeys = params.bindingKeys @@ -40,7 +41,7 @@ export class CreateSuperStreamRequest extends AbstractRequest { this.partitions.forEach((partition) => writer.writeString(partition)) writer.writeInt32(this.bindingKeys.length) this.bindingKeys.forEach((bindingKey) => writer.writeString(bindingKey)) - writer.writeUInt32(this._arguments.length) + writer.writeUInt32(this._arguments?.length ?? 0) this._arguments.forEach(({ key, value }) => { writer.writeString(key) writer.writeString(value.toString())