Skip to content

Commit

Permalink
feat: Initial draft for errors handling between Rust and JS
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred committed Sep 18, 2023
1 parent f261ebf commit 8a917ae
Show file tree
Hide file tree
Showing 18 changed files with 2,329 additions and 904 deletions.
1,924 changes: 1,074 additions & 850 deletions cohort_banking_initiator_js/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cohort_banking_initiator_js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"dependencies": {
"@types/pg": "^8.10.2",
"cohort_sdk_client": "file:../cohort_sdk_client",
"cohort_sdk_js": "file:../packages/cohort_sdk_js",
"nodejs-dashboard": "^0.5.1",
"pg": "^8.11.3",
Expand Down
12 changes: 7 additions & 5 deletions cohort_banking_initiator_js/src/banking-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { Pond } from "./pond"

import { logger } from "./logger"

import { CapturedItemState, CapturedState, CertificationCandidate, CertificationRequest, TransferRequest, TransferRequestMessage } from "./model"
import { Initiator, JsCertificationRequestPayload, OutOfOrderRequest } from "cohort_sdk_js"
import { CapturedItemState, CapturedState, TransferRequest, TransferRequestMessage } from "./model"
import { SDK_CONFIG as sdkConfig } from "./cfg/config-cohort-sdk"
import { Initiator, JsCertificationRequestPayload, OutOfOrderRequest } from "cohort_sdk_client"

export class BankingApp {
private startedAtMs: number = 0
Expand All @@ -25,13 +25,14 @@ export class BankingApp {

async init() {
this.initiator = await Initiator.init(sdkConfig)

this.queue.onmessage = (event: MessageEvent<TransferRequestMessage>) => {
this.pond.submit(async () => {
try {
const spans = await this.processQueueItem(event)
this.spans.push(spans)
} catch (e) {
logger.error("Failed to process tx: %s", e)
logger.error("%s", e)
}
})
}
Expand Down Expand Up @@ -59,7 +60,7 @@ export class BankingApp {
const subSpans = await this.handleTransaction(event.data.request)
spans.processDetails = subSpans
} catch (e) {
logger.error("Unable to process tx: %s. Error:: %s", JSON.stringify(event.data), e)
logger.error("Unable to process tx: %s. Error: %s", JSON.stringify(event.data), e)
} finally {
this.handledCount++
spans.process = Date.now() - span_s
Expand Down Expand Up @@ -148,7 +149,7 @@ export class BankingApp {
return new CapturedState(Number(result.rows[0].snapshot_version), items)
} catch (e) {
logger.error("BankingApp.loadState(): %s", e)
throw e
throw new Error(`Unable to load state for tx: ${ JSON.stringify(tx) }`, { cause: e })
} finally {
cnn?.release()
}
Expand Down Expand Up @@ -203,6 +204,7 @@ export class BankingApp {

} catch (e) {
logger.error("BankingApp.installOutOfOrder(): %s", e)
throw new Error(`Unable to complete out of order installation of tx: ${ JSON.stringify(tx) }`, { cause: e })
} finally {
cnn?.release()
}
Expand Down
2 changes: 1 addition & 1 deletion cohort_banking_initiator_js/src/cfg/config-cohort-sdk.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JsInitiatorConfig } from "cohort_sdk_js"
import { JsInitiatorConfig } from "cohort_sdk_client"

const SDK_CONFIG: JsInitiatorConfig = {
backoffOnConflict: {
Expand Down
52 changes: 51 additions & 1 deletion cohort_banking_initiator_js/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { createGeneratorService } from "./load-generator"
import { BankingApp } from "./banking-app"
import { DB_CONFIG } from "./cfg/config-db-pool"
import { Pond } from "./pond"
import { TalosSdkError, SdkErrorKind } from "cohort_sdk_client"

logger.info("App: Cohort JS Application: %d", 111)
logger.info("App: ---------------------")
Expand Down Expand Up @@ -46,6 +47,45 @@ class LaunchParams {
new Promise(async (resolve) => {
const params = LaunchParams.parse(process.argv)

// try {
// new SomeRustServiceClass().testCatchWrapAndThrow()
// } catch (e) {
// console.log(e.message)
// console.log(e)
// console.log(e.cause)
// return
// }

// try {
// new SomeRustServiceClass().example1DirectThrow(100)
// } catch (e) {
// logger.info("- - - - - - - - App caught error - - - - - - - - - - - -")
// if (e instanceof TalosSdkError) {
// if (e.code === 100) {
// logger.error("Caught TalosSdkError with code 100: %s\n%s\n%s", e.message, e, e.cause)
// } else {
// logger.error("Caught TalosSdkError with unexpected code: %s. Error: %s", e.code, e)
// }
// } else {
// logger.error("Caught some generic Error:\ndetails:\n\t%s", e)
// }
// logger.info("- - - - - - - - - - - - - - - - - - - - - - - - - -")
// throw e
// }

// try {
// new SomeRustServiceClass().example2SimulateRustError()
// } catch (e) {
// logger.info("- - - - - - - - App caught error - - - - - - - - - - - -")
// if (e instanceof TalosSdkError) {
// logger.error("Caught TalosSdkError:\n%s\n%s", e.message, e, e.cause)
// } else {
// logger.error("Caught some generic Error:\ndetails:\n\t%s", e)
// }
// logger.info("- - - - - - - - - - - - - - - - - - - - - - - - - -")
// throw e
// }

const database = new Pool(DB_CONFIG)
database.on("error", (e, _) => { logger.error("DBPool.error: Error: %s", e) })
database.on("release", (e, _) => { if (e) { logger.error("DBPool.release: Error: %s", e) } })
Expand All @@ -71,6 +111,16 @@ new Promise(async (resolve) => {
queue,
fnFinish,
)
await app.init()
try {
await app.init()
} catch (e) {
if (e instanceof TalosSdkError) {
const sdkError = e as TalosSdkError
if (sdkError.kind == SdkErrorKind.Messaging) {
logger.error("Unable to connect to kafka....")
}
throw e
}
}
const _worker = createGeneratorService({ channelName: CHANNEL_NAME, count: params.transactionsCount, rate: params.targetRatePerSecond })
})
Loading

0 comments on commit 8a917ae

Please sign in to comment.