2626 ResponseReasoningSummaryTextDeltaEvent ,
2727)
2828from agents .models .openai_provider import OpenAIProvider
29+ from openai .types .responses .response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent
30+ from openai .types .responses .response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent
31+ from openai .types .responses .response_reasoning_text_done_event import ResponseReasoningTextDoneEvent
2932
3033from agentex import AsyncAgentex
3134from agentex .lib .utils .logging import make_logger
3235from agentex .lib .core .tracing .tracer import AsyncTracer
36+ from agentex .types .reasoning_content_delta import ReasoningContentDelta
37+ from agentex .types .reasoning_summary_delta import ReasoningSummaryDelta
3338from agentex .types .task_message_delta import TextDelta
3439from agentex .types .task_message_update import (
3540 StreamTaskMessageDone ,
@@ -460,22 +465,25 @@ def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any)
460465 return call_id , tool_name , content
461466
462467
463- async def convert_openai_to_agentex_events (stream_response ):
464- """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events.
465- This function takes an async iterator of OpenAI events and yields AgentEx
466- TaskMessageUpdate events based on the OpenAI event types.
468+ async def convert_openai_to_agentex_events_with_reasoning (stream_response ):
469+ """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support.
470+
471+ This is an enhanced version of the base converter that includes support for:
472+ - Reasoning content deltas (for o1 models)
473+ - Reasoning summary deltas (for o1 models)
474+
467475 Args:
468476 stream_response: An async iterator of OpenAI streaming events
469477 Yields:
470- TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta or StreamTaskMessageDone)
478+ TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta, StreamTaskMessageFull, or StreamTaskMessageDone)
471479 """
472480
473481 tool_map = {}
474482 event_count = 0
475483 message_index = 0 # Track message index for proper sequencing
476484 seen_tool_output = False # Track if we've seen tool output to know when final text starts
477485 item_id_to_index = {} # Map item_id to message index
478- current_reasoning_summary = "" # Accumulate reasoning summary text
486+ item_id_to_type = {} # Map item_id to content type ( text, reasoning_content, reasoning_summary)
479487
480488 async for event in stream_response :
481489 event_count += 1
@@ -495,16 +503,107 @@ async def convert_openai_to_agentex_events(stream_response):
495503 elif isinstance (raw_event , ResponseOutputItemDoneEvent ):
496504 item_id = raw_event .item .id
497505 if item_id in item_id_to_index :
498- # Send done event for this message
499- yield StreamTaskMessageDone (
500- type = "done" ,
506+ # Get the message type to decide whether to send done event
507+ message_type = item_id_to_type .get (item_id , "text" )
508+
509+ # Don't send done events for reasoning content/summary
510+ # They just end with their last delta
511+ if message_type not in ("reasoning_content" , "reasoning_summary" ):
512+ yield StreamTaskMessageDone (
513+ type = "done" ,
514+ index = item_id_to_index [item_id ],
515+ )
516+
517+ # Skip reasoning summary part added events - we handle them on delta
518+ elif isinstance (raw_event , ResponseReasoningSummaryPartAddedEvent ):
519+ pass
520+
521+ # Handle reasoning summary text delta events
522+ elif isinstance (raw_event , ResponseReasoningSummaryTextDeltaEvent ):
523+ item_id = raw_event .item_id
524+ summary_index = raw_event .summary_index
525+
526+ # If this is a new item_id we haven't seen, create a new message
527+ if item_id and item_id not in item_id_to_index :
528+ message_index += 1
529+ item_id_to_index [item_id ] = message_index
530+ item_id_to_type [item_id ] = "reasoning_summary"
531+
532+ # Send a start event for this new reasoning summary message
533+ yield StreamTaskMessageStart (
534+ type = "start" ,
501535 index = item_id_to_index [item_id ],
536+ content = TextContent (
537+ type = "text" ,
538+ author = "agent" ,
539+ content = "" , # Start with empty content
540+ ),
502541 )
503542
504- # Skip reasoning summary events since o1 reasoning tokens are not accessible
505- elif isinstance (raw_event , (ResponseReasoningSummaryPartAddedEvent ,
506- ResponseReasoningSummaryTextDeltaEvent ,
507- ResponseReasoningSummaryPartDoneEvent )):
543+ # Use the index for this item_id
544+ current_index = item_id_to_index .get (item_id , message_index )
545+
546+ # Yield reasoning summary delta
547+ yield StreamTaskMessageDelta (
548+ type = "delta" ,
549+ index = current_index ,
550+ delta = ReasoningSummaryDelta (
551+ type = "reasoning_summary" ,
552+ summary_index = summary_index ,
553+ summary_delta = raw_event .delta ,
554+ ),
555+ )
556+
557+ # Handle reasoning summary text done events
558+ elif isinstance (raw_event , ResponseReasoningSummaryTextDoneEvent ):
559+ # We do NOT close the streaming context here
560+ # as there can be multiple reasoning summaries.
561+ # The context will be closed when the entire
562+ # output item is done (ResponseOutputItemDoneEvent)
563+ pass
564+
565+ # Handle reasoning content text delta events
566+ elif isinstance (raw_event , ResponseReasoningTextDeltaEvent ):
567+ item_id = raw_event .item_id
568+ content_index = raw_event .content_index
569+
570+ # If this is a new item_id we haven't seen, create a new message
571+ if item_id and item_id not in item_id_to_index :
572+ message_index += 1
573+ item_id_to_index [item_id ] = message_index
574+ item_id_to_type [item_id ] = "reasoning_content"
575+
576+ # Send a start event for this new reasoning content message
577+ yield StreamTaskMessageStart (
578+ type = "start" ,
579+ index = item_id_to_index [item_id ],
580+ content = TextContent (
581+ type = "text" ,
582+ author = "agent" ,
583+ content = "" , # Start with empty content
584+ ),
585+ )
586+
587+ # Use the index for this item_id
588+ current_index = item_id_to_index .get (item_id , message_index )
589+
590+ # Yield reasoning content delta
591+ yield StreamTaskMessageDelta (
592+ type = "delta" ,
593+ index = current_index ,
594+ delta = ReasoningContentDelta (
595+ type = "reasoning_content" ,
596+ content_index = content_index ,
597+ content_delta = raw_event .delta ,
598+ ),
599+ )
600+
601+ # Handle reasoning content text done events
602+ elif isinstance (raw_event , ResponseReasoningTextDoneEvent ):
603+ # We do NOT close the streaming context here
604+ # as there can be multiple reasoning content texts.
605+ # The context will be closed when the entire
606+ # output item is done (ResponseOutputItemDoneEvent)
508607 pass
509608
510609 # Check if this is a text delta event from OpenAI
@@ -523,6 +622,8 @@ async def convert_openai_to_agentex_events(stream_response):
523622 else :
524623 item_id_to_index [item_id ] = message_index
525624
625+ item_id_to_type [item_id ] = "text"
626+
526627 # Send a start event with empty content for this new text message
527628 yield StreamTaskMessageStart (
528629 type = "start" ,
@@ -548,7 +649,7 @@ async def convert_openai_to_agentex_events(stream_response):
548649 yield delta_message
549650
550651 elif hasattr (event , 'type' ) and event .type == 'run_item_stream_event' :
551- # Skip reasoning_item events since o1 reasoning tokens are not accessible via the API
652+ # Skip reasoning_item events - they're handled via raw_response_event above
552653 if hasattr (event , 'item' ) and event .item .type == 'reasoning_item' :
553654 continue
554655
@@ -587,3 +688,4 @@ async def convert_openai_to_agentex_events(stream_response):
587688 index = message_index ,
588689 content = tool_response_content ,
589690 )
691+
0 commit comments