Skip to content

Commit

Permalink
feat(nodejs): set copyBuffer to 'true' by default (#18)
Browse files Browse the repository at this point in the history
* feat(nodejs): set copyBuffer to 'true' by default, add type checks for sender options

* feat(nodejs): worker threads example

* version bump

* Update src/sender.js

Co-authored-by: Andrei Pechkurov <[email protected]>

---------

Co-authored-by: Andrei Pechkurov <[email protected]>
  • Loading branch information
glasstiger and puzpuzpuz authored May 26, 2023
1 parent c88ddd1 commit 82ced72
Show file tree
Hide file tree
Showing 14 changed files with 443 additions and 62 deletions.
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,77 @@ async function run(): Promise<number> {

run().then(value => console.log(value)).catch(err => console.log(err));
```

### Worker threads example
```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// fake venue
// generates random prices for a ticker for max 5 seconds, then the feed closes
function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {"ticker": ticker, "price": Math.random()};
}
}

// market data feed simulator
// uses the fake venue to deliver price updates to the feed handler (onTick() callback)
async function subscribe(ticker, onTick) {
const feed = venue(workerData.ticker);
let tick;
while (tick = feed.next().value) {
await onTick(tick);
await sleep(rndInt(30));
}
}

async function run() {
if (isMainThread) {
const tickers = ["t1", "t2", "t3", "t4"];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => { console.log("Ingested " + msg.count + " prices for ticker " + msg.ticker); });
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender({ bufferSize: 4096 });
await sender.connect({ port: 9009, host: "localhost" });

// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table("prices")
.symbol("ticker", tick.ticker)
.floatColumn("price", tick.price)
.atNow();
await sender.flush();
count++;
});

// let the main thread know how many prices were ingested
parentPort.postMessage({"ticker": workerData.ticker, "count": count});

// close the connection to the database
await sender.close();
}
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}

run().catch((err) => console.log(err));
```
46 changes: 25 additions & 21 deletions docs/Sender.html
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,18 @@ <h5>Parameters:</h5>
Properties of the object:
<ul>
<li>bufferSize: <i>number</i> - Size of the buffer used by the sender to collect rows, provided in bytes. <br>
Optional, defaults to 8192 bytes </li>
<li>copyBuffer: <i>boolean</i> - If true a new buffer will be created for every flush() call and the data to be sent to the server will be copied into this new buffer. <br>
Setting the flag could result in performance degradation, use this flag only if calls to the client cannot be serialised. <br>
Optional, defaults to false </li>
Optional, defaults to 8192 bytes. <br>
If the value passed is not a number, the setting is ignored. </li>
<li>copyBuffer: <i>boolean</i> - By default a new buffer is created for every flush() call, and the data to be sent to the server is copied into this new buffer.
Setting the flag to <i>false</i> results in reusing the same buffer instance for each flush() call. Use this flag only if calls to the client are serialised. <br>
Optional, defaults to <i>true</i>. <br>
If the value passed is not a boolean, the setting is ignored. </li>
<li>jwk: <i>{x: string, y: string, kid: string, kty: string, d: string, crv: string}</i> - JsonWebKey for authentication. <br>
If not provided, client is not authenticated and server might reject the connection depending on configuration.</li>
If not provided, client is not authenticated and server might reject the connection depending on configuration. <br>
No type checks performed on the object passed. </li>
<li>log: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i> - logging function. <br>
If not provided, default logging is used which writes to the console with logging level 'info'.</li>
If not provided, default logging is used which writes to the console with logging level <i>info</i>. <br>
If not a function passed, the setting is ignored. </li>
</ul>
</p></td>
</tr>
Expand Down Expand Up @@ -327,7 +331,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line356">line 356</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line361">line 361</a>
</li></ul></dd>


Expand Down Expand Up @@ -416,7 +420,7 @@ <h4 class="name" id="atNow"><span class="type-signature"></span>atNow<span class

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line375">line 375</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line380">line 380</a>
</li></ul></dd>


Expand Down Expand Up @@ -576,7 +580,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line287">line 287</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line292">line 292</a>
</li></ul></dd>


Expand Down Expand Up @@ -687,7 +691,7 @@ <h4 class="name" id="close"><span class="type-signature">(async) </span>close<sp

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line167">line 167</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line172">line 172</a>
</li></ul></dd>


Expand Down Expand Up @@ -882,7 +886,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line111">line 111</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line116">line 116</a>
</li></ul></dd>


Expand Down Expand Up @@ -1064,7 +1068,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line302">line 302</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line307">line 307</a>
</li></ul></dd>


Expand Down Expand Up @@ -1175,7 +1179,7 @@ <h4 class="name" id="flush"><span class="type-signature">(async) </span>flush<sp

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line180">line 180</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line185">line 185</a>
</li></ul></dd>


Expand Down Expand Up @@ -1357,7 +1361,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line318">line 318</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line323">line 323</a>
</li></ul></dd>


Expand Down Expand Up @@ -1468,7 +1472,7 @@ <h4 class="name" id="reset"><span class="type-signature"></span>reset<span class

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line97">line 97</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line102">line 102</a>
</li></ul></dd>


Expand Down Expand Up @@ -1629,7 +1633,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line82">line 82</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line87">line 87</a>
</li></ul></dd>


Expand Down Expand Up @@ -1789,7 +1793,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line270">line 270</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line275">line 275</a>
</li></ul></dd>


Expand Down Expand Up @@ -1971,7 +1975,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line245">line 245</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line250">line 250</a>
</li></ul></dd>


Expand Down Expand Up @@ -2130,7 +2134,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line224">line 224</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line229">line 229</a>
</li></ul></dd>


Expand Down Expand Up @@ -2312,7 +2316,7 @@ <h5>Parameters:</h5>

<dt class="tag-source">Source:</dt>
<dd class="tag-source"><ul class="dummy"><li>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line338">line 338</a>
<a href="src_sender.js.html">src/sender.js</a>, <a href="src_sender.js.html#line343">line 343</a>
</li></ul></dd>


Expand Down Expand Up @@ -2386,7 +2390,7 @@ <h2><a href="index.html">Home</a></h2><h3>Modules</h3><ul><li><a href="module-@q
<br class="clear">

<footer>
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Wed May 17 2023 18:51:54 GMT+0100 (British Summer Time)
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Fri May 26 2023 10:30:37 GMT+0100 (British Summer Time)
</footer>

<script> prettyPrint(); </script>
Expand Down
74 changes: 73 additions & 1 deletion docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,78 @@ <h3>TypeScript example</h3>
}

run().then(value => console.log(value)).catch(err => console.log(err));
</code></pre>
<h3>Worker threads example</h3>
<pre class="prettyprint source lang-javascript"><code>const { Sender } = require(&quot;@questdb/nodejs-client&quot;);
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// fake venue
// generates random prices for a ticker for max 5 seconds, then the feed closes
function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {&quot;ticker&quot;: ticker, &quot;price&quot;: Math.random()};
}
}

// market data feed simulator
// uses the fake venue to deliver price updates to the feed handler (onTick() callback)
async function subscribe(ticker, onTick) {
const feed = venue(workerData.ticker);
let tick;
while (tick = feed.next().value) {
await onTick(tick);
await sleep(rndInt(30));
}
}

async function run() {
if (isMainThread) {
const tickers = [&quot;t1&quot;, &quot;t2&quot;, &quot;t3&quot;, &quot;t4&quot;];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => { console.log(&quot;Ingested &quot; + msg.count + &quot; prices for ticker &quot; + msg.ticker); });
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender({ bufferSize: 4096 });
await sender.connect({ port: 9009, host: &quot;localhost&quot; });

// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table(&quot;prices&quot;)
.symbol(&quot;ticker&quot;, tick.ticker)
.floatColumn(&quot;price&quot;, tick.price)
.atNow();
await sender.flush();
count++;
});

// let the main thread know how many prices were ingested
parentPort.postMessage({&quot;ticker&quot;: workerData.ticker, &quot;count&quot;: count});

// close the connection to the database
await sender.close();
}
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}

run().catch((err) => console.log(err));
</code></pre></article>
</section>

Expand All @@ -176,7 +248,7 @@ <h2><a href="index.html">Home</a></h2><h3>Modules</h3><ul><li><a href="module-@q
<br class="clear">

<footer>
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Wed May 17 2023 18:51:54 GMT+0100 (British Summer Time)
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Fri May 26 2023 10:30:37 GMT+0100 (British Summer Time)
</footer>

<script> prettyPrint(); </script>
Expand Down
2 changes: 1 addition & 1 deletion docs/index.js.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ <h2><a href="index.html">Home</a></h2><h3>Modules</h3><ul><li><a href="module-@q
<br class="clear">

<footer>
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Wed May 17 2023 18:51:54 GMT+0100 (British Summer Time)
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Fri May 26 2023 10:30:37 GMT+0100 (British Summer Time)
</footer>

<script> prettyPrint(); </script>
Expand Down
2 changes: 1 addition & 1 deletion docs/module-@questdb_nodejs-client.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ <h2><a href="index.html">Home</a></h2><h3>Modules</h3><ul><li><a href="module-@q
<br class="clear">

<footer>
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Wed May 17 2023 18:51:54 GMT+0100 (British Summer Time)
Documentation generated by <a href="https://github.com/jsdoc/jsdoc">JSDoc 3.6.11</a> on Fri May 26 2023 10:30:37 GMT+0100 (British Summer Time)
</footer>

<script> prettyPrint(); </script>
Expand Down
Loading

0 comments on commit 82ced72

Please sign in to comment.