Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable cohort to pass on_commit messages for messenger #94

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions cohort_banking_initiator_js/src/banking-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class BankingApp {
private pond: Pond,
private database: Pool,
private queue: BroadcastChannel,
private onFinishListener: (appRef: BankingApp) => any) {}
private onFinishListener: (appRef: BankingApp) => any) { }

async init() {
this.initiator = await Initiator.init(sdkConfig)
Expand Down Expand Up @@ -133,6 +133,22 @@ export class BankingApp {
writeset: [tx.from, tx.to],
readvers: state.items.map(i => i.version),
statemaps: [{ "TRANSFER": tx }],
onCommit: {
publish: {
kafka: [
{
topic: "test.transfer.feedback.js",
fmarek-kindred marked this conversation as resolved.
Show resolved Hide resolved
value: {
"from_account": tx.from,
"to_account": tx.to,
"amount": tx.amount
}
},
]
}
}

// TODO: GK - Add on commit action
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supported? If we support action why dont we add them to interface too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, messenger is written in a way that it can accept any valid json, but will only process the supported types.

But on the other end, it would be nice if the sender is made aware on what they can send. Let me think on it and see how I can add this in at the cohort end. Thank for pointing this out!

},
snapshot: state.snapshotVersion,
timeoutMs: 0,
Expand All @@ -143,14 +159,16 @@ export class BankingApp {
let cnn: PoolClient
try {
cnn = await this.database.connect()
const result = await cnn.query({ name: "get-state", text:
`SELECT
const result = await cnn.query({
name: "get-state", text:
`SELECT
ba."number" as "id", ba."version" as "version", cs."version" AS snapshot_version
FROM
bank_accounts ba, cohort_snapshot cs
WHERE
ba."number" = $1 OR ba."number" = $2`,
values: [tx.from, tx.to] }
values: [tx.from, tx.to]
}
)

if (result.rowCount != 2) {
Expand All @@ -163,7 +181,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.loadState(): %s", e)
throw new Error(`Unable to load state for tx: ${ JSON.stringify(tx) }. Reason: ${e.message}`, { cause: e })
throw new Error(`Unable to load state for tx: ${JSON.stringify(tx)}. Reason: ${e.message}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down Expand Up @@ -219,7 +237,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.installOutOfOrder(): %s", e)
throw new Error(`Unable to complete out of order installation of tx: ${ JSON.stringify(tx) }`, { cause: e })
throw new Error(`Unable to complete out of order installation of tx: ${JSON.stringify(tx)}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down
1 change: 1 addition & 0 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl Generator<CertificationRequest> for RequestGenerator {
snapshot: 5,
writeset: Vec::from(["3".to_string()]),
statemap: None,
on_commit: None,
};

CertificationRequest {
Expand Down
20 changes: 19 additions & 1 deletion packages/cohort_banking/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use opentelemetry_api::{
global,
metrics::{Counter, Unit},
};
use serde_json::json;
use talos_agent::messaging::api::Decision;

use crate::{
Expand Down Expand Up @@ -64,19 +65,35 @@ impl BankingApp {
#[async_trait]
impl Handler<TransferRequest> for BankingApp {
async fn handle(&self, request: TransferRequest) -> Result<(), String> {
log::debug!("processig new banking transfer request: {:?}", request);
log::debug!("processing new banking transfer request: {:?}", request);

let statemap = vec![HashMap::from([(
BusinessActionType::TRANSFER.to_string(),
TransferRequest::new(request.from.clone(), request.to.clone(), request.amount).json(),
)])];

let on_commit_value = json!({
"publish": {
"kafka": [
{
"topic": "test.transfer.feedback",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded topic as it's just the example app, and to minimise refactor.

"value": {
"from_account": request.from,
"to_account": request.to,
"amount": request.amount
}
},
],
}
});

let certification_request = CertificationRequest {
timeout_ms: 0,
candidate: CandidateData {
readset: vec![request.from.clone(), request.to.clone()],
writeset: vec![request.from, request.to],
statemap: Some(statemap),
on_commit: Some(on_commit_value),
},
};

Expand All @@ -85,6 +102,7 @@ impl Handler<TransferRequest> for BankingApp {
database: Arc::clone(&self.database),
single_query_strategy,
};

let request_payload_callback = || state_provider.get_certification_candidate(certification_request.clone());

let oo_inst = OutOfOrderInstallerImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl CertificationCandidateProviderImpl {
writeset: request.candidate.writeset,
statemaps: request.candidate.statemap,
readvers: state.items.into_iter().map(|x| x.version).collect(),
on_commit: request.candidate.on_commit.map(|x| x.into()),
};

Ok(CertificationCandidateCallbackResponse::Proceed(CertificationRequestPayload {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl QueueProcessor {
threads: u64,
item_handler: Arc<H>,
) -> Vec<JoinHandle<()>> {
let item_handler = Arc::new(item_handler);
let mut tasks = Vec::<JoinHandle<()>>::new();

for thread_number in 1..=threads {
Expand Down
3 changes: 2 additions & 1 deletion packages/cohort_banking/src/model/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ pub struct CandidateData {
pub readset: Vec<String>,
pub writeset: Vec<String>,
pub statemap: Option<Vec<HashMap<String, Value>>>,
// The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos
// The "snapshot" is intentionally missing here. We will compute it ourselves before feeding this data to Talos
pub on_commit: Option<Value>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

#[derive(Clone)]
Expand Down
1 change: 1 addition & 0 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ impl Cohort {
writeset: request.candidate.writeset,
readvers,
snapshot,
on_commit: request.candidate.on_commit,
},
timeout: if request.timeout_ms > 0 {
Some(Duration::from_millis(request.timeout_ms))
Expand Down
1 change: 1 addition & 0 deletions packages/cohort_sdk/src/model/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct CertificationCandidate {
pub writeset: Vec<String>,
pub readvers: Vec<u64>,
pub statemaps: Option<Vec<HashMap<String, Value>>>,
pub on_commit: Option<Box<Value>>,
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions packages/cohort_sdk_js/src/initiator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct JsCertificationCandidate {
pub writeset: Vec<String>,
pub readvers: Vec<i64>,
pub statemaps: Option<Vec<HashMap<String, Value>>>,
pub on_commit: Option<Value>,
}

impl From<JsCertificationCandidate> for CertificationCandidate {
Expand All @@ -82,6 +83,7 @@ impl From<JsCertificationCandidate> for CertificationCandidate {
writeset: val.writeset,
readvers: val.readvers.iter().map(|v| *v as u64).collect(),
statemaps: val.statemaps,
on_commit: val.on_commit.map(Box::new),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/talos_agent/src/agent/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
snapshot: 1_u64,
writeset: Vec::<String>::new(),
statemap: None,
on_commit: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/talos_agent/src/agent/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ mod tests {
snapshot: 0,
writeset: vec![String::from("1"), String::from("2"), String::from("3")],
statemap: None,
on_commit: None,
},
timeout: Some(Duration::from_secs(1)),
},
Expand Down
2 changes: 2 additions & 0 deletions packages/talos_agent/src/agent/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ mod tests {
snapshot: 1_u64,
writeset: Vec::<String>::new(),
statemap: None,
on_commit: None,
},
},
Arc::new(Box::new(tx_answer)),
Expand Down Expand Up @@ -668,6 +669,7 @@ mod tests {
snapshot: 0,
writeset: Vec::<String>::new(),
statemap: None,
on_commit: None,
},
},
};
Expand Down
1 change: 1 addition & 0 deletions packages/talos_agent/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct CandidateData {
pub snapshot: u64,
pub writeset: Vec<String>,
pub statemap: Option<StateMap>,
pub on_commit: Option<Box<Value>>,
}

/// The data input from client to agent
Expand Down
5 changes: 5 additions & 0 deletions packages/talos_agent/src/messaging/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::api::{CandidateData, StateMap, TalosType};
use crate::messaging::errors::MessagingError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use strum::{Display, EnumString};

pub static HEADER_MESSAGE_TYPE: &str = "messageType";
Expand All @@ -27,6 +28,8 @@ pub struct CandidateMessage {
pub writeset: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub statemap: Option<StateMap>,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_commit: Option<Box<Value>>,
pub published_at: i128,
}

Expand All @@ -41,6 +44,7 @@ impl CandidateMessage {
snapshot: candidate.snapshot,
writeset: candidate.writeset,
statemap: candidate.statemap,
on_commit: candidate.on_commit,
published_at,
}
}
Expand Down Expand Up @@ -139,6 +143,7 @@ mod tests {
snapshot: 1_u64,
writeset: vec!["1".to_string()],
statemap: None,
on_commit: None,
},
0,
);
Expand Down
1 change: 1 addition & 0 deletions packages/talos_agent/src/messaging/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ mod tests_publisher {
snapshot: 2_u64,
writeset: vec!["1".to_string()],
statemap: None,
on_commit: None,
published_at: 0,
})
.unwrap();
Expand Down