Skip to content

Streaming

Real-time response as events arrive. The stream() method returns a StreamHandle.

Fluent Callbacks with .on() and .done()

Register typed callbacks, then consume with .done():

ts
import {
  Claude,
  EVENT_TEXT,
  EVENT_TOOL_USE,
  EVENT_RESULT,
  EVENT_ERROR,
  EVENT_SYSTEM,
} from '@scottwalker/claude-connector'

const claude = new Claude()

const result = await claude.stream('Refactor the auth module')
  .on(EVENT_TEXT, (text) => {
    process.stdout.write(text)
  })
  .on(EVENT_TOOL_USE, (event) => {
    console.log(`\n[Tool: ${event.toolName}]`)
    console.log(JSON.stringify(event.toolInput, null, 2))
  })
  .on(EVENT_RESULT, (event) => {
    console.log(`\nDone in ${event.durationMs}ms`)
    console.log(`Tokens: ${event.usage.inputTokens}→${event.usage.outputTokens}`)
  })
  .on(EVENT_ERROR, (event) => {
    console.error(`Error: ${event.message}`)
  })
  .on(EVENT_SYSTEM, (event) => {
    console.log(`[System/${event.subtype}]`, event.data)
  })
  .done()

// result is StreamResultEvent — available after stream completes
console.log(`Session: ${result.sessionId}`)
console.log(`Cost: $${result.cost}`)

Collect All Text with .text()

One-liner to get the full text response:

ts
const text = await claude.stream('Summarize README.md').text()
console.log(text)

With callbacks still firing:

ts
import { Claude, EVENT_TOOL_USE } from '@scottwalker/claude-connector'

const text = await claude.stream('Summarize README.md')
  .on(EVENT_TOOL_USE, (e) => console.log(`[${e.toolName}]`))
  .text()

Pipe to stdout with .pipe()

Pipe text directly to any writable. Returns the result when done:

ts
// Pipe to stdout
const result = await claude.stream('Explain the auth flow').pipe(process.stdout)
console.log(`\nCost: $${result.cost}`)

// Pipe to stderr
await claude.stream('Find bugs').pipe(process.stderr)

// Pipe to any object with .write()
const chunks: string[] = []
await claude.stream('Analyze').pipe({
  write(chunk: string) { chunks.push(chunk) },
})

Async Iteration (for await)

StreamHandle implements AsyncIterable<StreamEvent> — use for await for full control:

ts
import {
  Claude,
  EVENT_TEXT,
  EVENT_TOOL_USE,
  EVENT_RESULT,
  EVENT_ERROR,
  EVENT_SYSTEM,
} from '@scottwalker/claude-connector'

const claude = new Claude()

for await (const event of claude.stream('Analyze the codebase')) {
  switch (event.type) {
    case EVENT_TEXT:
      process.stdout.write(event.text)
      break
    case EVENT_TOOL_USE:
      console.log(`[${event.toolName}]`, event.toolInput)
      break
    case EVENT_RESULT:
      console.log(`\nTokens: ${event.usage.inputTokens}→${event.usage.outputTokens}`)
      break
    case EVENT_ERROR:
      console.error(event.message)
      break
    case EVENT_SYSTEM:
      // system events (init, stderr, etc.)
      break
  }
}

Collect Stream into a String

ts
import { Claude, EVENT_TEXT } from '@scottwalker/claude-connector'

const claude = new Claude()

let fullText = ''

for await (const event of claude.stream('Summarize README.md')) {
  if (event.type === EVENT_TEXT) fullText += event.text
}

console.log(fullText)

Stream Events Reference

EventConstantCallback SignatureDescription
textEVENT_TEXT(text: string)Incremental text chunk
tool_useEVENT_TOOL_USE(event: { toolName, toolInput })Tool invocation
resultEVENT_RESULT(event: { text, sessionId, usage, cost, durationMs })Final result (always last)
errorEVENT_ERROR(event: { message, code? })Error during execution
systemEVENT_SYSTEM(event: { subtype, data })System/internal event

Progress Tracking

ts
import { EVENT_TEXT, EVENT_TOOL_USE } from '@scottwalker/claude-connector'

let charCount = 0
let toolCount = 0

const result = await claude.stream('Rewrite the test suite')
  .on(EVENT_TEXT, (text) => {
    charCount += text.length
    process.stdout.write(text)
  })
  .on(EVENT_TOOL_USE, (event) => {
    toolCount++
    process.stderr.write(`\r[Tools used: ${toolCount}] ${event.toolName}`)
  })
  .done()

console.log(`\nOutput: ${charCount} chars, ${toolCount} tools, ${result.durationMs}ms`)

Tool Activity Logger

ts
import { EVENT_TEXT, EVENT_TOOL_USE } from '@scottwalker/claude-connector'

const tools: Array<{ name: string; timestamp: number }> = []
const startTime = Date.now()

const result = await claude.stream('Fix all TypeScript errors in src/')
  .on(EVENT_TEXT, (text) => process.stdout.write(text))
  .on(EVENT_TOOL_USE, (event) => {
    tools.push({ name: event.toolName, timestamp: Date.now() - startTime })
  })
  .done()

console.log('\n\nTool timeline:')
for (const t of tools) {
  console.log(`  +${t.timestamp}ms  ${t.name}`)
}

Token Budget Monitoring

ts
import { EVENT_TEXT, EVENT_RESULT } from '@scottwalker/claude-connector'

const MAX_COST = 1.00 // $1 limit

const result = await claude.stream('Analyze the entire repo', { maxBudget: MAX_COST })
  .on(EVENT_TEXT, (text) => process.stdout.write(text))
  .on(EVENT_RESULT, (event) => {
    const pct = ((event.cost ?? 0) / MAX_COST * 100).toFixed(1)
    console.log(`\nBudget: $${event.cost} / $${MAX_COST} (${pct}%)`)
  })
  .done()

Timeout and Abort

ts
import { Claude, EVENT_TEXT } from '@scottwalker/claude-connector'

const claude = new Claude({ useSdk: false })

// Abort after 30 seconds
const timer = setTimeout(() => claude.abort(), 30_000)

try {
  const result = await claude.stream('Analyze everything')
    .on(EVENT_TEXT, (t) => process.stdout.write(t))
    .done()

  clearTimeout(timer)
  console.log(`\nCompleted in ${result.durationMs}ms`)
} catch (err) {
  console.log('\nAborted or failed:', (err as Error).message)
}

Task Events

Track subagent lifecycle with task_started, task_progress, and task_notification events:

ts
import {
  Claude,
  EVENT_TEXT,
  EVENT_TASK_STARTED,
  EVENT_TASK_PROGRESS,
  EVENT_TASK_NOTIFICATION,
} from '@scottwalker/claude-connector'

const claude = new Claude()

const result = await claude.stream('Refactor the entire src/ directory')
  .on(EVENT_TEXT, (text) => process.stdout.write(text))
  .on(EVENT_TASK_STARTED, (event) => {
    console.log(`\n[Task started: ${event.description}] id=${event.taskId}`)
  })
  .on(EVENT_TASK_PROGRESS, (event) => {
    console.log(`\n[Task ${event.taskId}] ${event.description}`)
    console.log(`  Tokens: ${event.usage.totalTokens}, Tools: ${event.usage.toolUses}`)
    if (event.summary) console.log(`  Summary: ${event.summary}`)
  })
  .on(EVENT_TASK_NOTIFICATION, (event) => {
    console.log(`\n[Task ${event.taskId} ${event.status}] ${event.summary}`)
  })
  .done()

Per-Query Abort with signal

Cancel a specific stream without affecting other queries:

ts
import { Claude, EVENT_TEXT } from '@scottwalker/claude-connector'

const claude = new Claude()
const controller = new AbortController()

// Abort after 15 seconds
setTimeout(() => controller.abort(), 15_000)

try {
  const result = await claude.stream('Analyze the full codebase', {
    signal: controller.signal,
  })
    .on(EVENT_TEXT, (t) => process.stdout.write(t))
    .done()
} catch (err) {
  console.log('\nStream aborted')
}

Parallel Streams

Run multiple streams simultaneously:

ts
const claude = new Claude({ useSdk: false })

const streams = [
  claude.stream('Review src/auth.ts').text(),
  claude.stream('Review src/db.ts').text(),
  claude.stream('Review src/api.ts').text(),
]

const [auth, db, api] = await Promise.all(streams)

console.log('Auth review:', auth.slice(0, 100))
console.log('DB review:', db.slice(0, 100))
console.log('API review:', api.slice(0, 100))

Released under the MIT License.