Skip to content

Commit

Permalink
add back websocket support as env var toggle
Browse files Browse the repository at this point in the history
  • Loading branch information
wphan committed Nov 27, 2023
1 parent 4380a3d commit 666488e
Showing 1 changed file with 62 additions and 27 deletions.
89 changes: 62 additions & 27 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import {
UserStatsMap,
DLOBSubscriber,
BulkAccountLoader,
DriftClientSubscriptionConfig,
SlotSubscriber,
SlotSource,
} from '@drift-labs/sdk';

import { logger, setLogLevel } from './utils/logger';
Expand Down Expand Up @@ -53,6 +56,7 @@ const sdkConfig = initialize({ env: process.env.ENV });
const stateCommitment: Commitment = 'processed';
const serverPort = process.env.PORT || 6969;
export const ORDERBOOK_UPDATE_INTERVAL = 1000;
const useWebsocket = process.env.USE_WEBSOCKET?.toLowerCase() === 'true';

const rateLimitCallsPerSecond = process.env.RATE_LIMIT_CALLS_PER_SECOND
? parseInt(process.env.RATE_LIMIT_CALLS_PER_SECOND)
Expand Down Expand Up @@ -117,6 +121,7 @@ const endpoint = process.env.ENDPOINT;
const wsEndpoint = process.env.WS_ENDPOINT;
logger.info(`RPC endpoint: ${endpoint}`);
logger.info(`WS endpoint: ${wsEndpoint}`);
logger.info(`useWebsocket: ${useWebsocket}`);
logger.info(`DriftEnv: ${driftEnv}`);
logger.info(`Commit: ${commitHash}`);

Expand Down Expand Up @@ -171,62 +176,92 @@ const main = async () => {
commitment: stateCommitment,
});

const bulkAccountLoader = new BulkAccountLoader(
connection,
stateCommitment,
ORDERBOOK_UPDATE_INTERVAL
);
// only set when polling
let bulkAccountLoader: BulkAccountLoader | undefined;

// only set when using websockets
let slotSubscriber: SlotSubscriber | undefined;

let accountSubscription: DriftClientSubscriptionConfig;
let slotSource: SlotSource;

if (!useWebsocket) {
bulkAccountLoader = new BulkAccountLoader(
connection,
stateCommitment,
ORDERBOOK_UPDATE_INTERVAL
);

accountSubscription = {
type: 'polling',
accountLoader: bulkAccountLoader,
};
slotSource = {
getSlot: () => bulkAccountLoader!.getSlot(),
};
} else {
accountSubscription = {
type: 'websocket',
commitment: stateCommitment,
};
slotSubscriber = new SlotSubscriber(connection);
await slotSubscriber.subscribe();
slotSource = {
getSlot: () => slotSubscriber!.getSlot(),
};
}

driftClient = new DriftClient({
connection,
wallet,
programID: clearingHousePublicKey,
accountSubscription: {
type: 'polling',
accountLoader: bulkAccountLoader,
},
accountSubscription,
env: driftEnv,
userStats: true,
});

const dlobCoder = DLOBOrdersCoder.create();

const lamportsBalance = await connection.getBalance(wallet.publicKey);
logger.info(
`DriftClient ProgramId: ${driftClient.program.programId.toBase58()}`
);
logger.info(`Wallet pubkey: ${wallet.publicKey.toBase58()}`);
logger.info(` . SOL balance: ${lamportsBalance / 10 ** 9}`);

await driftClient.subscribe();
driftClient.eventEmitter.on('error', (e) => {
logger.info('clearing house error');
logger.error(e);
});

setInterval(async () => {
lastSlotReceived = bulkAccountLoader.getSlot();
lastSlotReceived = slotSource.getSlot();
}, ORDERBOOK_UPDATE_INTERVAL);

logger.info(`Initializing userMap...`);
const initUserMapStart = Date.now();
const userMap = new UserMap(
driftClient,
driftClient.userAccountSubscriptionConfig,
false
);
await userMap.subscribe();
const userStatsMap = new UserStatsMap(driftClient, {
type: 'polling',
accountLoader: new BulkAccountLoader(connection, stateCommitment, 0),
});
logger.info(`userMap initialized in ${Date.now() - initUserMapStart} ms`);

logger.info(`Initializing userStatsMap...`);
const initUserStatsMapStart = Date.now();
const userStatsMap = new UserStatsMap(driftClient, accountSubscription);
await userStatsMap.subscribe();
logger.info(
`userStatsMap initialized in ${Date.now() - initUserStatsMapStart} ms`
);

logger.info(`Initializing DLOBSubscriber...`);
const initDlobSubscriberStart = Date.now();
const dlobSubscriber = new DLOBSubscriber({
driftClient,
dlobSource: userMap,
slotSource: bulkAccountLoader,
slotSource,
updateFrequency: ORDERBOOK_UPDATE_INTERVAL,
});
await dlobSubscriber.subscribe();
logger.info(
`DLOBSubscriber initialized in ${Date.now() - initDlobSubscriberStart} ms`
);

MARKET_SUBSCRIBERS = await initializeAllMarketSubscribers(driftClient);

Expand All @@ -253,7 +288,7 @@ const main = async () => {
// object with userAccount key and orders object serialized
const orders: Array<any> = [];
const oracles: Array<any> = [];
const slot = bulkAccountLoader.getSlot();
const slot = slotSource.getSlot();

for (const market of driftClient.getPerpMarketAccounts()) {
const oracle = driftClient.getOracleDataForPerpMarket(
Expand Down Expand Up @@ -297,7 +332,7 @@ const main = async () => {
app.get('/orders/json', async (_req, res, next) => {
try {
// object with userAccount key and orders object serialized
const slot = bulkAccountLoader.getSlot();
const slot = slotSource.getSlot();
const orders: Array<any> = [];
const oracles: Array<any> = [];
for (const market of driftClient.getPerpMarketAccounts()) {
Expand Down Expand Up @@ -460,7 +495,7 @@ const main = async () => {

res.end(
JSON.stringify({
slot: bulkAccountLoader.getSlot(),
slot: slotSource.getSlot(),
data: dlobCoder.encode(dlobOrders).toString('base64'),
})
);
Expand Down Expand Up @@ -549,7 +584,7 @@ const main = async () => {
.getDLOB()
.getRestingLimitBids(
normedMarketIndex,
bulkAccountLoader.getSlot(),
slotSource.getSlot(),
normedMarketType,
oracle
)
Expand All @@ -560,7 +595,7 @@ const main = async () => {
.getDLOB()
.getRestingLimitAsks(
normedMarketIndex,
bulkAccountLoader.getSlot(),
slotSource.getSlot(),
normedMarketType,
oracle
)
Expand Down

0 comments on commit 666488e

Please sign in to comment.