feat(streaming): stream tool call argument deltas in TemporalStreamingModel#355
Open
vkalmathscale wants to merge 6 commits into
Open
feat(streaming): stream tool call argument deltas in TemporalStreamingModel#355vkalmathscale wants to merge 6 commits into
vkalmathscale wants to merge 6 commits into
Conversation
…gModel Wire ResponseFunctionCallArgumentsDeltaEvent into the streaming layer introduced in #333, so write-heavy tools (write_file, apply_patch) no longer freeze the UI for the duration of argument generation. The model now opens a per-function-call streaming context with a ToolRequestContent placeholder, emits ToolRequestDelta updates for each argument delta, and finalizes with a StreamTaskMessageFull containing the parsed arguments on ResponseOutputItemDoneEvent. Coalescing and mode dispatch are inherited from the existing streaming infrastructure -- no new flags or surface area. ModelResponse output is unchanged; activity determinism is unaffected. End-of-loop cleanup defensively closes any function-call contexts that didn't see a Done event (truncated stream or mid-stream exception). Adds two tests covering the happy path (well-formed JSON args -> deltas + parsed Full) and the malformed-args fallback (invalid JSON -> empty dict + WARNING log).
Logging raw_args[:200] could leak partial file contents, PII, or secrets from write_file / apply_patch arguments into production log pipelines. Switch to logging only bounded metadata (tool name + raw arg byte count). The existing malformed-args test still passes since it asserts on the "Failed to parse tool call arguments" prefix, which is preserved.
danielmillerp
approved these changes
May 18, 2026
…call-arg-deltas # Conflicts: # src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
TemporalStreamingModelalready streams text deltas and reasoning summary deltas to Redis viaStreamingTaskMessageContext, butResponseFunctionCallArgumentsDeltaEventwas being silently buffered intofunction_calls_in_progress[...]['arguments']with no per-delta publish. Consumers only saw the completed tool call surface later (after the activity returned, via downstream hooks if any).For write-heavy tools —
write_file,apply_patch, anything that puts a 2–20KB string into a single argument — the model spends multiple seconds generating the argument body, and the UI sees nothing until the entire activity finishes. The result is a frozen UI followed by an abrupt jump when the activity returns.This PR threads tool-call argument deltas through the same streaming machinery used for text and reasoning, riding on the
CoalescingBuffer+StreamingModeinfrastructure added in #333. The buffer's merge helpers already key ontool_call_idforToolRequestDelta, so coalescing, mode dispatch, and opt-out are inherited from existing infra.Design
TemporalStreamingModelnow opens astreaming_task_message_contextper function call (keyed off the call'soutput_index), withinitial_content=ToolRequestContent(...)and the model's configuredstreaming_mode. Three event handlers participate:ResponseOutputItemAddedEvent(type=function_call)function_calls_in_progress[output_index]['context'].ResponseFunctionCallArgumentsDeltaEventStreamTaskMessageDelta(delta=ToolRequestDelta(arguments_delta=..., tool_call_id=..., name=...))into the per-call context. The coalescing buffer merges consecutive deltas with the sametool_call_id.ResponseOutputItemDoneEvent(type=function_call)JSONDecodeError), emit a finalStreamTaskMessageFull(content=ToolRequestContent(...)), and close the context.End-of-loop cleanup defensively closes any function-call contexts that didn't see a
Doneevent (truncated stream or mid-stream exception).ModelResponseoutput is unchanged:output_itemsstill receives the same completeResponseFunctionToolCall. Activity determinism is unaffected — streaming is a side effect.What this does NOT change
StreamingModeis already the on/off knob. No new flag.streaming_mode="off"suppresses tool-arg deltas the same way it suppresses text deltas."per_token"publishes immediately;"coalesced"(default) batches at 50ms / 128 chars.TemporalStreamingHooks.on_tool_startis unchanged. It still fires after the activity returns and still emits aToolRequestContentFull message via thestream_lifecycle_contentactivity. See Caveats.Caveats
Overlap with
TemporalStreamingHooks.on_tool_start. Users who passTemporalStreamingHookstoRunner.runwill now see two persistedtask_messages per tool call: one created by the model (delta stream + final Full) and one created by the hook (Full only). Both land on the same Redis topictask:{task_id}with differentparent_task_message.ids, so a default UI will render two cards for the same logical tool call.This needs a follow-up to decide which path owns the canonical
ToolRequestemission. Options for review discussion:on_tool_start's Full emit when the model is also emitting (auto-detect via a workflow-instance flag, mirroring how_task_id/_trace_idare threaded today).on_tool_start's Full emit entirely in a follow-up major bump (the model becomes the single source of truth forToolRequestevents).Until that follow-up, users who want streamed tool args without duplicate emits should subclass
TemporalStreamingHooksand overrideon_tool_startto a no-op.Coalescing windows still apply. With the default 50ms / 128-char window, tool args render in ~50ms-granularity chunks rather than per-token. This is the same tradeoff already made for text streaming in perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window) #333, and the right default for write-heavy tools (UX value is "watch the artifact appear", not "see each token").
Malformed argument JSON. If the model produces invalid JSON for the args (truncated stream, hallucinated structure), the path logs a WARNING and emits
arguments={}in the finalToolRequestContent. The raw delta stream is preserved on the consumer side regardless — only the structured final view falls back.Test plan
test_streaming_model.py::TestStreamingModelFunctionCallArgsStreaming:ToolRequestContent, oneStreamTaskMessageDelta(ToolRequestDelta)perArgumentsDeltaevent preserving the delta text, and one finalStreamTaskMessageFull(ToolRequestContent)with parsed args.arguments={}in the final Full and logs a WARNING.test_streaming_model.pysuite passes (42/42).ruff checkclean on both modified files.streaming_mode="off"suppresses tool-arg deltas (only the final persisted message exists on close).cc reviewers familiar with #333's
CoalescingBufferdesign.Greptile Summary
This PR threads tool-call argument deltas through the existing streaming machinery in
TemporalStreamingModel, so write-heavy tools likewrite_filepublish incrementalToolRequestDeltaupdates to Redis instead of buffering the entire argument body until the activity returns. The implementation follows the establishedCoalescingBuffer+StreamingModepattern from #333.streaming_task_message_contextis opened onResponseOutputItemAddedEvent, eachResponseFunctionCallArgumentsDeltaEventemits aStreamTaskMessageDelta(ToolRequestDelta), andResponseOutputItemDoneEventemits a finalStreamTaskMessageFull(ToolRequestContent)then closes the context withcall_data['context'] = Nonein afinallyblock — correctly preventing the orphan-cleanup loop from double-closing.TemporalStreamingHooksis also active, twoToolRequesttask messages will be persisted per call (on_tool_startstill fires); the PR flags this as a follow-up to resolve ownership of the canonical emission.close()call count is not asserted, leaving the double-close guard untested against future regressions.Confidence Score: 4/5
Safe to merge for the streaming side-effect path; the actual tool-call execution and ModelResponse output are unchanged.
The core event-handling logic is correct and follows established patterns. The
finally: call_data['context'] = Noneblock properly prevents double-close, and JSON parse failures fall back gracefully. The main gap is that the new tests do not assertclose()is called exactly once per function call, so the double-close guard has no regression test. The acknowledgedTemporalStreamingHooksduplicate-emission caveat is a known UX concern, not a data-correctness issue.The test file would benefit from a
close()call-count assertion to lock in the double-close prevention; the production model file itself looks correct.Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant API as OpenAI Streaming API participant TSM as TemporalStreamingModel participant CTX as StreamingContext (per call) participant BUF as CoalescingBuffer participant REDIS as Redis (task:{task_id}) API->>TSM: ResponseOutputItemAddedEvent (function_call) TSM->>CTX: "open context (initial_content=ToolRequestContent(args={}))" CTX->>REDIS: persist initial task_message loop N × ResponseFunctionCallArgumentsDeltaEvent API->>TSM: ArgumentsDelta(delta, output_index) TSM->>TSM: "call_data['arguments'] += delta" TSM->>CTX: stream_update(StreamTaskMessageDelta(ToolRequestDelta)) CTX->>BUF: merge delta (keyed on tool_call_id) BUF-->>REDIS: publish (50ms / 128-char window or per-token) end API->>TSM: ResponseFunctionCallArgumentsDoneEvent TSM->>TSM: "call_data['arguments'] = final_args (canonical override)" API->>TSM: ResponseOutputItemDoneEvent (function_call) TSM->>TSM: "json.loads(raw_args) parsed_args (fallback={} on JSONDecodeError)" TSM->>CTX: stream_update(StreamTaskMessageFull(ToolRequestContent(parsed_args))) CTX->>REDIS: persist final message TSM->>CTX: "close() [finally: call_data['context']=None]" API->>TSM: ResponseCompletedEvent TSM->>TSM: "output_items = response.output (authoritative list)" Note over TSM: Orphan cleanup loop — context=None, no-op for clean calls%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant API as OpenAI Streaming API participant TSM as TemporalStreamingModel participant CTX as StreamingContext (per call) participant BUF as CoalescingBuffer participant REDIS as Redis (task:{task_id}) API->>TSM: ResponseOutputItemAddedEvent (function_call) TSM->>CTX: "open context (initial_content=ToolRequestContent(args={}))" CTX->>REDIS: persist initial task_message loop N × ResponseFunctionCallArgumentsDeltaEvent API->>TSM: ArgumentsDelta(delta, output_index) TSM->>TSM: "call_data['arguments'] += delta" TSM->>CTX: stream_update(StreamTaskMessageDelta(ToolRequestDelta)) CTX->>BUF: merge delta (keyed on tool_call_id) BUF-->>REDIS: publish (50ms / 128-char window or per-token) end API->>TSM: ResponseFunctionCallArgumentsDoneEvent TSM->>TSM: "call_data['arguments'] = final_args (canonical override)" API->>TSM: ResponseOutputItemDoneEvent (function_call) TSM->>TSM: "json.loads(raw_args) parsed_args (fallback={} on JSONDecodeError)" TSM->>CTX: stream_update(StreamTaskMessageFull(ToolRequestContent(parsed_args))) CTX->>REDIS: persist final message TSM->>CTX: "close() [finally: call_data['context']=None]" API->>TSM: ResponseCompletedEvent TSM->>TSM: "output_items = response.output (authoritative list)" Note over TSM: Orphan cleanup loop — context=None, no-op for clean callsPrompt To Fix All With AI
Reviews (4): Last reviewed commit: "Merge remote-tracking branch 'origin/nex..." | Re-trigger Greptile