Skip to content

Commit

Permalink
Merge pull request #37 from drift-labs/grpc-dlob
Browse files Browse the repository at this point in the history
grpc order subscriber
  • Loading branch information
NourAlharithi authored Dec 7, 2023
2 parents 368ced8 + 4ea2888 commit a6dd602
Show file tree
Hide file tree
Showing 6 changed files with 519 additions and 8 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "lib/index.js",
"license": "Apache-2.0",
"dependencies": {
"@coral-xyz/anchor": "^0.29.0",
"@drift-labs/sdk": "2.49.0-beta.12",
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.1",
Expand All @@ -13,6 +14,7 @@
"@project-serum/anchor": "^0.19.1-beta.1",
"@project-serum/serum": "^0.13.65",
"@solana/web3.js": "^1.73.3",
"@triton-one/yellowstone-grpc": "^0.3.0",
"@types/redis": "^4.0.11",
"@types/ws": "^8.5.8",
"async-mutex": "^0.4.0",
Expand Down
47 changes: 44 additions & 3 deletions src/dlobProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { DLOB, OrderSubscriber, UserAccount, UserMap } from '@drift-labs/sdk';
import { PublicKey } from '@solana/web3.js';
import { GeyserOrderSubscriber } from './grpc/OrderSubscriberGRPC';

export type DLOBProvider = {
subscribe(): Promise<void>;
Expand Down Expand Up @@ -56,11 +57,51 @@ export function getDLOBProviderFromOrderSubscriber(
return await orderSubscriber.getDLOB(slot);
},
getUniqueAuthorities: () => {
const authorities = new Set<PublicKey>();
const authorities = new Set<string>();
for (const { userAccount } of orderSubscriber.usersAccounts.values()) {
authorities.add(userAccount.authority);
authorities.add(userAccount.authority.toBase58());
}
return Array.from(authorities.values());
const pubkeys = Array.from(authorities).map((a) => new PublicKey(a));
return pubkeys;
},
getUserAccounts: function* () {
for (const [
key,
{ userAccount },
] of orderSubscriber.usersAccounts.entries()) {
yield { userAccount: userAccount, publicKey: new PublicKey(key) };
}
},
getUserAccount: (publicKey) => {
return orderSubscriber.usersAccounts.get(publicKey.toString())
?.userAccount;
},
size(): number {
return orderSubscriber.usersAccounts.size;
},
fetch() {
return orderSubscriber.fetch();
},
};
}

export function getDLOBProviderFromGrpcOrderSubscriber(
orderSubscriber: GeyserOrderSubscriber
): DLOBProvider {
return {
subscribe: async () => {
await orderSubscriber.subscribe();
},
getDLOB: async (slot: number) => {
return await orderSubscriber.getDLOB(slot);
},
getUniqueAuthorities: () => {
const authorities = new Set<string>();
for (const { userAccount } of orderSubscriber.usersAccounts.values()) {
authorities.add(userAccount.authority.toBase58());
}
const pubkeys = Array.from(authorities).map((a) => new PublicKey(a));
return pubkeys;
},
getUserAccounts: function* () {
for (const [
Expand Down
253 changes: 253 additions & 0 deletions src/grpc/OrderSubscriberGRPC.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import { BorshAccountsCoder } from '@coral-xyz/anchor';
import { Commitment, PublicKey, RpcResponseAndContext } from '@solana/web3.js';
import { Buffer } from 'buffer';
import Client, {
CommitmentLevel,
SubscribeRequest,
SubscribeUpdate,
} from '@triton-one/yellowstone-grpc';
import {
BN,
DLOB,
DriftClient,
UserAccount,
getUserFilter,
getUserWithOrderFilter,
} from '@drift-labs/sdk';
import { ClientDuplexStream } from '@grpc/grpc-js';

type grpcDlobConfig = {
endpoint: string;
token: string;
};

export class GeyserOrderSubscriber {
usersAccounts = new Map<string, { slot: number; userAccount: UserAccount }>();
commitment: Commitment;
driftClient: DriftClient;
config: grpcDlobConfig;
stream: ClientDuplexStream<SubscribeRequest, SubscribeUpdate>;

fetchPromise?: Promise<void>;
fetchPromiseResolver: () => void;
mostRecentSlot: number;

constructor(driftClient: DriftClient, config: grpcDlobConfig) {
this.driftClient = driftClient;
this.config = config;
}

public async subscribe(): Promise<void> {
const client = new Client(this.config.endpoint, this.config.token);
this.stream = await client.subscribe();
const request: SubscribeRequest = {
slots: {},
accounts: {
drift: {
owner: [this.driftClient.program.programId.toBase58()],
filters: [
{
memcmp: {
offset: '0',
bytes: BorshAccountsCoder.accountDiscriminator('User'),
},
},
{
memcmp: {
offset: '4350',
bytes: Uint8Array.from([1]),
},
},
],
account: [],
},
},
transactions: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
commitment: CommitmentLevel.PROCESSED,
entry: {},
};

this.stream.on('data', (chunk: any) => {
if (!chunk.account) {
return;
}
const slot = Number(chunk.account.slot);
this.tryUpdateUserAccount(
new PublicKey(chunk.account.account.pubkey).toBase58(),
'grpc',
chunk.account.account.data,
slot
);
});

return new Promise<void>((resolve, reject) => {
this.stream.write(request, (err) => {
if (err === null || err === undefined) {
resolve();
} else {
reject(err);
}
});
}).catch((reason) => {
console.error(reason);
throw reason;
});
}

async fetch(): Promise<void> {
if (this.fetchPromise) {
return this.fetchPromise;
}

this.fetchPromise = new Promise((resolver) => {
this.fetchPromiseResolver = resolver;
});

try {
const rpcRequestArgs = [
this.driftClient.program.programId.toBase58(),
{
commitment: this.commitment,
filters: [getUserFilter(), getUserWithOrderFilter()],
encoding: 'base64',
withContext: true,
},
];

const rpcJSONResponse: any =
// @ts-ignore
await this.driftClient.connection._rpcRequest(
'getProgramAccounts',
rpcRequestArgs
);

const rpcResponseAndContext: RpcResponseAndContext<
Array<{
pubkey: PublicKey;
account: {
data: [string, string];
};
}>
> = rpcJSONResponse.result;

const slot: number = rpcResponseAndContext.context.slot;

const programAccountSet = new Set<string>();
for (const programAccount of rpcResponseAndContext.value) {
const key = programAccount.pubkey.toString();
programAccountSet.add(key);
this.tryUpdateUserAccount(
key,
'raw',
programAccount.account.data,
slot
);
// give event loop a chance to breathe
await new Promise((resolve) => setTimeout(resolve, 0));
}

for (const key of this.usersAccounts.keys()) {
if (!programAccountSet.has(key)) {
this.usersAccounts.delete(key);
}
// give event loop a chance to breathe
await new Promise((resolve) => setTimeout(resolve, 0));
}
} catch (e) {
console.error(e);
} finally {
this.fetchPromiseResolver();
this.fetchPromise = undefined;
}
}

tryUpdateUserAccount(
key: string,
dataType: 'raw' | 'grpc',
data: string[] | Buffer | UserAccount,
slot: number
): void {
if (!this.mostRecentSlot || slot > this.mostRecentSlot) {
this.mostRecentSlot = slot;
}

const slotAndUserAccount = this.usersAccounts.get(key);
if (!slotAndUserAccount || slotAndUserAccount.slot <= slot) {
// Polling leads to a lot of redundant decoding, so we only decode if data is from a fresh slot
let buffer: Buffer;
if (dataType === 'raw') {
buffer = Buffer.from(data[0], data[1]);
} else {
buffer = data as Buffer;
}

const newLastActiveSlot = new BN(
buffer.subarray(4328, 4328 + 8),
undefined,
'le'
);
if (
slotAndUserAccount &&
slotAndUserAccount.userAccount.lastActiveSlot.gt(newLastActiveSlot)
) {
return;
}

const userAccount =
this.driftClient.program.account.user.coder.accounts.decodeUnchecked(
'User',
buffer
) as UserAccount;

if (userAccount.hasOpenOrder) {
this.usersAccounts.set(key, { slot, userAccount });
} else {
this.usersAccounts.delete(key);
}
}
}

public async getDLOB(slot: number): Promise<DLOB> {
const dlob = new DLOB();
for (const [key, { userAccount }] of this.usersAccounts.entries()) {
const userAccountPubkey = new PublicKey(key);
for (const order of userAccount.orders) {
dlob.insertOrder(order, userAccountPubkey, slot);
}
}
return dlob;
}

public getSlot(): number {
return this.mostRecentSlot ?? 0;
}

public async unsubscribe(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.stream.write(
{
slots: {},
accounts: {},
transactions: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
entry: {},
},
(err) => {
if (err === null || err === undefined) {
resolve();
} else {
reject(err);
}
}
);
}).catch((reason) => {
console.error(reason);
throw reason;
});
}
}
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ const WS_FALLBACK_FETCH_INTERVAL = ORDERBOOK_UPDATE_INTERVAL * 10;
const useWebsocket = process.env.USE_WEBSOCKET?.toLowerCase() === 'true';
const useOrderSubscriber =
process.env.USE_ORDER_SUBSCRIBER?.toLowerCase() === 'true';

const rateLimitCallsPerSecond = process.env.RATE_LIMIT_CALLS_PER_SECOND
? parseInt(process.env.RATE_LIMIT_CALLS_PER_SECOND)
: 1;
Expand Down
20 changes: 17 additions & 3 deletions src/publishers/dlobPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import { DLOBSubscriberIO } from '../dlob-subscriber/DLOBSubscriberIO';
import { RedisClient } from '../utils/redisClient';
import {
DLOBProvider,
getDLOBProviderFromGrpcOrderSubscriber,
getDLOBProviderFromOrderSubscriber,
getDLOBProviderFromUserMap,
} from '../dlobProvider';
import FEATURE_FLAGS from '../utils/featureFlags';
import { GeyserOrderSubscriber } from '../grpc/OrderSubscriberGRPC';

require('dotenv').config();
const stateCommitment: Commitment = 'processed';
const ORDERBOOK_UPDATE_INTERVAL = 1000;
const ORDERBOOK_UPDATE_INTERVAL = 400;
const WS_FALLBACK_FETCH_INTERVAL = ORDERBOOK_UPDATE_INTERVAL * 10;
const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
const commitHash = process.env.COMMIT;
Expand All @@ -49,10 +51,15 @@ let driftClient: DriftClient;
const opts = program.opts();
setLogLevel(opts.debug ? 'debug' : 'info');

const endpoint = process.env.ENDPOINT;
const token = process.env.TOKEN;
const endpoint = token
? process.env.ENDPOINT + `/${token}`
: process.env.ENDPOINT;
const wsEndpoint = process.env.WS_ENDPOINT;
const useOrderSubscriber =
process.env.USE_ORDER_SUBSCRIBER?.toLowerCase() === 'true';

const useGrpc = process.env.USE_GRPC?.toLowerCase() === 'true';
const useWebsocket = process.env.USE_WEBSOCKET?.toLowerCase() === 'true';

logger.info(`RPC endpoint: ${endpoint}`);
Expand Down Expand Up @@ -126,7 +133,7 @@ const main = async () => {
bulkAccountLoader = new BulkAccountLoader(
connection,
stateCommitment,
ORDERBOOK_UPDATE_INTERVAL
ORDERBOOK_UPDATE_INTERVAL < 1000 ? 1000 : ORDERBOOK_UPDATE_INTERVAL
);

accountSubscription = {
Expand Down Expand Up @@ -206,6 +213,13 @@ const main = async () => {
slotSource = {
getSlot: () => orderSubscriber.getSlot(),
};
} else if (useGrpc) {
const grpcOrderSubscriber = new GeyserOrderSubscriber(driftClient, {
endpoint: endpoint,
token: token,
});

dlobProvider = getDLOBProviderFromGrpcOrderSubscriber(grpcOrderSubscriber);
} else {
const userMap = new UserMap({
driftClient,
Expand Down
Loading

0 comments on commit a6dd602

Please sign in to comment.