Skip to content

Commit eb66898

Browse files
authored
feat(client): propagate trace attributes onto all child spans on update (#1415)
1 parent 0c11888 commit eb66898

File tree

12 files changed

+3408
-80
lines changed

12 files changed

+3408
-80
lines changed

langfuse/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from ._client.constants import ObservationTypeLiteral
88
from ._client.get_client import get_client
99
from ._client.observe import observe
10+
from ._client.propagation import propagate_attributes
1011
from ._client.span import (
1112
LangfuseAgent,
1213
LangfuseChain,
@@ -26,6 +27,7 @@
2627
"Langfuse",
2728
"get_client",
2829
"observe",
30+
"propagate_attributes",
2931
"ObservationTypeLiteral",
3032
"LangfuseSpan",
3133
"LangfuseGeneration",

langfuse/_client/attributes.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
ObservationTypeGenerationLike,
1919
ObservationTypeSpanLike,
2020
)
21-
2221
from langfuse._utils.serializer import EventSerializer
2322
from langfuse.model import PromptClient
2423
from langfuse.types import MapValue, SpanLevel
@@ -60,6 +59,17 @@ class LangfuseOtelSpanAttributes:
6059
# Internal
6160
AS_ROOT = "langfuse.internal.as_root"
6261

62+
# Experiments
63+
EXPERIMENT_ID = "langfuse.experiment.id"
64+
EXPERIMENT_NAME = "langfuse.experiment.name"
65+
EXPERIMENT_DESCRIPTION = "langfuse.experiment.description"
66+
EXPERIMENT_METADATA = "langfuse.experiment.metadata"
67+
EXPERIMENT_DATASET_ID = "langfuse.experiment.dataset.id"
68+
EXPERIMENT_ITEM_ID = "langfuse.experiment.item.id"
69+
EXPERIMENT_ITEM_EXPECTED_OUTPUT = "langfuse.experiment.item.expected_output"
70+
EXPERIMENT_ITEM_METADATA = "langfuse.experiment.item.metadata"
71+
EXPERIMENT_ITEM_ROOT_OBSERVATION_ID = "langfuse.experiment.item.root_observation_id"
72+
6373

6474
def create_trace_attributes(
6575
*,

langfuse/_client/client.py

Lines changed: 89 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import backoff
2929
import httpx
30-
from opentelemetry import trace
3130
from opentelemetry import trace as otel_trace_api
3231
from opentelemetry.sdk.trace import TracerProvider
3332
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
@@ -37,8 +36,9 @@
3736
)
3837
from packaging.version import Version
3938

40-
from langfuse._client.attributes import LangfuseOtelSpanAttributes
39+
from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize
4140
from langfuse._client.constants import (
41+
LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
4242
ObservationTypeGenerationLike,
4343
ObservationTypeLiteral,
4444
ObservationTypeLiteralNoEvent,
@@ -57,6 +57,10 @@
5757
LANGFUSE_TRACING_ENABLED,
5858
LANGFUSE_TRACING_ENVIRONMENT,
5959
)
60+
from langfuse._client.propagation import (
61+
PropagatedExperimentAttributes,
62+
_propagate_attributes,
63+
)
6064
from langfuse._client.resource_manager import LangfuseResourceManager
6165
from langfuse._client.span import (
6266
LangfuseAgent,
@@ -70,7 +74,7 @@
7074
LangfuseSpan,
7175
LangfuseTool,
7276
)
73-
from langfuse._client.utils import run_async_safely
77+
from langfuse._client.utils import get_sha256_hash_hex, run_async_safely
7478
from langfuse._utils import _get_timestamp
7579
from langfuse._utils.parse_error import handle_fern_exception
7680
from langfuse._utils.prompt_cache import PromptCache
@@ -1638,10 +1642,6 @@ def update_current_trace(
16381642
) -> None:
16391643
"""Update the current trace with additional information.
16401644
1641-
This method updates the Langfuse trace that the current span belongs to. It's useful for
1642-
adding trace-level metadata like user ID, session ID, or tags that apply to
1643-
the entire Langfuse trace rather than just a single observation.
1644-
16451645
Args:
16461646
name: Updated name for the Langfuse trace
16471647
user_id: ID of the user who initiated the Langfuse trace
@@ -1653,25 +1653,8 @@ def update_current_trace(
16531653
tags: List of tags to categorize the Langfuse trace
16541654
public: Whether the Langfuse trace should be publicly accessible
16551655
1656-
Example:
1657-
```python
1658-
with langfuse.start_as_current_span(name="handle-request") as span:
1659-
# Get user information
1660-
user = authenticate_user(request)
1661-
1662-
# Update trace with user context
1663-
langfuse.update_current_trace(
1664-
user_id=user.id,
1665-
session_id=request.session_id,
1666-
tags=["production", "web-app"]
1667-
)
1668-
1669-
# Continue processing
1670-
response = process_request(request)
1671-
1672-
# Update span with results
1673-
span.update(output=response)
1674-
```
1656+
See Also:
1657+
:func:`langfuse.propagate_attributes`: Recommended replacement
16751658
"""
16761659
if not self._tracing_enabled:
16771660
langfuse_logger.debug(
@@ -1817,7 +1800,7 @@ def _create_remote_parent_span(
18171800
is_remote=False,
18181801
)
18191802

1820-
return trace.NonRecordingSpan(span_context)
1803+
return otel_trace_api.NonRecordingSpan(span_context)
18211804

18221805
def _is_valid_trace_id(self, trace_id: str) -> bool:
18231806
pattern = r"^[0-9a-f]{32}$"
@@ -2477,7 +2460,7 @@ def run_experiment(
24772460
evaluators: List[EvaluatorFunction] = [],
24782461
run_evaluators: List[RunEvaluatorFunction] = [],
24792462
max_concurrency: int = 50,
2480-
metadata: Optional[Dict[str, Any]] = None,
2463+
metadata: Optional[Dict[str, str]] = None,
24812464
) -> ExperimentResult:
24822465
"""Run an experiment on a dataset with automatic tracing and evaluation.
24832466
@@ -2649,7 +2632,7 @@ def average_accuracy(*, item_results, **kwargs):
26492632
evaluators=evaluators or [],
26502633
run_evaluators=run_evaluators or [],
26512634
max_concurrency=max_concurrency,
2652-
metadata=metadata or {},
2635+
metadata=metadata,
26532636
),
26542637
),
26552638
)
@@ -2665,7 +2648,7 @@ async def _run_experiment_async(
26652648
evaluators: List[EvaluatorFunction],
26662649
run_evaluators: List[RunEvaluatorFunction],
26672650
max_concurrency: int,
2668-
metadata: Dict[str, Any],
2651+
metadata: Optional[Dict[str, Any]] = None,
26692652
) -> ExperimentResult:
26702653
langfuse_logger.debug(
26712654
f"Starting experiment '{name}' run '{run_name}' with {len(data)} items"
@@ -2763,85 +2746,123 @@ async def _process_experiment_item(
27632746
experiment_name: str,
27642747
experiment_run_name: str,
27652748
experiment_description: Optional[str],
2766-
experiment_metadata: Dict[str, Any],
2749+
experiment_metadata: Optional[Dict[str, Any]] = None,
27672750
) -> ExperimentItemResult:
2768-
# Execute task with tracing
27692751
span_name = "experiment-item-run"
27702752

27712753
with self.start_as_current_span(name=span_name) as span:
27722754
try:
2773-
output = await _run_task(task, item)
2774-
27752755
input_data = (
27762756
item.get("input")
27772757
if isinstance(item, dict)
27782758
else getattr(item, "input", None)
27792759
)
27802760

2781-
item_metadata: Dict[str, Any] = {}
2761+
if input_data is None:
2762+
raise ValueError("Experiment Item is missing input. Skipping item.")
2763+
2764+
expected_output = (
2765+
item.get("expected_output")
2766+
if isinstance(item, dict)
2767+
else getattr(item, "expected_output", None)
2768+
)
27822769

2783-
if isinstance(item, dict):
2784-
item_metadata = item.get("metadata", None) or {}
2770+
item_metadata = (
2771+
item.get("metadata")
2772+
if isinstance(item, dict)
2773+
else getattr(item, "metadata", None)
2774+
)
27852775

2786-
final_metadata = {
2776+
final_observation_metadata = {
27872777
"experiment_name": experiment_name,
27882778
"experiment_run_name": experiment_run_name,
2789-
**experiment_metadata,
2779+
**(experiment_metadata or {}),
27902780
}
27912781

2792-
if (
2793-
not isinstance(item, dict)
2794-
and hasattr(item, "dataset_id")
2795-
and hasattr(item, "id")
2796-
):
2797-
final_metadata.update(
2798-
{"dataset_id": item.dataset_id, "dataset_item_id": item.id}
2799-
)
2800-
2801-
if isinstance(item_metadata, dict):
2802-
final_metadata.update(item_metadata)
2803-
2804-
span.update(
2805-
input=input_data,
2806-
output=output,
2807-
metadata=final_metadata,
2808-
)
2809-
2810-
# Get trace ID for linking
28112782
trace_id = span.trace_id
2783+
dataset_id = None
2784+
dataset_item_id = None
28122785
dataset_run_id = None
28132786

28142787
# Link to dataset run if this is a dataset item
28152788
if hasattr(item, "id") and hasattr(item, "dataset_id"):
28162789
try:
2817-
dataset_run_item = self.api.dataset_run_items.create(
2790+
# Use sync API to avoid event loop issues when run_async_safely
2791+
# creates multiple event loops across different threads
2792+
dataset_run_item = await asyncio.to_thread(
2793+
self.api.dataset_run_items.create,
28182794
request=CreateDatasetRunItemRequest(
28192795
runName=experiment_run_name,
28202796
runDescription=experiment_description,
28212797
metadata=experiment_metadata,
28222798
datasetItemId=item.id, # type: ignore
28232799
traceId=trace_id,
28242800
observationId=span.id,
2825-
)
2801+
),
28262802
)
28272803

28282804
dataset_run_id = dataset_run_item.dataset_run_id
28292805

28302806
except Exception as e:
28312807
langfuse_logger.error(f"Failed to create dataset run item: {e}")
28322808

2809+
if (
2810+
not isinstance(item, dict)
2811+
and hasattr(item, "dataset_id")
2812+
and hasattr(item, "id")
2813+
):
2814+
dataset_id = item.dataset_id
2815+
dataset_item_id = item.id
2816+
2817+
final_observation_metadata.update(
2818+
{"dataset_id": dataset_id, "dataset_item_id": dataset_item_id}
2819+
)
2820+
2821+
if isinstance(item_metadata, dict):
2822+
final_observation_metadata.update(item_metadata)
2823+
2824+
experiment_id = dataset_run_id or self._create_observation_id()
2825+
experiment_item_id = (
2826+
dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16]
2827+
)
2828+
span._otel_span.set_attributes(
2829+
{
2830+
k: v
2831+
for k, v in {
2832+
LangfuseOtelSpanAttributes.ENVIRONMENT: LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
2833+
LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION: experiment_description,
2834+
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT: _serialize(
2835+
expected_output
2836+
),
2837+
}.items()
2838+
if v is not None
2839+
}
2840+
)
2841+
2842+
with _propagate_attributes(
2843+
experiment=PropagatedExperimentAttributes(
2844+
experiment_id=experiment_id,
2845+
experiment_name=experiment_run_name,
2846+
experiment_metadata=_serialize(experiment_metadata),
2847+
experiment_dataset_id=dataset_id,
2848+
experiment_item_id=experiment_item_id,
2849+
experiment_item_metadata=_serialize(item_metadata),
2850+
experiment_item_root_observation_id=span.id,
2851+
)
2852+
):
2853+
output = await _run_task(task, item)
2854+
2855+
span.update(
2856+
input=input_data,
2857+
output=output,
2858+
metadata=final_observation_metadata,
2859+
)
2860+
28332861
# Run evaluators
28342862
evaluations = []
28352863

28362864
for evaluator in evaluators:
28372865
try:
2838-
expected_output = None
2839-
2840-
if isinstance(item, dict):
2841-
expected_output = item.get("expected_output")
2842-
elif hasattr(item, "expected_output"):
2843-
expected_output = item.expected_output
2844-
28452866
eval_metadata: Optional[Dict[str, Any]] = None
28462867

28472868
if isinstance(item, dict):
@@ -2862,6 +2883,7 @@ async def _process_experiment_item(
28622883
for evaluation in eval_results:
28632884
self.create_score(
28642885
trace_id=trace_id,
2886+
observation_id=span.id,
28652887
name=evaluation.name,
28662888
value=evaluation.value, # type: ignore
28672889
comment=evaluation.comment,

langfuse/_client/constants.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
This module defines constants used throughout the Langfuse OpenTelemetry integration.
44
"""
55

6-
from typing import Literal, List, get_args, Union, Any
6+
from typing import Any, List, Literal, Union, get_args
7+
78
from typing_extensions import TypeAlias
89

910
LANGFUSE_TRACER_NAME = "langfuse-sdk"
1011

12+
LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT = "sdk-experiment"
1113

1214
"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
1315
ObservationTypeGenerationLike: TypeAlias = Literal[

0 commit comments

Comments
 (0)