Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable cohort to pass on_commit messages for messenger #94

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions cohort_banking_initiator_js/src/banking-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class BankingApp {
private pond: Pond,
private database: Pool,
private queue: BroadcastChannel,
private onFinishListener: (appRef: BankingApp) => any) {}
private onFinishListener: (appRef: BankingApp) => any) { }

async init() {
this.initiator = await Initiator.init(sdkConfig)
Expand Down Expand Up @@ -133,6 +133,21 @@ export class BankingApp {
writeset: [tx.from, tx.to],
readvers: state.items.map(i => i.version),
statemaps: [{ "TRANSFER": tx }],
onCommit: {
publish: {
kafka: [
{
topic: "test.transfer.feedback.js",
fmarek-kindred marked this conversation as resolved.
Show resolved Hide resolved
value: {
"from_account": tx.from,
"to_account": tx.to,
"amount": tx.amount
}
},
]
}
}

},
snapshot: state.snapshotVersion,
timeoutMs: 0,
Expand All @@ -143,14 +158,16 @@ export class BankingApp {
let cnn: PoolClient
try {
cnn = await this.database.connect()
const result = await cnn.query({ name: "get-state", text:
`SELECT
const result = await cnn.query({
name: "get-state", text:
`SELECT
ba."number" as "id", ba."version" as "version", cs."version" AS snapshot_version
FROM
bank_accounts ba, cohort_snapshot cs
WHERE
ba."number" = $1 OR ba."number" = $2`,
values: [tx.from, tx.to] }
values: [tx.from, tx.to]
}
)

if (result.rowCount != 2) {
Expand All @@ -163,7 +180,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.loadState(): %s", e)
throw new Error(`Unable to load state for tx: ${ JSON.stringify(tx) }. Reason: ${e.message}`, { cause: e })
throw new Error(`Unable to load state for tx: ${JSON.stringify(tx)}. Reason: ${e.message}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down Expand Up @@ -219,7 +236,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.installOutOfOrder(): %s", e)
throw new Error(`Unable to complete out of order installation of tx: ${ JSON.stringify(tx) }`, { cause: e })
throw new Error(`Unable to complete out of order installation of tx: ${JSON.stringify(tx)}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down
32 changes: 29 additions & 3 deletions cohort_sdk_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ export interface JsCertificationCandidate {
writeset: Array<string>
readvers: Array<number>
statemaps?: Array<Record<string, any>>
onCommit?: JsCandidateOnCommitActions
}

export interface JsKafkaAction {
cluster?: string
/** Topic to publish the payload */
topic: string
/** Key encoding to be used. Defaults to `text/plain`. */
keyEncoding?: string
/** Key for the message to publish. */
key?: string
/** Optional if the message should be published to a specific partition. */
partition?: number
/** Optional headers while publishing. */
headers?: Record<string, string>
/** Key encoding to be used. Defaults to `application/json`. */
valueEncoding?: string
/** Payload to publish. */
value: any
}
export interface JsCandidateOnCommitPublishActions {
kafka: Array<JsKafkaAction>
}
export interface JsCandidateOnCommitActions {
publish?: JsCandidateOnCommitPublishActions
}
```

Expand All @@ -113,10 +138,11 @@ export interface JsCertificationCandidate {
Before SDK can issue a certification request to Talos Certifier it needs some details from you. You will have to query your local database to fetch the following:
1. Identifiers and version numbers of all objects involved in your transaction. These are known as `readset`, `writeset` and `readvers`.
2. The copy of your transaction as one serializable object. It makes sense to describe your transaction as JSON object and serialise it to string. This is known as `statemap`.
3. Any additional message to be published for candidate requests with committed decision outcome, can added to `onCommit` field. Currently the SDK supports only publishing to **Kafka**.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to say "can be added to" ?


Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap.
Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap.

Read about `statemap` in the end of this document. See section "About Statemap".
Read about `statemap` in the end of this document. See section [About Statemap](#about-statemap).

### About "JsCertificationRequestPayload"

Expand All @@ -134,7 +160,7 @@ Most likely you will want to retry your request. The SDK implements retry with i

## About "Out of Order Install Callback"

If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects.
If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects.

<em>What is the benefit of having out of order callback if its responsibility overlaps with "Statemap Installer Callback" found in Replicator?
You may wish not to implement this callback and rely on Replicator to do all your DB changes. Just keep in mind that Replicator will do it "later". How much later will depends on the overall load on the replicator and other dependent transactions which are still in-flight. If you did not implement out of order callback then it is possible to finish the call to `let response = await initiator.certify(...)`, have "go ahead" decision from Talos in the response variable, but your DB will not see this change. If, at that point, you returned response to user via HTTP and user went to query DB via another endpoint, it could be that user will not see the change yet (Replicator may still be processing the backlog of other transactions). On the other hand, with out of order callback in place, once the call to `let response = await initiator.certify(...)` finished, your DB is already updated and you may rely on that change in your following logic.
Expand Down
1 change: 1 addition & 0 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl Generator<CertificationRequest> for RequestGenerator {
snapshot: 5,
writeset: Vec::from(["3".to_string()]),
statemap: None,
on_commit: None,
};

CertificationRequest {
Expand Down
29 changes: 27 additions & 2 deletions packages/cohort_banking/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use banking_common::{
};
use cohort_sdk::{
cohort::Cohort,
model::{ClientErrorKind, Config},
model::{
callback::{CandidateOnCommitActions, CandidateOnCommitPublishActions, KafkaAction},
ClientErrorKind, Config,
},
};

use opentelemetry_api::{
global,
metrics::{Counter, Unit},
};
use serde_json::json;
use talos_agent::messaging::api::Decision;

use crate::{
Expand Down Expand Up @@ -64,19 +68,39 @@ impl BankingApp {
#[async_trait]
impl Handler<TransferRequest> for BankingApp {
async fn handle(&self, request: TransferRequest) -> Result<(), String> {
log::debug!("processig new banking transfer request: {:?}", request);
log::debug!("processing new banking transfer request: {:?}", request);

let statemap = vec![HashMap::from([(
BusinessActionType::TRANSFER.to_string(),
TransferRequest::new(request.from.clone(), request.to.clone(), request.amount).json(),
)])];

let on_commit_action = CandidateOnCommitActions {
publish: Some(CandidateOnCommitPublishActions {
kafka: vec![KafkaAction {
headers: None,
key: None,
partition: None,
topic: "test.transfer.feedback".to_string(),
value: json!({
"from_account": request.from,
"to_account": request.to,
"amount": request.amount
}),
cluster: Default::default(),
key_encoding: Default::default(),
value_encoding: Default::default(),
}],
}),
};

let certification_request = CertificationRequest {
timeout_ms: 0,
candidate: CandidateData {
readset: vec![request.from.clone(), request.to.clone()],
writeset: vec![request.from, request.to],
statemap: Some(statemap),
on_commit: Some(on_commit_action),
},
};

Expand All @@ -85,6 +109,7 @@ impl Handler<TransferRequest> for BankingApp {
database: Arc::clone(&self.database),
single_query_strategy,
};

let request_payload_callback = || state_provider.get_certification_candidate(certification_request.clone());

let oo_inst = OutOfOrderInstallerImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl CertificationCandidateProviderImpl {
writeset: request.candidate.writeset,
statemaps: request.candidate.statemap,
readvers: state.items.into_iter().map(|x| x.version).collect(),
on_commit: request.candidate.on_commit,
};

Ok(CertificationCandidateCallbackResponse::Proceed(CertificationRequestPayload {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl QueueProcessor {
threads: u64,
item_handler: Arc<H>,
) -> Vec<JoinHandle<()>> {
let item_handler = Arc::new(item_handler);
let mut tasks = Vec::<JoinHandle<()>>::new();

for thread_number in 1..=threads {
Expand Down
4 changes: 3 additions & 1 deletion packages/cohort_banking/src/model/requests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cohort_sdk::model::callback::CandidateOnCommitActions;
use serde_json::Value;
use std::collections::HashMap;

Expand All @@ -6,7 +7,8 @@ pub struct CandidateData {
pub readset: Vec<String>,
pub writeset: Vec<String>,
pub statemap: Option<Vec<HashMap<String, Value>>>,
// The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos
// The "snapshot" is intentionally missing here. We will compute it ourselves before feeding this data to Talos
pub on_commit: Option<CandidateOnCommitActions>,
}

#[derive(Clone)]
Expand Down
7 changes: 7 additions & 0 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry_api::{
global,
metrics::{Counter, Histogram, Unit},
};
use serde_json::Value;
use talos_agent::{
agent::{
core::{AgentServices, TalosAgentImpl},
Expand Down Expand Up @@ -448,6 +449,11 @@ impl Cohort {
let (snapshot, readvers) = Self::select_snapshot_and_readvers(request.snapshot, request.candidate.readvers);

let xid = uuid::Uuid::new_v4().to_string();
let on_commit: Option<Box<Value>> = match request.candidate.on_commit {
Some(value) => serde_json::to_value(value).ok().map(|x| x.into()),
None => None,
};

let agent_request = CertificationRequest {
message_key: xid.clone(),
candidate: CandidateData {
Expand All @@ -457,6 +463,7 @@ impl Cohort {
writeset: request.candidate.writeset,
readvers,
snapshot,
on_commit,
},
timeout: if request.timeout_ms > 0 {
Some(Duration::from_millis(request.timeout_ms))
Expand Down
43 changes: 43 additions & 0 deletions packages/cohort_sdk/src/model/callback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, PartialEq)]
Expand All @@ -16,12 +17,54 @@ pub struct CertificationRequestPayload {
pub timeout_ms: u64,
}

fn default_text_plain_encoding() -> String {
"text/plain".to_string()
}

fn default_application_json_encoding() -> String {
"application/json".to_string()
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaAction {
#[serde(default)]
pub cluster: String,
/// Topic to publish the payload
pub topic: String,
/// Key encoding to be used. Defaults to `text/plain`.
#[serde(default = "default_text_plain_encoding")]
pub key_encoding: String,
/// Key for the message to publish.
pub key: Option<String>,
/// Optional if the message should be published to a specific partition.
pub partition: Option<i32>,
/// Optional headers while publishing.
pub headers: Option<HashMap<String, String>>,
/// Key encoding to be used. Defaults to `application/json`.
#[serde(default = "default_application_json_encoding")]
pub value_encoding: String,
/// Payload to publish.
pub value: serde_json::Value,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct CandidateOnCommitPublishActions {
pub kafka: Vec<KafkaAction>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct CandidateOnCommitActions {
pub publish: Option<CandidateOnCommitPublishActions>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct CertificationCandidate {
pub readset: Vec<String>,
pub writeset: Vec<String>,
pub readvers: Vec<u64>,
pub statemaps: Option<Vec<HashMap<String, Value>>>,
pub on_commit: Option<CandidateOnCommitActions>,
}

#[derive(Debug, Clone)]
Expand Down
Loading