Skip to content

Commit 0a15c49

Browse files
MichaelSun48stainless-app[bot]
authored andcommitted
Handle reasoning in openai event translator
1 parent 40ea807 commit 0a15c49

File tree

1 file changed

+116
-14
lines changed

1 file changed

+116
-14
lines changed

src/agentex/lib/adk/providers/_modules/sync_provider.py

Lines changed: 116 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,15 @@
2626
ResponseReasoningSummaryTextDeltaEvent,
2727
)
2828
from agents.models.openai_provider import OpenAIProvider
29+
from openai.types.responses.response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent
30+
from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent
31+
from openai.types.responses.response_reasoning_text_done_event import ResponseReasoningTextDoneEvent
2932

3033
from agentex import AsyncAgentex
3134
from agentex.lib.utils.logging import make_logger
3235
from agentex.lib.core.tracing.tracer import AsyncTracer
36+
from agentex.types.reasoning_content_delta import ReasoningContentDelta
37+
from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta
3338
from agentex.types.task_message_delta import TextDelta
3439
from agentex.types.task_message_update import (
3540
StreamTaskMessageDone,
@@ -460,22 +465,25 @@ def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any)
460465
return call_id, tool_name, content
461466

462467

463-
async def convert_openai_to_agentex_events(stream_response):
464-
"""Convert OpenAI streaming events to AgentEx TaskMessageUpdate events.
465-
This function takes an async iterator of OpenAI events and yields AgentEx
466-
TaskMessageUpdate events based on the OpenAI event types.
468+
async def convert_openai_to_agentex_events_with_reasoning(stream_response):
469+
"""Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support.
470+
471+
This is an enhanced version of the base converter that includes support for:
472+
- Reasoning content deltas (for o1 models)
473+
- Reasoning summary deltas (for o1 models)
474+
467475
Args:
468476
stream_response: An async iterator of OpenAI streaming events
469477
Yields:
470-
TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta or StreamTaskMessageDone)
478+
TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta, StreamTaskMessageFull, or StreamTaskMessageDone)
471479
"""
472480

473481
tool_map = {}
474482
event_count = 0
475483
message_index = 0 # Track message index for proper sequencing
476484
seen_tool_output = False # Track if we've seen tool output to know when final text starts
477485
item_id_to_index = {} # Map item_id to message index
478-
current_reasoning_summary = "" # Accumulate reasoning summary text
486+
item_id_to_type = {} # Map item_id to content type (text, reasoning_content, reasoning_summary)
479487

480488
async for event in stream_response:
481489
event_count += 1
@@ -495,16 +503,107 @@ async def convert_openai_to_agentex_events(stream_response):
495503
elif isinstance(raw_event, ResponseOutputItemDoneEvent):
496504
item_id = raw_event.item.id
497505
if item_id in item_id_to_index:
498-
# Send done event for this message
499-
yield StreamTaskMessageDone(
500-
type="done",
506+
# Get the message type to decide whether to send done event
507+
message_type = item_id_to_type.get(item_id, "text")
508+
509+
# Don't send done events for reasoning content/summary
510+
# They just end with their last delta
511+
if message_type not in ("reasoning_content", "reasoning_summary"):
512+
yield StreamTaskMessageDone(
513+
type="done",
514+
index=item_id_to_index[item_id],
515+
)
516+
517+
# Skip reasoning summary part added events - we handle them on delta
518+
elif isinstance(raw_event, ResponseReasoningSummaryPartAddedEvent):
519+
pass
520+
521+
# Handle reasoning summary text delta events
522+
elif isinstance(raw_event, ResponseReasoningSummaryTextDeltaEvent):
523+
item_id = raw_event.item_id
524+
summary_index = raw_event.summary_index
525+
526+
# If this is a new item_id we haven't seen, create a new message
527+
if item_id and item_id not in item_id_to_index:
528+
message_index += 1
529+
item_id_to_index[item_id] = message_index
530+
item_id_to_type[item_id] = "reasoning_summary"
531+
532+
# Send a start event for this new reasoning summary message
533+
yield StreamTaskMessageStart(
534+
type="start",
501535
index=item_id_to_index[item_id],
536+
content=TextContent(
537+
type="text",
538+
author="agent",
539+
content="", # Start with empty content
540+
),
502541
)
503542

504-
# Skip reasoning summary events since o1 reasoning tokens are not accessible
505-
elif isinstance(raw_event, (ResponseReasoningSummaryPartAddedEvent,
506-
ResponseReasoningSummaryTextDeltaEvent,
507-
ResponseReasoningSummaryPartDoneEvent)):
543+
# Use the index for this item_id
544+
current_index = item_id_to_index.get(item_id, message_index)
545+
546+
# Yield reasoning summary delta
547+
yield StreamTaskMessageDelta(
548+
type="delta",
549+
index=current_index,
550+
delta=ReasoningSummaryDelta(
551+
type="reasoning_summary",
552+
summary_index=summary_index,
553+
summary_delta=raw_event.delta,
554+
),
555+
)
556+
557+
# Handle reasoning summary text done events
558+
elif isinstance(raw_event, ResponseReasoningSummaryTextDoneEvent):
559+
# We do NOT close the streaming context here
560+
# as there can be multiple reasoning summaries.
561+
# The context will be closed when the entire
562+
# output item is done (ResponseOutputItemDoneEvent)
563+
pass
564+
565+
# Handle reasoning content text delta events
566+
elif isinstance(raw_event, ResponseReasoningTextDeltaEvent):
567+
item_id = raw_event.item_id
568+
content_index = raw_event.content_index
569+
570+
# If this is a new item_id we haven't seen, create a new message
571+
if item_id and item_id not in item_id_to_index:
572+
message_index += 1
573+
item_id_to_index[item_id] = message_index
574+
item_id_to_type[item_id] = "reasoning_content"
575+
576+
# Send a start event for this new reasoning content message
577+
yield StreamTaskMessageStart(
578+
type="start",
579+
index=item_id_to_index[item_id],
580+
content=TextContent(
581+
type="text",
582+
author="agent",
583+
content="", # Start with empty content
584+
),
585+
)
586+
587+
# Use the index for this item_id
588+
current_index = item_id_to_index.get(item_id, message_index)
589+
590+
# Yield reasoning content delta
591+
yield StreamTaskMessageDelta(
592+
type="delta",
593+
index=current_index,
594+
delta=ReasoningContentDelta(
595+
type="reasoning_content",
596+
content_index=content_index,
597+
content_delta=raw_event.delta,
598+
),
599+
)
600+
601+
# Handle reasoning content text done events
602+
elif isinstance(raw_event, ResponseReasoningTextDoneEvent):
603+
# We do NOT close the streaming context here
604+
# as there can be multiple reasoning content texts.
605+
# The context will be closed when the entire
606+
# output item is done (ResponseOutputItemDoneEvent)
508607
pass
509608

510609
# Check if this is a text delta event from OpenAI
@@ -523,6 +622,8 @@ async def convert_openai_to_agentex_events(stream_response):
523622
else:
524623
item_id_to_index[item_id] = message_index
525624

625+
item_id_to_type[item_id] = "text"
626+
526627
# Send a start event with empty content for this new text message
527628
yield StreamTaskMessageStart(
528629
type="start",
@@ -548,7 +649,7 @@ async def convert_openai_to_agentex_events(stream_response):
548649
yield delta_message
549650

550651
elif hasattr(event, 'type') and event.type == 'run_item_stream_event':
551-
# Skip reasoning_item events since o1 reasoning tokens are not accessible via the API
652+
# Skip reasoning_item events - they're handled via raw_response_event above
552653
if hasattr(event, 'item') and event.item.type == 'reasoning_item':
553654
continue
554655

@@ -587,3 +688,4 @@ async def convert_openai_to_agentex_events(stream_response):
587688
index=message_index,
588689
content=tool_response_content,
589690
)
691+

0 commit comments

Comments
 (0)