-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Open
Labels
Description
Responses API with ZDR parameter still seems to maintain state. Even with store parameter is set to false and with reasoning.encrypted_content I see the response contains reasoning item with id rs_07162bf5faaac0660***
So we have 2 oai instances which we are trying to use it load balancing by making responses api stateless. But consecutive requests fail if the processing is sent to another oai instance.
Below I am using REPL utility and saving the processing items in file session. The 2nd turn fails because the reasoning item was not found
from __future__ import annotations
import asyncio
import json
import random
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar
from uuid import uuid4
from agents import (
Agent,
GuardrailFunctionOutput,
InputGuardrail,
Model,
ModelSettings,
Runner,
SessionABC,
WebSearchTool,
set_default_openai_client,
)
from agents.exceptions import InputGuardrailTripwireTriggered
from agents.result import RunResultBase
from agents.run_context import TContext
from agents.stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
RunItemStreamEvent,
)
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from openai import AsyncAzureOpenAI
from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent
from pydantic import BaseModel
if TYPE_CHECKING:
from agents import TResponseInputItem
_dict = {}
class InMemorySession(SessionABC):
"""In-memory implementation of SessionABC for conversation history storage.
This implementation stores conversation items in memory using a class-level
dictionary. It's suitable for development, testing, and single-instance
deployments where persistence is not required.
"""
# Class-level storage shared across all instances
_storage: ClassVar[dict[str, list[TResponseInputItem]]] = {}
_storage_dir: ClassVar[Path] = Path("session_data")
def __init__(self, session_id: str) -> None:
self.session_id = session_id
# Create storage directory if it doesn't exist
self._storage_dir.mkdir(exist_ok=True)
# Initialize storage for this session if it doesn't exist
if session_id not in self._storage:
self._storage[session_id] = []
# Try to load from file if it exists
self._load_from_file()
def _get_file_path(self) -> Path:
return self._storage_dir / f"{self.session_id}.json"
def _save_to_file(self) -> None:
"""Save the current session data to a JSON file."""
try:
file_path = self._get_file_path()
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self._storage[self.session_id], f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving session to file: {e}")
def _load_from_file(self) -> None:
"""Load session data from a JSON file if it exists."""
try:
file_path = self._get_file_path()
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
self._storage[self.session_id] = json.load(f)
except Exception as e:
print(f"Error loading session from file: {e}")
self._storage[self.session_id] = []
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
items = self._storage.get(self.session_id, [])
if limit is None:
return items.copy()
# Return the latest N items (from the end of the list)
if limit <= 0:
return []
# Get the last N items and return them in chronological order
return items[-limit:] if len(items) > limit else items.copy()
async def add_items(self, items: list[TResponseInputItem]) -> None:
if self.session_id not in self._storage:
self._storage[self.session_id] = []
self._storage[self.session_id].extend(items)
self._save_to_file()
async def pop_item(self) -> TResponseInputItem | None:
items = self._storage.get(self.session_id, [])
if not items:
return None
item = items.pop()
self._save_to_file()
return item
async def clear_session(self) -> None:
"""Clear all items for this session."""
self._storage[self.session_id] = []
self._save_to_file()
class HomeworkOutput(BaseModel):
is_homework: bool
reasoning: str
model_settings = ModelSettings(store=False, response_include=["reasoning.encrypted_content"])
guardrail_agent = Agent(
name="Guardrail check",
instructions="Check if the user is asking about homework.",
output_type=HomeworkOutput,
model="gpt-5"
)
math_tutor_agent = Agent(
name="Math Tutor",
handoff_description="Specialist agent for math questions",
instructions="You provide help with math problems. Explain your reasoning at each step and include examples",
model_settings=model_settings,
model="gpt-5"
)
history_tutor_agent = Agent(
name="History Tutor",
handoff_description="Specialist agent for historical questions",
instructions="You provide assistance with historical queries. Explain important events and context clearly.",
model_settings=model_settings,
model="gpt-5"
)
social_studies_tutor_agent = Agent(
name="Social Studies Tutor",
handoff_description="Specialist agent for social studies questions",
instructions="You provide assistance with social studies questions. Explain concepts clearly and provide relevant examples.",
model_settings=model_settings,
model="gpt-5",
)
async def homework_guardrail(ctx, agent, input_data):
result = await Runner.run(guardrail_agent, input_data, context=ctx.context)
final_output = result.final_output_as(HomeworkOutput)
return GuardrailFunctionOutput(
output_info=final_output,
tripwire_triggered=not final_output.is_homework,
)
triage_agent = Agent(
name="Triage Agent",
instructions="You determine which agent to use based on the user's homework question",
handoffs=[history_tutor_agent, math_tutor_agent, social_studies_tutor_agent],
model_settings=model_settings,
model="gpt-5"
)
async def run_demo_loop(
session_id: str,
agent: Agent[Any], *, stream: bool = True, context: TContext | None = None
) -> None:
"""Run a simple REPL loop with the given agent.
This utility allows quick manual testing and debugging of an agent from the
command line. Conversation state is preserved across turns. Enter ``exit``
or ``quit`` to stop the loop.
Args:
agent: The starting agent to run.
stream: Whether to stream the agent output.
context: Additional context information to pass to the runner.
"""
current_agent = agent
input_items: list[TResponseInputItem] = []
while True:
try:
user_input = input(" > ")
except (EOFError, KeyboardInterrupt):
print()
break
if user_input.strip().lower() in {"exit", "quit"}:
break
if not user_input:
continue
input_items.append({"role": "user", "content": user_input})
result: RunResultBase
azure_openai_client = AsyncAzureOpenAI(
azure_endpoint=str(random.choice([
#//multiple oai instances
])),
api_version="2025-03-01-preview",
azure_ad_token_provider=get_bearer_token_provider(
DefaultAzureCredential(),
"https://cognitiveservices.azure.com/.default",
),
max_retries=3, # Enable SDK's built-in retry logic with exponential backoff
)
set_default_openai_client(azure_openai_client)
if stream:
result = Runner.run_streamed(current_agent, input=user_input, context=context,
session=InMemorySession(session_id))
async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
if isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)
elif isinstance(event, RunItemStreamEvent):
if event.item.type == "tool_call_item":
print("\n[tool called]", flush=True)
elif event.item.type == "tool_call_output_item":
print(f"\n[tool output: {event.item.output}]", flush=True)
elif isinstance(event, AgentUpdatedStreamEvent):
print(f"\n[Agent updated: {event.new_agent.name}]", flush=True)
print()
else:
result = await Runner.run(current_agent, input_items, context=context)
if result.final_output is not None:
print(result.final_output)
current_agent = result.last_agent
input_items = result.to_input_list()
async def main():
# Example 1: History question
session_id = str(uuid4())
await run_demo_loop("c3776624-2dac-4f7a-9f05-640e9d9aaf53",triage_agent)
if __name__ == "__main__":
asyncio.run(main())[
{
"content": "Hi",
"role": "user"
},
{
"id": "rs_07162bf5faaac06601691147e32c2c819695b23a1b13fe694f",
"summary": [],
"type": "reasoning",
"encrypted_content": "**KO6h0ugkDkcB7wJA5m_pJdJ_QGwk3uHbk5iar6vbeWeAcTOFmV91AftnLPUrf71g4T0Wco2KCkuW96X6b5WiuqJwIA1nNp5bWy73KLsAa7Wr9jwo6VyvfI2b1Ba33TnoybNsmNjUJzZF2Eu-v_oUsEU2-2m3x-tC-ErOp8acZrVEjE3I6L_uCYLw=="
},
{
"id": "msg_07162bf5faaac06601691147e510bc8196bb5307d3495b8b92",
"content": [
{
"annotations": [],
"text": "Hi! What homework question can I help with? Tell me the subject (math, history, or social studies) and share the problem—text or a photo works. If you’ve started work, let me know where you’re stuck.",
"type": "output_text",
"logprobs": []
}
],
"role": "assistant",
"status": "completed",
"type": "message"
},
{
"content": "Are you sure?",
"role": "user"
},
{
"id": "rs_07162bf5faaac066016911480cbdd88196bcbc5c5d16e08807",
"summary": [],
"type": "reasoning",
"encrypted_content": "**ZhfZIENHUdaIpcDmVmQd2mZNg88777MCJelhrKVZ_jf4-TbEY3o-G9P5VXAE6lckVs1TIBltcIB1gzQ1i4zbsx6-EWP-bJmLDjXRDduz3RSnl3kQrz0uGFSQLQXT_qNqBsppqKfmQ_OlzmZ_UxD91FxkE33X2oWFNtxEnXh9baR4RsEfFw5aQutpTiKWPtV1KoShYs1xirG1EkPkE="
},
{
"id": "msg_07162bf5faaac066016911480f7c6481969972f62764e29860",
"content": [
{
"annotations": [],
"text": "Yes—I'm here to help. Share your homework problem and the subject (math, history, or social studies), and I’ll route you to the right tutor. If you meant something else by “Are you sure?”, let me know what you’re asking.",
"type": "output_text",
"logprobs": []
}
],
"role": "assistant",
"status": "completed",
"type": "message"
},
{
"content": "Really?",
"role": "user"
}
]