Skip to content

Commit

Permalink
feat(cctx): track all outbound txs (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
fadeev authored Aug 26, 2023
1 parent 34bb111 commit d748d5b
Show file tree
Hide file tree
Showing 3 changed files with 797 additions and 706 deletions.
200 changes: 124 additions & 76 deletions helpers/tx.ts
Original file line number Diff line number Diff line change
@@ -1,96 +1,144 @@
import { getEndpoints } from "@zetachain/networks";
import axios from "axios";
import moment from "moment";
import ora from "ora";
import clc from "cli-color";
import Spinnies from "spinnies";
import WebSocket from "ws";

export const trackCCTX = (inboundTxHash: string): Promise<void> => {
const API = getEndpoints("cosmos-http", "zeta_testnet")[0]?.url;
if (API === undefined) {
throw new Error("getEndpoints: API endpoint not found");
}

const WSS = getEndpoints("tendermint-ws", "zeta_testnet")[0]?.url;
if (WSS === undefined) {
throw new Error("getEndpoints: WSS endpoint not found");
const getEndpoint = (key: any): string => {
const endpoint = getEndpoints(key, "zeta_testnet")[0]?.url;
if (!endpoint) {
throw new Error(`getEndpoints: ${key} endpoint not found`);
}
return endpoint;
};

return new Promise((resolve, reject) => {
let cctx_index: string;
let latest_status: string;
const fetchCCTX = async (
hash: string,
spinnies: any,
API: string,
cctxList: any
) => {
try {
const url = `${API}/zeta-chain/crosschain/inTxHashToCctx/${hash}`;
const apiResponse = await axios.get(url);
const res = apiResponse?.data?.inTxHashToCctx?.cctx_index;
res.forEach((cctxHash: any) => {
if (cctxHash && !cctxList[cctxHash]) {
cctxList[cctxHash] = [];
}
if (!spinnies.spinners[`spinner-${cctxHash}`]) {
spinnies.add(`spinner-${cctxHash}`, {
text: `${cctxHash}`,
});
}
});
} catch (error) {}
};

const spinner = ora(
"Looking for the cross-chain transaction (CCTX) on ZetaChain..."
).start();
const SUBSCRIBE_MESSAGE = {
id: 1,
jsonrpc: "2.0",
method: "subscribe",
params: ["tm.event='NewBlock'"],
};

const socket = new WebSocket(WSS);
const fetchCCTXData = async (
cctxHash: string,
spinnies: any,
API: string,
cctxList: any
) => {
const url = `${API}/zeta-chain/crosschain/cctx/${cctxHash}`;
const apiResponse = await axios.get(url);
const cctx = apiResponse?.data?.CrossChainTx;
const tx = {
receiver_chainId: cctx.outbound_tx_params[0].receiver_chainId,
sender_chain_id: cctx.inbound_tx_params.sender_chain_id,
status: cctx.cctx_status.status,
status_message: cctx.cctx_status.status_message,
};
const lastCCTX = cctxList[cctxHash][cctxList[cctxHash].length - 1];
const isEmpty = cctxList[cctxHash].length === 0;
if (isEmpty || lastCCTX?.status !== tx.status) {
cctxList[cctxHash].push(tx);
const sender = cctxList[cctxHash]?.[0].sender_chain_id;
const receiver = cctxList[cctxHash]?.[0].receiver_chainId;
const path = cctxList[cctxHash]
.map(
(x: any) =>
`${clc.bold.underline(x.status)} ${
x.status_message && "(" + x.status_message + ")"
}`
)
.join(" → ");
const text = {
text: `${cctxHash}: ${sender}${receiver}: ${path}`,
};
const id = `spinner-${cctxHash}`;
switch (tx.status) {
case "OutboundMined":
case "Reverted":
spinnies.succeed(id, text);
break;
case "Aborted":
spinnies.fail(id, text);
break;
default:
spinnies.update(id, text);
break;
}
}
};

socket.on("open", () => {
const subscribeMessage = {
id: 1,
jsonrpc: "2.0",
method: "subscribe",
params: ["tm.event='NewBlock'"],
};
socket.send(JSON.stringify(subscribeMessage));
});
export const trackCCTX = async (inboundTxHash: string): Promise<void> => {
const spinnies = new Spinnies();

socket.on("message", async (data: any) => {
const jsonData = JSON.parse(data);
const blockHeight = jsonData?.result?.data?.value?.block?.header?.height;
if (!cctx_index) {
try {
const url = `${API}/zeta-chain/crosschain/inTxHashToCctx/${inboundTxHash}`;
const apiResponse = await axios.get(url);
const res = apiResponse?.data?.inTxHashToCctx?.cctx_index;
if (res) {
cctx_index = res;
spinner.succeed(`CCTX hash found: ${cctx_index}\n`);
spinner.start(`Checking status of the CCTX...`);
}
} catch (error) {}
} else {
try {
const url = `${API}/zeta-chain/crosschain/cctx/${cctx_index}`;
const apiResponse = await axios.get(url);
const cctx = apiResponse?.data?.CrossChainTx;
const finalizedBlock =
cctx?.inbound_tx_params?.inbound_tx_finalized_zeta_height;
const pendingBlocks = blockHeight - finalizedBlock;
const { status, status_message } = cctx.cctx_status;
if (status != latest_status) {
latest_status = status;
spinner.info(`Status updated to "${status}": ${status_message}\n`);
const API = getEndpoint("cosmos-http");
const WSS = getEndpoint("tendermint-ws");

// TODO: disabled until https://github.com/zeta-chain/node/issues/766 is fixed
//
// if (status === "PendingOutbound" && pendingBlocks > 100) {
// const time = moment
// .duration(pendingBlocks * 5, "seconds")
// .humanize();
// spinner.warn(
// `CCTX is pending for too long (${pendingBlocks} blocks, about ${time})\n`
// );
// }
spinner.start(`Checking status of the CCTX...`);
if (/^(Aborted|Reverted|OutboundMined)$/.test(status)) {
socket.close();
spinner.succeed("CCTX has been finalized on ZetaChain");
resolve();
return;
}
}
} catch (error) {}
return new Promise((resolve, reject) => {
let cctxList: any = {};
const socket = new WebSocket(WSS);
socket.on("open", () => socket.send(JSON.stringify(SUBSCRIBE_MESSAGE)));
socket.on("message", async () => {
if (Object.keys(cctxList).length === 0) {
spinnies.add(`search`, {
text: `Looking for cross-chain transactions (CCTXs) on ZetaChain...\n`,
});
await fetchCCTX(inboundTxHash, spinnies, API, cctxList);
}
for (const txHash in cctxList) {
await fetchCCTX(txHash, spinnies, API, cctxList);
}
if (Object.keys(cctxList).length > 0) {
if (spinnies.spinners["search"]) {
spinnies.succeed(`search`, {
text: `CCTXs on ZetaChain found.\n`,
});
}
for (const cctxHash in cctxList) {
try {
fetchCCTXData(cctxHash, spinnies, API, cctxList);
} catch (error) {}
}
}
if (
Object.keys(cctxList).length > 0 &&
Object.keys(cctxList)
.map((c: any) => {
const last = cctxList[c][cctxList[c].length - 1];
return last?.status;
})
.filter((s) => !["OutboundMined", "Aborted", "Reverted"].includes(s))
.length === 0
) {
socket.close();
}
});

socket.on("error", (error: any) => {
spinner.fail(`WebSocket error: ${error}`);
reject(error);
});

socket.on("close", () => {
spinner.stop();
resolve();
});
});
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@
"bech32": "^2.0.0",
"bip39": "^3.1.0",
"bitcoinjs-lib": "^6.1.3",
"cli-color": "^2.0.3",
"ecpair": "^2.1.0",
"ethers": "^5.4.7",
"form-data": "^4.0.0",
"hardhat": "^2.15.0",
"moment": "^2.29.4",
"ora": "5.4.1",
"spinnies": "^0.5.1",
"readline": "^1.3.0",
"tiny-secp256k1": "^2.2.3",
"ws": "^8.13.0"
}
}
}
Loading

0 comments on commit d748d5b

Please sign in to comment.