All files / MonkeeWrench/src/app/api/stream route.ts

100% Statements 39/39
83.33% Branches 5/6
71.42% Functions 5/7
100% Lines 39/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 471x 1x   1x 2x     2x   2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x   2x 2x 1x 1x 1x 2x 2x 1x 1x 1x 1x 2x   2x   2x 2x 2x 2x 2x 2x 2x 2x  
import {EVENTS, bus} from '@/lib/events'
import {requireSession} from '@/lib/guard'
 
export const GET = async () => {
  await requireSession()
 
  // Use a cancellable underlying source so we can unsubscribe cleanly
  const source: UnderlyingDefaultSource<Uint8Array> & {
    _cleanup?: () => void
  } = {
    start(controller: ReadableStreamDefaultController<Uint8Array>) {
      const send = (type: string, payload: unknown) => {
        controller.enqueue(
          new TextEncoder().encode(
            `data: ${JSON.stringify({type, payload})}\n\n`,
          ),
        )
      }
      const onUpdate = (p: unknown) => send(EVENTS.PROPOSAL_UPDATED, p)
      const onCreate = (p: unknown) => send(EVENTS.PROPOSAL_CREATED, p)
      bus.on(EVENTS.PROPOSAL_UPDATED, onUpdate)
      bus.on(EVENTS.PROPOSAL_CREATED, onCreate)
      // Initial tick
      send('hello', {})
      source._cleanup = () => {
        bus.off(EVENTS.PROPOSAL_UPDATED, onUpdate)
        bus.off(EVENTS.PROPOSAL_CREATED, onCreate)
      }
    },
    cancel() {
      try {
        source._cleanup?.()
      } catch {}
    },
  }
 
  const stream = new ReadableStream<Uint8Array>(source)
 
  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache, no-transform',
      Connection: 'keep-alive',
    },
  })
}