Skip to content

Commit

Permalink
fix: Add more types to Replicator.run()
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred committed Oct 13, 2023
1 parent 5658656 commit 2383364
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cohort_banking_replicator_js/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ new Promise(async (_resolve) => {
try {
await replicator.run(
async () => await database.getSnapshot(),
async (_, params) => await database.install(params, { delayMs: 2, maxAttempts: 50 })
async (data) => await database.install(data, { delayMs: 2, maxAttempts: 50 })
)
logger.info("Replciator is running ...")
} catch (e) {
Expand Down
72 changes: 68 additions & 4 deletions cohort_sdk_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The example of SDK usage can be found in `cohort_banking_initiator_js` and in `c
The SDK allows users to create certification transaction initiator and certification transaction replicator in JavaScript either as a single app or as two separate applications.

# The Initiator API
The role of Initiator in Talos ecosystem is to send certification request to Talos Certifier and receive response.
The role of the Initiator is to validate transactions locally, submit updates for certification and apply submitted updates out-of-order.

```TypeScript
// Create and use initiator
Expand Down Expand Up @@ -51,7 +51,7 @@ These functions are known as Initiator callbacks.

The details of how to create a new request is delegated to you. The shape of the callback function looks like this:
```TypeScript
const fnMakeNewRequestCallback = async () => { ... };
async (): Promise<JsCertificationCandidateCallbackResponse>
```
- It receives no parameters.
- It returns `JsCertificationCandidateCallbackResponse`
Expand Down Expand Up @@ -118,7 +118,7 @@ If your business transaction requires a certification from Talos, it is expected

The shape of the callback function looks like this:
```TypeScript
const fnOooInstaller = async (oooRequest: JsOutOfOrderRequest): Promise<JsCertificationCandidateCallbackResponse> => { ... };
async (oooRequest: JsOutOfOrderRequest): Promise<JsCertificationCandidateCallbackResponse>;
```
- It receives `JsOutOfOrderRequest`
Expand Down Expand Up @@ -237,4 +237,68 @@ See structure defined in `JsInitiatorConfig`
# The Replicator API
TODO: ...
The role of Replicator in Talos ecosystem is to listen to certification decisions and apply transactions in the same order as they were certified.
```TypeScript
const replicator = await Replicator.init(...)

const fnSnapshotProviderCallback = async () => {...}
const fnStatemapInstallerCallback = async () => {...}
// this will run indefinitely
await replicator.run(fnSnapshotProviderCallback, fnTransactionInstaller)
```
The replicator is a server running in the background. It connects to Kafka and recieves all messages from certification topic. Regardless of the order messages were received, the replicator will re-order them and feed into your `fnStatemapInstallerCallback` in the correct order. Your role here is to implement both callbacks.
## About "Snapshot Provider Callback"
The callback has the shape of simple getter function which returns the "global state" - the latest version number which you have in your DB. The signature of the function looks like this:
```TypeScript
async (): Promise<number>
```
- It receives no parameters.
- It returns the snapshot version as number
This callback is invoked only once during startup of Replicator server. It fetches current snapshot. Replicator uses current snapshot version as a point in time marker which tells Replictor not to react to any messages older than that version.
## About "Statemap Installer Callback"
This statemap installer callback looks like this:
```TypeScript
async (data: JsStatemapAndSnapshot): Promise<void>
```
- It receives `JsStatemapAndSnapshot`
- There is no return value, if function completes successfully SDK will proceed to handling the next transaction.
```TypeScript
export interface JsStatemapAndSnapshot {
statemap: Array<JsStatemapItem>
version: number
}

export interface JsStatemapItem {
action: string
version: number
safepoint?: number
payload: any
}
```
The purpose of statemap installer callback is to "catch-up" on changes made by other* cohorts.
In the deployment where there is no Talos present, microservices typically implement eventual consistency model by utilising messaging broker. For example, Kafka. If changes are made to bank account in Cohort 1 that change will eventually propagate to Cohort 2 through messaging middleware.
In the deployment with Talos, the replicator, specifically this callback, is responsible for updating the database so that your cohort has the up-to-date view of bank account (or any other shared object). The async messaging is abstracted away from you by SDK. You just provide a handler how you want your database to be updated.
However, there are few rules which Talos protocols expects you to implement.
- Once you are done updating business objects in your database, it is very important to update the global stapshot. The replicator, specifically this callback, is the only place within cohort which is responsible for writing into snapshots table.
- The version number in the snapshots table should only be incremented. If it happens that callabck is invoked with older version, then no change should be made to database.
- The change to business objects and to snapshots table should be atomic (in a single DB transaction with isolation level matching repeatable read or stricter).
- When callback is invoked, it is possible that your business objects are already updated. In this case, the job of callback is to update the snapshot table only.
- This may happen if replicator and initiator belong to the same cohort, so initiator's out of order installer may have executed before the replicator and updated our business objects. However, installer should never write to snapshots table.
- When replicator belong to different cohort, it is just catching up on the changes made by other cohorts, hence it may not encounter the state when business objects is already updated. Unless there was some contingency like unexpected restart.
- And finally, when updating business obbjects, alo update their versions so that they match with snapshot version.
What value to write into snapshot table? Use this attribute: `JsStatemapAndSnapshot.version`
What version to set into business objects? Use this attribute: `JsStatemapAndSnapshot.version`
## About Statemap
17 changes: 15 additions & 2 deletions cohort_sdk_client/src/replicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,22 @@ export class Replicator {

constructor(readonly impl: InternalReplicator) {}

async run(snapshotProviderCallback: () => Promise<number>, statemapInstallerCallback: (err: Error | null, value: JsStatemapAndSnapshot) => any) {
async run(snapshotProviderCallback: () => Promise<number>, statemapInstallerCallback: (data: JsStatemapAndSnapshot) => Promise<void>) {
try {
await this.impl.run(snapshotProviderCallback, statemapInstallerCallback)
// This will hide the 'error' parameter from callback (it comes from NAPI).
const adaptedStatemapInstallerCallback = async (error: Error | null, data: JsStatemapAndSnapshot): Promise<void> => {
if (error) {
throw new TalosSdkError(
SdkErrorKind.Internal,
"Call from native code into 'statemapInstallerCallback' function provided by JS has failed at the NAPI layer. See the 'cause' field for more details.",
{ cause: error }
)
} else {
return await statemapInstallerCallback(data)
}
}

await this.impl.run(snapshotProviderCallback, adaptedStatemapInstallerCallback)
} catch(e) {
const reason: string = e.message
if (isSdkError(reason)) {
Expand Down
4 changes: 3 additions & 1 deletion packages/cohort_sdk_js/src/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ impl InternalReplicator {
pub async fn run(
&self,
#[napi(ts_arg_type = "() => Promise<number>")] snapshot_provider_callback: ThreadsafeFunction<()>,
statemap_installer_callback: ThreadsafeFunction<JsStatemapAndSnapshot>,
#[napi(ts_arg_type = "(error: Error | null, data: JsStatemapAndSnapshot) => Promise<void>")] statemap_installer_callback: ThreadsafeFunction<
JsStatemapAndSnapshot,
>,
) -> napi::Result<()> {
let kafka_consumer = KafkaConsumer::new(&self.kafka_config.clone().into());
let brokers = &self.kafka_config.brokers.clone();
Expand Down

0 comments on commit 2383364

Please sign in to comment.