콘텐츠로 이동

프로바이더 스트리밍 내부 구조

이 문서는 @f5-sales-demo/pi-ai에서 토큰/도구 스트리밍이 어떻게 정규화되는지, 그리고 @f5-sales-demo/pi-agent-corecoding-agent 세션 이벤트를 통해 어떻게 전파되는지 설명합니다.

  1. streamSimple() (packages/ai/src/stream.ts)는 일반 옵션을 매핑하고 프로바이더 스트림 함수로 디스패치합니다.
  2. 프로바이더 스트림 함수(anthropic.ts, openai-responses.ts, google.ts)는 프로바이더 네이티브 스트림 이벤트를 통합된 AssistantMessageEvent 시퀀스로 변환합니다.
  3. 각 프로바이더는 이벤트를 AssistantMessageEventStream (packages/ai/src/utils/event-stream.ts)으로 푸시하며, 이는 델타 이벤트를 스로틀링하고 다음을 노출합니다:
    • 증분 업데이트를 위한 비동기 이터레이션
    • 최종 AssistantMessage를 위한 result()
  4. agentLoop (packages/agent/src/agent-loop.ts)는 해당 이벤트를 소비하고, 처리 중인 어시스턴트 상태를 변경하며, 원시 assistantMessageEvent를 담은 message_update 이벤트를 발행합니다.
  5. AgentSession (packages/coding-agent/src/session/agent-session.ts)은 에이전트 이벤트를 구독하고, 메시지를 지속하며, 확장 훅을 구동하고, 세션 동작(재시도, 압축, TTSR, 스트리밍 편집 중단 검사)을 적용합니다.

@f5-sales-demo/pi-ai의 통합 스트림 계약

섹션 제목: “@f5-sales-demo/pi-ai의 통합 스트림 계약”

모든 프로바이더는 동일한 형태(packages/ai/src/types.tsAssistantMessageEvent)로 이벤트를 발행합니다:

  • start
  • 콘텐츠 블록 생명주기 트리플릿:
    • 텍스트: text_starttext_delta* → text_end
    • 사고: thinking_startthinking_delta* → thinking_end
    • 도구 호출: toolcall_starttoolcall_delta* → toolcall_end
  • 터미널 이벤트:
    • done (reason: "stop" | "length" | "toolUse" 포함)
    • 또는 error (reason: "aborted" | "error" 포함)

AssistantMessageEventStream이 보장하는 사항:

  • 최종 결과는 터미널 이벤트(done 또는 error)에 의해 해결됨
  • 델타는 일괄 처리/스로틀링됨 (~50ms)
  • 버퍼링된 델타는 비-델타 이벤트 전 및 완료 전에 플러시됨

AssistantMessageEventStreamtext_delta, thinking_delta, toolcall_delta를 병합 가능한 이벤트로 처리합니다:

  • 버퍼링된 델타는 타입 + contentIndex가 일치할 때만 병합됨
  • 병합은 최신 partial 스냅샷을 유지함
  • 비-델타 이벤트는 즉각적인 플러시를 강제함

이는 TUI/이벤트 소비자를 위해 고빈도 프로바이더 스트림을 부드럽게 처리하지만, 프로바이더 역압력은 아닙니다. 프로바이더는 여전히 전속력으로 생산하며, 로컬 스트림이 버퍼링합니다.

소스: packages/ai/src/providers/anthropic.ts

정규화 포인트:

  • message_start는 사용량(입력/출력/캐시 토큰)을 초기화함
  • content_block_start는 텍스트/사고/도구 호출 시작으로 매핑됨
  • content_block_delta 매핑:
    • text_deltatext_delta
    • thinking_deltathinking_delta
    • input_json_deltatoolcall_delta
    • signature_deltathinkingSignature만 업데이트함 (이벤트 없음)
  • content_block_stop은 대응하는 *_end를 발행함
  • message_delta.stop_reasonmapStopReason()을 통해 매핑됨

도구 호출 인수 스트리밍:

  • 각 도구 블록은 내부 partialJson을 보유함
  • 모든 JSON 델타는 partialJson에 추가됨
  • argumentsparseStreamingJson()을 통해 각 델타마다 재파싱됨
  • toolcall_end는 한 번 더 재파싱한 후 partialJson을 제거함

소스: packages/ai/src/providers/openai-responses.ts

정규화 포인트:

  • response.output_item.added는 추론/텍스트/함수 호출 블록을 시작함
  • 추론 요약 이벤트(response.reasoning_summary_text.delta)는 thinking_delta가 됨
  • 출력/거부 델타는 text_delta가 됨
  • response.function_call_arguments.deltatoolcall_delta가 됨
  • response.output_item.donethinking_end / text_end / toolcall_end를 발행함
  • response.completed는 상태를 중지 이유 및 사용량으로 매핑함

도구 호출 인수 스트리밍:

  • Anthropic과 동일한 partialJson 누적 패턴
  • response.function_call_arguments.done만 전송하는 프로바이더도 최종 인수를 채움
  • 도구 호출 ID는 "<call_id>|<item_id>"로 정규화됨

Google Generative AI (google-generative-ai)

섹션 제목: “Google Generative AI (google-generative-ai)”

소스: packages/ai/src/providers/google.ts

정규화 포인트:

  • candidate.content.parts를 반복함
  • 텍스트 파트는 isThinkingPart(part)에 의해 사고와 텍스트로 분리됨
  • 블록 전환 시 새 블록을 시작하기 전에 이전 블록을 닫음
  • part.functionCall은 완전한 도구 호출로 처리됨 (start/delta/end가 즉시 발행됨)
  • 완료 이유는 google-shared.tsmapStopReason()에 의해 매핑됨

도구 호출 인수 스트리밍:

  • 함수 호출 인수는 증분 JSON 텍스트가 아닌 구조화된 객체로 도착함
  • 구현은 JSON.stringify(arguments)를 포함하는 하나의 합성 toolcall_delta를 발행함
  • 이 경로에서 Google에는 부분 JSON 파서가 필요하지 않음

부분 도구 호출 JSON 누적 및 복구

섹션 제목: “부분 도구 호출 JSON 누적 및 복구”

Anthropic/OpenAI Responses의 공유 동작은 parseStreamingJson() (packages/ai/src/utils/json-parse.ts)을 사용합니다:

  1. JSON.parse 시도
  2. 불완전한 조각에 대해 partial-json 파서로 폴백
  3. 둘 다 실패하면 {}를 반환

시사점:

  • 잘못된 형식이거나 잘린 인수 델타는 스트림 처리를 즉시 중단하지 않음
  • 처리 중인 arguments는 일시적으로 {}가 될 수 있음
  • 이후의 유효한 델타는 모든 추가 시마다 파싱이 재시도되므로 구조화된 인수를 복구할 수 있음
  • 최종 toolcall_end는 발행 전에 한 번 더 파싱을 시도함

프로바이더 중지 이유는 정규화된 stopReason으로 매핑됩니다:

  • Anthropic: end_turnstop, max_tokenslength, tool_usetoolUse, 안전/거부 케이스→error
  • OpenAI Responses: completedstop, incompletelength, failed/cancellederror
  • Google: STOPstop, MAX_TOKENSlength, 안전/금지/잘못된 함수 호출 클래스→error

오류 의미론은 두 단계로 분리됩니다:

  1. 모델 완료 의미론 (프로바이더가 보고한 완료 이유/상태)
  2. 전송/런타임 실패 (네트워크/클라이언트/파서/중단 예외)

프로바이더 스트림이 오류를 발생시키거나 실패를 신호하면, 각 프로바이더 래퍼는 이를 캐치하고 다음과 함께 터미널 error 이벤트를 발행합니다:

  • 중단 신호가 설정된 경우 stopReason = "aborted"
  • 그렇지 않으면 stopReason = "error"
  • errorMessage = formatErrorMessageWithRetryAfter(error)

잘못된 형식의 청크 / SSE 파싱 실패 동작

섹션 제목: “잘못된 형식의 청크 / SSE 파싱 실패 동작”

이러한 프로바이더 경로에서 청크/SSE 프레이밍은 벤더 SDK 스트림(Anthropic SDK, OpenAI SDK, Google SDK)에 의해 처리됩니다. 이 코드는 여기서 커스텀 SSE 디코더를 구현하지 않습니다.

현재 구현에서 관찰된 동작:

  • SDK 레벨에서의 잘못된 형식의 청크/SSE 파싱은 예외 또는 스트림 error 이벤트로 나타남
  • 프로바이더 래퍼는 이를 통합된 터미널 error 이벤트로 변환함
  • 스트림 함수 내부에 프로바이더 특정 재개/재시도 없음
  • 상위 레벨 재시도는 AgentSession 자동 재시도 로직에서 처리됨 (스트림 청크 재생이 아닌 메시지 레벨 재시도)

취소는 계층화되어 있습니다:

  • AI 프로바이더 요청: options.signal이 프로바이더 클라이언트 스트림 호출에 전달됨.
  • 프로바이더 래퍼: 스트림 루프 이후, 중단된 신호는 오류 경로("Request was aborted")를 강제함.
  • 에이전트 루프: 각 프로바이더 이벤트 처리 전에 signal.aborted를 확인하고 최신 부분에서 중단된 어시스턴트 메시지를 합성할 수 있음.
  • 세션/에이전트 컨트롤: AgentSession.abort() -> agent.abort() -> 공유 중단 컨트롤러 취소.

도구 실행 취소는 모델 스트림 취소와 별개입니다:

  • 도구 러너는 AbortSignal.any([agentSignal, steeringAbortSignal])을 사용함
  • 스티어링 인터럽트는 이미 생성된 도구 결과를 보존하면서 나머지 도구 실행을 중단할 수 있음

프로바이더 SDK 스트림과 다운스트림 소비자 사이에는 하드 역압력 메커니즘이 없습니다:

  • EventStream은 최대 크기 없이 인메모리 큐를 사용함
  • 스로틀링은 UI 업데이트 속도를 줄이지만 프로바이더 수신을 늦추지 않음
  • 소비자가 크게 지연되면 완료될 때까지 대기 중인 이벤트가 증가할 수 있음

현재 설계는 제한된 버퍼 흐름 제어보다 응답성과 단순한 순서를 우선시합니다.

스트림 이벤트가 에이전트/세션 이벤트로 표시되는 방식

섹션 제목: “스트림 이벤트가 에이전트/세션 이벤트로 표시되는 방식”

agentLoop.streamAssistantResponse()AssistantMessageEventAgentEvent로 연결합니다:

  • start 시: 플레이스홀더 어시스턴트 메시지를 푸시하고 message_start를 발행함
  • 블록 이벤트(text_*, thinking_*, toolcall_*) 시: 마지막 어시스턴트 메시지를 업데이트하고 원시 assistantMessageEvent와 함께 message_update를 발행함
  • 터미널(done/error) 시: response.result()에서 최종 메시지를 해결하고 message_end를 발행함

AgentSession은 이후 세션 레벨 동작을 위해 해당 이벤트를 소비합니다:

  • TTSR은 text_deltatoolcall_delta에 대해 message_update.assistantMessageEvent를 감시함
  • 스트리밍 편집 가드는 edit 호출 시 toolcall_delta/toolcall_end를 검사하고 조기에 중단할 수 있음
  • 지속성은 message_end에서 완료된 메시지를 씀
  • 자동 재시도는 어시스턴트 stopReason === "error"errorMessage 휴리스틱을 검사함

통합 (공통 계약):

  • 이벤트 형태 (AssistantMessageEvent)
  • 최종 결과 추출 (done/error)
  • 델타 스로틀링 + 병합 규칙
  • 에이전트/세션 이벤트 전파 모델

프로바이더 특정 (완전히 추상화되지 않음):

  • 업스트림 이벤트 분류 및 매핑 로직
  • 중지 이유 변환 테이블
  • 도구 호출 ID 규칙
  • 추론/사고 블록 의미론 및 서명
  • 사용량 토큰 의미론 및 가용성 타이밍
  • API별 메시지 변환 제약 조건