From 830da13d88ade5d9c7830415fbc42ce5e7c33c0f Mon Sep 17 00:00:00 2001 From: Bossett Date: Tue, 3 Sep 2024 10:32:13 +0000 Subject: [PATCH] skipping repost,follow,like --- src/firehoseWatcher.ts | 5 +++++ src/lib/firehoseIterable/firehoseIterable.ts | 22 ++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/firehoseWatcher.ts b/src/firehoseWatcher.ts index fdeab58..cdd201a 100644 --- a/src/firehoseWatcher.ts +++ b/src/firehoseWatcher.ts @@ -203,6 +203,11 @@ export default async function firehoseWatcher() { seq: seq, timeout: env.limits.MAX_FIREHOSE_DELAY, maxPending: 10000, + ignoreTypes: [ + 'app.bsky.feed.repost', + 'app.bsky.graph.follow', + 'app.bsky.feed.like', + ], }) for await (const commit of firehose) { diff --git a/src/lib/firehoseIterable/firehoseIterable.ts b/src/lib/firehoseIterable/firehoseIterable.ts index d85e6ab..d767fb8 100644 --- a/src/lib/firehoseIterable/firehoseIterable.ts +++ b/src/lib/firehoseIterable/firehoseIterable.ts @@ -7,6 +7,7 @@ import { Commit } from '@atproto/api/dist/client/types/com/atproto/sync/subscrib import { Subscription } from '@atproto/xrpc-server' import { parseCBORandCar } from '@/lib/firehoseIterable/parseCBORandCar.js' +import { record } from 'zod' export default class FirehoseIterable { private commitQueue: Denque = new Denque() @@ -15,21 +16,25 @@ export default class FirehoseIterable { private timeout: number private seq: number private maxPending: number + private ignoreTypes: Set async create({ service, seq, timeout, maxPending, + ignoreTypes, }: { service?: string seq?: number timeout?: number maxPending?: number + ignoreTypes?: string[] } = {}) { this.service = service || 'wss://bsky.network' this.timeout = timeout || 10000 this.maxPending = maxPending || 10000 + this.ignoreTypes = new Set(ignoreTypes || []) if (seq && Number.isSafeInteger(seq)) this.seq = seq else this.seq = 0 @@ -86,10 +91,23 @@ export default class FirehoseIterable { do { while (!this.commitQueue.isEmpty()) { - const commit: any = this.commitQueue.shift() + const commit: Commit | undefined = this.commitQueue.shift() + if (commit === undefined) continue + const now = Date.now() - if (commit === undefined) break + let interestingCommit = true + + if (Array.isArray(commit.ops)) { + for (const op of commit.ops) { + if (this.ignoreTypes.has(op.path.split('/')[0])) { + interestingCommit = false + break + } + } + } + + if (!interestingCommit) continue if (!shouldWait && now - this.lastCommitTime > timeout) { logger.error(`no events received for ${Math.floor(timeout / 1000)}s`)