- 홈
- Documentation
- 프로바이더
- 프로바이더 스트리밍 내부 구조
프로바이더 스트리밍 내부 구조
이 문서는 @f5-sales-demo/pi-ai에서 토큰/도구 스트리밍이 어떻게 정규화되는지, 그리고 @f5-sales-demo/pi-agent-core 및 coding-agent 세션 이벤트를 통해 어떻게 전파되는지 설명합니다.
엔드-투-엔드 흐름
섹션 제목: “엔드-투-엔드 흐름”streamSimple()(packages/ai/src/stream.ts)는 일반 옵션을 매핑하고 프로바이더 스트림 함수로 디스패치합니다.- 프로바이더 스트림 함수(
anthropic.ts,openai-responses.ts,google.ts)는 프로바이더 네이티브 스트림 이벤트를 통합된AssistantMessageEvent시퀀스로 변환합니다. - 각 프로바이더는 이벤트를
AssistantMessageEventStream(packages/ai/src/utils/event-stream.ts)으로 푸시하며, 이는 델타 이벤트를 스로틀링하고 다음을 노출합니다:- 증분 업데이트를 위한 비동기 이터레이션
- 최종
AssistantMessage를 위한result()
agentLoop(packages/agent/src/agent-loop.ts)는 해당 이벤트를 소비하고, 처리 중인 어시스턴트 상태를 변경하며, 원시assistantMessageEvent를 담은message_update이벤트를 발행합니다.AgentSession(packages/coding-agent/src/session/agent-session.ts)은 에이전트 이벤트를 구독하고, 메시지를 지속하며, 확장 훅을 구동하고, 세션 동작(재시도, 압축, TTSR, 스트리밍 편집 중단 검사)을 적용합니다.
@f5-sales-demo/pi-ai의 통합 스트림 계약
섹션 제목: “@f5-sales-demo/pi-ai의 통합 스트림 계약”모든 프로바이더는 동일한 형태(packages/ai/src/types.ts의 AssistantMessageEvent)로 이벤트를 발행합니다:
start- 콘텐츠 블록 생명주기 트리플릿:
- 텍스트:
text_start→text_delta* →text_end - 사고:
thinking_start→thinking_delta* →thinking_end - 도구 호출:
toolcall_start→toolcall_delta* →toolcall_end
- 텍스트:
- 터미널 이벤트:
done(reason: "stop" | "length" | "toolUse"포함)- 또는
error(reason: "aborted" | "error"포함)
AssistantMessageEventStream이 보장하는 사항:
- 최종 결과는 터미널 이벤트(
done또는error)에 의해 해결됨 - 델타는 일괄 처리/스로틀링됨 (~50ms)
- 버퍼링된 델타는 비-델타 이벤트 전 및 완료 전에 플러시됨
델타 스로틀링 및 조화 동작
섹션 제목: “델타 스로틀링 및 조화 동작”AssistantMessageEventStream은 text_delta, thinking_delta, toolcall_delta를 병합 가능한 이벤트로 처리합니다:
- 버퍼링된 델타는 타입 + contentIndex가 일치할 때만 병합됨
- 병합은 최신
partial스냅샷을 유지함 - 비-델타 이벤트는 즉각적인 플러시를 강제함
이는 TUI/이벤트 소비자를 위해 고빈도 프로바이더 스트림을 부드럽게 처리하지만, 프로바이더 역압력은 아닙니다. 프로바이더는 여전히 전속력으로 생산하며, 로컬 스트림이 버퍼링합니다.
프로바이더 정규화 세부 사항
섹션 제목: “프로바이더 정규화 세부 사항”Anthropic (anthropic-messages)
섹션 제목: “Anthropic (anthropic-messages)”소스: packages/ai/src/providers/anthropic.ts
정규화 포인트:
message_start는 사용량(입력/출력/캐시 토큰)을 초기화함content_block_start는 텍스트/사고/도구 호출 시작으로 매핑됨content_block_delta매핑:text_delta→text_deltathinking_delta→thinking_deltainput_json_delta→toolcall_deltasignature_delta는thinkingSignature만 업데이트함 (이벤트 없음)
content_block_stop은 대응하는*_end를 발행함message_delta.stop_reason은mapStopReason()을 통해 매핑됨
도구 호출 인수 스트리밍:
- 각 도구 블록은 내부
partialJson을 보유함 - 모든 JSON 델타는
partialJson에 추가됨 arguments는parseStreamingJson()을 통해 각 델타마다 재파싱됨toolcall_end는 한 번 더 재파싱한 후partialJson을 제거함
OpenAI Responses (openai-responses)
섹션 제목: “OpenAI Responses (openai-responses)”소스: packages/ai/src/providers/openai-responses.ts
정규화 포인트:
response.output_item.added는 추론/텍스트/함수 호출 블록을 시작함- 추론 요약 이벤트(
response.reasoning_summary_text.delta)는thinking_delta가 됨 - 출력/거부 델타는
text_delta가 됨 response.function_call_arguments.delta는toolcall_delta가 됨response.output_item.done은thinking_end/text_end/toolcall_end를 발행함response.completed는 상태를 중지 이유 및 사용량으로 매핑함
도구 호출 인수 스트리밍:
- Anthropic과 동일한
partialJson누적 패턴 response.function_call_arguments.done만 전송하는 프로바이더도 최종 인수를 채움- 도구 호출 ID는
"<call_id>|<item_id>"로 정규화됨
Google Generative AI (google-generative-ai)
섹션 제목: “Google Generative AI (google-generative-ai)”소스: packages/ai/src/providers/google.ts
정규화 포인트:
candidate.content.parts를 반복함- 텍스트 파트는
isThinkingPart(part)에 의해 사고와 텍스트로 분리됨 - 블록 전환 시 새 블록을 시작하기 전에 이전 블록을 닫음
part.functionCall은 완전한 도구 호출로 처리됨 (start/delta/end가 즉시 발행됨)- 완료 이유는
google-shared.ts의mapStopReason()에 의해 매핑됨
도구 호출 인수 스트리밍:
- 함수 호출 인수는 증분 JSON 텍스트가 아닌 구조화된 객체로 도착함
- 구현은
JSON.stringify(arguments)를 포함하는 하나의 합성toolcall_delta를 발행함 - 이 경로에서 Google에는 부분 JSON 파서가 필요하지 않음
부분 도구 호출 JSON 누적 및 복구
섹션 제목: “부분 도구 호출 JSON 누적 및 복구”Anthropic/OpenAI Responses의 공유 동작은 parseStreamingJson() (packages/ai/src/utils/json-parse.ts)을 사용합니다:
JSON.parse시도- 불완전한 조각에 대해
partial-json파서로 폴백 - 둘 다 실패하면
{}를 반환
시사점:
- 잘못된 형식이거나 잘린 인수 델타는 스트림 처리를 즉시 중단하지 않음
- 처리 중인
arguments는 일시적으로{}가 될 수 있음 - 이후의 유효한 델타는 모든 추가 시마다 파싱이 재시도되므로 구조화된 인수를 복구할 수 있음
- 최종
toolcall_end는 발행 전에 한 번 더 파싱을 시도함
중지 이유 대 전송/런타임 오류
섹션 제목: “중지 이유 대 전송/런타임 오류”프로바이더 중지 이유는 정규화된 stopReason으로 매핑됩니다:
- Anthropic:
end_turn→stop,max_tokens→length,tool_use→toolUse, 안전/거부 케이스→error - OpenAI Responses:
completed→stop,incomplete→length,failed/cancelled→error - Google:
STOP→stop,MAX_TOKENS→length, 안전/금지/잘못된 함수 호출 클래스→error
오류 의미론은 두 단계로 분리됩니다:
- 모델 완료 의미론 (프로바이더가 보고한 완료 이유/상태)
- 전송/런타임 실패 (네트워크/클라이언트/파서/중단 예외)
프로바이더 스트림이 오류를 발생시키거나 실패를 신호하면, 각 프로바이더 래퍼는 이를 캐치하고 다음과 함께 터미널 error 이벤트를 발행합니다:
- 중단 신호가 설정된 경우
stopReason = "aborted" - 그렇지 않으면
stopReason = "error" errorMessage = formatErrorMessageWithRetryAfter(error)
잘못된 형식의 청크 / SSE 파싱 실패 동작
섹션 제목: “잘못된 형식의 청크 / SSE 파싱 실패 동작”이러한 프로바이더 경로에서 청크/SSE 프레이밍은 벤더 SDK 스트림(Anthropic SDK, OpenAI SDK, Google SDK)에 의해 처리됩니다. 이 코드는 여기서 커스텀 SSE 디코더를 구현하지 않습니다.
현재 구현에서 관찰된 동작:
- SDK 레벨에서의 잘못된 형식의 청크/SSE 파싱은 예외 또는 스트림
error이벤트로 나타남 - 프로바이더 래퍼는 이를 통합된 터미널
error이벤트로 변환함 - 스트림 함수 내부에 프로바이더 특정 재개/재시도 없음
- 상위 레벨 재시도는
AgentSession자동 재시도 로직에서 처리됨 (스트림 청크 재생이 아닌 메시지 레벨 재시도)
취소 경계
섹션 제목: “취소 경계”취소는 계층화되어 있습니다:
- AI 프로바이더 요청:
options.signal이 프로바이더 클라이언트 스트림 호출에 전달됨. - 프로바이더 래퍼: 스트림 루프 이후, 중단된 신호는 오류 경로(
"Request was aborted")를 강제함. - 에이전트 루프: 각 프로바이더 이벤트 처리 전에
signal.aborted를 확인하고 최신 부분에서 중단된 어시스턴트 메시지를 합성할 수 있음. - 세션/에이전트 컨트롤:
AgentSession.abort()->agent.abort()-> 공유 중단 컨트롤러 취소.
도구 실행 취소는 모델 스트림 취소와 별개입니다:
- 도구 러너는
AbortSignal.any([agentSignal, steeringAbortSignal])을 사용함 - 스티어링 인터럽트는 이미 생성된 도구 결과를 보존하면서 나머지 도구 실행을 중단할 수 있음
역압력 경계
섹션 제목: “역압력 경계”프로바이더 SDK 스트림과 다운스트림 소비자 사이에는 하드 역압력 메커니즘이 없습니다:
EventStream은 최대 크기 없이 인메모리 큐를 사용함- 스로틀링은 UI 업데이트 속도를 줄이지만 프로바이더 수신을 늦추지 않음
- 소비자가 크게 지연되면 완료될 때까지 대기 중인 이벤트가 증가할 수 있음
현재 설계는 제한된 버퍼 흐름 제어보다 응답성과 단순한 순서를 우선시합니다.
스트림 이벤트가 에이전트/세션 이벤트로 표시되는 방식
섹션 제목: “스트림 이벤트가 에이전트/세션 이벤트로 표시되는 방식”agentLoop.streamAssistantResponse()는 AssistantMessageEvent를 AgentEvent로 연결합니다:
start시: 플레이스홀더 어시스턴트 메시지를 푸시하고message_start를 발행함- 블록 이벤트(
text_*,thinking_*,toolcall_*) 시: 마지막 어시스턴트 메시지를 업데이트하고 원시assistantMessageEvent와 함께message_update를 발행함 - 터미널(
done/error) 시:response.result()에서 최종 메시지를 해결하고message_end를 발행함
AgentSession은 이후 세션 레벨 동작을 위해 해당 이벤트를 소비합니다:
- TTSR은
text_delta및toolcall_delta에 대해message_update.assistantMessageEvent를 감시함 - 스트리밍 편집 가드는
edit호출 시toolcall_delta/toolcall_end를 검사하고 조기에 중단할 수 있음 - 지속성은
message_end에서 완료된 메시지를 씀 - 자동 재시도는 어시스턴트
stopReason === "error"및errorMessage휴리스틱을 검사함
통합 대 프로바이더 특정 책임
섹션 제목: “통합 대 프로바이더 특정 책임”통합 (공통 계약):
- 이벤트 형태 (
AssistantMessageEvent) - 최종 결과 추출 (
done/error) - 델타 스로틀링 + 병합 규칙
- 에이전트/세션 이벤트 전파 모델
프로바이더 특정 (완전히 추상화되지 않음):
- 업스트림 이벤트 분류 및 매핑 로직
- 중지 이유 변환 테이블
- 도구 호출 ID 규칙
- 추론/사고 블록 의미론 및 서명
- 사용량 토큰 의미론 및 가용성 타이밍
- API별 메시지 변환 제약 조건
구현 파일
섹션 제목: “구현 파일”../../ai/src/stream.ts— 프로바이더 디스패치, 옵션 매핑, API 키/세션 배관.../../ai/src/utils/event-stream.ts— 일반 스트림 큐 + 어시스턴트 델타 스로틀링.../../ai/src/utils/json-parse.ts— 스트리밍된 도구 인수를 위한 부분 JSON 파싱.../../ai/src/providers/anthropic.ts— Anthropic 이벤트 변환 및 도구 JSON 델타 누적.../../ai/src/providers/openai-responses.ts— OpenAI Responses 이벤트 변환 및 상태 매핑.../../ai/src/providers/google.ts— Gemini 스트림 청크-블록 변환.../../ai/src/providers/google-shared.ts— Gemini 완료 이유 매핑 및 공유 변환 규칙.../../agent/src/agent-loop.ts— 프로바이더 스트림 소비 및message_update연결.../src/session/agent-session.ts— 스트리밍 업데이트, 중단, 재시도, 지속성의 세션 레벨 처리.