- Inicio
- Documentation
- Proveedores
- Internos del streaming de proveedores
Internos del streaming de proveedores
Este documento explica cómo el streaming de tokens/herramientas se normaliza en @f5-sales-demo/pi-ai, y luego se propaga a través de @f5-sales-demo/pi-agent-core y los eventos de sesión de coding-agent.
Flujo de extremo a extremo
Sección titulada «Flujo de extremo a extremo»streamSimple()(packages/ai/src/stream.ts) mapea opciones genéricas y despacha a una función de stream del proveedor.- Las funciones de stream del proveedor (
anthropic.ts,openai-responses.ts,google.ts) traducen los eventos de stream nativos del proveedor a la secuencia unificada deAssistantMessageEvent. - Cada proveedor envía eventos a
AssistantMessageEventStream(packages/ai/src/utils/event-stream.ts), que regula los eventos delta y expone:- iteración asíncrona para actualizaciones incrementales
result()para elAssistantMessagefinal
agentLoop(packages/agent/src/agent-loop.ts) consume esos eventos, muta el estado del asistente en curso y emite eventosmessage_updateque transportan elassistantMessageEventsin procesar.AgentSession(packages/coding-agent/src/session/agent-session.ts) se suscribe a los eventos del agente, persiste mensajes, ejecuta hooks de extensión y aplica comportamientos de sesión (reintento, compactación, TTSR, verificaciones de aborto de edición en streaming).
Contrato unificado de stream en @f5-sales-demo/pi-ai
Sección titulada «Contrato unificado de stream en @f5-sales-demo/pi-ai»Todos los proveedores emiten la misma forma (AssistantMessageEvent en packages/ai/src/types.ts):
start- tripletas de ciclo de vida de bloques de contenido:
- texto:
text_start→text_delta* →text_end - pensamiento:
thinking_start→thinking_delta* →thinking_end - llamada a herramienta:
toolcall_start→toolcall_delta* →toolcall_end
- texto:
- evento terminal:
doneconreason: "stop" | "length" | "toolUse"- o
errorconreason: "aborted" | "error"
AssistantMessageEventStream garantiza:
- el resultado final se resuelve mediante el evento terminal (
doneoerror) - los deltas se agrupan/regulan (~50ms)
- los deltas almacenados en búfer se vacían antes de los eventos no-delta y antes de la finalización
Comportamiento de regulación y armonización de deltas
Sección titulada «Comportamiento de regulación y armonización de deltas»AssistantMessageEventStream trata text_delta, thinking_delta y toolcall_delta como eventos combinables:
- los deltas almacenados en búfer se combinan solo cuando type + contentIndex coinciden
- la combinación mantiene la última instantánea
partial - los eventos no-delta fuerzan un vaciado inmediato
Esto suaviza los streams de alta frecuencia del proveedor para consumidores TUI/eventos, pero no es contrapresión del proveedor: los proveedores siguen produciendo a máxima velocidad, mientras el stream local almacena en búfer.
Detalles de normalización por proveedor
Sección titulada «Detalles de normalización por proveedor»Anthropic (anthropic-messages)
Sección titulada «Anthropic (anthropic-messages)»Fuente: packages/ai/src/providers/anthropic.ts
Puntos de normalización:
message_startinicializa el uso (tokens de entrada/salida/caché)content_block_startse mapea a inicios de texto/pensamiento/llamada a herramientacontent_block_deltamapea:text_delta→text_deltathinking_delta→thinking_deltainput_json_delta→toolcall_deltasignature_deltaactualizathinkingSignaturesolamente (sin evento)
content_block_stopemite el*_endcorrespondientemessage_delta.stop_reasonse mapea mediantemapStopReason()
Streaming de argumentos de llamadas a herramientas:
- cada bloque de herramienta lleva un
partialJsoninterno - cada delta JSON se anexa a
partialJson - los
argumentsse reanalizan en cada delta medianteparseStreamingJson() toolcall_endreanaliza una vez más, luego eliminapartialJson
OpenAI Responses (openai-responses)
Sección titulada «OpenAI Responses (openai-responses)»Fuente: packages/ai/src/providers/openai-responses.ts
Puntos de normalización:
response.output_item.addedinicia bloques de razonamiento/texto/llamada a función- los eventos de resumen de razonamiento (
response.reasoning_summary_text.delta) se convierten enthinking_delta - los deltas de salida/rechazo se convierten en
text_delta response.function_call_arguments.deltase convierte entoolcall_deltaresponse.output_item.doneemitethinking_end/text_end/toolcall_endresponse.completedmapea el estado a razón de parada y uso
Streaming de argumentos de llamadas a herramientas:
- mismo patrón de acumulación
partialJsonque Anthropic - los proveedores que envían solo
response.function_call_arguments.doneaún pueblan los argumentos finales - los IDs de llamada a herramienta se normalizan como
"<call_id>|<item_id>"
Google Generative AI (google-generative-ai)
Sección titulada «Google Generative AI (google-generative-ai)»Fuente: packages/ai/src/providers/google.ts
Puntos de normalización:
- itera sobre
candidate.content.parts - las partes de texto se dividen en pensamiento vs texto mediante
isThinkingPart(part) - las transiciones de bloque cierran el bloque anterior antes de iniciar uno nuevo
part.functionCallse trata como una llamada a herramienta completa (start/delta/end se emiten inmediatamente)- la razón de finalización se mapea mediante
mapStopReason()desdegoogle-shared.ts
Streaming de argumentos de llamadas a herramientas:
- los argumentos de llamada a función llegan como objeto estructurado, no como texto JSON incremental
- la implementación emite un
toolcall_deltasintético que contieneJSON.stringify(arguments) - no se necesita un analizador de JSON parcial para Google en esta ruta
Acumulación y recuperación de JSON parcial en llamadas a herramientas
Sección titulada «Acumulación y recuperación de JSON parcial en llamadas a herramientas»El comportamiento compartido para Anthropic/OpenAI Responses utiliza parseStreamingJson() (packages/ai/src/utils/json-parse.ts):
- intenta
JSON.parse - recurre al analizador
partial-jsonpara fragmentos incompletos - si ambos fallan, retorna
{}
Implicaciones:
- los deltas de argumentos malformados o truncados no hacen fallar el procesamiento del stream inmediatamente
- los
argumentsen progreso pueden ser temporalmente{} - deltas válidos posteriores pueden recuperar argumentos estructurados porque el análisis se reintenta en cada anexión
- el
toolcall_endfinal realiza un intento más de análisis antes de la emisión
Razones de parada vs errores de transporte/ejecución
Sección titulada «Razones de parada vs errores de transporte/ejecución»Las razones de parada del proveedor se mapean a stopReason normalizado:
- Anthropic:
end_turn→stop,max_tokens→length,tool_use→toolUse, casos de seguridad/rechazo→error - OpenAI Responses:
completed→stop,incomplete→length,failed/cancelled→error - Google:
STOP→stop,MAX_TOKENS→length, clases de seguridad/prohibido/llamada-a-función-malformada→error
La semántica de errores se divide en dos etapas:
- Semántica de finalización del modelo (razón de finalización/estado reportado por el proveedor)
- Fallo de transporte/ejecución (excepciones de red/cliente/analizador/aborto)
Si el stream del proveedor lanza una excepción o señala fallo, cada wrapper de proveedor captura y emite un evento terminal error con:
stopReason = "aborted"cuando la señal de aborto está activada- de lo contrario
stopReason = "error" errorMessage = formatErrorMessageWithRetryAfter(error)
Comportamiento ante chunks malformados / fallos de análisis SSE
Sección titulada «Comportamiento ante chunks malformados / fallos de análisis SSE»Para estas rutas de proveedores, el enmarcado de chunks/SSE es manejado por los streams del SDK del vendedor (SDK de Anthropic, SDK de OpenAI, SDK de Google). Este código no implementa un decodificador SSE personalizado aquí.
Comportamiento observado en la implementación actual:
- el análisis de chunks/SSE malformados a nivel de SDK se manifiesta como una excepción o evento
errordel stream - el wrapper del proveedor lo convierte en un evento terminal
errorunificado - no hay reanudación/reintento específico del proveedor dentro de la función de stream en sí
- los reintentos de nivel superior se manejan en la lógica de reintento automático de
AgentSession(reintento a nivel de mensaje, no reproducción de chunks del stream)
Límites de cancelación
Sección titulada «Límites de cancelación»La cancelación se organiza en capas:
- Solicitud al proveedor de IA:
options.signalse pasa a la llamada de stream del cliente del proveedor. - Wrapper del proveedor: después del bucle del stream, una señal abortada fuerza la ruta de error (
"Request was aborted"). - Bucle del agente: verifica
signal.abortedantes de manejar cada evento del proveedor y puede sintetizar un mensaje de asistente abortado a partir del parcial más reciente. - Controles de sesión/agente:
AgentSession.abort()->agent.abort()-> cancelación del controlador de aborto compartido.
La cancelación de ejecución de herramientas es independiente de la cancelación del stream del modelo:
- los ejecutores de herramientas usan
AbortSignal.any([agentSignal, steeringAbortSignal]) - las interrupciones de dirección pueden abortar la ejecución restante de herramientas mientras preservan los resultados de herramientas ya producidos
Límites de contrapresión
Sección titulada «Límites de contrapresión»No existe un mecanismo de contrapresión rígido entre el stream del SDK del proveedor y los consumidores posteriores:
EventStreamusa colas en memoria sin tamaño máximo- la regulación reduce la tasa de actualización de la UI pero no ralentiza la ingesta del proveedor
- si los consumidores se retrasan significativamente, los eventos en cola pueden crecer hasta la finalización
El diseño actual favorece la capacidad de respuesta y el ordenamiento simple sobre el control de flujo con búfer acotado.
Cómo los eventos de stream se manifiestan como eventos de agente/sesión
Sección titulada «Cómo los eventos de stream se manifiestan como eventos de agente/sesión»agentLoop.streamAssistantResponse() conecta AssistantMessageEvent con AgentEvent:
- en
start: inserta un mensaje de asistente provisional y emitemessage_start - en eventos de bloque (
text_*,thinking_*,toolcall_*): actualiza el último mensaje del asistente, emitemessage_updatecon elassistantMessageEventsin procesar - en terminal (
done/error): resuelve el mensaje final deresponse.result(), emitemessage_end
AgentSession luego consume esos eventos para comportamientos a nivel de sesión:
- TTSR observa
message_update.assistantMessageEventbuscandotext_deltaytoolcall_delta - la protección de edición en streaming inspecciona
toolcall_delta/toolcall_enden llamadasedity puede abortar anticipadamente - la persistencia escribe mensajes finalizados en
message_end - el reintento automático examina
stopReason === "error"del asistente más heurísticas deerrorMessage
Responsabilidades unificadas vs específicas del proveedor
Sección titulada «Responsabilidades unificadas vs específicas del proveedor»Unificadas (contrato común):
- forma del evento (
AssistantMessageEvent) - extracción del resultado final (
done/error) - regulación de deltas + reglas de combinación
- modelo de propagación de eventos agente/sesión
Específicas del proveedor (no completamente abstraídas):
- taxonomías de eventos upstream y lógica de mapeo
- tablas de traducción de razón de parada
- convenciones de ID de llamada a herramienta
- semántica de bloques de razonamiento/pensamiento y firmas
- semántica de tokens de uso y disponibilidad temporal
- restricciones de conversión de mensajes por API
Archivos de implementación
Sección titulada «Archivos de implementación»../../ai/src/stream.ts— despacho de proveedor, mapeo de opciones, canalización de clave API/sesión.../../ai/src/utils/event-stream.ts— cola genérica de stream + regulación de deltas del asistente.../../ai/src/utils/json-parse.ts— análisis de JSON parcial para argumentos de herramientas en streaming.../../ai/src/providers/anthropic.ts— traducción de eventos de Anthropic y acumulación de deltas JSON de herramientas.../../ai/src/providers/openai-responses.ts— traducción de eventos de OpenAI Responses y mapeo de estados.../../ai/src/providers/google.ts— traducción de chunks de stream de Gemini a bloques.../../ai/src/providers/google-shared.ts— mapeo de razón de finalización de Gemini y reglas de conversión compartidas.../../agent/src/agent-loop.ts— consumo del stream del proveedor y conexión demessage_update.../src/session/agent-session.ts— manejo a nivel de sesión de actualizaciones en streaming, aborto, reintento y persistencia.