2727
2828import backoff
2929import httpx
30- from opentelemetry import trace
3130from opentelemetry import trace as otel_trace_api
3231from opentelemetry .sdk .trace import TracerProvider
3332from opentelemetry .sdk .trace .id_generator import RandomIdGenerator
3736)
3837from packaging .version import Version
3938
40- from langfuse ._client .attributes import LangfuseOtelSpanAttributes
39+ from langfuse ._client .attributes import LangfuseOtelSpanAttributes , _serialize
4140from langfuse ._client .constants import (
41+ LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT ,
4242 ObservationTypeGenerationLike ,
4343 ObservationTypeLiteral ,
4444 ObservationTypeLiteralNoEvent ,
5757 LANGFUSE_TRACING_ENABLED ,
5858 LANGFUSE_TRACING_ENVIRONMENT ,
5959)
60+ from langfuse ._client .propagation import (
61+ PropagatedExperimentAttributes ,
62+ _propagate_attributes ,
63+ )
6064from langfuse ._client .resource_manager import LangfuseResourceManager
6165from langfuse ._client .span import (
6266 LangfuseAgent ,
7074 LangfuseSpan ,
7175 LangfuseTool ,
7276)
73- from langfuse ._client .utils import run_async_safely
77+ from langfuse ._client .utils import get_sha256_hash_hex , run_async_safely
7478from langfuse ._utils import _get_timestamp
7579from langfuse ._utils .parse_error import handle_fern_exception
7680from langfuse ._utils .prompt_cache import PromptCache
@@ -1638,10 +1642,6 @@ def update_current_trace(
16381642 ) -> None :
16391643 """Update the current trace with additional information.
16401644
1641- This method updates the Langfuse trace that the current span belongs to. It's useful for
1642- adding trace-level metadata like user ID, session ID, or tags that apply to
1643- the entire Langfuse trace rather than just a single observation.
1644-
16451645 Args:
16461646 name: Updated name for the Langfuse trace
16471647 user_id: ID of the user who initiated the Langfuse trace
@@ -1653,25 +1653,8 @@ def update_current_trace(
16531653 tags: List of tags to categorize the Langfuse trace
16541654 public: Whether the Langfuse trace should be publicly accessible
16551655
1656- Example:
1657- ```python
1658- with langfuse.start_as_current_span(name="handle-request") as span:
1659- # Get user information
1660- user = authenticate_user(request)
1661-
1662- # Update trace with user context
1663- langfuse.update_current_trace(
1664- user_id=user.id,
1665- session_id=request.session_id,
1666- tags=["production", "web-app"]
1667- )
1668-
1669- # Continue processing
1670- response = process_request(request)
1671-
1672- # Update span with results
1673- span.update(output=response)
1674- ```
1656+ See Also:
1657+ :func:`langfuse.propagate_attributes`: Recommended replacement
16751658 """
16761659 if not self ._tracing_enabled :
16771660 langfuse_logger .debug (
@@ -1817,7 +1800,7 @@ def _create_remote_parent_span(
18171800 is_remote = False ,
18181801 )
18191802
1820- return trace .NonRecordingSpan (span_context )
1803+ return otel_trace_api .NonRecordingSpan (span_context )
18211804
18221805 def _is_valid_trace_id (self , trace_id : str ) -> bool :
18231806 pattern = r"^[0-9a-f]{32}$"
@@ -2477,7 +2460,7 @@ def run_experiment(
24772460 evaluators : List [EvaluatorFunction ] = [],
24782461 run_evaluators : List [RunEvaluatorFunction ] = [],
24792462 max_concurrency : int = 50 ,
2480- metadata : Optional [Dict [str , Any ]] = None ,
2463+ metadata : Optional [Dict [str , str ]] = None ,
24812464 ) -> ExperimentResult :
24822465 """Run an experiment on a dataset with automatic tracing and evaluation.
24832466
@@ -2649,7 +2632,7 @@ def average_accuracy(*, item_results, **kwargs):
26492632 evaluators = evaluators or [],
26502633 run_evaluators = run_evaluators or [],
26512634 max_concurrency = max_concurrency ,
2652- metadata = metadata or {} ,
2635+ metadata = metadata ,
26532636 ),
26542637 ),
26552638 )
@@ -2665,7 +2648,7 @@ async def _run_experiment_async(
26652648 evaluators : List [EvaluatorFunction ],
26662649 run_evaluators : List [RunEvaluatorFunction ],
26672650 max_concurrency : int ,
2668- metadata : Dict [str , Any ],
2651+ metadata : Optional [ Dict [str , Any ]] = None ,
26692652 ) -> ExperimentResult :
26702653 langfuse_logger .debug (
26712654 f"Starting experiment '{ name } ' run '{ run_name } ' with { len (data )} items"
@@ -2763,85 +2746,123 @@ async def _process_experiment_item(
27632746 experiment_name : str ,
27642747 experiment_run_name : str ,
27652748 experiment_description : Optional [str ],
2766- experiment_metadata : Dict [str , Any ],
2749+ experiment_metadata : Optional [ Dict [str , Any ]] = None ,
27672750 ) -> ExperimentItemResult :
2768- # Execute task with tracing
27692751 span_name = "experiment-item-run"
27702752
27712753 with self .start_as_current_span (name = span_name ) as span :
27722754 try :
2773- output = await _run_task (task , item )
2774-
27752755 input_data = (
27762756 item .get ("input" )
27772757 if isinstance (item , dict )
27782758 else getattr (item , "input" , None )
27792759 )
27802760
2781- item_metadata : Dict [str , Any ] = {}
2761+ if input_data is None :
2762+ raise ValueError ("Experiment Item is missing input. Skipping item." )
2763+
2764+ expected_output = (
2765+ item .get ("expected_output" )
2766+ if isinstance (item , dict )
2767+ else getattr (item , "expected_output" , None )
2768+ )
27822769
2783- if isinstance (item , dict ):
2784- item_metadata = item .get ("metadata" , None ) or {}
2770+ item_metadata = (
2771+ item .get ("metadata" )
2772+ if isinstance (item , dict )
2773+ else getattr (item , "metadata" , None )
2774+ )
27852775
2786- final_metadata = {
2776+ final_observation_metadata = {
27872777 "experiment_name" : experiment_name ,
27882778 "experiment_run_name" : experiment_run_name ,
2789- ** experiment_metadata ,
2779+ ** ( experiment_metadata or {}) ,
27902780 }
27912781
2792- if (
2793- not isinstance (item , dict )
2794- and hasattr (item , "dataset_id" )
2795- and hasattr (item , "id" )
2796- ):
2797- final_metadata .update (
2798- {"dataset_id" : item .dataset_id , "dataset_item_id" : item .id }
2799- )
2800-
2801- if isinstance (item_metadata , dict ):
2802- final_metadata .update (item_metadata )
2803-
2804- span .update (
2805- input = input_data ,
2806- output = output ,
2807- metadata = final_metadata ,
2808- )
2809-
2810- # Get trace ID for linking
28112782 trace_id = span .trace_id
2783+ dataset_id = None
2784+ dataset_item_id = None
28122785 dataset_run_id = None
28132786
28142787 # Link to dataset run if this is a dataset item
28152788 if hasattr (item , "id" ) and hasattr (item , "dataset_id" ):
28162789 try :
2817- dataset_run_item = self .api .dataset_run_items .create (
2790+ # Use sync API to avoid event loop issues when run_async_safely
2791+ # creates multiple event loops across different threads
2792+ dataset_run_item = await asyncio .to_thread (
2793+ self .api .dataset_run_items .create ,
28182794 request = CreateDatasetRunItemRequest (
28192795 runName = experiment_run_name ,
28202796 runDescription = experiment_description ,
28212797 metadata = experiment_metadata ,
28222798 datasetItemId = item .id , # type: ignore
28232799 traceId = trace_id ,
28242800 observationId = span .id ,
2825- )
2801+ ),
28262802 )
28272803
28282804 dataset_run_id = dataset_run_item .dataset_run_id
28292805
28302806 except Exception as e :
28312807 langfuse_logger .error (f"Failed to create dataset run item: { e } " )
28322808
2809+ if (
2810+ not isinstance (item , dict )
2811+ and hasattr (item , "dataset_id" )
2812+ and hasattr (item , "id" )
2813+ ):
2814+ dataset_id = item .dataset_id
2815+ dataset_item_id = item .id
2816+
2817+ final_observation_metadata .update (
2818+ {"dataset_id" : dataset_id , "dataset_item_id" : dataset_item_id }
2819+ )
2820+
2821+ if isinstance (item_metadata , dict ):
2822+ final_observation_metadata .update (item_metadata )
2823+
2824+ experiment_id = dataset_run_id or self ._create_observation_id ()
2825+ experiment_item_id = (
2826+ dataset_item_id or get_sha256_hash_hex (_serialize (input_data ))[:16 ]
2827+ )
2828+ span ._otel_span .set_attributes (
2829+ {
2830+ k : v
2831+ for k , v in {
2832+ LangfuseOtelSpanAttributes .ENVIRONMENT : LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT ,
2833+ LangfuseOtelSpanAttributes .EXPERIMENT_DESCRIPTION : experiment_description ,
2834+ LangfuseOtelSpanAttributes .EXPERIMENT_ITEM_EXPECTED_OUTPUT : _serialize (
2835+ expected_output
2836+ ),
2837+ }.items ()
2838+ if v is not None
2839+ }
2840+ )
2841+
2842+ with _propagate_attributes (
2843+ experiment = PropagatedExperimentAttributes (
2844+ experiment_id = experiment_id ,
2845+ experiment_name = experiment_run_name ,
2846+ experiment_metadata = _serialize (experiment_metadata ),
2847+ experiment_dataset_id = dataset_id ,
2848+ experiment_item_id = experiment_item_id ,
2849+ experiment_item_metadata = _serialize (item_metadata ),
2850+ experiment_item_root_observation_id = span .id ,
2851+ )
2852+ ):
2853+ output = await _run_task (task , item )
2854+
2855+ span .update (
2856+ input = input_data ,
2857+ output = output ,
2858+ metadata = final_observation_metadata ,
2859+ )
2860+
28332861 # Run evaluators
28342862 evaluations = []
28352863
28362864 for evaluator in evaluators :
28372865 try :
2838- expected_output = None
2839-
2840- if isinstance (item , dict ):
2841- expected_output = item .get ("expected_output" )
2842- elif hasattr (item , "expected_output" ):
2843- expected_output = item .expected_output
2844-
28452866 eval_metadata : Optional [Dict [str , Any ]] = None
28462867
28472868 if isinstance (item , dict ):
@@ -2862,6 +2883,7 @@ async def _process_experiment_item(
28622883 for evaluation in eval_results :
28632884 self .create_score (
28642885 trace_id = trace_id ,
2886+ observation_id = span .id ,
28652887 name = evaluation .name ,
28662888 value = evaluation .value , # type: ignore
28672889 comment = evaluation .comment ,
0 commit comments