- ホーム
- Documentation
- プロバイダー
- プロバイダーストリーミング内部実装
プロバイダーストリーミング内部実装
このドキュメントでは、@f5-sales-demo/pi-ai においてトークン/ツールストリーミングがどのように正規化され、@f5-sales-demo/pi-agent-core および coding-agent セッションイベントを通じて伝播されるかを説明します。
エンドツーエンドのフロー
Section titled “エンドツーエンドのフロー”streamSimple()(packages/ai/src/stream.ts)は汎用オプションをマップし、プロバイダーストリーム関数にディスパッチします。- プロバイダーストリーム関数(
anthropic.ts、openai-responses.ts、google.ts)は、プロバイダーネイティブのストリームイベントを統一されたAssistantMessageEventシーケンスに変換します。 - 各プロバイダーはイベントを
AssistantMessageEventStream(packages/ai/src/utils/event-stream.ts)にプッシュします。これはデルタイベントをスロットリングし、以下を公開します:- インクリメンタル更新のための非同期イテレーション
- 最終的な
AssistantMessageのためのresult()
agentLoop(packages/agent/src/agent-loop.ts)はこれらのイベントを消費し、処理中のアシスタント状態を変更して、生のassistantMessageEventを含むmessage_updateイベントを発行します。AgentSession(packages/coding-agent/src/session/agent-session.ts)はエージェントイベントをサブスクライブし、メッセージを永続化し、拡張フックを駆動し、セッション動作(リトライ、コンパクション、TTSR、ストリーミング編集中断チェック)を適用します。
@f5-sales-demo/pi-ai における統一ストリームコントラクト
Section titled “@f5-sales-demo/pi-ai における統一ストリームコントラクト”すべてのプロバイダーは同一の形状(packages/ai/src/types.ts の AssistantMessageEvent)を出力します:
start- コンテンツブロックのライフサイクルトリプレット:
- テキスト:
text_start→text_delta* →text_end - シンキング:
thinking_start→thinking_delta* →thinking_end - ツールコール:
toolcall_start→toolcall_delta* →toolcall_end
- テキスト:
- ターミナルイベント:
done(reason: "stop" | "length" | "toolUse"を含む)- または
error(reason: "aborted" | "error"を含む)
AssistantMessageEventStream が保証する内容:
- 最終結果はターミナルイベント(
doneまたはerror)によって解決される - デルタはバッチ処理/スロットリングされる(約50ms)
- バッファリングされたデルタは非デルタイベントの前および完了前にフラッシュされる
デルタスロットリングと調和動作
Section titled “デルタスロットリングと調和動作”AssistantMessageEventStream は text_delta、thinking_delta、toolcall_delta をマージ可能なイベントとして扱います:
- バッファリングされたデルタは type + contentIndex が一致する場合のみマージされる
- マージでは最新の
partialスナップショットが保持される - 非デルタイベントは即時フラッシュを強制する
これにより、TUI/イベントコンシューマーに対して高頻度のプロバイダーストリームが平滑化されますが、プロバイダーのバックプレッシャーではありません:プロバイダーは依然としてフルスピードで生産しており、ローカルストリームがバッファリングします。
プロバイダー正規化の詳細
Section titled “プロバイダー正規化の詳細”Anthropic (anthropic-messages)
Section titled “Anthropic (anthropic-messages)”ソース:packages/ai/src/providers/anthropic.ts
正規化ポイント:
message_startは使用量(入力/出力/キャッシュトークン)を初期化するcontent_block_startはテキスト/シンキング/ツールコール開始にマップされるcontent_block_deltaのマッピング:text_delta→text_deltathinking_delta→thinking_deltainput_json_delta→toolcall_deltasignature_deltaはthinkingSignatureのみを更新する(イベントなし)
content_block_stopは対応する*_endを発行するmessage_delta.stop_reasonはmapStopReason()を介してマップされる
ツールコール引数ストリーミング:
- 各ツールブロックは内部
partialJsonを保持する - 各JSONデルタは
partialJsonに追記される argumentsはデルタごとにparseStreamingJson()を介して再パースされるtoolcall_endはもう一度パースしてからpartialJsonを除去する
OpenAI Responses (openai-responses)
Section titled “OpenAI Responses (openai-responses)”ソース:packages/ai/src/providers/openai-responses.ts
正規化ポイント:
response.output_item.addedはリーズニング/テキスト/ファンクションコールブロックを開始する- リーズニングサマリーイベント(
response.reasoning_summary_text.delta)はthinking_deltaになる - 出力/拒否デルタは
text_deltaになる response.function_call_arguments.deltaはtoolcall_deltaになるresponse.output_item.doneはthinking_end/text_end/toolcall_endを発行するresponse.completedはステータスをストップ理由と使用量にマップする
ツールコール引数ストリーミング:
- Anthropic と同じ
partialJson蓄積パターン response.function_call_arguments.doneのみを送信するプロバイダーでも最終引数を設定できる- ツールコールIDは
"<call_id>|<item_id>"として正規化される
Google Generative AI (google-generative-ai)
Section titled “Google Generative AI (google-generative-ai)”ソース:packages/ai/src/providers/google.ts
正規化ポイント:
candidate.content.partsをイテレートする- テキストパーツは
isThinkingPart(part)によってシンキングとテキストに分割される - ブロック遷移は新しいブロックを開始する前に前のブロックを閉じる
part.functionCallは完全なツールコールとして扱われる(start/delta/end が即座に発行される)- フィニッシュ理由は
google-shared.tsのmapStopReason()によってマップされる
ツールコール引数ストリーミング:
- ファンクションコール引数はインクリメンタルなJSONテキストではなく、構造化オブジェクトとして届く
- 実装は
JSON.stringify(arguments)を含む1つの合成toolcall_deltaを発行する - このパスではGoogleに対して部分的なJSONパーサーは不要
ツールコール部分JSONの蓄積とリカバリー
Section titled “ツールコール部分JSONの蓄積とリカバリー”Anthropic/OpenAI Responses の共通動作では parseStreamingJson()(packages/ai/src/utils/json-parse.ts)を使用します:
JSON.parseを試みる- 不完全なフラグメントに対して
partial-jsonパーサーにフォールバックする - 両方が失敗した場合、
{}を返す
影響:
- 不正または切り捨てられた引数デルタは即座にストリーム処理をクラッシュさせない
- 処理中の
argumentsは一時的に{}になる可能性がある - 後続の有効なデルタは、すべての追記でパースが再試行されるため、構造化された引数をリカバリーできる
- 最終的な
toolcall_endは発行前にもう一度パースを試みる
ストップ理由とトランスポート/ランタイムエラー
Section titled “ストップ理由とトランスポート/ランタイムエラー”プロバイダーのストップ理由は正規化された stopReason にマップされます:
- Anthropic:
end_turn→stop、max_tokens→length、tool_use→toolUse、安全性/拒否ケース→error - OpenAI Responses:
completed→stop、incomplete→length、failed/cancelled→error - Google:
STOP→stop、MAX_TOKENS→length、安全性/禁止/不正なファンクションコールクラス→error
エラーセマンティクスは2段階に分かれています:
- モデル完了セマンティクス(プロバイダーが報告したフィニッシュ理由/ステータス)
- トランスポート/ランタイム障害(ネットワーク/クライアント/パーサー/中断例外)
プロバイダーストリームがスローまたは障害を通知した場合、各プロバイダーラッパーはこれをキャッチし、以下を含むターミナル error イベントを発行します:
- 中断シグナルが設定されている場合:
stopReason = "aborted" - それ以外の場合:
stopReason = "error" errorMessage = formatErrorMessageWithRetryAfter(error)
不正なチャンク / SSEパース失敗の動作
Section titled “不正なチャンク / SSEパース失敗の動作”これらのプロバイダーパスでは、チャンク/SSEフレーミングはベンダーSDKストリーム(Anthropic SDK、OpenAI SDK、Google SDK)によって処理されます。このコードではカスタムSSEデコーダーは実装していません。
現在の実装における観察された動作:
- SDKレベルでの不正なチャンク/SSEパースは、例外またはストリームの
errorイベントとして表面化する - プロバイダーラッパーはそれを統一されたターミナル
errorイベントに変換する - ストリーム関数自体にはプロバイダー固有の再開/リトライは存在しない
- より高いレベルのリトライは
AgentSessionの自動リトライロジックで処理される(メッセージレベルのリトライであり、ストリームチャンクの再生ではない)
キャンセルの境界
Section titled “キャンセルの境界”キャンセルは階層化されています:
- AIプロバイダーリクエスト:
options.signalはプロバイダークライアントのストリームコールに渡される。 - プロバイダーラッパー:ストリームループの後、中断されたシグナルはエラーパス(
"Request was aborted")を強制する。 - エージェントループ:各プロバイダーイベントを処理する前に
signal.abortedを確認し、最新のパーシャルから中断されたアシスタントメッセージを合成できる。 - セッション/エージェントコントロール:
AgentSession.abort()→agent.abort()→ 共有中断コントローラーのキャンセル。
ツール実行のキャンセルはモデルストリームのキャンセルとは別です:
- ツールランナーは
AbortSignal.any([agentSignal, steeringAbortSignal])を使用する - ステアリング割り込みは、既に生成されたツール結果を保持しながら残りのツール実行を中断できる
バックプレッシャーの境界
Section titled “バックプレッシャーの境界”プロバイダーSDKストリームとダウンストリームコンシューマーの間にはハードなバックプレッシャーメカニズムはありません:
EventStreamは最大サイズのないインメモリキューを使用する- スロットリングはUIの更新レートを低下させるが、プロバイダーの取り込みを遅くしない
- コンシューマーが大幅に遅延した場合、キューに入れられたイベントは完了まで増加し続ける可能性がある
現在の設計は、バウンデッドバッファのフロー制御よりも応答性とシンプルな順序付けを優先しています。
ストリームイベントがエージェント/セッションイベントとして表面化する方法
Section titled “ストリームイベントがエージェント/セッションイベントとして表面化する方法”agentLoop.streamAssistantResponse() は AssistantMessageEvent を AgentEvent にブリッジします:
start時:プレースホルダーのアシスタントメッセージをプッシュし、message_startを発行する- ブロックイベント(
text_*、thinking_*、toolcall_*)時:最後のアシスタントメッセージを更新し、生のassistantMessageEventを含むmessage_updateを発行する - ターミナル(
done/error)時:response.result()から最終メッセージを解決し、message_endを発行する
AgentSession はこれらのイベントをセッションレベルの動作のために消費します:
- TTSRは
text_deltaとtoolcall_deltaのためにmessage_update.assistantMessageEventを監視する - ストリーミング編集ガードは
editコールでのtoolcall_delta/toolcall_endを検査し、早期に中断できる - 永続化は
message_endで完成したメッセージを書き込む - 自動リトライはアシスタントの
stopReason === "error"とerrorMessageヒューリスティックを検査する
統一とプロバイダー固有の責務
Section titled “統一とプロバイダー固有の責務”統一(共通コントラクト):
- イベント形状(
AssistantMessageEvent) - 最終結果の抽出(
done/error) - デルタスロットリング + マージルール
- エージェント/セッションイベント伝播モデル
プロバイダー固有(完全には抽象化されていない):
- アップストリームのイベント分類とマッピングロジック
- ストップ理由の変換テーブル
- ツールコールIDの規約
- リーズニング/シンキングブロックのセマンティクスとシグネチャ
- 使用量トークンのセマンティクスと利用可能タイミング
- APIごとのメッセージ変換制約
実装ファイル
Section titled “実装ファイル”../../ai/src/stream.ts— プロバイダーディスパッチ、オプションマッピング、APIキー/セッションの配管。../../ai/src/utils/event-stream.ts— 汎用ストリームキューとアシスタントデルタスロットリング。../../ai/src/utils/json-parse.ts— ストリーミングされたツール引数の部分的なJSONパース。../../ai/src/providers/anthropic.ts— AnthropicイベントのTranslationとツールJSONデルタの蓄積。../../ai/src/providers/openai-responses.ts— OpenAI Responsesイベントの変換とステータスマッピング。../../ai/src/providers/google.ts— Geminiストリームチャンクからブロックへの変換。../../ai/src/providers/google-shared.ts— Geminiフィニッシュ理由マッピングと共有変換ルール。../../agent/src/agent-loop.ts— プロバイダーストリームの消費とmessage_updateのブリッジ。../src/session/agent-session.ts— ストリーミング更新、中断、リトライ、永続化のセッションレベルの処理。