diff --git a/cohort_banking_initiator_js/README.md b/cohort_banking_initiator_js/README.md index d26f3303..5ce97f59 100644 --- a/cohort_banking_initiator_js/README.md +++ b/cohort_banking_initiator_js/README.md @@ -145,6 +145,6 @@ npm start ``` cd $TALOS/cohort_banking_initiator_js -# Genrate 1000 transation requests at the rate of 500 TPS -npm start count=1000 rate=500 +# Genrate 1000 transation requests at the rate of 500 TPS, pick accounts from dataset containing 10k bank accounts +npm start count=1000 rate=500 accounts=10000 ``` \ No newline at end of file diff --git a/cohort_banking_initiator_js/package-lock.json b/cohort_banking_initiator_js/package-lock.json index 074d0c3c..4a6abc15 100644 --- a/cohort_banking_initiator_js/package-lock.json +++ b/cohort_banking_initiator_js/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.1", "license": "UNLICENSED", "dependencies": { - "@kindredgroup/cohort_sdk_client": "^0.0.1-0ed84cd", + "@kindredgroup/cohort_sdk_client": "^0.0.1", "pg": "^8.11.3", "winston": "^3.10.0" }, @@ -117,47 +117,31 @@ } }, "node_modules/@kindredgroup/cohort_sdk_client": { - "version": "0.0.1-0ed84cd", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_client/0.0.1-0ed84cd/3ea570a59cc0546ce4e9bdc86b95250e3510e73c", - "integrity": "sha512-fN28eSsnZVA4fKrkS8qAAxqe6aAyeRHPZyvTsCmgPijBz4BaLd1DFe0oBxigeKYfyqqHRnUhiyhfXVH2jZXJZw==", - "license": "UNLICENSED", + "version": "0.0.1", + "resolved": "http://127.0.0.1:4873/@kindredgroup/cohort_sdk_client/-/cohort_sdk_client-0.0.1.tgz", + "integrity": "sha512-a9igrozgr4eXnToh3SuaRUiB0Ub1HNO8CtbolMOnxk45DAWGCQ2cFjNnlI68+cuXnk01Iv9C+OeFS88PdzJceA==", "dependencies": { - "@kindredgroup/cohort_sdk_js": "^0.0.1-0ed84cd", + "@kindredgroup/cohort_sdk_js": "^0.0.1", "@types/pg": "^8.10.2", "ts-node": "^10.9.1", "winston": "^3.10.0" - }, - "optionalDependencies": { - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" } }, "node_modules/@kindredgroup/cohort_sdk_js": { - "version": "0.0.1-0ed84cd", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_js/0.0.1-0ed84cd/c7074b83176e3f6220d4455dd1584035f5932b1f", - "integrity": "sha512-zyfOzX4x6jsUb3uFpWw3Nn34yvi9wbfKs2VnNgw/Y8rvTyGtwtAslv6JaE6E9tb+yut4UEhB4jX/Ki0kJo63ew==", + "version": "0.0.1", + "resolved": "http://127.0.0.1:4873/@kindredgroup/cohort_sdk_js/-/cohort_sdk_js-0.0.1.tgz", + "integrity": "sha512-MQ72nLF/eoyCpJkIvsDYwLxnmEijG8nTCWnHoivHaMY93zWZc70g7B1lJMGNxv5of4nfEhg/7hazA53noSYt2A==", "hasInstallScript": true, - "license": "UNLICENSED", "dependencies": { "@napi-rs/cli": "^2.16.0" }, "engines": { "node": ">= 10" - }, - "optionalDependencies": { - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" } }, "node_modules/@napi-rs/cli": { "version": "2.16.3", - "resolved": "https://registry.npmjs.org/@napi-rs/cli/-/cli-2.16.3.tgz", + "resolved": "http://127.0.0.1:4873/@napi-rs/cli/-/cli-2.16.3.tgz", "integrity": "sha512-3mLNPlbbOhpbIUKicLrJtIearlHXUuXL3UeueYyRRplpVMNkdn8xCyzY6PcYZi3JXR8bmCOiWgkVmLnrSL7DKw==", "bin": { "napi": "scripts/index.js" @@ -1716,37 +1700,27 @@ } }, "@kindredgroup/cohort_sdk_client": { - "version": "0.0.1-0ed84cd", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_client/0.0.1-0ed84cd/3ea570a59cc0546ce4e9bdc86b95250e3510e73c", - "integrity": "sha512-fN28eSsnZVA4fKrkS8qAAxqe6aAyeRHPZyvTsCmgPijBz4BaLd1DFe0oBxigeKYfyqqHRnUhiyhfXVH2jZXJZw==", + "version": "0.0.1", + "resolved": "http://127.0.0.1:4873/@kindredgroup/cohort_sdk_client/-/cohort_sdk_client-0.0.1.tgz", + "integrity": "sha512-a9igrozgr4eXnToh3SuaRUiB0Ub1HNO8CtbolMOnxk45DAWGCQ2cFjNnlI68+cuXnk01Iv9C+OeFS88PdzJceA==", "requires": { - "@kindredgroup/cohort_sdk_js": "^0.0.1-0ed84cd", + "@kindredgroup/cohort_sdk_js": "^0.0.1", "@types/pg": "^8.10.2", - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1", "ts-node": "^10.9.1", "winston": "^3.10.0" } }, "@kindredgroup/cohort_sdk_js": { - "version": "0.0.1-0ed84cd", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_js/0.0.1-0ed84cd/c7074b83176e3f6220d4455dd1584035f5932b1f", - "integrity": "sha512-zyfOzX4x6jsUb3uFpWw3Nn34yvi9wbfKs2VnNgw/Y8rvTyGtwtAslv6JaE6E9tb+yut4UEhB4jX/Ki0kJo63ew==", + "version": "0.0.1", + "resolved": "http://127.0.0.1:4873/@kindredgroup/cohort_sdk_js/-/cohort_sdk_js-0.0.1.tgz", + "integrity": "sha512-MQ72nLF/eoyCpJkIvsDYwLxnmEijG8nTCWnHoivHaMY93zWZc70g7B1lJMGNxv5of4nfEhg/7hazA53noSYt2A==", "requires": { - "@napi-rs/cli": "^2.16.0", - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" + "@napi-rs/cli": "^2.16.0" } }, "@napi-rs/cli": { "version": "2.16.3", - "resolved": "https://registry.npmjs.org/@napi-rs/cli/-/cli-2.16.3.tgz", + "resolved": "http://127.0.0.1:4873/@napi-rs/cli/-/cli-2.16.3.tgz", "integrity": "sha512-3mLNPlbbOhpbIUKicLrJtIearlHXUuXL3UeueYyRRplpVMNkdn8xCyzY6PcYZi3JXR8bmCOiWgkVmLnrSL7DKw==" }, "@tsconfig/node10": { diff --git a/cohort_banking_initiator_js/package.json b/cohort_banking_initiator_js/package.json index 871f7b16..341a928b 100644 --- a/cohort_banking_initiator_js/package.json +++ b/cohort_banking_initiator_js/package.json @@ -19,7 +19,7 @@ "typescript": "^5.1.6" }, "dependencies": { - "@kindredgroup/cohort_sdk_client": "^0.0.1-0ed84cd", + "@kindredgroup/cohort_sdk_client": "^0.0.1", "pg": "^8.11.3", "winston": "^3.10.0" } diff --git a/cohort_banking_initiator_js/src/banking-app.ts b/cohort_banking_initiator_js/src/banking-app.ts index 1395ed35..b95d6632 100644 --- a/cohort_banking_initiator_js/src/banking-app.ts +++ b/cohort_banking_initiator_js/src/banking-app.ts @@ -99,12 +99,13 @@ export class BankingApp { async () => { const s = Date.now() const newRequest = await this.createNewRequest(tx) as any + logger.info("%s", JSON.stringify(newRequest, null, 2)) const n = Date.now() stateEnd = n - span_s stateDuration = n - s return { newRequest } }, - async (_e, request: OutOfOrderRequest) => { + async (request: OutOfOrderRequest) => { const s = Date.now() const r = await this.installOutOfOrder(tx, request) as any const n = Date.now() diff --git a/cohort_banking_initiator_js/src/load-generator.ts b/cohort_banking_initiator_js/src/load-generator.ts index 275c4a02..142ba738 100644 --- a/cohort_banking_initiator_js/src/load-generator.ts +++ b/cohort_banking_initiator_js/src/load-generator.ts @@ -73,8 +73,8 @@ export function createGeneratorService(settings: any): Worker { } if (!isMainThread) { - const { count, channelName, rate } = workerData.settings - const generator = new LoadGenerator(100_000, 100) + const { count, channelName, rate, accounts } = workerData.settings + const generator = new LoadGenerator(accounts, 100) logger.info("Load generator will generate: %d transactions at the reate of %d TPS", count, rate.toFixed(2)) new Promise(async () => { diff --git a/cohort_banking_initiator_js/src/main.ts b/cohort_banking_initiator_js/src/main.ts index 644d409a..00f82bf0 100644 --- a/cohort_banking_initiator_js/src/main.ts +++ b/cohort_banking_initiator_js/src/main.ts @@ -22,6 +22,7 @@ const printMetrics = (spans: Array) => { class LaunchParams { transactionsCount: number = 10_000 targetRatePerSecond: number = 1_000 + totalAccounts: number = 100_000 static parse(args: string[]): LaunchParams { const params = new LaunchParams() @@ -37,6 +38,8 @@ class LaunchParams { params.transactionsCount = parseInt(arg.replaceAll("count=", "")) } else if (arg.startsWith("rate")) { params.targetRatePerSecond = parseInt(arg.replaceAll("rate=", "")) + } else if (arg.startsWith("accounts")) { + params.totalAccounts = parseInt(arg.replaceAll("accounts=", "")) } } @@ -86,5 +89,10 @@ new Promise(async (resolve) => { } throw e } - const _worker = createGeneratorService({ channelName: CHANNEL_NAME, count: params.transactionsCount, rate: params.targetRatePerSecond }) + const _worker = createGeneratorService({ + channelName: CHANNEL_NAME, + count: params.transactionsCount, + rate: params.targetRatePerSecond, + accounts: params.totalAccounts, + }) }) diff --git a/cohort_banking_replicator_js/src/main.ts b/cohort_banking_replicator_js/src/main.ts index ae072f1f..ec78c9dd 100644 --- a/cohort_banking_replicator_js/src/main.ts +++ b/cohort_banking_replicator_js/src/main.ts @@ -1,4 +1,4 @@ -import { Replicator, TalosSdkError } from "@kindredgroup/cohort_sdk_client" +import { JsStatemapAndSnapshot, Replicator, TalosSdkError } from "@kindredgroup/cohort_sdk_client" import { Database } from "./database" import { KAFKA_CONFIG } from "./cfg/config-kafka" @@ -17,7 +17,7 @@ new Promise(async (_resolve) => { try { await replicator.run( async () => await database.getSnapshot(), - async (_, params) => await database.install(params, { delayMs: 2, maxAttempts: 50 }) + async (data: JsStatemapAndSnapshot) => await database.install(data, { delayMs: 2, maxAttempts: 50 }) ) logger.info("Replciator is running ...") } catch (e) { diff --git a/cohort_sdk_client/README.md b/cohort_sdk_client/README.md index f9dfb0cd..c8f3d1f2 100644 --- a/cohort_sdk_client/README.md +++ b/cohort_sdk_client/README.md @@ -1 +1,305 @@ -# The JavaScript binding for Cohort SDK \ No newline at end of file +# The JavaScript binding for Cohort SDK + +## About +This module contains JavaScript SDK for creating Talos Cohort Initiator or Replicator applications in JavaScript. It is bundled as NPM package and available in [GitHub Packages NPM repository](https://github.com/orgs/kindredgroup/packages?repo_name=talos). + +Logically it contains two parts: the internal implementation and public SDK. The internal implementation is made as a separate NPM module `@kindredgroup/cohort_sdk_js` and provided as dependency to `cohort_sdk_client`. + +## Cohort SDK JS + The "cohort_sdk_js" module is a bridge between Rust and JavaScript utilising Node-API - an API for building native Addons. In our case the addon is written in Rust. So, the actual Talos Cohort SDK can be found in `packages/cohort_sdk` package. The `cohort_sdk_js` should be considered an internal project as it is not designed to be consumed directly by JS apps. Applications should use `cohort_sdk_client`. + + ## Cohort SDK Client +The "cohort_sdk_client" module is a wrapper around `cohort_sdk_js` which makes use of SDK a bit easier as it smoothens some rough edges sticking out of pure Node-API for native bindings. + +The example of SDK usage can be found in `cohort_banking_initiator_js` and in `cohort_banking_replicator_js` modules. + +The SDK allows users to create certification transaction initiator and certification transaction replicator in JavaScript either as a single app or as two separate applications. + +# The Initiator API +The role of the Initiator is to validate transactions locally, submit updates for certification and apply submitted updates out-of-order. + +```TypeScript +// Create and use initiator +const initiator = await Initiator.init(sdkConfig) + +const fnMakeNewRequestCallback = async (...) => { + // some logic how to create a new certification request + // based on the current state of the app +}; + +const fnOooInstaller = async (...) => { + // some logic how to install certified transaction + // into local database. +} + +const response = await initiator.certify(fnMakeNewRequestCallback, fnOooInstaller) + +if (response.decision === Decision.Committed) { + // do something +} + +``` + +The `Initiator.certify` method accepts two callback functions: + +1. The factory to create a new certification request, let's call it "New Request Callback". +2. The function to update database once transaction was certified by Talos. This callback will not be invoked if Talos did not certify the transaction. + +These functions are known as Initiator callbacks. + +## About "New Request Callback" + +The details of how to create a new request is delegated to you. The shape of the callback function looks like this: +```TypeScript +async (): Promise +``` +- It receives no parameters. +- It returns `JsCertificationCandidateCallbackResponse` + +```TypeScript +export interface JsCertificationCandidateCallbackResponse { + cancellationReason?: string + newRequest?: JsCertificationRequestPayload +} + +export interface JsCertificationRequestPayload { + candidate: JsCertificationCandidate + snapshot: number + timeoutMs: number +} + +export interface JsCertificationCandidate { + readset: Array + writeset: Array + readvers: Array + statemaps?: Array> +} +``` + +### About "JsCertificationCandidate" + +Before SDK can issue a certification request to Talos Certifier it needs some details from you. You will have to query your local database to fetch the following: +1. Identifiers and version numbers of all objects involved in your transaction. These are known as `readset`, `writeset` and `readvers`. +2. The copy of your transaction as one serializable object. It makes sense to describe your transaction as JSON object and serialise it to string. This is known as `statemap`. + +Above mentioned reads, writes and statemap fields together are known as certification candidate details. Talos and SDK does not use statemap. The statemap is given back to you along with the response to your certification request. When you receive response, you will use details encoded in statemap to understand what transaction has been certified just now and what to do next. In the example app, which simulates a banking transaction to transfer money between two accounts, we define the statemap as action with some parameters: + +```json +[ + { + "TRANSFER": { + "from": "account 1", + "to": "account 2", + "amount": "100.00" + } + } +] +``` +This is free form object. When Talos certifies the transaction, we know that the request to transfer 100.00 from account1 to account2 can go ahead, as installer function has all the necessary details to implement this transfer. +You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. So, from the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process but it still needs access to the statemap. + +### About "JsCertificationRequestPayload" + +Talos is making a certification decision purely based on two major pieces of data. 1) the decisions it made to previous requests issued by you or by other cohorts 2) the state of objects in your database. In order to have a full picture, Talos Certifier needs to know not only _the state of your objects taking part in the current transaction_ but also the _global state of your cohort_. The global state is known as the snapshot version. So, you need to pass your objects (candidate) and your current global state (snapshot). Check this structure `JsCertificationRequestPayload`. + +The response to your certification request will be asynchronously received by SDK. However, the async aspect of it is hidden from you for simplicity of usage. Your call to `await .certify(...)` will block until request is available or until it times out. The optional value of `JsCertificationRequestPayload.timeoutMs` attribute allows you to specify how long you are willing to wait for that specific response. If not provided, then value will be taken from `JsInitiatorConfig.timeoutMs`. + +### About "JsCertificationCandidateCallbackResponse" + +What happens when you need to certify some transaction but: +- the data you have passed to Talos resulted in abort decision because it was outdated? +- there was a short network disruption communicating with either Kafka or your local DB (during execution of "New Request Callback")? + +Most likely you will want to retry your request. The SDK implements retry with incremental random backoff logic. It is configurable via `JsInitiatorConfig.retry*` settings. Upon failure of "current" attempt the "New Request Callback" which you have provided to `.certify(here, ...)` will be invoked again (after some short delay). As your database is used in the concurrent fashion by multiple instances of your cohort and by other processes in your application, it is very likely that state of your objects or the global state changed while SDK was processing this current attempt. So, the retry mechanism inside SDK will make sure that request will be re-issued, however, you, as cohort developer, should make sure that you are providing the latest information about your state to Talos. So, once attempt to certify fails, your "New Request Callback" will be invoked again, and you will load a new fresh state from DB. By default, this "invoke Talos -> fail -> sleep -> reload -> invoke Talos" loop may go on until we got successful response from Talos, until we timed out or until we exhausted max number of attempts. The SDK allows you to tap in and cancel a certification request by providing any reason in `JsCertificationCandidateCallbackResponse.cancellationReason`. Why would you want to cancel the transaction? It depends on the state of your objects, what if object was deleted from DB or became a read-only etc. It depends on your application. + +## About "Out of Order Install Callback" + +If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects. + +The shape of the callback function looks like this: +```TypeScript +async (oooRequest: JsOutOfOrderRequest): Promise; +``` + +- It receives `JsOutOfOrderRequest` +- It returns `JsOutOfOrderInstallOutcome` + +What do you need to do inside "Out of Order Install Callback"? +Two things: +1. Implement your business transaction, for example, transfer the money. +2. Update state of your objects, by bumping up their version numbers. (What version number to set is given in the parameters, see `OutOfOrderRequest.newVersion`. You will set this version number to all affected objects). + +Parameters to this callback: + +```TypeScript +export interface OutOfOrderRequest { + xid: string + safepoint: number + newVersion: number +} +``` + +- `OutOfOrderRequest.xid` - the unique ID of certification transaction attempt +- `newVersion` - when certified your objects state should be set to this new version. IF YOUR OBJECTS ARE ALREADY ON VERSION BIGGER OR EQUAL TO THIS, YOU SHOULD NOT UPDATE DB AT ALL. (In other words - if your objects were updated by some other transaction then you are risking overriding these changes, hence you should not update your objects.) +- `safepoint` - it is expected that your global state did not move past this version. IF YOUR COHORT SNAPSHOT IS SMALLER THAN THIS, YOU SHOULD NOT UPDATE DB AT ALL (In other words - if your global state has not reached the point when it is safe to update it, then no updates should be made.) + +What response to return from "Out of Order Install Callback"? + +```TypeScript +export const enum JsOutOfOrderInstallOutcome { + Installed, + InstalledAlready, + SafepointCondition +} +``` + +- `JsOutOfOrderInstallOutcome.Installed` if you successfully updated your objects +- `JsOutOfOrderInstallOutcome.InstalledAlready` if your objects have been updated by some other transaction and you made no changes +- `JsOutOfOrderInstallOutcome.SafepointCondition` if your global state hasn't reached the safe point. + +## The Initiator API Errors + +Some things may go wrong when working with Initiator API. Possible errors can be detected by checking `TalosSdkError` + +```TypeScript + try { + const tx = ... + const resp = initiator.certify(...) + } catch (e) { + if (e instanceof TalosSdkError) { + const sdkError = e as TalosSdkError + logger.error("Unable to process tx: %s. ", JSON.stringify(tx), + "\nTalosSdkError.message: " + sdkError.message + "\nTalosSdkError.kind: " + sdkError.kind + "\nTalosSdkError.name: " + sdkError.name + "\nTalosSdkError.cause: " + sdkError.cause + "\nTalosSdkError.stack: " + sdkError.stack) + } else { + logger.error("Unable to process tx: %s. Error: %s", JSON.stringify(tx), e) + } + } +``` + +Where `TalosSdkError.kind` is defined as: + +```TypeScript +export const enum SdkErrorKind { + Certification, + CertificationTimeout, + Cancelled, + Messaging, + Persistence, + Internal, + OutOfOrderCallbackFailed, + OutOfOrderSnapshotTimeout +} +``` + +| Kind | Description | +| ------------- | ------------- | +| `Certification` | Error during certification | +| `CertificationTimeout` | The timeout as configured | +| `Cancelled` | You have cancelled the certification retry attempt via `JsCertificationCandidateCallbackResponse.cancellationReason`| +| `Messaging` | Error communicating with Kafka broker | +| `Persistence` | Error communicating with database | +| `Internal` | Some unexpected SDK error | +| `OutOfOrderCallbackFailed` | Error when invoking out of order installer callback | +| `OutOfOrderSnapshotTimeout` | You have indicated that out of order install was not successful because of `JsOutOfOrderInstallOutcome.SafepointCondition` and we exhausted all retry attempts. | + +## The Initiator API Configuration + +See structure defined in `JsInitiatorConfig` + +| Parameter | Description | Example, default | +| ---------- | ----------- | ---------------- | +| `retryAttemptsMax` | This parameter comes into play during re-try logic implemented in SDK. It controls many times to re-try the certification request. | 10 | +| `retryBackoff.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 20 | +| `retryBackoff.maxMs` | | 1500 | +| `retryOoAttemptsMax` | This parameter comes into play during re-try logic implemented in SDK. How many times to re-try installing statemap. The difference between this parameter and `retryAttemptsMax` is that this retry is implemented for each certification attempt.| 10 | +| `retryOoBackoff.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 20 | +| `retryOoBackoff.maxMs` | | 1000 | +| `backoffOnConflict.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 1 | +| `backoffOnConflict.maxMs` | | 1500 | +| `snapshotWaitTimeoutMs` | This parameter comes into play during retry logic implemented in SDK. When Talos certifier aborts the transaction (if conflict is detected) then you should wait until your global state reaches the safe point. This is number of millis to wait until `OutOfOrderSnapshotTimeout` is raised. | 10000 | +| `agent` | Cohort Agent Name, this must be unique per kafka consumer instance. Passed in the kafka mesage header `certAgent`. Used to correlate response message with certification request message issued by same process. | | +| `cohort` | Cohort name. This param, and `agent` param are passed to Talos in the candidate message payload and returned in the decision message. | | +| `bufferSize` | The size of internal buffer in certification messages. | 10000 | +| `timeoutMs` | The number of millis SDK should wait for response from Talos before giving up. | | +| `kafka.brokers` | The array of kafka brokers. | `["127.0.0.1:9092"]` | +| `kafka.topic` | The certification topic name. The same as used by Talos Certifier. | | +| `kafka.clientId` | The client id of connecting cohort. | | +| `kafka.groupId` | TODO: Explain why it should not be set for cohort. | | +| `kafka.username` | Kafka auth | | +| `kafka.password` | Kafka auth | | +| `kafka.producerConfigOverrides` | This is `Map` All keys and values are passed directly to underlying kafka library when configuring produser. | `{ "enable.auto.commit": "false" }` | +| `kafka.consumerConfigOverrides` | This is `Map` All keys and values are passed directly to underlying kafka library when configuring consumer. | | +| `kafka.producerSendTimeoutMs` | The `queue_timeout` parameter controls how long to retry for if the librdkafka producer queue is full. | 10 | +| `kafka.logLevel` | The verbocity level of kafka library. One of "alert", "critical", "debug", "emerg", "error", "info", "notice", "warning". | "info" | + + +# The Replicator API +The role of Replicator in Talos ecosystem is to listen to certification decisions and apply transactions in the same order as they were certified. + +```TypeScript +const replicator = await Replicator.init(...) + +const fnSnapshotProviderCallback = async () => {...} +const fnStatemapInstallerCallback = async () => {...} +// this will run indefinitely +await replicator.run(fnSnapshotProviderCallback, fnTransactionInstaller) +``` +The replicator is a server running in the background. It connects to Kafka and recieves all messages from certification topic. Regardless of the order messages were received, the replicator will re-order them and feed into your `fnStatemapInstallerCallback` in the correct order. Your role here is to implement both callbacks. + +## About "Snapshot Provider Callback" +The callback has the shape of simple getter function which returns the "global state" - the latest version number which you have in your DB. The signature of the function looks like this: + +```TypeScript +async (): Promise +``` +- It receives no parameters. +- It returns the snapshot version as number + +This callback is invoked only once during startup of Replicator server. It fetches current snapshot. Replicator uses current snapshot version as a point in time marker which tells Replictor not to react to any messages older than that version. + +## About "Statemap Installer Callback" +This statemap installer callback looks like this: + +```TypeScript +async (data: JsStatemapAndSnapshot): Promise +``` +- It receives `JsStatemapAndSnapshot` +- There is no return value, if function completes successfully SDK will proceed to handling the next transaction. + +```TypeScript +export interface JsStatemapAndSnapshot { + statemap: Array + version: number +} + +export interface JsStatemapItem { + action: string + version: number + safepoint?: number + payload: any +} +``` +The purpose of statemap installer callback is to "catch-up" on changes made by other cohorts (including your cohort). + +In the deployment where there is no Talos present, microservices typically implement eventual consistency model by utilising messaging broker. For example, Kafka. If changes are made to bank account in Cohort 1 that change will eventually propagate to Cohort 2 through messaging middleware. + +In the deployment with Talos, the replicator, specifically this callback, is responsible for updating the database so that your cohort has the up-to-date view of bank account (or any other shared object). The async messaging is abstracted away from you by SDK. You just provide a handler how you want your database to be updated. + +However, there are few rules which Talos protocol expects you to implement. + +- Once you are done updating business objects in your database, it is very important to update the global snapshot. The replicator, specifically this callback, is the only place within cohort which is responsible for writing into snapshot table. +- The version number in the snapshot table should only be incremented. If it happens that callback is invoked with older version, then no change should be made to database. +- The change to business objects and to snapshot table should be atomic (in a single DB transaction with isolation level matching repeatable read or stricter). +- When callback is invoked, it is possible that your business objects are already updated. In this case, the job of callback is to update the snapshot table only. + - This may happen if replicator and initiator belong to the same cohort, for example, out of order installer in initiator may have executed and updated our business objects before the replicator. However, installer should never write to snapshot table. + - When replicator belong to different cohort, it is just catching up on the changes made by other cohorts, hence it may not encounter the state when business objects are updated already. Unless there was some contingency, like unexpected restart. +- When updating business objects, also update their versions so that versions match with snapshot version. + +What value to write into snapshot table? Use this attribute: `JsStatemapAndSnapshot.version` +What version to set into business objects? Use this attribute: `JsStatemapAndSnapshot.version` + +## About Statemap \ No newline at end of file diff --git a/cohort_sdk_client/package-lock.json b/cohort_sdk_client/package-lock.json index e3c65dc7..578cb52d 100644 --- a/cohort_sdk_client/package-lock.json +++ b/cohort_sdk_client/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.1", "license": "UNLICENSED", "dependencies": { - "@kindredgroup/cohort_sdk_js": "^0.0.1-b4185b0", + "@kindredgroup/cohort_sdk_js": "^0.0.1", "@types/pg": "^8.10.2", "ts-node": "^10.9.1", "winston": "^3.10.0" @@ -17,13 +17,6 @@ "devDependencies": { "@types/node": "^20.5.1", "typescript": "^5.1.6" - }, - "optionalDependencies": { - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" } }, "../../sd/package": { @@ -122,23 +115,15 @@ } }, "node_modules/@kindredgroup/cohort_sdk_js": { - "version": "0.0.1-b4185b0", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_js/0.0.1-b4185b0/e842a75d8f0b30ea907594e004035595ef2f9b8c", - "integrity": "sha512-gOKF0P41vVwMdnnxgWJJsVyoS1YekRD9HW2oQWnZdIw0KcwQ65fonV8G2BlbZ+kK6z9/7C8pz7UGZpqy7UJKdA==", + "version": "0.0.1", + "resolved": "http://localhost:4873/@kindredgroup/cohort_sdk_js/-/cohort_sdk_js-0.0.1.tgz", + "integrity": "sha512-MQ72nLF/eoyCpJkIvsDYwLxnmEijG8nTCWnHoivHaMY93zWZc70g7B1lJMGNxv5of4nfEhg/7hazA53noSYt2A==", "hasInstallScript": true, - "license": "UNLICENSED", "dependencies": { "@napi-rs/cli": "^2.16.0" }, "engines": { "node": ">= 10" - }, - "optionalDependencies": { - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" } }, "node_modules/@napi-rs/cli": { @@ -661,16 +646,11 @@ } }, "@kindredgroup/cohort_sdk_js": { - "version": "0.0.1-b4185b0", - "resolved": "https://npm.pkg.github.com/download/@kindredgroup/cohort_sdk_js/0.0.1-b4185b0/e842a75d8f0b30ea907594e004035595ef2f9b8c", - "integrity": "sha512-gOKF0P41vVwMdnnxgWJJsVyoS1YekRD9HW2oQWnZdIw0KcwQ65fonV8G2BlbZ+kK6z9/7C8pz7UGZpqy7UJKdA==", + "version": "0.0.1", + "resolved": "http://localhost:4873/@kindredgroup/cohort_sdk_js/-/cohort_sdk_js-0.0.1.tgz", + "integrity": "sha512-MQ72nLF/eoyCpJkIvsDYwLxnmEijG8nTCWnHoivHaMY93zWZc70g7B1lJMGNxv5of4nfEhg/7hazA53noSYt2A==", "requires": { - "@napi-rs/cli": "^2.16.0", - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" + "@napi-rs/cli": "^2.16.0" } }, "@napi-rs/cli": { diff --git a/cohort_sdk_client/package.json b/cohort_sdk_client/package.json index b435feb1..a42e63a1 100644 --- a/cohort_sdk_client/package.json +++ b/cohort_sdk_client/package.json @@ -16,16 +16,9 @@ "build": "npx tsc -d" }, "dependencies": { - "@kindredgroup/cohort_sdk_js": "^0.0.1-b4185b0", + "@kindredgroup/cohort_sdk_js": "^0.0.1", "@types/pg": "^8.10.2", "ts-node": "^10.9.1", "winston": "^3.10.0" - }, - "optionalDependencies": { - "cohort_sdk_js-darwin-arm64": "0.0.1", - "cohort_sdk_js-darwin-universal": "0.0.1", - "cohort_sdk_js-darwin-x64": "0.0.1", - "cohort_sdk_js-linux-x64-gnu": "0.0.1", - "cohort_sdk_js-win32-x64-msvc": "0.0.1" } } diff --git a/cohort_sdk_client/src/initiator.ts b/cohort_sdk_client/src/initiator.ts index 4b452d86..57477ba7 100644 --- a/cohort_sdk_client/src/initiator.ts +++ b/cohort_sdk_client/src/initiator.ts @@ -1,4 +1,4 @@ -import { InternalInitiator, JsInitiatorConfig, OutOfOrderRequest, SdkErrorKind, JsCertificationResponse } from "@kindredgroup/cohort_sdk_js" +import { InternalInitiator, JsInitiatorConfig, OutOfOrderRequest, SdkErrorKind, JsCertificationResponse, JsCertificationCandidateCallbackResponse, JsOutOfOrderInstallOutcome } from "@kindredgroup/cohort_sdk_js" import { isSdkError } from "./internal" import { TalosSdkError } from "." @@ -19,9 +19,22 @@ export class Initiator { constructor(readonly impl: InternalInitiator) {} - async certify(makeNewRequestCallback: () => Promise, oooCallback: (err: Error | null, value: OutOfOrderRequest) => any): Promise { + async certify(makeNewRequestCallback: () => Promise, oooInstallCallback: (oooRequest: OutOfOrderRequest) => Promise): Promise { try { - return await this.impl.certify(makeNewRequestCallback, oooCallback) + // This will hide the 'error' parameter from callback (it comes from NAPI). + const adaptedOooInstallCallback = async (error: Error | null, oooRequest: OutOfOrderRequest): Promise => { + if (error) { + throw new TalosSdkError( + SdkErrorKind.Internal, + "Call from native code into 'oooInstallCallback' function provided by JS has failed at the NAPI layer. See the 'cause' field for more details.", + { cause: error } + ) + } else { + return await oooInstallCallback(oooRequest) + } + } + + return await this.impl.certify(makeNewRequestCallback, adaptedOooInstallCallback) } catch (e) { const reason: string = e.message if (isSdkError(reason)) { diff --git a/cohort_sdk_client/src/replicator.ts b/cohort_sdk_client/src/replicator.ts index 231932b0..2d67aa61 100644 --- a/cohort_sdk_client/src/replicator.ts +++ b/cohort_sdk_client/src/replicator.ts @@ -19,9 +19,22 @@ export class Replicator { constructor(readonly impl: InternalReplicator) {} - async run(snapshotProviderCallback: () => Promise, statemapInstallerCallback: (err: Error | null, value: JsStatemapAndSnapshot) => any) { + async run(snapshotProviderCallback: () => Promise, statemapInstallerCallback: (data: JsStatemapAndSnapshot) => Promise) { try { - await this.impl.run(snapshotProviderCallback, statemapInstallerCallback) + // This will hide the 'error' parameter from callback (it comes from NAPI). + const adaptedStatemapInstallerCallback = async (error: Error | null, data: JsStatemapAndSnapshot): Promise => { + if (error) { + throw new TalosSdkError( + SdkErrorKind.Internal, + "Call from native code into 'statemapInstallerCallback' function provided by JS has failed at the NAPI layer. See the 'cause' field for more details.", + { cause: error } + ) + } else { + return await statemapInstallerCallback(data) + } + } + + await this.impl.run(snapshotProviderCallback, adaptedStatemapInstallerCallback) } catch(e) { const reason: string = e.message if (isSdkError(reason)) { diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 00ccbdd2..72b451f1 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -332,7 +332,7 @@ impl Cohort { // The loop exits when either of the below conditions are met. // 1. When commit decision is received from talos agent/certifier. - // 2. When an clientabort is requested. + // 2. When an client abort is requested. // 3. When all retries are exhausted. let final_result = loop { // Await for snapshot and build the certification request payload. diff --git a/packages/cohort_sdk_js/src/initiator/mod.rs b/packages/cohort_sdk_js/src/initiator/mod.rs index f9806130..a25278ea 100644 --- a/packages/cohort_sdk_js/src/initiator/mod.rs +++ b/packages/cohort_sdk_js/src/initiator/mod.rs @@ -182,8 +182,10 @@ impl InternalInitiator { #[napi] pub async fn certify( &self, - #[napi(ts_arg_type = "() => Promise")] make_new_request_callback: ThreadsafeFunction<()>, - ooo_callback: ThreadsafeFunction, + #[napi(ts_arg_type = "() => Promise")] make_new_request_callback: ThreadsafeFunction<()>, + #[napi(ts_arg_type = "(error: Error | null, ooRequest: OutOfOrderRequest) => Promise")] ooo_callback: ThreadsafeFunction< + OutOfOrderRequest, + >, ) -> napi::Result { let new_request_provider = NewRequestProvider { make_new_request_callback }; let ooo_impl = OutOfOrderInstallerImpl { ooo_callback }; diff --git a/packages/cohort_sdk_js/src/installer/mod.rs b/packages/cohort_sdk_js/src/installer/mod.rs index 650e6c5f..4fb7239c 100644 --- a/packages/cohort_sdk_js/src/installer/mod.rs +++ b/packages/cohort_sdk_js/src/installer/mod.rs @@ -85,7 +85,9 @@ impl InternalReplicator { pub async fn run( &self, #[napi(ts_arg_type = "() => Promise")] snapshot_provider_callback: ThreadsafeFunction<()>, - statemap_installer_callback: ThreadsafeFunction, + #[napi(ts_arg_type = "(error: Error | null, data: JsStatemapAndSnapshot) => Promise")] statemap_installer_callback: ThreadsafeFunction< + JsStatemapAndSnapshot, + >, ) -> napi::Result<()> { let kafka_consumer = KafkaConsumer::new(&self.kafka_config.clone().into()); let brokers = &self.kafka_config.brokers.clone();