Skip to content

Commit

Permalink
without queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 5, 2024
1 parent 13cbafd commit c744a6a
Showing 1 changed file with 3 additions and 26 deletions.
29 changes: 3 additions & 26 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ import {
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

import { Mutex } from 'async-mutex'

const includedRecords = new Set(['app.bsky.feed.post'])
let runningEvents = 0
const mutex = new Mutex()
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>
Expand All @@ -38,8 +33,6 @@ export abstract class FirehoseSubscriptionBase {
abstract handleEvent(evt: RepoEvent): Promise<void>

async run(subscriptionReconnectDelay: number) {
await mutex.runExclusive(() => (runningEvents = 0))

try {
for await (const evt of this.sub) {
const commit = evt as Commit
Expand All @@ -49,25 +42,9 @@ export abstract class FirehoseSubscriptionBase {
const [collection] = commit.ops[0].path.split('/')

if (includedRecords.has(collection)) {
while (runningEvents > 256) {
await delay(10)
}

await mutex.runExclusive(async () => {
runningEvents++
})

this.handleEvent(evt)
.catch((err) => {
console.log(`err in handleEvent ${err}`)
})
.finally(async () => {
await mutex.runExclusive(async () => {
if (runningEvents > 0) runningEvents--
})
})

// no longer awaiting this
this.handleEvent(evt).catch((err) => {
console.log(`err in handleEvent ${err}`)
}) // no longer awaiting this
}
// update stored cursor every 1000 events or so
if (isCommit(evt) && evt.seq % 1000 === 0) {
Expand Down

0 comments on commit c744a6a

Please sign in to comment.