diff --git a/cohort_banking_initiator_js/src/banking-app.ts b/cohort_banking_initiator_js/src/banking-app.ts index b95d6632..aa928b2d 100644 --- a/cohort_banking_initiator_js/src/banking-app.ts +++ b/cohort_banking_initiator_js/src/banking-app.ts @@ -21,7 +21,7 @@ export class BankingApp { private pond: Pond, private database: Pool, private queue: BroadcastChannel, - private onFinishListener: (appRef: BankingApp) => any) {} + private onFinishListener: (appRef: BankingApp) => any) { } async init() { this.initiator = await Initiator.init(sdkConfig) @@ -133,6 +133,21 @@ export class BankingApp { writeset: [tx.from, tx.to], readvers: state.items.map(i => i.version), statemaps: [{ "TRANSFER": tx }], + onCommit: { + publish: { + kafka: [ + { + topic: "test.transfer.feedback.js", + value: { + "from_account": tx.from, + "to_account": tx.to, + "amount": tx.amount + } + }, + ] + } + } + }, snapshot: state.snapshotVersion, timeoutMs: 0, @@ -143,14 +158,16 @@ export class BankingApp { let cnn: PoolClient try { cnn = await this.database.connect() - const result = await cnn.query({ name: "get-state", text: - `SELECT + const result = await cnn.query({ + name: "get-state", text: + `SELECT ba."number" as "id", ba."version" as "version", cs."version" AS snapshot_version FROM bank_accounts ba, cohort_snapshot cs WHERE ba."number" = $1 OR ba."number" = $2`, - values: [tx.from, tx.to] } + values: [tx.from, tx.to] + } ) if (result.rowCount != 2) { @@ -163,7 +180,7 @@ export class BankingApp { } catch (e) { // This print here is important, without it the original reason is lost when using NAPI 2.10. logger.error("BankingApp.loadState(): %s", e) - throw new Error(`Unable to load state for tx: ${ JSON.stringify(tx) }. Reason: ${e.message}`, { cause: e }) + throw new Error(`Unable to load state for tx: ${JSON.stringify(tx)}. Reason: ${e.message}`, { cause: e }) } finally { cnn?.release() } @@ -219,7 +236,7 @@ export class BankingApp { } catch (e) { // This print here is important, without it the original reason is lost when using NAPI 2.10. logger.error("BankingApp.installOutOfOrder(): %s", e) - throw new Error(`Unable to complete out of order installation of tx: ${ JSON.stringify(tx) }`, { cause: e }) + throw new Error(`Unable to complete out of order installation of tx: ${JSON.stringify(tx)}`, { cause: e }) } finally { cnn?.release() } diff --git a/cohort_sdk_client/README.md b/cohort_sdk_client/README.md index ba398895..225365a6 100644 --- a/cohort_sdk_client/README.md +++ b/cohort_sdk_client/README.md @@ -105,6 +105,31 @@ export interface JsCertificationCandidate { writeset: Array readvers: Array statemaps?: Array> + onCommit?: JsCandidateOnCommitActions +} + +export interface JsKafkaAction { + cluster?: string + /** Topic to publish the payload */ + topic: string + /** Key encoding to be used. Defaults to `text/plain`. */ + keyEncoding?: string + /** Key for the message to publish. */ + key?: string + /** Optional if the message should be published to a specific partition. */ + partition?: number + /** Optional headers while publishing. */ + headers?: Record + /** Key encoding to be used. Defaults to `application/json`. */ + valueEncoding?: string + /** Payload to publish. */ + value: any +} +export interface JsCandidateOnCommitPublishActions { + kafka: Array +} +export interface JsCandidateOnCommitActions { + publish?: JsCandidateOnCommitPublishActions } ``` @@ -113,10 +138,11 @@ export interface JsCertificationCandidate { Before SDK can issue a certification request to Talos Certifier it needs some details from you. You will have to query your local database to fetch the following: 1. Identifiers and version numbers of all objects involved in your transaction. These are known as `readset`, `writeset` and `readvers`. 2. The copy of your transaction as one serializable object. It makes sense to describe your transaction as JSON object and serialise it to string. This is known as `statemap`. +3. Any additional message to be published for candidate requests with committed decision outcome, can be added to `onCommit` field. Currently the SDK supports only publishing to **Kafka**. -Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap. +Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap. -Read about `statemap` in the end of this document. See section "About Statemap". +Read about `statemap` in the end of this document. See section [About Statemap](#about-statemap). ### About "JsCertificationRequestPayload" @@ -134,7 +160,7 @@ Most likely you will want to retry your request. The SDK implements retry with i ## About "Out of Order Install Callback" -If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects. +If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects. What is the benefit of having out of order callback if its responsibility overlaps with "Statemap Installer Callback" found in Replicator? You may wish not to implement this callback and rely on Replicator to do all your DB changes. Just keep in mind that Replicator will do it "later". How much later will depends on the overall load on the replicator and other dependent transactions which are still in-flight. If you did not implement out of order callback then it is possible to finish the call to `let response = await initiator.certify(...)`, have "go ahead" decision from Talos in the response variable, but your DB will not see this change. If, at that point, you returned response to user via HTTP and user went to query DB via another endpoint, it could be that user will not see the change yet (Replicator may still be processing the backlog of other transactions). On the other hand, with out of order callback in place, once the call to `let response = await initiator.certify(...)` finished, your DB is already updated and you may rely on that change in your following logic. diff --git a/examples/agent_client/examples/agent_client.rs b/examples/agent_client/examples/agent_client.rs index 85cfca46..9f741b6f 100644 --- a/examples/agent_client/examples/agent_client.rs +++ b/examples/agent_client/examples/agent_client.rs @@ -297,6 +297,7 @@ impl Generator for RequestGenerator { snapshot: 5, writeset: Vec::from(["3".to_string()]), statemap: None, + on_commit: None, }; CertificationRequest { diff --git a/packages/cohort_banking/src/app.rs b/packages/cohort_banking/src/app.rs index 6113ff5f..0e180571 100644 --- a/packages/cohort_banking/src/app.rs +++ b/packages/cohort_banking/src/app.rs @@ -7,13 +7,17 @@ use banking_common::{ }; use cohort_sdk::{ cohort::Cohort, - model::{ClientErrorKind, Config}, + model::{ + callback::{CandidateOnCommitActions, CandidateOnCommitPublishActions, KafkaAction}, + ClientErrorKind, Config, + }, }; use opentelemetry_api::{ global, metrics::{Counter, Unit}, }; +use serde_json::json; use talos_agent::messaging::api::Decision; use crate::{ @@ -64,19 +68,39 @@ impl BankingApp { #[async_trait] impl Handler for BankingApp { async fn handle(&self, request: TransferRequest) -> Result<(), String> { - log::debug!("processig new banking transfer request: {:?}", request); + log::debug!("processing new banking transfer request: {:?}", request); let statemap = vec![HashMap::from([( BusinessActionType::TRANSFER.to_string(), TransferRequest::new(request.from.clone(), request.to.clone(), request.amount).json(), )])]; + let on_commit_action = CandidateOnCommitActions { + publish: Some(CandidateOnCommitPublishActions { + kafka: vec![KafkaAction { + headers: None, + key: None, + partition: None, + topic: "test.transfer.feedback".to_string(), + value: json!({ + "from_account": request.from, + "to_account": request.to, + "amount": request.amount + }), + cluster: Default::default(), + key_encoding: Default::default(), + value_encoding: Default::default(), + }], + }), + }; + let certification_request = CertificationRequest { timeout_ms: 0, candidate: CandidateData { readset: vec![request.from.clone(), request.to.clone()], writeset: vec![request.from, request.to], statemap: Some(statemap), + on_commit: Some(on_commit_action), }, }; @@ -85,6 +109,7 @@ impl Handler for BankingApp { database: Arc::clone(&self.database), single_query_strategy, }; + let request_payload_callback = || state_provider.get_certification_candidate(certification_request.clone()); let oo_inst = OutOfOrderInstallerImpl { diff --git a/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs b/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs index 3f906202..5fd5f89e 100644 --- a/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs +++ b/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs @@ -155,6 +155,7 @@ impl CertificationCandidateProviderImpl { writeset: request.candidate.writeset, statemaps: request.candidate.statemap, readvers: state.items.into_iter().map(|x| x.version).collect(), + on_commit: request.candidate.on_commit, }; Ok(CertificationCandidateCallbackResponse::Proceed(CertificationRequestPayload { diff --git a/packages/cohort_banking/src/examples_support/queue_processor.rs b/packages/cohort_banking/src/examples_support/queue_processor.rs index 16064cc5..16a31430 100644 --- a/packages/cohort_banking/src/examples_support/queue_processor.rs +++ b/packages/cohort_banking/src/examples_support/queue_processor.rs @@ -22,7 +22,6 @@ impl QueueProcessor { threads: u64, item_handler: Arc, ) -> Vec> { - let item_handler = Arc::new(item_handler); let mut tasks = Vec::>::new(); for thread_number in 1..=threads { diff --git a/packages/cohort_banking/src/model/requests.rs b/packages/cohort_banking/src/model/requests.rs index 31fe2fee..7ece9386 100644 --- a/packages/cohort_banking/src/model/requests.rs +++ b/packages/cohort_banking/src/model/requests.rs @@ -1,3 +1,4 @@ +use cohort_sdk::model::callback::CandidateOnCommitActions; use serde_json::Value; use std::collections::HashMap; @@ -6,7 +7,8 @@ pub struct CandidateData { pub readset: Vec, pub writeset: Vec, pub statemap: Option>>, - // The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos + // The "snapshot" is intentionally missing here. We will compute it ourselves before feeding this data to Talos + pub on_commit: Option, } #[derive(Clone)] diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 72b451f1..2003607a 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -8,6 +8,7 @@ use opentelemetry_api::{ global, metrics::{Counter, Histogram, Unit}, }; +use serde_json::Value; use talos_agent::{ agent::{ core::{AgentServices, TalosAgentImpl}, @@ -448,6 +449,11 @@ impl Cohort { let (snapshot, readvers) = Self::select_snapshot_and_readvers(request.snapshot, request.candidate.readvers); let xid = uuid::Uuid::new_v4().to_string(); + let on_commit: Option> = match request.candidate.on_commit { + Some(value) => serde_json::to_value(value).ok().map(|x| x.into()), + None => None, + }; + let agent_request = CertificationRequest { message_key: xid.clone(), candidate: CandidateData { @@ -457,6 +463,7 @@ impl Cohort { writeset: request.candidate.writeset, readvers, snapshot, + on_commit, }, timeout: if request.timeout_ms > 0 { Some(Duration::from_millis(request.timeout_ms)) diff --git a/packages/cohort_sdk/src/model/callback.rs b/packages/cohort_sdk/src/model/callback.rs index b2689780..2511b841 100644 --- a/packages/cohort_sdk/src/model/callback.rs +++ b/packages/cohort_sdk/src/model/callback.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use serde_json::Value; #[derive(Debug, PartialEq)] @@ -16,12 +17,54 @@ pub struct CertificationRequestPayload { pub timeout_ms: u64, } +fn default_text_plain_encoding() -> String { + "text/plain".to_string() +} + +fn default_application_json_encoding() -> String { + "application/json".to_string() +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct KafkaAction { + #[serde(default)] + pub cluster: String, + /// Topic to publish the payload + pub topic: String, + /// Key encoding to be used. Defaults to `text/plain`. + #[serde(default = "default_text_plain_encoding")] + pub key_encoding: String, + /// Key for the message to publish. + pub key: Option, + /// Optional if the message should be published to a specific partition. + pub partition: Option, + /// Optional headers while publishing. + pub headers: Option>, + /// Key encoding to be used. Defaults to `application/json`. + #[serde(default = "default_application_json_encoding")] + pub value_encoding: String, + /// Payload to publish. + pub value: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct CandidateOnCommitPublishActions { + pub kafka: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct CandidateOnCommitActions { + pub publish: Option, +} + #[derive(Debug, Clone, PartialEq)] pub struct CertificationCandidate { pub readset: Vec, pub writeset: Vec, pub readvers: Vec, pub statemaps: Option>>, + pub on_commit: Option, } #[derive(Debug, Clone)] diff --git a/packages/cohort_sdk_js/src/initiator/mod.rs b/packages/cohort_sdk_js/src/initiator/mod.rs index a25278ea..b198e5d0 100644 --- a/packages/cohort_sdk_js/src/initiator/mod.rs +++ b/packages/cohort_sdk_js/src/initiator/mod.rs @@ -3,8 +3,8 @@ use crate::sdk_errors::SdkErrorContainer; use async_trait::async_trait; use cohort_sdk::cohort::Cohort; use cohort_sdk::model::callback::{ - CertificationCandidate, CertificationCandidateCallbackResponse, CertificationRequestPayload, OutOfOrderInstallOutcome, OutOfOrderInstallRequest, - OutOfOrderInstaller, + CandidateOnCommitActions, CandidateOnCommitPublishActions, CertificationCandidate, CertificationCandidateCallbackResponse, CertificationRequestPayload, + KafkaAction, OutOfOrderInstallOutcome, OutOfOrderInstallRequest, OutOfOrderInstaller, }; use cohort_sdk::model::{CertificationResponse, ClientError, Config, ResponseMetadata}; use napi::bindgen_prelude::FromNapiValue; @@ -67,12 +67,73 @@ impl From for Config { } } +// #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +// #[serde(rename_all = "camelCase")] +#[napi(object)] +pub struct JsKafkaAction { + pub cluster: Option, + /// Topic to publish the payload + pub topic: String, + /// Key encoding to be used. Defaults to `text/plain`. + pub key_encoding: Option, + /// Key for the message to publish. + pub key: Option, + /// Optional if the message should be published to a specific partition. + pub partition: Option, + /// Optional headers while publishing. + pub headers: Option>, + /// Key encoding to be used. Defaults to `application/json`. + pub value_encoding: Option, + /// Payload to publish. + pub value: serde_json::Value, +} + +// #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[napi(object)] +pub struct JsCandidateOnCommitPublishActions { + pub kafka: Vec, +} + +// #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[napi(object)] +pub struct JSCandidateOnCommitActions { + pub publish: Option, +} + #[napi(object)] pub struct JsCertificationCandidate { pub readset: Vec, pub writeset: Vec, pub readvers: Vec, pub statemaps: Option>>, + pub on_commit: Option, +} + +impl From for CandidateOnCommitPublishActions { + fn from(val: JsCandidateOnCommitPublishActions) -> Self { + let kafka_actions = val + .kafka + .into_iter() + .map(|action| KafkaAction { + cluster: action.cluster.unwrap_or_default(), + headers: action.headers, + key: action.key, + key_encoding: action.key_encoding.unwrap_or_default(), + partition: action.partition, + topic: action.topic, + value: action.value, + value_encoding: action.value_encoding.unwrap_or_default(), + }) + .collect(); + CandidateOnCommitPublishActions { kafka: kafka_actions } + } +} +impl From for CandidateOnCommitActions { + fn from(val: JSCandidateOnCommitActions) -> Self { + CandidateOnCommitActions { + publish: val.publish.map(|x| x.into()), + } + } } impl From for CertificationCandidate { @@ -82,6 +143,7 @@ impl From for CertificationCandidate { writeset: val.writeset, readvers: val.readvers.iter().map(|v| *v as u64).collect(), statemaps: val.statemaps, + on_commit: val.on_commit.map(|x| x.into()), } } } diff --git a/packages/talos_agent/src/agent/core.rs b/packages/talos_agent/src/agent/core.rs index 48f4e18f..91a71756 100644 --- a/packages/talos_agent/src/agent/core.rs +++ b/packages/talos_agent/src/agent/core.rs @@ -246,6 +246,7 @@ mod tests { snapshot: 1_u64, writeset: Vec::::new(), statemap: None, + on_commit: None, } } diff --git a/packages/talos_agent/src/agent/errors.rs b/packages/talos_agent/src/agent/errors.rs index 23959bca..48138289 100644 --- a/packages/talos_agent/src/agent/errors.rs +++ b/packages/talos_agent/src/agent/errors.rs @@ -100,6 +100,7 @@ mod tests { snapshot: 0, writeset: vec![String::from("1"), String::from("2"), String::from("3")], statemap: None, + on_commit: None, }, timeout: Some(Duration::from_secs(1)), }, diff --git a/packages/talos_agent/src/agent/state_manager.rs b/packages/talos_agent/src/agent/state_manager.rs index 960271d5..0c53ba25 100644 --- a/packages/talos_agent/src/agent/state_manager.rs +++ b/packages/talos_agent/src/agent/state_manager.rs @@ -337,6 +337,7 @@ mod tests { snapshot: 1_u64, writeset: Vec::::new(), statemap: None, + on_commit: None, }, }, Arc::new(Box::new(tx_answer)), @@ -668,6 +669,7 @@ mod tests { snapshot: 0, writeset: Vec::::new(), statemap: None, + on_commit: None, }, }, }; diff --git a/packages/talos_agent/src/api.rs b/packages/talos_agent/src/api.rs index f05cc22b..7eb86439 100644 --- a/packages/talos_agent/src/api.rs +++ b/packages/talos_agent/src/api.rs @@ -21,6 +21,7 @@ pub struct CandidateData { pub snapshot: u64, pub writeset: Vec, pub statemap: Option, + pub on_commit: Option>, } /// The data input from client to agent diff --git a/packages/talos_agent/src/messaging/api.rs b/packages/talos_agent/src/messaging/api.rs index 6bb98d24..9ef8d77f 100644 --- a/packages/talos_agent/src/messaging/api.rs +++ b/packages/talos_agent/src/messaging/api.rs @@ -2,6 +2,7 @@ use crate::api::{CandidateData, StateMap, TalosType}; use crate::messaging::errors::MessagingError; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use serde_json::Value; use strum::{Display, EnumString}; pub static HEADER_MESSAGE_TYPE: &str = "messageType"; @@ -27,6 +28,8 @@ pub struct CandidateMessage { pub writeset: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub statemap: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub on_commit: Option>, pub published_at: i128, } @@ -41,6 +44,7 @@ impl CandidateMessage { snapshot: candidate.snapshot, writeset: candidate.writeset, statemap: candidate.statemap, + on_commit: candidate.on_commit, published_at, } } @@ -139,6 +143,7 @@ mod tests { snapshot: 1_u64, writeset: vec!["1".to_string()], statemap: None, + on_commit: None, }, 0, ); diff --git a/packages/talos_agent/src/messaging/kafka.rs b/packages/talos_agent/src/messaging/kafka.rs index 2b87fed8..6123afd3 100644 --- a/packages/talos_agent/src/messaging/kafka.rs +++ b/packages/talos_agent/src/messaging/kafka.rs @@ -332,6 +332,7 @@ mod tests_publisher { snapshot: 2_u64, writeset: vec!["1".to_string()], statemap: None, + on_commit: None, published_at: 0, }) .unwrap();