Skip to content

Commit

Permalink
142-docs-example-for-cluster-and-load-balancer (#152)
Browse files Browse the repository at this point in the history
* Adds info to readme

* Add example with load-balancer

* Minor fixes

* Adds more comments in readme section

* Make create stream arguments optionals

* Adds reconnect and superstream example

* Update readme with super stream example

---------

Co-authored-by: magne <[email protected]>
  • Loading branch information
l4mby and magne authored Jan 24, 2024
1 parent 03e0560 commit 9cfbda3
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 32 deletions.
169 changes: 166 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@
- [Getting started](#getting-started)
- [Usage](#usage)
- [Connect](#connect)
- [Connect through TLS/SSL](#connect-through-tls-ssl)
- [Basic Publish](#basic-publish)
- [Sub Batch Entry Publishing](#sub-batch-entry-publishing)
- [Basic Consuming](#basic-consuming)
- [Single Active Consumer](#single-active-consumer)
- [Clustering](#clustering)
- [Load Balancer](#loadbalancer)
- [Running Examples](#running-examples)
- [Build from source](#build-from-source)
- [MISC](#misc)
- [Super Stream](#super-stream)
- [Filtering](#filtering)

## Overview

Expand Down Expand Up @@ -70,6 +77,27 @@ const client = await connect({
await client.close()
```

### Connect through TLS/SSL

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
ssl: {
key: "<client_private_key>",
cert: "<client_certificate>",
ca: "<ca>", // Optional
},
})

// ...

await client.close()
```

### Basic Publish

```typescript
Expand All @@ -93,6 +121,40 @@ await publisher.send(Buffer.from("my message content"))
await client.close()
```

### Sub Batch Entry Publishing

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

const publisher = await client.declarePublisher({
stream: "stream-name",
publisherRef: "my-publisher",
})

const messages = [
{ content: Buffer.from("my message content 1") },
{ content: Buffer.from("my message content 2") },
{ content: Buffer.from("my message content 3") },
{ content: Buffer.from("my message content 4") },
]

await publisher.sendSubEntries(messages)
/*
It is also possible to specify a compression when sending sub entries of messages:
e.g: await publisher.sendSubEntries(messages, CompressionType.Gzip)
The current values for the compression types are CompressionType.None or CompressionType.Gzip
*/

await client.close()
```

### Basic Consuming

```typescript
Expand All @@ -104,17 +166,118 @@ const client = await connect({
vhost: "/",
})

const consumerOptions = { stream: "stream-name", offset: Offset.next() } // see docs for various offset types
const consumerOptions = { stream: "stream-name", offset: Offset.next() }
/*
When creating a consumer the offset and the stream name are mandatory parameters.
The offset parameter can be created from one of the following functions:
- Offset.first() ---> Start reading from the first available offset.
- Offset.next() ---> Start reading from the next offset to be written.
- Offset.last() ---> Start reading from the last chunk of messages in the stream.
- Offset.offset(x) ---> Start reading from the specified offset. The parameter has to be a bigint.
- Offset.timestamp(t) ---> Start reading from the messages stored after the timestamp t.
*/

const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {
console.log(message.content) // it's a Buffer
console.log(message.content) // it's a Buffer
})

// declareConsumer works even with sub batch entry publishing and compression

// ...

await client.close()
```

### Single Active Consumer

It is possible to create a consumer as single active.
For the given reference only one consumer will be able to consume messages

```typescript
const consumerOptions = {
stream: "stream-name",
offset: Offset.next(),
singleActive: true,
consumerRef: "my-consumer-ref",
} // see docs for various offset types

const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {
console.log(message.content) // it's a Buffer
})
// ...
```

### Clustering

Every time we create a new producer or a new consumer, a new connection is created.
In particular for the producer the connection is created on the node leader.
For more running the tests in a cluster follow the readme under the folder /cluster

### Load Balancer

With the load balancer, what happens is we will connect first to the AddressResolver
and then we will connect to the node through the AddressResolver.
The address resolver is going to give us a node leader for a Producer and a node
replica for the consumer, otherwise it will close the connection and retry.

```typescript
const client = await connect({
hostname: "node0",
port: 5562,
username: "rabbit",
password: "rabbit",
vhost: "/",
addressResolver: { enabled: true },
})

const streamName = "my-stream"
await rabbit.createStream(streamName)

await wait(200) // wait for replicas to be created

// ...

await client.close()
```

### Super Stream

It is possible to create a super stream directly through the client only if you are using the latest (3.13.0-rc) management version.
Currently we do not support batch publishing and compression - that feature is coming soon

```typescript
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: rabbitUser,
password: rabbitPassword,
vhost: "/",
heartbeat: 0,
})
await client.createSuperStream({ streamName: "super-stream-example" })
await sleep(200) // Waiting for partitions to be created

const routingKeyExtractor = (content, msgOptions) => msgOptions.messageProperties.messageId
const publisher = await client.declareSuperStreamPublisher({ superStream: "super-stream-example" }, routingKeyExtractor)

await publisher.send(Buffer.from("Test message 1"), { messageProperties: { messageId: "1" } })
await publisher.send(Buffer.from("Test message 2"), { messageProperties: { messageId: "2" } })
await publisher.send(Buffer.from("Test message 3"), { messageProperties: { messageId: "3" } })

await client.declareSuperStreamConsumer({ superStream: "super-stream-example" }, (message) => {
console.log(`Received message ${message.content.toString()}`)
})

await sleep(2000)

await client.close()
```

### Filtering

Work in progress ⚠️

## Running Examples

the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps
Expand All @@ -129,7 +292,7 @@ npm i
run the docker-compose to launch a rabbit instance already stream enabled

```shell
docker-compose up -d
docker-compose up -d
```

add this line to your host file (on linux `/etc/hosts`) to correctly resolve rabbitmq
Expand Down
73 changes: 66 additions & 7 deletions example/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node index.js"
"start": "node index.js",
"cluster-example": "node cluster_example.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"amqplib": "^0.10.3",
"rabbitmq-stream-js-client": "^0.1.1"
"rabbitmq-stream-js-client": "file:../."
},
"engines": {
"node": "16.x.x"
Expand Down
49 changes: 49 additions & 0 deletions example/src/autoreconnect_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const rabbit = require("rabbitmq-stream-js-client")
const { randomUUID } = require("crypto")

const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"

async function main() {
const streamName = `example-${randomUUID()}`
console.log(`Creating stream ${streamName}`)

let client = undefined

const connectToRabbit = async () => {
client = await rabbit.connect({
hostname: "localhost",
port: 5553,
username: rabbitUser,
password: rabbitPassword,
listeners: {
connection_closed: async () => {
await sleep(Math.random() * 3000)
connectToRabbit()
.then(() => console.log("Successfully re-connected to rabbit!"))
.catch((e) => console.error("Error while reconnecting to Rabbit!", e))
},
},
vhost: "/",
heartbeat: 0,
})
}

await connectToRabbit()

await sleep(2000)

console.log("Closing!")
await client.close()
console.log("Now it should reopen!")

await sleep(10000)
}

main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("ERROR ", res)
process.exit(-1)
})
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
Loading

0 comments on commit 9cfbda3

Please sign in to comment.