diff --git a/cohort_banking_replicator_js/src/main.ts b/cohort_banking_replicator_js/src/main.ts index ae072f1f..531b55ee 100644 --- a/cohort_banking_replicator_js/src/main.ts +++ b/cohort_banking_replicator_js/src/main.ts @@ -17,7 +17,7 @@ new Promise(async (_resolve) => { try { await replicator.run( async () => await database.getSnapshot(), - async (_, params) => await database.install(params, { delayMs: 2, maxAttempts: 50 }) + async (data) => await database.install(data, { delayMs: 2, maxAttempts: 50 }) ) logger.info("Replciator is running ...") } catch (e) { diff --git a/cohort_sdk_client/README.md b/cohort_sdk_client/README.md index 03730afc..b95390ad 100644 --- a/cohort_sdk_client/README.md +++ b/cohort_sdk_client/README.md @@ -16,7 +16,7 @@ The example of SDK usage can be found in `cohort_banking_initiator_js` and in `c The SDK allows users to create certification transaction initiator and certification transaction replicator in JavaScript either as a single app or as two separate applications. # The Initiator API -The role of Initiator in Talos ecosystem is to send certification request to Talos Certifier and receive response. +The role of the Initiator is to validate transactions locally, submit updates for certification and apply submitted updates out-of-order. ```TypeScript // Create and use initiator @@ -51,7 +51,7 @@ These functions are known as Initiator callbacks. The details of how to create a new request is delegated to you. The shape of the callback function looks like this: ```TypeScript -const fnMakeNewRequestCallback = async () => { ... }; +async (): Promise ``` - It receives no parameters. - It returns `JsCertificationCandidateCallbackResponse` @@ -118,7 +118,7 @@ If your business transaction requires a certification from Talos, it is expected The shape of the callback function looks like this: ```TypeScript -const fnOooInstaller = async (oooRequest: JsOutOfOrderRequest): Promise => { ... }; +async (oooRequest: JsOutOfOrderRequest): Promise; ``` - It receives `JsOutOfOrderRequest` @@ -237,4 +237,68 @@ See structure defined in `JsInitiatorConfig` # The Replicator API -TODO: ... +The role of Replicator in Talos ecosystem is to listen to certification decisions and apply transactions in the same order as they were certified. + +```TypeScript +const replicator = await Replicator.init(...) + +const fnSnapshotProviderCallback = async () => {...} +const fnStatemapInstallerCallback = async () => {...} +// this will run indefinitely +await replicator.run(fnSnapshotProviderCallback, fnTransactionInstaller) +``` +The replicator is a server running in the background. It connects to Kafka and recieves all messages from certification topic. Regardless of the order messages were received, the replicator will re-order them and feed into your `fnStatemapInstallerCallback` in the correct order. Your role here is to implement both callbacks. + +## About "Snapshot Provider Callback" +The callback has the shape of simple getter function which returns the "global state" - the latest version number which you have in your DB. The signature of the function looks like this: + +```TypeScript +async (): Promise +``` +- It receives no parameters. +- It returns the snapshot version as number + +This callback is invoked only once during startup of Replicator server. It fetches current snapshot. Replicator uses current snapshot version as a point in time marker which tells Replictor not to react to any messages older than that version. + +## About "Statemap Installer Callback" +This statemap installer callback looks like this: + +```TypeScript +async (data: JsStatemapAndSnapshot): Promise +``` +- It receives `JsStatemapAndSnapshot` +- There is no return value, if function completes successfully SDK will proceed to handling the next transaction. + +```TypeScript +export interface JsStatemapAndSnapshot { + statemap: Array + version: number +} + +export interface JsStatemapItem { + action: string + version: number + safepoint?: number + payload: any +} +``` +The purpose of statemap installer callback is to "catch-up" on changes made by other* cohorts. + +In the deployment where there is no Talos present, microservices typically implement eventual consistency model by utilising messaging broker. For example, Kafka. If changes are made to bank account in Cohort 1 that change will eventually propagate to Cohort 2 through messaging middleware. + +In the deployment with Talos, the replicator, specifically this callback, is responsible for updating the database so that your cohort has the up-to-date view of bank account (or any other shared object). The async messaging is abstracted away from you by SDK. You just provide a handler how you want your database to be updated. + +However, there are few rules which Talos protocols expects you to implement. + +- Once you are done updating business objects in your database, it is very important to update the global stapshot. The replicator, specifically this callback, is the only place within cohort which is responsible for writing into snapshots table. +- The version number in the snapshots table should only be incremented. If it happens that callabck is invoked with older version, then no change should be made to database. +- The change to business objects and to snapshots table should be atomic (in a single DB transaction with isolation level matching repeatable read or stricter). +- When callback is invoked, it is possible that your business objects are already updated. In this case, the job of callback is to update the snapshot table only. + - This may happen if replicator and initiator belong to the same cohort, so initiator's out of order installer may have executed before the replicator and updated our business objects. However, installer should never write to snapshots table. + - When replicator belong to different cohort, it is just catching up on the changes made by other cohorts, hence it may not encounter the state when business objects is already updated. Unless there was some contingency like unexpected restart. +- And finally, when updating business obbjects, alo update their versions so that they match with snapshot version. + +What value to write into snapshot table? Use this attribute: `JsStatemapAndSnapshot.version` +What version to set into business objects? Use this attribute: `JsStatemapAndSnapshot.version` + +## About Statemap \ No newline at end of file diff --git a/cohort_sdk_client/src/replicator.ts b/cohort_sdk_client/src/replicator.ts index 231932b0..2d67aa61 100644 --- a/cohort_sdk_client/src/replicator.ts +++ b/cohort_sdk_client/src/replicator.ts @@ -19,9 +19,22 @@ export class Replicator { constructor(readonly impl: InternalReplicator) {} - async run(snapshotProviderCallback: () => Promise, statemapInstallerCallback: (err: Error | null, value: JsStatemapAndSnapshot) => any) { + async run(snapshotProviderCallback: () => Promise, statemapInstallerCallback: (data: JsStatemapAndSnapshot) => Promise) { try { - await this.impl.run(snapshotProviderCallback, statemapInstallerCallback) + // This will hide the 'error' parameter from callback (it comes from NAPI). + const adaptedStatemapInstallerCallback = async (error: Error | null, data: JsStatemapAndSnapshot): Promise => { + if (error) { + throw new TalosSdkError( + SdkErrorKind.Internal, + "Call from native code into 'statemapInstallerCallback' function provided by JS has failed at the NAPI layer. See the 'cause' field for more details.", + { cause: error } + ) + } else { + return await statemapInstallerCallback(data) + } + } + + await this.impl.run(snapshotProviderCallback, adaptedStatemapInstallerCallback) } catch(e) { const reason: string = e.message if (isSdkError(reason)) { diff --git a/packages/cohort_sdk_js/src/installer/mod.rs b/packages/cohort_sdk_js/src/installer/mod.rs index 650e6c5f..4fb7239c 100644 --- a/packages/cohort_sdk_js/src/installer/mod.rs +++ b/packages/cohort_sdk_js/src/installer/mod.rs @@ -85,7 +85,9 @@ impl InternalReplicator { pub async fn run( &self, #[napi(ts_arg_type = "() => Promise")] snapshot_provider_callback: ThreadsafeFunction<()>, - statemap_installer_callback: ThreadsafeFunction, + #[napi(ts_arg_type = "(error: Error | null, data: JsStatemapAndSnapshot) => Promise")] statemap_installer_callback: ThreadsafeFunction< + JsStatemapAndSnapshot, + >, ) -> napi::Result<()> { let kafka_consumer = KafkaConsumer::new(&self.kafka_config.clone().into()); let brokers = &self.kafka_config.brokers.clone();