Skip to content

Commit

Permalink
feat(nodejs): safer timestamp API
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Oct 9, 2023
1 parent 4ba7774 commit 25496d5
Show file tree
Hide file tree
Showing 33 changed files with 3,627 additions and 3,076 deletions.
118 changes: 66 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,133 +1,143 @@
## QuestDB Node.js Client
# QuestDB Node.js Client

## Requirements

The client requires Node.js v16 or newer version.

## Installation
```shell
npm install @questdb/nodejs-client
npm i -s @questdb/nodejs-client
```

## Examples

### Basic API usage

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// create a sender with a 4k buffer
// it is important to size the buffer correctly so messages can fit
const sender = new Sender({bufferSize: 4096});
const sender = new Sender();

// connect to QuestDB
// host and port are required in connect options
await sender.connect({port: 9009, host: "localhost"});
await sender.connect({port: 9009, host: 'localhost'});

// add rows to the buffer of the sender
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0195).floatColumn("ask", 1.0221).atNow();
sender.table("prices").symbol("instrument", "GBPUSD")
.floatColumn("bid", 1.2076).floatColumn("ask", 1.2082).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0195).floatColumn('ask', 1.0221)
.at(Date.now(), 'ms');
sender.table('prices').symbol('instrument', 'GBPUSD')
.floatColumn('bid', 1.2076).floatColumn('ask', 1.2082)
.at(Date.now(), 'ms');

// flush the buffer of the sender, sending the data to QuestDB
// the buffer is cleared after the data is sent and the sender is ready to accept new data
await sender.flush();

// add rows to the buffer again and send it to the server
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run()
.then(console.log)
.catch(console.error);
```

### Authentication and secure connection

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// construct a JsonWebKey
const CLIENT_ID = "testapp";
const PRIVATE_KEY = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID = 'testapp';
const PRIVATE_KEY = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### TypeScript example

```typescript
import { Sender } from "@questdb/nodejs-client";
import { Sender } from '@questdb/nodejs-client';

async function run(): Promise<number> {
// construct a JsonWebKey
const CLIENT_ID: string = "testapp";
const PRIVATE_KEY: string = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID: string = 'testapp';
const PRIVATE_KEY: string = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY: { x: string, y: string } = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK: { x: string, y: string, kid: string, kty: string, d: string, crv: string } = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender: Sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender: Sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224).at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### Worker threads example

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// fake venue
Expand All @@ -136,7 +146,7 @@ function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {"ticker": ticker, "price": Math.random()};
yield {'ticker': ticker, 'price': Math.random()};
}
}

Expand All @@ -153,35 +163,37 @@ async function subscribe(ticker, onTick) {

async function run() {
if (isMainThread) {
const tickers = ["t1", "t2", "t3", "t4"];
const tickers = ['t1', 't2', 't3', 't4'];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => { console.log("Ingested " + msg.count + " prices for ticker " + msg.ticker); });
.on('message', (msg) => {
console.log(`Ingested ${msg.count} prices for ticker ${msg.ticker}`);
});
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender({ bufferSize: 4096 });
await sender.connect({ port: 9009, host: "localhost" });
const sender = new Sender();
await sender.connect({ port: 9009, host: 'localhost' });

// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table("prices")
.symbol("ticker", tick.ticker)
.floatColumn("price", tick.price)
.atNow();
.table('prices')
.symbol('ticker', tick.ticker)
.floatColumn('price', tick.price)
.at(Date.now(), 'ms');
await sender.flush();
count++;
});

// let the main thread know how many prices were ingested
parentPort.postMessage({"ticker": workerData.ticker, "count": count});
parentPort.postMessage({'ticker': workerData.ticker, 'count': count});

// close the connection to the database
await sender.close();
Expand All @@ -196,5 +208,7 @@ function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}

run().catch((err) => console.log(err));
run()
.then(console.log)
.catch(console.error);
```
Loading

0 comments on commit 25496d5

Please sign in to comment.