diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..12d40dc --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "editor.defaultFormatter": "vscode.typescript-language-features" +} \ No newline at end of file diff --git a/package.json b/package.json index a288646..a381298 100644 --- a/package.json +++ b/package.json @@ -12,9 +12,10 @@ "dependencies": { "@solana-program/compute-budget": "^0.5.0", "@solana-program/system": "^0.5.0", - "@solana/promises": "2.0.0-canary-20241004222707", - "@solana/transaction-confirmation": "2.0.0-canary-20241004222707", - "@solana/web3.js": "2.0.0-canary-20241004222707", + "@solana/promises": "2.0.0-canary-20241023093758", + "@solana/signers": "2.0.0-canary-20241023093758", + "@solana/transaction-confirmation": "2.0.0-canary-20241023093758", + "@solana/web3.js": "2.0.0-canary-20241023093758", "axios": "^1.7.3", "bs58": "^6.0.0", "dotenv": "^16.4.5", @@ -22,37 +23,37 @@ "undici": "^6.19.5" }, "overrides": { - "@solana/accounts": "2.0.0-canary-20241004222707", - "@solana/addresses": "2.0.0-canary-20241004222707", - "@solana/assertions": "2.0.0-canary-20241004222707", - "@solana/codecs": "2.0.0-canary-20241004222707", - "@solana/codecs-core": "2.0.0-canary-20241004222707", - "@solana/codecs-data-structures": "2.0.0-canary-20241004222707", - "@solana/codecs-numbers": "2.0.0-canary-20241004222707", - "@solana/codecs-strings": "2.0.0-canary-20241004222707", - "@solana/errors": "2.0.0-canary-20241004222707", - "@solana/fast-stable-stringify": "2.0.0-canary-20241004222707", - "@solana/functional": "2.0.0-canary-20241004222707", - "@solana/instructions": "2.0.0-canary-20241004222707", - "@solana/keys": "2.0.0-canary-20241004222707", - "@solana/programs": "2.0.0-canary-20241004222707", - "@solana/rpc": "2.0.0-canary-20241004222707", - "@solana/rpc-api": "2.0.0-canary-20241004222707", - "@solana/rpc-parsed-types": "2.0.0-canary-20241004222707", - "@solana/rpc-spec": "2.0.0-canary-20241004222707", - "@solana/rpc-spec-types": "2.0.0-canary-20241004222707", - "@solana/rpc-subscriptions": "2.0.0-canary-20241004222707", - "@solana/rpc-subscriptions-api": "2.0.0-canary-20241004222707", - "@solana/rpc-subscriptions-channel-websocket": "2.0.0-canary-20241004222707", - "@solana/rpc-subscriptions-spec": "2.0.0-canary-20241004222707", - "@solana/rpc-subscribable": "2.0.0-canary-20241004222707", - "@solana/rpc-transformers": "2.0.0-canary-20241004222707", - "@solana/rpc-transport-http": "2.0.0-canary-20241004222707", - "@solana/rpc-types": "2.0.0-canary-20241004222707", - "@solana/sysvars": "2.0.0-canary-20241004222707", - "@solana/transaction-confirmation": "2.0.0-canary-20241004222707", - "@solana/transaction-messages": "2.0.0-canary-20241004222707", - "@solana/transactions": "2.0.0-canary-20241004222707" + "@solana/accounts": "2.0.0-canary-20241023093758", + "@solana/addresses": "2.0.0-canary-20241023093758", + "@solana/assertions": "2.0.0-canary-20241023093758", + "@solana/codecs": "2.0.0-canary-20241023093758", + "@solana/codecs-core": "2.0.0-canary-20241023093758", + "@solana/codecs-data-structures": "2.0.0-canary-20241023093758", + "@solana/codecs-numbers": "2.0.0-canary-20241023093758", + "@solana/codecs-strings": "2.0.0-canary-20241023093758", + "@solana/errors": "2.0.0-canary-20241023093758", + "@solana/fast-stable-stringify": "2.0.0-canary-20241023093758", + "@solana/functional": "2.0.0-canary-20241023093758", + "@solana/instructions": "2.0.0-canary-20241023093758", + "@solana/keys": "2.0.0-canary-20241023093758", + "@solana/programs": "2.0.0-canary-20241023093758", + "@solana/rpc": "2.0.0-canary-20241023093758", + "@solana/rpc-api": "2.0.0-canary-20241023093758", + "@solana/rpc-parsed-types": "2.0.0-canary-20241023093758", + "@solana/rpc-spec": "2.0.0-canary-20241023093758", + "@solana/rpc-spec-types": "2.0.0-canary-20241023093758", + "@solana/rpc-subscriptions": "2.0.0-canary-20241023093758", + "@solana/rpc-subscriptions-api": "2.0.0-canary-20241023093758", + "@solana/rpc-subscriptions-channel-websocket": "2.0.0-canary-20241023093758", + "@solana/rpc-subscriptions-spec": "2.0.0-canary-20241023093758", + "@solana/rpc-subscribable": "2.0.0-canary-20241023093758", + "@solana/rpc-transformers": "2.0.0-canary-20241023093758", + "@solana/rpc-transport-http": "2.0.0-canary-20241023093758", + "@solana/rpc-types": "2.0.0-canary-20241023093758", + "@solana/sysvars": "2.0.0-canary-20241023093758", + "@solana/transaction-confirmation": "2.0.0-canary-20241023093758", + "@solana/transaction-messages": "2.0.0-canary-20241023093758", + "@solana/transactions": "2.0.0-canary-20241023093758" }, "devDependencies": { "@types/node": "^20.12.7" diff --git a/ping-thing-client.mjs b/ping-thing-client.mjs index 19d1971..4334e50 100644 --- a/ping-thing-client.mjs +++ b/ping-thing-client.mjs @@ -1,35 +1,38 @@ import { - createDefaultRpcTransport, createTransactionMessage, pipe, - setTransactionMessageFeePayer, + setTransactionMessageFeePayerSigner, setTransactionMessageLifetimeUsingBlockhash, createKeyPairFromBytes, - getAddressFromPublicKey, createSignerFromKeyPair, - signTransaction, appendTransactionMessageInstructions, - sendTransactionWithoutConfirmingFactory, - createSolanaRpcSubscriptions_UNSTABLE, + signTransactionMessageWithSigners, + SOLANA_ERROR__TRANSACTION_ERROR__BLOCKHASH_NOT_FOUND, + isSolanaError, getSignatureFromTransaction, - compileTransaction, - createSolanaRpc, - SOLANA_ERROR__TRANSACTION_ERROR__BLOCKHASH_NOT_FOUND + sendTransactionWithoutConfirmingFactory, + SOLANA_ERROR__BLOCK_HEIGHT_EXCEEDED, // Address, } from "@solana/web3.js"; import dotenv from "dotenv"; import bs58 from "bs58"; import { getSetComputeUnitLimitInstruction } from "@solana-program/compute-budget"; import { getTransferSolInstruction } from "@solana-program/system"; -import { createRecentSignatureConfirmationPromiseFactory } from "@solana/transaction-confirmation"; import { sleep } from "./utils/misc.mjs"; -import { watchBlockhash } from "./utils/blockhash.mjs"; -import { watchSlotSent } from "./utils/slot.mjs"; +import { getLatestBlockhash } from "./utils/blockhash.mjs"; +import { rpc, rpcSubscriptions } from "./utils/rpc.mjs"; +import { getNextSlot } from "./utils/slot.mjs"; import { setMaxListeners } from "events"; import axios from "axios"; +import { createBlockHeightExceedencePromiseFactory, createRecentSignatureConfirmationPromiseFactory } from "@solana/transaction-confirmation"; +import { safeRace } from '@solana/promises'; dotenv.config(); +function safeJSONStringify(val) { + return JSON.stringify(val, (_, val) => typeof val === 'bigint' ? val.toString() : val); +} + const orignalConsoleLog = console.log; console.log = function (...message) { const dateTime = new Date().toUTCString(); @@ -42,10 +45,6 @@ process.on("SIGINT", function () { process.exit(); }); - -const RPC_ENDPOINT = process.env.RPC_ENDPOINT; -const WS_ENDPOINT = process.env.WS_ENDPOINT; - const SLEEP_MS_RPC = process.env.SLEEP_MS_RPC || 2000; const SLEEP_MS_LOOP = process.env.SLEEP_MS_LOOP || 0; const VA_API_KEY = process.env.VA_API_KEY; @@ -56,19 +55,21 @@ const SKIP_VALIDATORS_APP = process.env.SKIP_VALIDATORS_APP || false; if (VERBOSE_LOG) console.log(`Starting script`); -const connection = createSolanaRpc(RPC_ENDPOINT) - -const rpcSubscriptions = createSolanaRpcSubscriptions_UNSTABLE( - WS_ENDPOINT -); - let USER_KEYPAIR; const TX_RETRY_INTERVAL = 2000; -const gBlockhash = { value: null, updated_at: 0, lastValidBlockHeight: 0 }; +setMaxListeners(100); + +const mConfirmRecentSignature = createRecentSignatureConfirmationPromiseFactory({ + rpc, + rpcSubscriptions, +}); +const mThrowOnBlockheightExceedence = createBlockHeightExceedencePromiseFactory({ + rpc, + rpcSubscriptions, +}); +const mSendTransactionWithoutConfirming = sendTransactionWithoutConfirmingFactory({ rpc }); -// Record new slot on `firstShredReceived` -const gSlotSent = { value: null, updated_at: 0 }; async function pingThing() { USER_KEYPAIR = await createKeyPairFromBytes( bs58.decode(process.env.WALLET_PRIVATE_KEYPAIR) @@ -81,114 +82,94 @@ async function pingThing() { const MAX_TRIES = 3; let tryCount = 0; - const feePayer = await getAddressFromPublicKey(USER_KEYPAIR.publicKey); const signer = await createSignerFromKeyPair(USER_KEYPAIR); - + const BASE_TRANSACTION_MESSAGE = pipe( + createTransactionMessage({ version: 0 }), + (tx) => setTransactionMessageFeePayerSigner(signer, tx), + (tx) => + appendTransactionMessageInstructions( + [ + getSetComputeUnitLimitInstruction({ + units: 500, + }), + getTransferSolInstruction({ + source: signer, + destination: signer.address, + amount: 5000, + }), + ], + tx + ) + ); while (true) { await sleep(SLEEP_MS_LOOP); - let blockhash; - let lastValidBlockHeight; let slotSent; let slotLanded; let signature; let txStart; let txSendAttempts = 1; - // Wait fresh data - while (true) { - if ( - Date.now() - gBlockhash.updated_at < 10000 && - Date.now() - gSlotSent.updated_at < 50 - ) { - blockhash = gBlockhash.value; - lastValidBlockHeight = gBlockhash.lastValidBlockHeight; - slotSent = gSlotSent.value; - break; - } - - await sleep(1); - } try { + const pingAbortController = new AbortController(); try { - const transaction = pipe( - createTransactionMessage({ version: 0 }), - (tx) => setTransactionMessageFeePayer(feePayer, tx), - (tx) => - setTransactionMessageLifetimeUsingBlockhash( - { - blockhash: gBlockhash.value, - lastValidBlockHeight: gBlockhash.lastValidBlockHeight, - }, - tx - ), - (tx) => - appendTransactionMessageInstructions( - [ - getSetComputeUnitLimitInstruction({ - units: 500, - }), - getTransferSolInstruction({ - source: signer, - destination: feePayer, - amount: 5000, - }), - ], - tx - ) - ); - const transactionSignedWithFeePayer = await signTransaction( - [USER_KEYPAIR], - compileTransaction(transaction) + const latestBlockhash = await getLatestBlockhash(); + const transactionMessage = setTransactionMessageLifetimeUsingBlockhash( + latestBlockhash, + BASE_TRANSACTION_MESSAGE ); + const transactionSignedWithFeePayer = + await signTransactionMessageWithSigners(transactionMessage); signature = getSignatureFromTransaction(transactionSignedWithFeePayer); - txStart = Date.now(); - console.log(`Sending ${signature}`); - const mSendTransaction = sendTransactionWithoutConfirmingFactory({ - rpc: connection, + let rejectSendLoop; + const sendLoopPromise = new Promise((_, reject) => { + rejectSendLoop = reject; }); - - const getRecentSignatureConfirmationPromise = - createRecentSignatureConfirmationPromiseFactory({ - rpc: connection, - rpcSubscriptions, + let sendAbortController; + function sendTransaction() { + sendAbortController = new AbortController(); + mSendTransactionWithoutConfirming(transactionSignedWithFeePayer, { + abortSignal: sendAbortController.signal, + commitment: COMMITMENT_LEVEL, + maxRetries: 0n, + skipPreflight: true, + }).catch(e => { + if (e instanceof Error && e.name === 'AbortError') { + return; + } else { + rejectSendLoop(e); + } }); - setMaxListeners(100); - const abortController = new AbortController(); - - while (true) { - try { - await mSendTransaction(transactionSignedWithFeePayer, { - commitment: "confirmed", - maxRetries: 0n, - skipPreflight: true, - }); - - await Promise.race([ - getRecentSignatureConfirmationPromise({ - signature, - commitment: "confirmed", - abortSignal: abortController.signal, - }), - sleep(TX_RETRY_INTERVAL * txSendAttempts).then(() => { - throw new Error("Tx Send Timeout"); - }), - ]); - - console.log(`Confirmed tx ${signature}`); - - break; - } catch (e) { - console.log( - `Tx not confirmed after ${ - TX_RETRY_INTERVAL * txSendAttempts++ - }ms, resending` - ); - } } + slotSent = await getNextSlot(); + const sendRetryInterval = setInterval(() => { + sendAbortController.abort(); + console.log(`Tx not confirmed after ${TX_RETRY_INTERVAL * txSendAttempts++}ms, resending`); + sendTransaction(); + }, TX_RETRY_INTERVAL); + pingAbortController.signal.addEventListener('abort', () => { + clearInterval(sendRetryInterval); + sendAbortController.abort(); + }); + txStart = Date.now(); + sendTransaction(); + await safeRace([ + mConfirmRecentSignature({ + abortSignal: pingAbortController.signal, + commitment: COMMITMENT_LEVEL, + signature, + }), + mThrowOnBlockheightExceedence({ + abortSignal: pingAbortController.signal, + commitment: COMMITMENT_LEVEL, + lastValidBlockHeight: latestBlockhash.lastValidBlockHeight, + }), + sendLoopPromise, + ]); + console.log(`Confirmed tx ${signature}`); } catch (e) { // Log and loop if we get a bad blockhash. if (isSolanaError(e, SOLANA_ERROR__TRANSACTION_ERROR__BLOCKHASH_NOT_FOUND)) { @@ -199,7 +180,7 @@ async function pingThing() { // If the transaction expired on the chain. Make a log entry and send // to VA. Otherwise log and loop. - if (e.name === "TransactionExpiredBlockheightExceededError") { + if (isSolanaError(e, SOLANA_ERROR__BLOCK_HEIGHT_EXCEEDED)) { console.log( `ERROR: Blockhash expired/block height exceeded. TX failure sent to VA.` ); @@ -207,12 +188,14 @@ async function pingThing() { console.log(`ERROR: ${e.name}`); console.log(e.message); console.log(e); - console.log(JSON.stringify(e)); + console.log(safeJSONStringify(e)); continue; } // Need to submit a fake signature to pass the import filters signature = FAKE_SIGNATURE; + } finally { + pingAbortController.abort(); } const txEnd = Date.now(); @@ -220,7 +203,7 @@ async function pingThing() { await sleep(SLEEP_MS_RPC); if (signature !== FAKE_SIGNATURE) { // Capture the slotLanded - let txLanded = await connection + let txLanded = await rpc .getTransaction(signature, { commitment: COMMITMENT_LEVEL, maxSupportedTransactionVersion: 255, @@ -246,7 +229,7 @@ async function pingThing() { } // prepare the payload to send to validators.app - const vAPayload = JSON.stringify({ + const vAPayload = safeJSONStringify({ time: txEnd - txStart, signature, transaction_type: "transfer", @@ -279,9 +262,8 @@ async function pingThing() { if (VERBOSE_LOG) { console.log( - `VA Response ${ - vaResponse.status - } ${JSON.stringify(vaResponse.data)}` + `VA Response ${vaResponse.status + } ${safeJSONStringify(vaResponse.data)}` ); } } @@ -296,8 +278,4 @@ async function pingThing() { } } -await Promise.all([ - watchBlockhash(gBlockhash, connection), - watchSlotSent(gSlotSent, rpcSubscriptions), - pingThing(), -]); +await pingThing(); diff --git a/utils/blockhash.mjs b/utils/blockhash.mjs index 5c88dac..027d7cb 100644 --- a/utils/blockhash.mjs +++ b/utils/blockhash.mjs @@ -1,63 +1,118 @@ -import { sleep } from "./misc.mjs"; +import { safeRace } from "@solana/promises"; +import { address } from "@solana/web3.js"; -const MAX_BLOCKHASH_FETCH_ATTEMPTS = process.env.MAX_BLOCKHASH_FETCH_ATTEMPTS || 5; -let attempts = 0; +import { timeout } from "./misc.mjs"; +import { rpc, rpcSubscriptions } from "./rpc.mjs"; -export const watchBlockhash = async (gBlockhash, connection) => { - // const gBlockhash = { value: null, updated_at: 0 }; - while (true) { - try { - // Use a 5 second timeout to avoid hanging the script - const timeoutPromise = new Promise((_, reject) => - setTimeout( - () => - reject( - new Error( - `${new Date().toISOString()} ERROR: Blockhash fetch operation timed out` - ) - ), - 5000 - ) - ); - // Get the latest blockhash from the RPC node and update the global - // blockhash object with the new value and timestamp. If the RPC node - // fails to respond within 5 seconds, the promise will reject and the - // script will log an error. - const latestBlockhash = await Promise.race([ - connection.getLatestBlockhash().send(), - timeoutPromise, - ]); +const MAX_BLOCKHASH_FETCH_ATTEMPTS = + process.env.MAX_BLOCKHASH_FETCH_ATTEMPTS || 5; +const RECENT_BLOCKHASHES_ADDRESS = address( + "SysvarRecentB1ockHashes11111111111111111111" +); - gBlockhash.value = latestBlockhash.value.blockhash; - gBlockhash.lastValidBlockHeight = latestBlockhash.value.lastValidBlockHeight; +async function getDifferenceBetweenSlotHeightAndBlockHeight() { + const { absoluteSlot, blockHeight } = await rpc + .getEpochInfo() + .send({ abortSignal: AbortSignal.any([]) }); + return absoluteSlot - blockHeight; +} - gBlockhash.updated_at = Date.now(); - attempts = 0; - } catch (error) { - gBlockhash.value = null; - gBlockhash.updated_at = 0; +function getLatestBlockhashesFromNotification( + { + context: { slot }, + value: { + data: { + parsed: { + info: blockhashes, + }, + }, + }, + }, + differenceBetweenSlotHeightAndBlockHeight +) { + return blockhashes.map(({ blockhash }) => ({ + blockhash, + lastValidBlockHeight: + slot - differenceBetweenSlotHeightAndBlockHeight + 150n, + })); +} - ++attempts; +let resolveInitialLatestBlockhashes; +let latestBlockhashesPromise = new Promise((resolve) => { + resolveInitialLatestBlockhashes = resolve; +}); - if (error.message.includes("new blockhash")) { - console.log( - `${new Date().toISOString()} ERROR: Unable to obtain a new blockhash` +let usedBlockhashes = new Set(); + +(async () => { + let attempts = 0; + while (true) { + try { + const [ + differenceBetweenSlotHeightAndBlockHeight, + recentBlockhashesNotifications, + ] = await safeRace([ + Promise.all([ + getDifferenceBetweenSlotHeightAndBlockHeight(), + rpcSubscriptions + .accountNotifications(RECENT_BLOCKHASHES_ADDRESS, { + encoding: "jsonParsed", + }) + .subscribe({ abortSignal: AbortSignal.any([]) }), + ]), + // If the RPC node fails to respond within 5 seconds, throw an error. + timeout(5000), + ]); + // Iterate over the notificatons forever, constantly updating the `latestBlockhashes` cache. + for await (const notification of recentBlockhashesNotifications) { + const nextLatestBlockhashes = getLatestBlockhashesFromNotification( + notification, + differenceBetweenSlotHeightAndBlockHeight + ); + const nextUsedBlockhashes = new Set(); + for (const { blockhash } of nextLatestBlockhashes) { + if (usedBlockhashes.has(blockhash)) { + nextUsedBlockhashes.add(blockhash) + } + } + usedBlockhashes = nextUsedBlockhashes; + attempts = 0; + if (resolveInitialLatestBlockhashes) { + resolveInitialLatestBlockhashes(nextLatestBlockhashes); + resolveInitialLatestBlockhashes = undefined; + } else { + latestBlockhashesPromise = Promise.resolve(nextLatestBlockhashes); + } + } + } catch (e) { + if (e.message === "Timeout") { + console.error( + `${new Date().toISOString()} ERROR: Blockhash fetch operation timed out` ); } else { - console.log(`${new Date().toISOString()} ERROR: ${error.name}`); - console.log(error.message); - console.log(error); - console.log(JSON.stringify(error)); + console.error(e); } - } finally { - if (attempts >= MAX_BLOCKHASH_FETCH_ATTEMPTS) { - console.log( + if (++attempts >= MAX_BLOCKHASH_FETCH_ATTEMPTS) { + console.error( `${new Date().toISOString()} ERROR: Max attempts for fetching blockhash reached, exiting` ); process.exit(0); } } + } +})(); - await sleep(5000); +export async function getLatestBlockhash() { + const latestBlockhashes = await latestBlockhashesPromise; + const latestUnusedBlockhash = latestBlockhashes.find( + ({ blockhash }) => !usedBlockhashes.has(blockhash), + ); + if (!latestUnusedBlockhash) { + console.error( + `${new Date().toISOString()} ERROR: Ran out of unused blockhashes before the subscription could replenish them`, + ); + process.exit(0); } -}; \ No newline at end of file + usedBlockhashes.add(latestUnusedBlockhash.blockhash); + return latestUnusedBlockhash; +} diff --git a/utils/misc.mjs b/utils/misc.mjs index 78fc790..6ca2902 100644 --- a/utils/misc.mjs +++ b/utils/misc.mjs @@ -1,2 +1,7 @@ export const sleep = async (duration) => - await new Promise((resolve) => setTimeout(resolve, duration)); \ No newline at end of file + await new Promise((resolve) => setTimeout(resolve, duration)); + +export const timeout = async (duration) => + await new Promise((_, reject) => setTimeout(() => { + reject(new Error('Timeout')); + }, duration)); \ No newline at end of file diff --git a/utils/rpc.mjs b/utils/rpc.mjs new file mode 100644 index 0000000..6237659 --- /dev/null +++ b/utils/rpc.mjs @@ -0,0 +1,11 @@ +import dotenv from 'dotenv'; +import { createSolanaRpc, createSolanaRpcSubscriptions_UNSTABLE } from "@solana/web3.js"; + +dotenv.config(); + +export const rpc = createSolanaRpc( + process.env.RPC_ENDPOINT, +); +export const rpcSubscriptions = createSolanaRpcSubscriptions_UNSTABLE( + process.env.WS_ENDPOINT, +); \ No newline at end of file diff --git a/utils/slot.mjs b/utils/slot.mjs index 7da87b2..0eca7f0 100644 --- a/utils/slot.mjs +++ b/utils/slot.mjs @@ -1,46 +1,57 @@ -import { sleep } from "./misc.mjs"; +import dotenv from 'dotenv'; +import { createCipheriv } from "crypto"; +import { rpcSubscriptions } from "./rpc.mjs"; -import { createSolanaRpcSubscriptions_UNSTABLE } from "@solana/web3.js"; +dotenv.config(); const MAX_SLOT_FETCH_ATTEMPTS = process.env.MAX_SLOT_FETCH_ATTEMPTS || 100; -let attempts = 0; -const abortController = new AbortController(); -export const watchSlotSent = async (gSlotSent, rpcSubscriptions) => { - try { - const slotNotifications = await rpcSubscriptions - .slotsUpdatesNotifications() - .subscribe({ abortSignal: abortController.signal }); - - for await (const notification of slotNotifications) { - if (notification.type === "firstShredReceived") { - gSlotSent.value = notification.slot; - gSlotSent.updated_at = Date.now(); - attempts = 0; - continue; - } - - gSlotSent.value = null; - gSlotSent.updated_at = 0; - - ++attempts; - - if (attempts >= MAX_SLOT_FETCH_ATTEMPTS) { - console.log( - `ERROR: Max attempts for fetching slot type "firstShredReceived" reached, exiting` - ); - process.exit(0); - } - - // If update not received in last 3s, re-subscribe - if (gSlotSent.value !== null) { - while (Date.now() - gSlotSent.updated_at < 3000) { - await sleep(1); +let resolveSlotPromise; +let nextSlotPromise; + +(async () => { + let attempts = 0; + while (true) { + try { + const slotNotifications = await rpcSubscriptions + .slotsUpdatesNotifications() + .subscribe({ abortSignal: AbortSignal.any([]) }); + for await (const { slot, type } of slotNotifications) { + let nextSlot; + switch (type) { + case 'completed': + nextSlot = slot + 1n; + break; + case 'firstShredReceived': + nextSlot = slot; + break + } + if (nextSlot != null) { + attempts = 0; + if (resolveSlotPromise) { + resolveSlotPromise(nextSlot); + } + resolveSlotPromise = undefined; + nextSlotPromise = undefined; + continue; + } + if (++attempts >= MAX_SLOT_FETCH_ATTEMPTS) { + console.log( + `ERROR: Max attempts for fetching slot type "completed" or "firstShredReceived" reached, exiting` + ); + process.exit(0); } } + } catch (e) { + console.log(`ERROR: ${e}`); + ++attempts; } - } catch (e) { - console.log(`ERROR: ${e}`); - ++attempts; } -}; +})(); + +export async function getNextSlot() { + nextSlotPromise ||= new Promise((resolve) => { + resolveSlotPromise = resolve + }); + return await nextSlotPromise; +}