Skip to content

Commit

Permalink
feat(nodejs): introduce safer timestamp API (#21)
Browse files Browse the repository at this point in the history
* feat(nodejs): safer timestamp API

* Update CI configs

* Address review comments
  • Loading branch information
puzpuzpuz authored Oct 9, 2023
1 parent 4ba7774 commit 828afd1
Show file tree
Hide file tree
Showing 35 changed files with 3,673 additions and 3,091 deletions.
21 changes: 7 additions & 14 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
name: build

on:
push:
on: [push]

jobs:
build:
name: Build @questdb/nodejs-questdb-client
test:
name: Build with Node.js ${{ matrix.node-version }}
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16, 20]
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand All @@ -16,7 +18,7 @@ jobs:
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 16
node-version: ${{ matrix.node-version }}

- name: Install dependencies
run: npm ci
Expand All @@ -26,12 +28,3 @@ jobs:

- name: Run linter
run: npm run eslint

- name: Publish @questdb/nodejs-questdb-client to npm
if: github.ref == 'refs/heads/main'
uses: JS-DevTools/npm-publish@v1
with:
token: ${{ secrets.CI_TOKEN }}
access: public
check-version: true
package: package.json
38 changes: 38 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: build

on:
push:
branches:
- main

jobs:
build:
name: Publish @questdb/nodejs-questdb-client to npm
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
submodules: recursive

- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 20

- name: Install dependencies
run: npm ci

- name: Run tests
run: npm test

- name: Run linter
run: npm run eslint

- name: Publish
uses: JS-DevTools/npm-publish@v2
with:
token: ${{ secrets.CI_TOKEN }}
access: public
strategy: all
package: package.json
118 changes: 66 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,133 +1,143 @@
## QuestDB Node.js Client
# QuestDB Node.js Client

## Requirements

The client requires Node.js v16 or newer version.

## Installation
```shell
npm install @questdb/nodejs-client
npm i -s @questdb/nodejs-client
```

## Examples

### Basic API usage

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// create a sender with a 4k buffer
// it is important to size the buffer correctly so messages can fit
const sender = new Sender({bufferSize: 4096});
const sender = new Sender();

// connect to QuestDB
// host and port are required in connect options
await sender.connect({port: 9009, host: "localhost"});
await sender.connect({port: 9009, host: 'localhost'});

// add rows to the buffer of the sender
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0195).floatColumn("ask", 1.0221).atNow();
sender.table("prices").symbol("instrument", "GBPUSD")
.floatColumn("bid", 1.2076).floatColumn("ask", 1.2082).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0195).floatColumn('ask', 1.0221)
.at(Date.now(), 'ms');
sender.table('prices').symbol('instrument', 'GBPUSD')
.floatColumn('bid', 1.2076).floatColumn('ask', 1.2082)
.at(Date.now(), 'ms');

// flush the buffer of the sender, sending the data to QuestDB
// the buffer is cleared after the data is sent and the sender is ready to accept new data
await sender.flush();

// add rows to the buffer again and send it to the server
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run()
.then(console.log)
.catch(console.error);
```

### Authentication and secure connection

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// construct a JsonWebKey
const CLIENT_ID = "testapp";
const PRIVATE_KEY = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID = 'testapp';
const PRIVATE_KEY = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### TypeScript example

```typescript
import { Sender } from "@questdb/nodejs-client";
import { Sender } from '@questdb/nodejs-client';

async function run(): Promise<number> {
// construct a JsonWebKey
const CLIENT_ID: string = "testapp";
const PRIVATE_KEY: string = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID: string = 'testapp';
const PRIVATE_KEY: string = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY: { x: string, y: string } = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK: { x: string, y: string, kid: string, kty: string, d: string, crv: string } = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender: Sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender: Sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224).at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### Worker threads example

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// fake venue
Expand All @@ -136,7 +146,7 @@ function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {"ticker": ticker, "price": Math.random()};
yield {'ticker': ticker, 'price': Math.random()};
}
}

Expand All @@ -153,35 +163,37 @@ async function subscribe(ticker, onTick) {

async function run() {
if (isMainThread) {
const tickers = ["t1", "t2", "t3", "t4"];
const tickers = ['t1', 't2', 't3', 't4'];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => { console.log("Ingested " + msg.count + " prices for ticker " + msg.ticker); });
.on('message', (msg) => {
console.log(`Ingested ${msg.count} prices for ticker ${msg.ticker}`);
});
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender({ bufferSize: 4096 });
await sender.connect({ port: 9009, host: "localhost" });
const sender = new Sender();
await sender.connect({ port: 9009, host: 'localhost' });

// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table("prices")
.symbol("ticker", tick.ticker)
.floatColumn("price", tick.price)
.atNow();
.table('prices')
.symbol('ticker', tick.ticker)
.floatColumn('price', tick.price)
.at(Date.now(), 'ms');
await sender.flush();
count++;
});

// let the main thread know how many prices were ingested
parentPort.postMessage({"ticker": workerData.ticker, "count": count});
parentPort.postMessage({'ticker': workerData.ticker, 'count': count});

// close the connection to the database
await sender.close();
Expand All @@ -196,5 +208,7 @@ function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}

run().catch((err) => console.log(err));
run()
.then(console.log)
.catch(console.error);
```
Loading

0 comments on commit 828afd1

Please sign in to comment.