From fe491463f35af8dbff1156b9f476aba42d41248d Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 3 Nov 2025 11:21:33 +0100 Subject: [PATCH 1/2] feat(core): add propagateAttributes function (#676) --- package.json | 3 +- .../src/experiment/ExperimentManager.ts | 133 +- packages/core/package.json | 3 + packages/core/src/constants.ts | 13 + packages/core/src/index.ts | 1 + packages/core/src/propagation.ts | 694 ++++++++ packages/core/src/utils.ts | 43 + packages/otel/src/span-processor.ts | 8 +- packages/tracing/src/index.ts | 4 + packages/tracing/src/spanWrapper.ts | 6 - pnpm-lock.yaml | 4 + tests/e2e/datasets.e2e.test.ts | 2 +- tests/e2e/experiments.e2e.test.ts | 2 +- ...experiment-propagation.integration.test.ts | 616 +++++++ .../propagation.integration.test.ts | 1505 +++++++++++++++++ 15 files changed, 2994 insertions(+), 43 deletions(-) create mode 100644 packages/core/src/propagation.ts create mode 100644 tests/integration/experiment-propagation.integration.test.ts create mode 100644 tests/integration/propagation.integration.test.ts diff --git a/package.json b/package.json index ecc48ec9..f7fb6b42 100644 --- a/package.json +++ b/package.json @@ -70,5 +70,6 @@ }, "resolutions": { "ml-spectra-processing": "14.14.0" - } + }, + "packageManager": "pnpm@9.15.0+sha512.76e2379760a4328ec4415815bcd6628dee727af3779aaa4c914e3944156c4299921a89f976381ee107d41f12cfa4b66681ca9c718f0668fa0831ed4c6d8ba56c" } diff --git a/packages/client/src/experiment/ExperimentManager.ts b/packages/client/src/experiment/ExperimentManager.ts index 555ec365..0daa072a 100644 --- a/packages/client/src/experiment/ExperimentManager.ts +++ b/packages/client/src/experiment/ExperimentManager.ts @@ -1,4 +1,13 @@ -import { DatasetItem, getGlobalLogger } from "@langfuse/core"; +import { + DatasetItem, + getGlobalLogger, + propagateAttributes, + serializeValue, + createExperimentId, + createExperimentItemId, + LangfuseOtelSpanAttributes, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, +} from "@langfuse/core"; import { startActiveObservation } from "@langfuse/tracing"; import { ProxyTracerProvider, trace } from "@opentelemetry/api"; @@ -349,53 +358,112 @@ export class ExperimentManager { task: ExperimentTask; evaluators?: Evaluator[]; }): Promise> { - const { item, evaluators = [], task, experimentMetadata = {} } = params; + const { item, evaluators = [], task, experimentMetadata } = params; + + const { output, traceId, observationId, datasetRunId } = + await startActiveObservation("experiment-item-run", async (span) => { + // Extract experiment data + const input = item.input; + const expectedOutput = item.expectedOutput; + const itemMetadata = item.metadata; + const datasetId = "datasetId" in item ? item.datasetId : undefined; + const datasetItemId = "id" in item ? item.id : undefined; + const traceId = span.traceId; + const observationId = span.id; + + // Validate input is present + if (input === undefined) { + throw new Error("Experiment item is missing input. Skipping item."); + } + + let datasetRunId: string | undefined = undefined; + + if (datasetItemId) { + try { + const result = await this.langfuseClient.api.datasetRunItems.create( + { + runName: params.experimentRunName, + runDescription: params.experimentDescription, + metadata: params.experimentMetadata, + datasetItemId, + traceId, + observationId, + }, + ); + + datasetRunId = result.datasetRunId; + } catch (err) { + this.logger.error("Linking dataset run item failed", err); + } + } - const { output, traceId, observationId } = await startActiveObservation( - "experiment-item-run", - async (span) => { - const output = await task(item); + // Generate IDs + const experimentItemId = + datasetItemId || (await createExperimentItemId(input)); + const experimentId = datasetRunId || (await createExperimentId()); + + // Set non-propagated experiment attributes directly on root span + const rootSpanAttributes: Record = { + [LangfuseOtelSpanAttributes.ENVIRONMENT]: + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + }; + if (params.experimentDescription) { + rootSpanAttributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION + ] = params.experimentDescription; + } + + if (expectedOutput !== undefined) { + const serialized = serializeValue(expectedOutput); + if (serialized) { + rootSpanAttributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT + ] = serialized; + } + } + + span.otelSpan.setAttributes(rootSpanAttributes); + + // Propagate experiment context to all child spans + const output = await propagateAttributes( + { + _internalExperiment: { + experimentId, + experimentName: params.experimentRunName, + experimentMetadata: serializeValue(experimentMetadata), + experimentDatasetId: datasetId, + experimentItemId, + experimentItemMetadata: serializeValue(itemMetadata), + experimentItemRootObservationId: span.id, + }, + }, + async () => await task(item), + ); span.update({ - input: item.input, + input, output, metadata: { experiment_name: params.experimentName, experiment_run_name: params.experimentRunName, ...experimentMetadata, - ...(item.metadata ?? {}), - ...("id" in item && "datasetId" in item + ...(itemMetadata ?? {}), + ...(datasetId && datasetItemId ? { - dataset_id: item["datasetId"], - dataset_item_id: item["id"], + dataset_id: datasetId, + dataset_item_id: datasetItemId, } : {}), }, }); - return { output, traceId: span.traceId, observationId: span.id }; - }, - ); - - let datasetRunId: string | undefined = undefined; - - if ("id" in item) { - await this.langfuseClient.api.datasetRunItems - .create({ - runName: params.experimentRunName, - runDescription: params.experimentDescription, - metadata: params.experimentMetadata, - datasetItemId: item.id, + return { + output, traceId, observationId, - }) - .then((result) => { - datasetRunId = result.datasetRunId; - }) - .catch((err) => - this.logger.error("Linking dataset run item failed", err), - ); - } + datasetRunId, + }; + }); const evalPromises: Promise[] = evaluators.map( async (evaluator) => { @@ -434,6 +502,7 @@ export class ExperimentManager { for (const ev of evals) { this.langfuseClient.score.create({ traceId, + observationId, ...ev, }); } diff --git a/packages/core/package.json b/packages/core/package.json index 93fcb0c3..11212624 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -27,6 +27,9 @@ "files": [ "dist" ], + "peerDependencies": { + "@opentelemetry/api": "^1.9.0" + }, "devDependencies": { "@types/node": "^24.1.0" } diff --git a/packages/core/src/constants.ts b/packages/core/src/constants.ts index c5c48809..fe36d949 100644 --- a/packages/core/src/constants.ts +++ b/packages/core/src/constants.ts @@ -4,6 +4,8 @@ export const LANGFUSE_TRACER_NAME = "langfuse-sdk"; export const LANGFUSE_SDK_VERSION = packageJson.version; export const LANGFUSE_SDK_NAME = "javascript"; +export const LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT = "sdk-experiment"; + // From Langfuse platform: web/src/features/otel/server/attributes.ts export enum LangfuseOtelSpanAttributes { // Langfuse-Trace attributes @@ -41,6 +43,17 @@ export enum LangfuseOtelSpanAttributes { // Internal AS_ROOT = "langfuse.internal.as_root", + // Experiment attributes + EXPERIMENT_ID = "langfuse.experiment.id", + EXPERIMENT_NAME = "langfuse.experiment.name", + EXPERIMENT_DESCRIPTION = "langfuse.experiment.description", + EXPERIMENT_METADATA = "langfuse.experiment.metadata", + EXPERIMENT_DATASET_ID = "langfuse.experiment.dataset.id", + EXPERIMENT_ITEM_ID = "langfuse.experiment.item.id", + EXPERIMENT_ITEM_EXPECTED_OUTPUT = "langfuse.experiment.item.expected_output", + EXPERIMENT_ITEM_METADATA = "langfuse.experiment.item.metadata", + EXPERIMENT_ITEM_ROOT_OBSERVATION_ID = "langfuse.experiment.item.root_observation_id", + // Compatibility - Map properties that were documented in https://langfuse.com/docs/opentelemetry/get-started#property-mapping, // but have a new assignment TRACE_COMPAT_USER_ID = "langfuse.user.id", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 4b7a12a4..80d29274 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,3 +9,4 @@ export { LangfuseAPIClient } from "./api/Client.js"; export * from "./utils.js"; export * from "./types.js"; export * from "./media.js"; +export * from "./propagation.js"; diff --git a/packages/core/src/propagation.ts b/packages/core/src/propagation.ts new file mode 100644 index 00000000..1e43c902 --- /dev/null +++ b/packages/core/src/propagation.ts @@ -0,0 +1,694 @@ +/** + * Attribute propagation utilities for Langfuse OpenTelemetry integration. + * + * This module provides the `propagateAttributes` function for setting trace-level + * attributes (userId, sessionId, metadata) that automatically propagate to all child spans + * within the context. + */ + +import { + context as otelContextApi, + trace as otelTraceApi, + propagation, + Context, + createContextKey, +} from "@opentelemetry/api"; + +import { + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + LangfuseOtelSpanAttributes, +} from "./constants.js"; +import { getGlobalLogger } from "./logger/index.js"; + +type CorrelatedKey = "userId" | "sessionId" | "metadata" | "version" | "tags"; + +const experimentKeys = [ + "experimentId", + "experimentName", + "experimentMetadata", + "experimentDatasetId", + "experimentItemId", + "experimentItemMetadata", + "experimentItemRootObservationId", +] as const; +type ExperimentKey = (typeof experimentKeys)[number]; +type PropagatedKey = CorrelatedKey | ExperimentKey; + +type PropagatedExperimentAttributes = { + experimentId: string; + experimentName: string; + experimentMetadata?: string; // serialized JSON + experimentDatasetId?: string; + experimentItemId: string; + experimentItemMetadata?: string; // serialized JSON + experimentItemRootObservationId: string; +}; + +export const LangfuseOtelContextKeys: Record = { + userId: createContextKey("langfuse_user_id"), + sessionId: createContextKey("langfuse_session_id"), + metadata: createContextKey("langfuse_metadata"), + version: createContextKey("langfuse_version"), + tags: createContextKey("langfuse_tags"), + + // Experiments + experimentId: createContextKey("langfuse_experiment_id"), + experimentName: createContextKey("langfuse_experiment_name"), + experimentMetadata: createContextKey("langfuse_experiment_metadata"), + experimentDatasetId: createContextKey("langfuse_experiment_dataset_id"), + experimentItemId: createContextKey("langfuse_experiment_item_id"), + experimentItemMetadata: createContextKey("langfuse_experiment_item_metadata"), + experimentItemRootObservationId: createContextKey( + "langfuse_experiment_item_root_observation_id", + ), +}; + +const LANGFUSE_BAGGAGE_PREFIX = "langfuse_"; +const LANGFUSE_BAGGAGE_TAGS_SEPARATOR = ","; + +/** + * Parameters for propagateAttributes function. + * + * @public + */ +export interface PropagateAttributesParams { + /** + * User identifier to associate with all spans in this context. + * Must be a string ≤200 characters. Use this to track which user + * generated each trace and enable e.g. per-user cost/performance analysis. + */ + userId?: string; + + /** + * Session identifier to associate with all spans in this context. + * Must be a string ≤200 characters. Use this to group related traces + * within a user session (e.g., a conversation thread, multi-turn interaction). + */ + sessionId?: string; + + /** + * Additional key-value metadata to propagate to all spans. + * - Keys and values must be strings + * - All values must be ≤200 characters + * - Use for dimensions like internal correlating identifiers + * - AVOID: large payloads, sensitive data, non-string values (will be dropped with warning) + */ + metadata?: Record; + + /** + * Version identifier for parts of your application that are independently versioned, e.g. agents + */ + version?: string; + + /** + * List of tags to categorize the group of observations + */ + tags?: string[]; + + /** + * If true, propagates attributes using OpenTelemetry baggage for + * cross-process/service propagation. + * + * **Security warning**: When enabled, attribute values are added to HTTP headers + * on ALL outbound requests. Only enable if values are safe to transmit via HTTP + * headers and you need cross-service tracing. + * + * @defaultValue false + */ + asBaggage?: boolean; + + /** + * **INTERNAL USE ONLY** - For Langfuse experiment framework. + * + * This parameter is used internally by the Langfuse experiment system to propagate + * experiment context to child spans. It should NOT be used by external code. + * + * @internal + */ + _internalExperiment?: PropagatedExperimentAttributes; +} + +/** + * Propagate trace-level attributes to all spans created within this context. + * + * This function sets attributes on the currently active span AND automatically + * propagates them to all new child spans created within the callback. This is the + * recommended way to set trace-level attributes like userId, sessionId, and metadata + * dimensions that should be consistently applied across all observations in a trace. + * + * **IMPORTANT**: Call this as early as possible within your trace/workflow. Only the + * currently active span and spans created after entering this context will have these + * attributes. Pre-existing spans will NOT be retroactively updated. + * + * **Why this matters**: Langfuse aggregation queries (e.g., total cost by userId, + * filtering by sessionId) only include observations that have the attribute set. + * If you call `propagateAttributes` late in your workflow, earlier spans won't be + * included in aggregations for that attribute. + * + * @param params - Configuration for attributes to propagate + * @param fn - Callback function (sync or async) within which attributes are propagated + * @returns The result of the callback function + * + * @example + * Basic usage with user and session tracking: + * + * ```typescript + * import { startActiveObservation, propagateAttributes } from '@langfuse/tracing'; + * + * // Set attributes early in the trace + * await startActiveObservation('user_workflow', async (span) => { + * await propagateAttributes({ + * userId: 'user_123', + * sessionId: 'session_abc', + * metadata: { experiment: 'variant_a', environment: 'production' } + * }, async () => { + * // All spans created here will have userId, sessionId, and metadata + * const llmSpan = startObservation('llm_call', { input: 'Hello' }); + * // This span inherits: userId, sessionId, experiment, environment + * llmSpan.end(); + * + * const gen = startObservation('completion', {}, { asType: 'generation' }); + * // This span also inherits all attributes + * gen.end(); + * }); + * }); + * ``` + * + * @example + * Late propagation (anti-pattern): + * + * ```typescript + * await startActiveObservation('workflow', async (span) => { + * // These spans WON'T have userId + * const earlySpan = startObservation('early_work', { input: 'data' }); + * earlySpan.end(); + * + * // Set attributes in the middle + * await propagateAttributes({ userId: 'user_123' }, async () => { + * // Only spans created AFTER this point will have userId + * const lateSpan = startObservation('late_work', { input: 'more' }); + * lateSpan.end(); + * }); + * + * // Result: Aggregations by userId will miss "early_work" span + * }); + * ``` + * + * @example + * Cross-service propagation with baggage (advanced): + * + * ```typescript + * import fetch from 'node-fetch'; + * + * // Service A - originating service + * await startActiveObservation('api_request', async () => { + * await propagateAttributes({ + * userId: 'user_123', + * sessionId: 'session_abc', + * asBaggage: true // Propagate via HTTP headers + * }, async () => { + * // Make HTTP request to Service B + * const response = await fetch('https://service-b.example.com/api'); + * // userId and sessionId are now in HTTP headers + * }); + * }); + * + * // Service B - downstream service + * // OpenTelemetry will automatically extract baggage from HTTP headers + * // and propagate to spans in Service B + * ``` + * + * @remarks + * - **Validation**: All attribute values (userId, sessionId, metadata values) + * must be strings ≤200 characters. Invalid values will be dropped with a + * warning logged. Ensure values meet constraints before calling. + * - **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood, + * making it compatible with other OTel-instrumented libraries. + * - **Baggage Security**: When `asBaggage=true`, attribute values are added to HTTP + * headers on outbound requests. Only use for non-sensitive values and when you + * need cross-service tracing. + * + * @public + */ +export function propagateAttributes< + A extends unknown[], + F extends (...args: A) => ReturnType, +>(params: PropagateAttributesParams, fn: F): ReturnType { + let context = otelContextApi.active(); + + const span = otelTraceApi.getActiveSpan(); + const asBaggage = params.asBaggage ?? false; + + const { userId, sessionId, metadata, version, tags, _internalExperiment } = + params; + + // Validate and set userId + if (userId) { + if (isValidPropagatedString({ value: userId, attributeName: "userId" })) { + context = setPropagatedAttribute({ + key: "userId", + value: userId, + context, + span, + asBaggage, + }); + } + } + + // Validate and set sessionId + if (sessionId) { + if ( + isValidPropagatedString({ + value: sessionId, + attributeName: "sessionId", + }) + ) { + context = setPropagatedAttribute({ + key: "sessionId", + value: sessionId, + context, + span, + asBaggage, + }); + } + } + + // Validate and set version + if (version) { + if ( + isValidPropagatedString({ + value: version, + attributeName: "version", + }) + ) { + context = setPropagatedAttribute({ + key: "version", + value: version, + context, + span, + asBaggage, + }); + } + } + + // Validate and set tags + if (tags && tags.length > 0) { + const validTags = tags.filter((tag) => + isValidPropagatedString({ + value: tag, + attributeName: "tag", + }), + ); + + if (validTags.length > 0) { + context = setPropagatedAttribute({ + key: "tags", + value: validTags, + context, + span, + asBaggage, + }); + } + } + + // Validate and set metadata + if (metadata) { + // Filter metadata to only include valid string values + const validatedMetadata: Record = {}; + + for (const [key, value] of Object.entries(metadata)) { + if ( + isValidPropagatedString({ + value: value, + attributeName: `metadata.${key}`, + }) + ) { + validatedMetadata[key] = value; + } + } + + if (Object.keys(validatedMetadata).length > 0) { + context = setPropagatedAttribute({ + key: "metadata", + value: validatedMetadata, + context, + span, + asBaggage, + }); + } + } + + // Handle experiment attributes + if (_internalExperiment) { + for (const [key, value] of Object.entries(_internalExperiment)) { + if (value !== undefined) { + // Experiment attributes are already serialized, no validation needed + context = setPropagatedAttribute({ + key: key as ExperimentKey, + value, + context, + span, + asBaggage, + }); + } + } + } + + // Execute callback in the new context + return otelContextApi.with(context, fn); +} + +export function getPropagatedAttributesFromContext( + context: Context, +): Record { + const propagatedAttributes: Record = {}; + + // Handle baggage + const baggage = propagation.getBaggage(context); + + if (baggage) { + baggage.getAllEntries().forEach(([baggageKey, baggageEntry]) => { + if (baggageKey.startsWith(LANGFUSE_BAGGAGE_PREFIX)) { + const spanKey = getSpanKeyFromBaggageKey(baggageKey); + + if (spanKey) { + const isMergedTags = + baggageKey == getBaggageKeyForPropagatedKey("tags"); + + propagatedAttributes[spanKey] = isMergedTags + ? baggageEntry.value.split(LANGFUSE_BAGGAGE_TAGS_SEPARATOR) + : baggageEntry.value; + } + } + }); + } + + // Handle OTEL context values + const userId = context.getValue(LangfuseOtelContextKeys["userId"]); + if (userId && typeof userId === "string") { + const spanKey = getSpanKeyForPropagatedKey("userId"); + + propagatedAttributes[spanKey] = userId; + } + + const sessionId = context.getValue(LangfuseOtelContextKeys["sessionId"]); + if (sessionId && typeof sessionId === "string") { + const spanKey = getSpanKeyForPropagatedKey("sessionId"); + + propagatedAttributes[spanKey] = sessionId; + } + + const version = context.getValue(LangfuseOtelContextKeys["version"]); + if (version && typeof version === "string") { + const spanKey = getSpanKeyForPropagatedKey("version"); + + propagatedAttributes[spanKey] = version; + } + + const tags = context.getValue(LangfuseOtelContextKeys["tags"]); + if (tags && Array.isArray(tags)) { + const spanKey = getSpanKeyForPropagatedKey("tags"); + + propagatedAttributes[spanKey] = tags; + } + + const metadata = context.getValue(LangfuseOtelContextKeys["metadata"]); + if (metadata && typeof metadata === "object" && metadata !== null) { + for (const [k, v] of Object.entries(metadata)) { + const spanKey = `${LangfuseOtelSpanAttributes.TRACE_METADATA}.${k}`; + + propagatedAttributes[spanKey] = String(v); + } + } + + // Extract experiment attributes + for (const key of experimentKeys) { + const contextKey = LangfuseOtelContextKeys[key]; + const value = context.getValue(contextKey); + + if (value && typeof value === "string") { + const spanKey = getSpanKeyForPropagatedKey(key); + propagatedAttributes[spanKey] = value; + } + } + + // add environment if propagation is for experiment + if ( + propagatedAttributes[ + getSpanKeyForPropagatedKey("experimentItemRootObservationId") + ] + ) { + propagatedAttributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT; + } + + return propagatedAttributes; +} + +type SetPropagatedAttributeParams = { + context: Context; + span: ReturnType; + asBaggage: boolean; +} & ( + | { + key: "userId" | "sessionId" | "version" | ExperimentKey; + value: string; + } + | { + key: "metadata"; + value: Record; + } + | { + key: "tags"; + value: string[]; + } +); + +function setPropagatedAttribute(params: SetPropagatedAttributeParams): Context { + const { key, value, span, asBaggage } = params; + + let context = params.context; + let mergedMetadata: Record = + key === "metadata" ? getContextMergedMetadata(context, value) : {}; + let mergedTags = key === "tags" ? getContextMergedTags(context, value) : []; + + // Get the context key for this attribute + const contextKey = getContextKeyForPropagatedKey(key); + + // Set in context + if (key === "metadata") { + context = context.setValue(contextKey, mergedMetadata); + } else if (key === "tags") { + context = context.setValue(contextKey, mergedTags); + } else { + context = context.setValue(contextKey, value); + } + + // Set on current span + if (span && span.isRecording()) { + if (key === "metadata") { + for (const [k, v] of Object.entries(mergedMetadata)) { + span.setAttribute( + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.${k}`, + v, + ); + } + } else if (key === "tags") { + const spanKey = getSpanKeyForPropagatedKey(key); + span.setAttribute(spanKey, mergedTags); + } else { + const spanKey = getSpanKeyForPropagatedKey(key); + span.setAttribute(spanKey, value); + } + } + + // Set on baggage + if (asBaggage) { + const baggageKey = getBaggageKeyForPropagatedKey(key); + let baggage = + propagation.getBaggage(context) || propagation.createBaggage(); + + if (key === "metadata") { + for (const [k, v] of Object.entries(mergedMetadata)) { + baggage = baggage.setEntry(`${baggageKey}_${k}`, { value: v }); + } + } else if (key === "tags") { + baggage = baggage.setEntry(baggageKey, { + value: mergedTags.join(LANGFUSE_BAGGAGE_TAGS_SEPARATOR), + }); + } else { + baggage = baggage.setEntry(baggageKey, { value }); + } + + context = propagation.setBaggage(context, baggage); + } + + return context; +} + +function getContextMergedTags(context: Context, newTags: string[]): string[] { + const existingTags = context.getValue(LangfuseOtelContextKeys["tags"]); + + if (existingTags && Array.isArray(existingTags)) { + return [...new Set([...existingTags, ...newTags])]; + } else { + return newTags; + } +} + +function getContextMergedMetadata( + context: Context, + newMetadata: Record, +): Record { + const existingMetadata = context.getValue( + LangfuseOtelContextKeys["metadata"], + ); + + if ( + existingMetadata && + typeof existingMetadata === "object" && + existingMetadata !== null && + !Array.isArray(existingMetadata) + ) { + return { ...(existingMetadata as Record), ...newMetadata }; + } else { + return newMetadata; + } +} + +function isValidPropagatedString(params: { + value: string; + attributeName: string; +}): boolean { + const logger = getGlobalLogger(); + const { value, attributeName } = params; + + if (typeof value !== "string") { + logger.warn( + `Propagated attribute '${attributeName}' must be a string. Dropping value.`, + ); + return false; + } + + if (value.length > 200) { + logger.warn( + `Propagated attribute '${attributeName}' value is over 200 characters (${value.length} chars). Dropping value.`, + ); + + return false; + } + + return true; +} + +function getContextKeyForPropagatedKey(key: PropagatedKey): symbol { + return LangfuseOtelContextKeys[key]; +} + +function getSpanKeyForPropagatedKey(key: PropagatedKey): string { + switch (key) { + case "userId": + return LangfuseOtelSpanAttributes.TRACE_USER_ID; + case "sessionId": + return LangfuseOtelSpanAttributes.TRACE_SESSION_ID; + case "version": + return LangfuseOtelSpanAttributes.VERSION; + case "metadata": + return LangfuseOtelSpanAttributes.TRACE_METADATA; + case "tags": + return LangfuseOtelSpanAttributes.TRACE_TAGS; + case "experimentId": + return LangfuseOtelSpanAttributes.EXPERIMENT_ID; + case "experimentName": + return LangfuseOtelSpanAttributes.EXPERIMENT_NAME; + case "experimentMetadata": + return LangfuseOtelSpanAttributes.EXPERIMENT_METADATA; + case "experimentDatasetId": + return LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID; + case "experimentItemId": + return LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID; + case "experimentItemMetadata": + return LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA; + case "experimentItemRootObservationId": + return LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID; + default: { + const fallback: never = key; + + throw Error("Unhandled propagated key", fallback); + } + } +} + +function getBaggageKeyForPropagatedKey(key: PropagatedKey): string { + // baggage keys must be snake case for correct cross service propagation + // second service might run Python SDK that is expecting snake case keys + switch (key) { + case "userId": + return `${LANGFUSE_BAGGAGE_PREFIX}user_id`; + case "sessionId": + return `${LANGFUSE_BAGGAGE_PREFIX}session_id`; + case "version": + return `${LANGFUSE_BAGGAGE_PREFIX}version`; + case "metadata": + return `${LANGFUSE_BAGGAGE_PREFIX}metadata`; + case "tags": + return `${LANGFUSE_BAGGAGE_PREFIX}tags`; + case "experimentId": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_id`; + case "experimentName": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_name`; + case "experimentMetadata": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_metadata`; + case "experimentDatasetId": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_dataset_id`; + case "experimentItemId": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_item_id`; + case "experimentItemMetadata": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_item_metadata`; + case "experimentItemRootObservationId": + return `${LANGFUSE_BAGGAGE_PREFIX}experiment_item_root_observation_id`; + default: { + const fallback: never = key; + + throw Error("Unhandled propagated key", fallback); + } + } +} + +function getSpanKeyFromBaggageKey(baggageKey: string): string | undefined { + if (!baggageKey.startsWith(LANGFUSE_BAGGAGE_PREFIX)) return; + + const suffix = baggageKey.slice(LANGFUSE_BAGGAGE_PREFIX.length); + + // Metadata keys have format: langfuse_metadata_{key_name} + if (suffix.startsWith("metadata_")) { + const metadataKey = suffix.slice("metadata_".length); + + return `${LangfuseOtelSpanAttributes.TRACE_METADATA}.${metadataKey}`; + } + + switch (suffix) { + case "user_id": + return getSpanKeyForPropagatedKey("userId"); + case "session_id": + return getSpanKeyForPropagatedKey("sessionId"); + case "version": + return getSpanKeyForPropagatedKey("version"); + case "tags": + return getSpanKeyForPropagatedKey("tags"); + case "experiment_id": + return getSpanKeyForPropagatedKey("experimentId"); + case "experiment_name": + return getSpanKeyForPropagatedKey("experimentName"); + case "experiment_metadata": + return getSpanKeyForPropagatedKey("experimentMetadata"); + case "experiment_dataset_id": + return getSpanKeyForPropagatedKey("experimentDatasetId"); + case "experiment_item_id": + return getSpanKeyForPropagatedKey("experimentItemId"); + case "experiment_item_metadata": + return getSpanKeyForPropagatedKey("experimentItemMetadata"); + case "experiment_item_root_observation_id": + return getSpanKeyForPropagatedKey("experimentItemRootObservationId"); + } +} diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 5e15d2ce..a271f0d4 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -85,3 +85,46 @@ export function base64Decode(input: string): string { const bytes = base64ToBytes(input); return new TextDecoder().decode(bytes); } + +/** + * Generate a random experiment ID (16 hex characters from 8 random bytes). + * @internal + */ +export async function createExperimentId(): Promise { + const randomBytes = new Uint8Array(8); + crypto.getRandomValues(randomBytes); + + return Array.from(randomBytes) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); +} + +/** + * Generate experiment item ID from input hash (first 16 hex chars of SHA-256). + * Skips serialization if input is already a string. + * @internal + */ +export async function createExperimentItemId(input: any): Promise { + const serialized = serializeValue(input); + const data = new TextEncoder().encode(serialized); + + const hashBuffer = await crypto.subtle.digest("SHA-256", data); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + const hashHex = hashArray + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); + + return hashHex.slice(0, 16); +} + +/** + * Serialize a value to JSON string, handling undefined/null. + * Skips serialization if value is already a string. + * @internal + */ +export function serializeValue(value: any): string | undefined { + if (value === undefined || value === null) return undefined; + if (typeof value === "string") return value; + + return JSON.stringify(value); +} diff --git a/packages/otel/src/span-processor.ts b/packages/otel/src/span-processor.ts index 59b5eef0..615bcb1c 100644 --- a/packages/otel/src/span-processor.ts +++ b/packages/otel/src/span-processor.ts @@ -6,7 +6,9 @@ import { LangfuseOtelSpanAttributes, getEnv, base64Encode, + getPropagatedAttributesFromContext, } from "@langfuse/core"; +import { Context } from "@opentelemetry/api"; import { hrTimeToMilliseconds } from "@opentelemetry/core"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; import { @@ -303,17 +305,19 @@ export class LangfuseSpanProcessor implements SpanProcessor { } /** - * Called when a span is started. Adds environment and release attributes to the span. + * Called when a span is started. Adds environment, release, and propagated attributes to the span. * * @param span - The span that was started * @param parentContext - The parent context * * @override */ - public onStart(span: Span, parentContext: any): void { + public onStart(span: Span, parentContext: Context): void { + // Set propagated attributes, environment and release attributes span.setAttributes({ [LangfuseOtelSpanAttributes.ENVIRONMENT]: this.environment, [LangfuseOtelSpanAttributes.RELEASE]: this.release, + ...getPropagatedAttributesFromContext(parentContext), }); return this.processor.onStart(span, parentContext); diff --git a/packages/tracing/src/index.ts b/packages/tracing/src/index.ts index e169abb1..09c03d4d 100644 --- a/packages/tracing/src/index.ts +++ b/packages/tracing/src/index.ts @@ -63,6 +63,10 @@ export { getLangfuseTracerProvider, getLangfuseTracer, } from "./tracerProvider.js"; +export { + propagateAttributes, + type PropagateAttributesParams, +} from "@langfuse/core"; export { LangfuseOtelSpanAttributes } from "@langfuse/core"; diff --git a/packages/tracing/src/spanWrapper.ts b/packages/tracing/src/spanWrapper.ts index b4abab13..b783fd53 100644 --- a/packages/tracing/src/spanWrapper.ts +++ b/packages/tracing/src/spanWrapper.ts @@ -193,12 +193,6 @@ abstract class LangfuseBaseObservation { /** * Updates the parent trace with new attributes. - * - * This sets trace-level attributes that apply to the entire trace, - * not just this specific observation. - * - * @param attributes - Trace attributes to set - * @returns This observation for method chaining */ public updateTrace(attributes: LangfuseTraceAttributes) { this.otelSpan.setAttributes(createTraceAttributes(attributes)); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c8e7575e..7904d96a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -140,6 +140,10 @@ importers: version: 4.2.6 packages/core: + dependencies: + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 devDependencies: '@types/node': specifier: ^24.1.0 diff --git a/tests/e2e/datasets.e2e.test.ts b/tests/e2e/datasets.e2e.test.ts index 0a6ed857..7428495d 100644 --- a/tests/e2e/datasets.e2e.test.ts +++ b/tests/e2e/datasets.e2e.test.ts @@ -27,7 +27,7 @@ describe("Langfuse Datasets E2E", () => { }); it("create and get dataset, name only, special character", async () => { - const datasetName = nanoid() + "+ 7/"; + const datasetName = nanoid() + "+ 7?"; await langfuse.api.datasets.create({ name: datasetName }); const getDataset = await langfuse.dataset.get(datasetName); diff --git a/tests/e2e/experiments.e2e.test.ts b/tests/e2e/experiments.e2e.test.ts index b31c42c5..ab94d275 100644 --- a/tests/e2e/experiments.e2e.test.ts +++ b/tests/e2e/experiments.e2e.test.ts @@ -505,7 +505,7 @@ describe("Langfuse Datasets E2E", () => { await testEnv.spanProcessor.forceFlush(); await waitForServerIngestion(1000); - expect(result.itemResults).toHaveLength(3); + expect(result.itemResults).toHaveLength(2); // Should handle missing fields gracefully result.itemResults.forEach((item) => { expect(item.traceId).toBeDefined(); diff --git a/tests/integration/experiment-propagation.integration.test.ts b/tests/integration/experiment-propagation.integration.test.ts new file mode 100644 index 00000000..2ef4345c --- /dev/null +++ b/tests/integration/experiment-propagation.integration.test.ts @@ -0,0 +1,616 @@ +/** + * Comprehensive tests for experiment attribute propagation. + * + * This test suite verifies that experiment context (experiment ID, dataset info, + * item metadata) automatically propagates to all child spans within an experiment run. + */ + +import { LangfuseClient } from "@langfuse/client"; +import { + LangfuseOtelSpanAttributes, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, +} from "@langfuse/core"; +import { startObservation, startActiveObservation } from "@langfuse/tracing"; +import { trace as otelTrace } from "@opentelemetry/api"; +import { describe, it, expect, beforeEach, afterEach } from "vitest"; + +import { + setupTestEnvironment, + teardownTestEnvironment, + waitForSpanExport, + type TestEnvironment, +} from "./helpers/testSetup.js"; + +describe("Experiment Attribute Propagation", () => { + let testEnv: TestEnvironment; + let langfuse: LangfuseClient; + + beforeEach(async () => { + testEnv = await setupTestEnvironment(); + langfuse = new LangfuseClient({ + publicKey: "test-pk", + secretKey: "test-sk", + baseUrl: "http://localhost:3000", + }); + }); + + afterEach(async () => { + await teardownTestEnvironment(testEnv); + }); + + describe("Basic Experiment Propagation", () => { + it("should propagate experiment attributes to child spans", async () => { + await langfuse.experiment.run({ + name: "test-experiment", + data: [{ input: "test-input" }], + task: async ({ input }) => { + // Create child span + const child = startObservation("child-operation", { input }); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); // root + child + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + const childSpan = spans.find((s) => s.name === "child-operation"); + + // Root span should have experiment attributes + expect( + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID], + ).toBeDefined(); + expect( + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME], + ).toBeDefined(); + expect( + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID], + ).toBeDefined(); + + // Child span should inherit experiment attributes + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID], + ).toBe(rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID]); + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME], + ).toBe(rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME]); + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID], + ).toBe( + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID], + ); + }); + + it("should propagate experiment metadata to child spans", async () => { + const experimentMetadata = { model: "gpt-4", temperature: "0.7" }; + + await langfuse.experiment.run({ + name: "metadata-test", + metadata: experimentMetadata, + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_METADATA], + ).toBeDefined(); + const metadata = JSON.parse( + childSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA + ] as string, + ); + expect(metadata).toEqual(experimentMetadata); + }); + + it("should propagate experiment item metadata to child spans", async () => { + const itemMetadata = { source: "user-input", priority: "high" }; + + await langfuse.experiment.run({ + name: "item-metadata-test", + data: [{ input: "test", metadata: itemMetadata }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + expect( + childSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA + ], + ).toBeDefined(); + const metadata = JSON.parse( + childSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA + ] as string, + ); + expect(metadata).toEqual(itemMetadata); + }); + }); + + describe("Nested Spans", () => { + it("should propagate to multiple levels of nested child spans", async () => { + await langfuse.experiment.run({ + name: "nested-test", + data: [{ input: "test" }], + task: async () => { + await startActiveObservation("level-1", async () => { + await startActiveObservation("level-2", async () => { + const level3 = startObservation("level-3"); + level3.end(); + }); + }); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 4); // root + 3 children + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + const experimentId = + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID]; + + // All nested spans should have the same experiment ID + const level1 = spans.find((s) => s.name === "level-1"); + const level2 = spans.find((s) => s.name === "level-2"); + const level3 = spans.find((s) => s.name === "level-3"); + + expect(level1?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID]).toBe( + experimentId, + ); + expect(level2?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID]).toBe( + experimentId, + ); + expect(level3?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID]).toBe( + experimentId, + ); + }); + }); + + describe("Non-Propagated Attributes", () => { + it("should set description only on root span, not child spans", async () => { + const description = "Test experiment description"; + + await langfuse.experiment.run({ + name: "description-test", + description, + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + const childSpan = spans.find((s) => s.name === "child"); + + // Root span should have description + expect( + rootSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION], + ).toBe(description); + + // Child span should NOT have description + expect( + childSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION + ], + ).toBeUndefined(); + }); + + it("should set expectedOutput only on root span, not child spans", async () => { + await langfuse.experiment.run({ + name: "expected-output-test", + data: [{ input: "France", expectedOutput: "Paris" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + const childSpan = spans.find((s) => s.name === "child"); + + // Root span should have expected output + // serializeValue passes strings through unchanged for efficiency + expect( + rootSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT + ], + ).toBe("Paris"); + + // Child span should NOT have expected output + expect( + childSpan?.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT + ], + ).toBeUndefined(); + }); + }); + + describe("Multiple Experiment Items", () => { + it("should not leak attributes between experiment items", async () => { + const items = [ + { input: "item1", metadata: { index: "1" } }, + { input: "item2", metadata: { index: "2" } }, + ]; + + const experimentIds: string[] = []; + const itemIds: string[] = []; + + await langfuse.experiment.run({ + name: "no-leakage-test", + data: items, + task: async (item) => { + await startActiveObservation("process-item", async (span) => { + experimentIds.push( + span.otelSpan.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ID + ] as string, + ); + itemIds.push( + span.otelSpan.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID + ] as string, + ); + }); + return `output-${item.input}`; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 4); // 2 roots + 2 children + + // Each item should have different item IDs + expect(itemIds[0]).not.toBe(itemIds[1]); + + // Each item should have different experiment IDs (randomly generated) + expect(experimentIds[0]).not.toBe(experimentIds[1]); + }); + }); + + describe("Experiment ID Generation", () => { + it("should generate experiment item ID from input hash for non-dataset items", async () => { + const input = "test-input-for-hashing"; + + await langfuse.experiment.run({ + name: "id-generation-test", + data: [{ input }], + task: async () => { + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 1); + const spans = testEnv.mockExporter.exportedSpans; + const rootSpan = spans[0]; + + const experimentItemId = + rootSpan.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID]; + + // Should be 16 hex characters (8 bytes) + expect(experimentItemId).toMatch(/^[0-9a-f]{16}$/); + }); + + it("should use dataset item ID when available", async () => { + const datasetItemId = "dataset-item-123"; + + await langfuse.experiment.run({ + name: "dataset-id-test", + data: [ + { + input: "test", + id: datasetItemId, + datasetId: "dataset-123", + } as any, + ], + task: async () => { + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 1); + const spans = testEnv.mockExporter.exportedSpans; + const rootSpan = spans[0]; + + const experimentItemId = + rootSpan.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID]; + + // Should use the dataset item ID directly + expect(experimentItemId).toBe(datasetItemId); + }); + }); + + describe("Root Observation ID Propagation", () => { + it("should propagate the root observation ID to child spans", async () => { + let rootObservationId: string | undefined; + + await langfuse.experiment.run({ + name: "root-id-test", + data: [{ input: "test" }], + task: async () => { + await startActiveObservation("child", async (span) => { + rootObservationId = span.otelSpan.attributes[ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID + ] as string; + }); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + + // Root observation ID should match the root span's ID + expect(rootObservationId).toBe(rootSpan?.spanContext().spanId); + }); + }); + + describe("Error Handling", () => { + it("should propagate attributes even when task throws error", async () => { + try { + await langfuse.experiment.run({ + name: "error-test", + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child-before-error"); + child.end(); + + throw new Error("Task failed"); + }, + }); + } catch { + // Expected error + } + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + + const childSpan = spans.find((s) => s.name === "child-before-error"); + + // Child span should still have experiment attributes + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_ID], + ).toBeDefined(); + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME], + ).toBeDefined(); + }); + }); + + describe("Concurrent Experiments", () => { + it("should not mix attributes between concurrent experiments", async () => { + const experiment1Promise = langfuse.experiment.run({ + name: "concurrent-exp-1", + data: [{ input: "input1" }], + task: async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + const child = startObservation("child-exp1"); + child.end(); + return "output1"; + }, + }); + + const experiment2Promise = langfuse.experiment.run({ + name: "concurrent-exp-2", + data: [{ input: "input2" }], + task: async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + const child = startObservation("child-exp2"); + child.end(); + return "output2"; + }, + }); + + await Promise.all([experiment1Promise, experiment2Promise]); + + await waitForSpanExport(testEnv.mockExporter, 4); // 2 roots + 2 children + const spans = testEnv.mockExporter.exportedSpans; + + const child1 = spans.find((s) => s.name === "child-exp1"); + const child2 = spans.find((s) => s.name === "child-exp2"); + + const exp1Name = + child1?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME]; + const exp2Name = + child2?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_NAME]; + + // Each child should have the correct experiment name + expect(exp1Name).toContain("concurrent-exp-1"); + expect(exp2Name).toContain("concurrent-exp-2"); + expect(exp1Name).not.toBe(exp2Name); + }); + }); + + describe("Serialization", () => { + it("should serialize complex metadata correctly", async () => { + const complexMetadata = { + nested: { key: "value" }, + array: [1, 2, 3], + boolean: true, + number: 42, + }; + + await langfuse.experiment.run({ + name: "serialization-test", + metadata: complexMetadata, + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + const metadataAttr = + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_METADATA]; + expect(metadataAttr).toBeDefined(); + + const parsed = JSON.parse(metadataAttr as string); + expect(parsed).toEqual(complexMetadata); + }); + + it("should handle string metadata without double serialization", async () => { + const stringMetadata = '{"already":"serialized"}'; + + await langfuse.experiment.run({ + name: "string-metadata-test", + metadata: stringMetadata as any, + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + const metadataAttr = + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_METADATA]; + + // Should not be double-serialized + expect(metadataAttr).toBe(stringMetadata); + }); + }); + + describe("Environment Attribute", () => { + it("should set experiment environment on ALL spans including root", async () => { + await langfuse.experiment.run({ + name: "environment-test", + data: [{ input: "test" }], + task: async () => { + await startActiveObservation("level-1", async () => { + await startActiveObservation("level-2", async () => { + const level3 = startObservation("level-3"); + level3.end(); + }); + }); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 4); // root + 3 children + const spans = testEnv.mockExporter.exportedSpans; + + const rootSpan = spans.find((s) => s.name === "experiment-item-run"); + const level1 = spans.find((s) => s.name === "level-1"); + const level2 = spans.find((s) => s.name === "level-2"); + const level3 = spans.find((s) => s.name === "level-3"); + + // ALL spans should have the experiment environment attribute + expect(rootSpan?.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT]).toBe( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ); + expect(level1?.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT]).toBe( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ); + expect(level2?.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT]).toBe( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ); + expect(level3?.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT]).toBe( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ); + }); + + it("should set experiment environment value to 'sdk-experiment'", async () => { + await langfuse.experiment.run({ + name: "environment-value-test", + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + // Verify the exact value + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT], + ).toBe("sdk-experiment"); + }); + }); + + describe("Dataset Attributes", () => { + it("should propagate dataset ID when using dataset items", async () => { + const datasetId = "dataset-abc-123"; + + await langfuse.experiment.run({ + name: "dataset-test", + data: [ + { + input: "test", + id: "item-1", + datasetId, + } as any, + ], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID], + ).toBe(datasetId); + }); + + it("should not have dataset ID for non-dataset experiments", async () => { + await langfuse.experiment.run({ + name: "non-dataset-test", + data: [{ input: "test" }], + task: async () => { + const child = startObservation("child"); + child.end(); + return "output"; + }, + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const childSpan = spans.find((s) => s.name === "child"); + + expect( + childSpan?.attributes[LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID], + ).toBeUndefined(); + }); + }); +}); diff --git a/tests/integration/propagation.integration.test.ts b/tests/integration/propagation.integration.test.ts new file mode 100644 index 00000000..70e6910d --- /dev/null +++ b/tests/integration/propagation.integration.test.ts @@ -0,0 +1,1505 @@ +/** + * Comprehensive tests for propagateAttributes functionality. + * + * This module tests the propagateAttributes function that allows setting + * trace-level attributes (userId, sessionId, version, metadata) that automatically propagate + * to all child spans within the context. + */ + +import { + LangfuseOtelContextKeys, + LangfuseOtelSpanAttributes, + getPropagatedAttributesFromContext, +} from "@langfuse/core"; +import { propagateAttributes, startObservation } from "@langfuse/tracing"; +import { + context as otelContext, + trace as otelTrace, + propagation, + ROOT_CONTEXT, +} from "@opentelemetry/api"; +import { describe, it, expect, beforeEach, afterEach } from "vitest"; + +import { + setupTestEnvironment, + teardownTestEnvironment, + waitForSpanExport, + type TestEnvironment, +} from "./helpers/testSetup.js"; + +describe("propagateAttributes", () => { + let testEnv: TestEnvironment; + + beforeEach(async () => { + testEnv = await setupTestEnvironment(); + }); + + afterEach(async () => { + await teardownTestEnvironment(testEnv); + }); + + describe("Basic Propagation", () => { + it("should propagate userId to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: "user_123" }, () => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect(child1?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + expect(child2?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + }); + + it("should propagate sessionId to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ sessionId: "session_abc" }, () => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect( + child1?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session_abc"); + expect( + child2?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session_abc"); + }); + + it("should propagate version to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ version: "v1.2.3" }, () => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect(child1?.attributes[LangfuseOtelSpanAttributes.VERSION]).toBe( + "v1.2.3", + ); + expect(child2?.attributes[LangfuseOtelSpanAttributes.VERSION]).toBe( + "v1.2.3", + ); + }); + + it("should propagate metadata to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + metadata: { experiment: "variant_a", version: "1.0" }, + }, + () => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect( + child1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment` + ], + ).toBe("variant_a"); + expect( + child1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.version` + ], + ).toBe("1.0"); + expect( + child2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment` + ], + ).toBe("variant_a"); + expect( + child2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.version` + ], + ).toBe("1.0"); + }); + + it("should propagate all attributes together", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "user_123", + sessionId: "session_abc", + version: "v2.0.0", + metadata: { experiment: "test", env: "prod" }, + }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session_abc"); + expect(child?.attributes[LangfuseOtelSpanAttributes.VERSION]).toBe( + "v2.0.0", + ); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment` + ], + ).toBe("test"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.env`], + ).toBe("prod"); + }); + + it("should maintain return value", async () => { + const tracer = otelTrace.getTracer("test"); + + const returnValue = await propagateAttributes( + { userId: "user_123" }, + async () => { + return await tracer.startActiveSpan("parent", async (parentSpan) => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + parentSpan.end(); + + return "hello"; + }); + }, + ); + + expect(returnValue).toBe("hello"); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect(child1?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + expect(child2?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + }); + + it("should propagate all attributes together (async)", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + await propagateAttributes( + { + userId: "user_123", + sessionId: "session_abc", + metadata: { experiment: "test", env: "prod" }, + }, + async () => { + await new Promise((resolve) => setTimeout(resolve)); + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user_123", + ); + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session_abc"); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment` + ], + ).toBe("test"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.env`], + ).toBe("prod"); + }); + }); + + describe("Tags Propagation", () => { + it("should propagate tags to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["production", "experiment-a"] }, () => { + const child1 = startObservation("child-1"); + child1.end(); + + const child2 = startObservation("child-2"); + child2.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + expect(child1?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual( + ["production", "experiment-a"], + ); + expect(child2?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual( + ["production", "experiment-a"], + ); + }); + + it("should merge tags from multiple propagateAttributes calls", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["tag1", "tag2"] }, () => { + propagateAttributes({ tags: ["tag3", "tag4"] }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Child should have all four tags merged + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual( + expect.arrayContaining(["tag1", "tag2", "tag3", "tag4"]), + ); + expect( + (child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS] as string[]) + ?.length, + ).toBe(4); + }); + + it("should deduplicate tags when merging", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["tag1", "tag2"] }, () => { + propagateAttributes({ tags: ["tag2", "tag3"] }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Should have unique tags only: tag1, tag2, tag3 + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual( + expect.arrayContaining(["tag1", "tag2", "tag3"]), + ); + expect( + (child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS] as string[]) + ?.length, + ).toBe(3); + }); + + it("should merge tags across nested contexts", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["outer", "shared"] }, () => { + const spanOuter1 = startObservation("span-outer-1"); + spanOuter1.end(); + + propagateAttributes({ tags: ["inner", "shared"] }, () => { + const spanInner = startObservation("span-inner"); + spanInner.end(); + }); + + const spanOuter2 = startObservation("span-outer-2"); + spanOuter2.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 4); + const spans = testEnv.mockExporter.exportedSpans; + const spanOuter1 = spans.find((s) => s.name === "span-outer-1"); + const spanInner = spans.find((s) => s.name === "span-inner"); + const spanOuter2 = spans.find((s) => s.name === "span-outer-2"); + + // spanOuter1: ["outer", "shared"] + expect( + spanOuter1?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS], + ).toEqual(expect.arrayContaining(["outer", "shared"])); + expect( + ( + spanOuter1?.attributes[ + LangfuseOtelSpanAttributes.TRACE_TAGS + ] as string[] + )?.length, + ).toBe(2); + + // spanInner: ["outer", "shared", "inner"] - "shared" deduplicated + expect( + spanInner?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS], + ).toEqual(expect.arrayContaining(["outer", "shared", "inner"])); + expect( + ( + spanInner?.attributes[ + LangfuseOtelSpanAttributes.TRACE_TAGS + ] as string[] + )?.length, + ).toBe(3); + + // spanOuter2: ["outer", "shared"] - restored to outer context + expect( + spanOuter2?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS], + ).toEqual(expect.arrayContaining(["outer", "shared"])); + expect( + ( + spanOuter2?.attributes[ + LangfuseOtelSpanAttributes.TRACE_TAGS + ] as string[] + )?.length, + ).toBe(2); + }); + + it("should handle empty tags array", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["tag1"] }, () => { + propagateAttributes({ tags: [] }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Should still have tag1 from outer context + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual([ + "tag1", + ]); + }); + + it("should propagate tags in baggage mode", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { tags: ["tag1", "tag2", "tag3"], asBaggage: true }, + () => { + const currentContext = otelContext.active(); + const baggage = propagation.getBaggage(currentContext); + + expect(baggage).toBeDefined(); + const entries = Array.from(baggage!.getAllEntries()); + const tagsEntry = entries.find(([key]) => key === "langfuse_tags"); + + expect(tagsEntry).toBeDefined(); + // Tags should be comma-separated in baggage + expect(tagsEntry?.[1].value).toBe("tag1,tag2,tag3"); + + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual([ + "tag1", + "tag2", + "tag3", + ]); + }); + + it("should merge tags in baggage mode", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["tag1"], asBaggage: true }, () => { + propagateAttributes({ tags: ["tag2"], asBaggage: true }, () => { + const currentContext = otelContext.active(); + const baggage = propagation.getBaggage(currentContext); + + expect(baggage).toBeDefined(); + const entries = Array.from(baggage!.getAllEntries()); + const tagsEntry = entries.find(([key]) => key === "langfuse_tags"); + + expect(tagsEntry).toBeDefined(); + // Merged tags should be comma-separated + expect(tagsEntry?.[1].value).toBe("tag1,tag2"); + + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual( + expect.arrayContaining(["tag1", "tag2"]), + ); + }); + + it("should drop tags over 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const longTag = "x".repeat(201); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ tags: ["valid-tag", longTag] }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual([ + "valid-tag", + ]); + }); + + it("should propagate tags with other attributes", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "user123", + sessionId: "session456", + tags: ["production", "test"], + metadata: { env: "prod" }, + }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user123", + ); + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session456"); + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_TAGS]).toEqual([ + "production", + "test", + ]); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.env`], + ).toBe("prod"); + }); + }); + + describe("Metadata Merging", () => { + it("should merge metadata from multiple propagateAttributes calls", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { key1: "value1" } }, () => { + propagateAttributes({ metadata: { key2: "value2" } }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Child should have both key1 and key2 + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + }); + + it("should allow metadata values to be overwritten by subsequent calls", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { key1: "value1" } }, () => { + propagateAttributes({ metadata: { key1: "value2" } }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Newer value should override + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value2"); + }); + + it("should preserve existing metadata when adding new keys", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { existing: "value" } }, () => { + const child1 = startObservation("child-1"); + child1.end(); + + propagateAttributes({ metadata: { new: "value2" } }, () => { + const child2 = startObservation("child-2"); + child2.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const child1 = spans.find((s) => s.name === "child-1"); + const child2 = spans.find((s) => s.name === "child-2"); + + // child1 should only have "existing" + expect( + child1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.existing` + ], + ).toBe("value"); + expect( + child1?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.new`], + ).toBeUndefined(); + + // child2 should have both "existing" and "new" + expect( + child2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.existing` + ], + ).toBe("value"); + expect( + child2?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.new`], + ).toBe("value2"); + }); + + it("should merge metadata across nested contexts", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { metadata: { level: "outer", shared: "outer" } }, + () => { + const spanOuter1 = startObservation("span-outer-1"); + spanOuter1.end(); + + propagateAttributes( + { metadata: { shared: "inner", extra: "inner" } }, + () => { + const spanInner = startObservation("span-inner"); + spanInner.end(); + }, + ); + + // Back to outer context + const spanOuter2 = startObservation("span-outer-2"); + spanOuter2.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 4); + const spans = testEnv.mockExporter.exportedSpans; + const spanOuter1 = spans.find((s) => s.name === "span-outer-1"); + const spanInner = spans.find((s) => s.name === "span-inner"); + const spanOuter2 = spans.find((s) => s.name === "span-outer-2"); + + // spanOuter1: {level: "outer", shared: "outer"} + expect( + spanOuter1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.level` + ], + ).toBe("outer"); + expect( + spanOuter1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.shared` + ], + ).toBe("outer"); + expect( + spanOuter1?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.extra` + ], + ).toBeUndefined(); + + // spanInner: {level: "outer", shared: "inner", extra: "inner"} + expect( + spanInner?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.level` + ], + ).toBe("outer"); + expect( + spanInner?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.shared` + ], + ).toBe("inner"); + expect( + spanInner?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.extra` + ], + ).toBe("inner"); + + // spanOuter2: {level: "outer", shared: "outer"} (restored) + expect( + spanOuter2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.level` + ], + ).toBe("outer"); + expect( + spanOuter2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.shared` + ], + ).toBe("outer"); + expect( + spanOuter2?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.extra` + ], + ).toBeUndefined(); + }); + + it("should merge metadata from multiple sequential calls in same context", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { key1: "value1" } }, () => { + propagateAttributes({ metadata: { key2: "value2" } }, () => { + propagateAttributes({ metadata: { key3: "value3" } }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // All three keys should be present + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key3`], + ).toBe("value3"); + }); + + it("should handle empty metadata object in merge", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { key1: "value1" } }, () => { + propagateAttributes({ metadata: {} }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // key1 should still be present + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + }); + + it("should handle undefined metadata in subsequent calls", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ metadata: { key1: "value1" } }, () => { + propagateAttributes({ userId: "user123" }, () => { + const child = startObservation("child"); + child.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // Both metadata and userId should be present + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user123", + ); + }); + + it("should merge metadata after some keys were dropped due to validation", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + metadata: { valid: "ok", invalid: "x".repeat(201) }, + }, + () => { + propagateAttributes({ metadata: { additional: "value" } }, () => { + const child = startObservation("child"); + child.end(); + }); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // valid and additional should be present, invalid should be dropped + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.valid`], + ).toBe("ok"); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.additional` + ], + ).toBe("value"); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.invalid` + ], + ).toBeUndefined(); + }); + + it("should merge metadata while updating other attributes", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { userId: "user1", metadata: { key1: "value1" } }, + () => { + propagateAttributes( + { userId: "user2", metadata: { key2: "value2" } }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // userId should be user2 (overwritten), both metadata keys should be present + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user2", + ); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + }); + + it("should merge metadata when only metadata is being updated", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "user1", + sessionId: "session1", + metadata: { key1: "value1" }, + }, + () => { + propagateAttributes({ metadata: { key2: "value2" } }, () => { + const child = startObservation("child"); + child.end(); + }); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + // All attributes should be present with merged metadata + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user1", + ); + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("session1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + }); + }); + + describe("Validation", () => { + it("should drop userId over 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const longUserId = "x".repeat(201); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: longUserId }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID], + ).toBeUndefined(); + }); + + it("should accept userId exactly 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const userId200 = "x".repeat(200); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: userId200 }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + userId200, + ); + }); + + it("should drop sessionId over 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const longSessionId = "y".repeat(201); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ sessionId: longSessionId }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBeUndefined(); + }); + + it("should drop version over 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const longVersion = "v".repeat(201); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ version: longVersion }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[LangfuseOtelSpanAttributes.VERSION], + ).toBeUndefined(); + }); + + it("should accept version exactly 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const version200 = "v".repeat(200); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ version: version200 }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.VERSION]).toBe( + version200, + ); + }); + + it("should drop metadata values over 200 characters", async () => { + const tracer = otelTrace.getTracer("test"); + const longValue = "z".repeat(201); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + metadata: { key: longValue }, + }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key`], + ).toBeUndefined(); + }); + + it("should drop non-string userId", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: 12345 as any }, () => { + const child = startObservation("child"); + child.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID], + ).toBeUndefined(); + }); + + it("should keep valid metadata and drop invalid", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + metadata: { + valid_key: "valid_value", + invalid_key: "x".repeat(201), + another_valid: "ok", + }, + }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.valid_key` + ], + ).toBe("valid_value"); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.another_valid` + ], + ).toBe("ok"); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.invalid_key` + ], + ).toBeUndefined(); + }); + }); + + describe("Baggage Propagation", () => { + it("should merge metadata in baggage mode", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { metadata: { key1: "value1" }, asBaggage: true }, + () => { + propagateAttributes( + { metadata: { key2: "value2" }, asBaggage: true }, + () => { + const currentContext = otelContext.active(); + const baggage = propagation.getBaggage(currentContext); + + expect(baggage).toBeDefined(); + const entries = Array.from(baggage!.getAllEntries()); + const baggageKeys = entries.map(([key]) => key); + + // Both metadata keys should be in baggage + expect(baggageKeys).toContain("langfuse_metadata_key1"); + expect(baggageKeys).toContain("langfuse_metadata_key2"); + + const key1Entry = entries.find( + ([key]) => key === "langfuse_metadata_key1", + ); + expect(key1Entry?.[1].value).toBe("value1"); + + const key2Entry = entries.find( + ([key]) => key === "langfuse_metadata_key2", + ); + expect(key2Entry?.[1].value).toBe("value2"); + + // Child span should also have both metadata keys + const child = startObservation("child"); + child.end(); + }, + ); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + child?.attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + }); + + it("should set baggage when asBaggage=true", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "user_123", + sessionId: "session_abc", + version: "v2.0", + metadata: { env: "test", region: "us-east" }, + asBaggage: true, + }, + () => { + // Get current context and inspect baggage + const currentContext = otelContext.active(); + const baggage = propagation.getBaggage(currentContext); + + expect(baggage).toBeDefined(); + const entries = Array.from(baggage!.getAllEntries()); + + // Check baggage keys exist + const baggageKeys = entries.map(([key]) => key); + expect(baggageKeys).toContain("langfuse_user_id"); + expect(baggageKeys).toContain("langfuse_session_id"); + expect(baggageKeys).toContain("langfuse_version"); + expect(baggageKeys).toContain("langfuse_metadata_env"); + expect(baggageKeys).toContain("langfuse_metadata_region"); + + // Check baggage values + const userIdEntry = entries.find( + ([key]) => key === "langfuse_user_id", + ); + expect(userIdEntry?.[1].value).toBe("user_123"); + + const sessionIdEntry = entries.find( + ([key]) => key === "langfuse_session_id", + ); + expect(sessionIdEntry?.[1].value).toBe("session_abc"); + + const versionEntry = entries.find( + ([key]) => key === "langfuse_version", + ); + expect(versionEntry?.[1].value).toBe("v2.0"); + + const envEntry = entries.find( + ([key]) => key === "langfuse_metadata_env", + ); + expect(envEntry?.[1].value).toBe("test"); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 1); + }); + + it("should propagate attributes from baggage to child spans", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "baggage_user", + sessionId: "baggage_session", + version: "v1.0-baggage", + metadata: { source: "baggage" }, + asBaggage: true, + }, + () => { + const child = startObservation("child"); + child.end(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 2); + const spans = testEnv.mockExporter.exportedSpans; + const child = spans.find((s) => s.name === "child"); + + expect(child?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "baggage_user", + ); + expect( + child?.attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID], + ).toBe("baggage_session"); + expect(child?.attributes[LangfuseOtelSpanAttributes.VERSION]).toBe( + "v1.0-baggage", + ); + expect( + child?.attributes[ + `${LangfuseOtelSpanAttributes.TRACE_METADATA}.source` + ], + ).toBe("baggage"); + }); + + it("should not set baggage when asBaggage=false (default)", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes( + { + userId: "user_123", + sessionId: "session_abc", + }, + () => { + const currentContext = otelContext.active(); + const baggage = propagation.getBaggage(currentContext); + + expect(baggage).toBeUndefined(); + }, + ); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 1); + }); + }); + + describe("Nesting and Context Isolation", () => { + it("should allow nested contexts with different values", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: "user1" }, () => { + const span1 = startObservation("span-1"); + span1.end(); + + propagateAttributes({ userId: "user2" }, () => { + const span2 = startObservation("span-2"); + span2.end(); + }); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 3); + const spans = testEnv.mockExporter.exportedSpans; + const span1 = spans.find((s) => s.name === "span-1"); + const span2 = spans.find((s) => s.name === "span-2"); + + expect(span1?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user1", + ); + expect(span2?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user2", + ); + }); + + it("should restore outer context after inner context exits", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + propagateAttributes({ userId: "user1" }, () => { + const span1 = startObservation("span-1"); + span1.end(); + + propagateAttributes({ userId: "user2" }, () => { + const span2 = startObservation("span-2"); + span2.end(); + }); + + // Back to outer context + const span3 = startObservation("span-3"); + span3.end(); + }); + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 4); + const spans = testEnv.mockExporter.exportedSpans; + const span1 = spans.find((s) => s.name === "span-1"); + const span2 = spans.find((s) => s.name === "span-2"); + const span3 = spans.find((s) => s.name === "span-3"); + + expect(span1?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user1", + ); + expect(span2?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user2", + ); + expect(span3?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "user1", + ); + }); + + it("should not propagate to spans outside context", async () => { + const tracer = otelTrace.getTracer("test"); + + await tracer.startActiveSpan("parent", async (parentSpan) => { + // Span before propagation + const beforeSpan = startObservation("before"); + beforeSpan.end(); + + propagateAttributes({ userId: "user_123" }, () => { + const insideSpan = startObservation("inside"); + insideSpan.end(); + }); + + // Span after propagation context exits + const afterSpan = startObservation("after"); + afterSpan.end(); + + parentSpan.end(); + }); + + await waitForSpanExport(testEnv.mockExporter, 4); + const spans = testEnv.mockExporter.exportedSpans; + const beforeSpan = spans.find((s) => s.name === "before"); + const insideSpan = spans.find((s) => s.name === "inside"); + const afterSpan = spans.find((s) => s.name === "after"); + + expect( + beforeSpan?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID], + ).toBeUndefined(); + expect( + insideSpan?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID], + ).toBe("user_123"); + expect( + afterSpan?.attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID], + ).toBeUndefined(); + }); + }); + + describe("getPropagatedAttributesFromContext", () => { + it("should read userId from context", () => { + const context = ROOT_CONTEXT.setValue( + LangfuseOtelContextKeys["userId"], + "test_user", + ); + const attributes = getPropagatedAttributesFromContext(context); + + expect(attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "test_user", + ); + }); + + it("should read sessionId from context", () => { + const context = ROOT_CONTEXT.setValue( + LangfuseOtelContextKeys["sessionId"], + "test_session", + ); + const attributes = getPropagatedAttributesFromContext(context); + + expect(attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID]).toBe( + "test_session", + ); + }); + + it("should read version from context", () => { + const context = ROOT_CONTEXT.setValue( + LangfuseOtelContextKeys["version"], + "v3.1.4", + ); + const attributes = getPropagatedAttributesFromContext(context); + + expect(attributes[LangfuseOtelSpanAttributes.VERSION]).toBe("v3.1.4"); + }); + + it("should read metadata from context", () => { + const context = ROOT_CONTEXT.setValue( + LangfuseOtelContextKeys["metadata"], + { + key1: "value1", + key2: "value2", + }, + ); + const attributes = getPropagatedAttributesFromContext(context); + + expect( + attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key1`], + ).toBe("value1"); + expect( + attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.key2`], + ).toBe("value2"); + }); + + it("should read attributes from baggage", () => { + let baggage = propagation.createBaggage(); + baggage = baggage.setEntry("langfuse_user_id", { value: "baggage_user" }); + baggage = baggage.setEntry("langfuse_session_id", { + value: "baggage_session", + }); + baggage = baggage.setEntry("langfuse_version", { value: "v2.5.1" }); + baggage = baggage.setEntry("langfuse_metadata_env", { value: "prod" }); + + const context = propagation.setBaggage(ROOT_CONTEXT, baggage); + const attributes = getPropagatedAttributesFromContext(context); + + expect(attributes[LangfuseOtelSpanAttributes.TRACE_USER_ID]).toBe( + "baggage_user", + ); + expect(attributes[LangfuseOtelSpanAttributes.TRACE_SESSION_ID]).toBe( + "baggage_session", + ); + expect(attributes[LangfuseOtelSpanAttributes.VERSION]).toBe("v2.5.1"); + expect( + attributes[`${LangfuseOtelSpanAttributes.TRACE_METADATA}.env`], + ).toBe("prod"); + }); + + it("should return empty object for context with no propagated attributes", () => { + const attributes = getPropagatedAttributesFromContext(ROOT_CONTEXT); + + expect(Object.keys(attributes)).toHaveLength(0); + }); + }); +}); From f22827501c25e6f0a3a61f6d8b709b37a4197585 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 3 Nov 2025 11:24:09 +0100 Subject: [PATCH 2/2] chore: release v4.4.0 --- package.json | 2 +- packages/client/package.json | 2 +- packages/core/package.json | 2 +- packages/langchain/package.json | 2 +- packages/openai/package.json | 2 +- packages/otel/package.json | 2 +- packages/tracing/package.json | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index f7fb6b42..b66cb3a9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "langfuse-js", - "version": "4.3.0", + "version": "4.4.0", "description": "Langfuse JavaScript / TypeScript SDK", "author": "Langfuse", "license": "MIT", diff --git a/packages/client/package.json b/packages/client/package.json index 1af3412b..b8681668 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/client", - "version": "4.3.0", + "version": "4.4.0", "description": "Langfuse API client for universal JavaScript environments", "type": "module", "sideEffects": false, diff --git a/packages/core/package.json b/packages/core/package.json index 11212624..5caf5f0f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/core", - "version": "4.3.0", + "version": "4.4.0", "description": "Core functions and utilities for Langfuse packages", "type": "module", "sideEffects": false, diff --git a/packages/langchain/package.json b/packages/langchain/package.json index b982ff1d..8fc5d567 100644 --- a/packages/langchain/package.json +++ b/packages/langchain/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/langchain", - "version": "4.3.0", + "version": "4.4.0", "description": "Langfuse integration for LangChain", "type": "module", "sideEffects": false, diff --git a/packages/openai/package.json b/packages/openai/package.json index 70b300d4..35876589 100644 --- a/packages/openai/package.json +++ b/packages/openai/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/openai", - "version": "4.3.0", + "version": "4.4.0", "description": "Langfuse integration for OpenAI SDK", "type": "module", "sideEffects": false, diff --git a/packages/otel/package.json b/packages/otel/package.json index b53a4bb3..dbb8432f 100644 --- a/packages/otel/package.json +++ b/packages/otel/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/otel", - "version": "4.3.0", + "version": "4.4.0", "author": "Langfuse", "license": "MIT", "engines": { diff --git a/packages/tracing/package.json b/packages/tracing/package.json index e48a60bd..1fa4ed07 100644 --- a/packages/tracing/package.json +++ b/packages/tracing/package.json @@ -1,6 +1,6 @@ { "name": "@langfuse/tracing", - "version": "4.3.0", + "version": "4.4.0", "author": "Langfuse", "license": "MIT", "engines": {