Skip to content

Commit

Permalink
skipping repost,follow,like
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 3, 2024
1 parent a5c17d1 commit 830da13
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/firehoseWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ export default async function firehoseWatcher() {
seq: seq,
timeout: env.limits.MAX_FIREHOSE_DELAY,
maxPending: 10000,
ignoreTypes: [
'app.bsky.feed.repost',
'app.bsky.graph.follow',
'app.bsky.feed.like',
],
})

for await (const commit of firehose) {
Expand Down
22 changes: 20 additions & 2 deletions src/lib/firehoseIterable/firehoseIterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Commit } from '@atproto/api/dist/client/types/com/atproto/sync/subscrib
import { Subscription } from '@atproto/xrpc-server'

import { parseCBORandCar } from '@/lib/firehoseIterable/parseCBORandCar.js'
import { record } from 'zod'

export default class FirehoseIterable {
private commitQueue: Denque<Commit> = new Denque()
Expand All @@ -15,21 +16,25 @@ export default class FirehoseIterable {
private timeout: number
private seq: number
private maxPending: number
private ignoreTypes: Set<string>

async create({
service,
seq,
timeout,
maxPending,
ignoreTypes,
}: {
service?: string
seq?: number
timeout?: number
maxPending?: number
ignoreTypes?: string[]
} = {}) {
this.service = service || 'wss://bsky.network'
this.timeout = timeout || 10000
this.maxPending = maxPending || 10000
this.ignoreTypes = new Set(ignoreTypes || [])

if (seq && Number.isSafeInteger(seq)) this.seq = seq
else this.seq = 0
Expand Down Expand Up @@ -86,10 +91,23 @@ export default class FirehoseIterable {

do {
while (!this.commitQueue.isEmpty()) {
const commit: any = this.commitQueue.shift()
const commit: Commit | undefined = this.commitQueue.shift()
if (commit === undefined) continue

const now = Date.now()

if (commit === undefined) break
let interestingCommit = true

if (Array.isArray(commit.ops)) {
for (const op of commit.ops) {
if (this.ignoreTypes.has(op.path.split('/')[0])) {
interestingCommit = false
break
}
}
}

if (!interestingCommit) continue

if (!shouldWait && now - this.lastCommitTime > timeout) {
logger.error(`no events received for ${Math.floor(timeout / 1000)}s`)
Expand Down

0 comments on commit 830da13

Please sign in to comment.