Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@
LangfuseSpan,
LangfuseTool,
)
from langfuse._client.utils import get_sha256_hash_hex, run_async_safely
from langfuse._client.utils import (
get_sha256_hash_hex,
get_string_span_attribute,
run_async_safely,
)
from langfuse._utils import _get_timestamp, json_path
from langfuse._utils.environment import get_common_release_envs
from langfuse._utils.parse_error import handle_fern_exception
Expand Down Expand Up @@ -1820,6 +1824,7 @@ def create_score(
config_id: Optional[str] = None,
metadata: Optional[Any] = None,
timestamp: Optional[datetime] = None,
environment: Optional[str] = None,
) -> None: ...

@overload
Expand All @@ -1840,6 +1845,7 @@ def create_score(
config_id: Optional[str] = None,
metadata: Optional[Any] = None,
timestamp: Optional[datetime] = None,
environment: Optional[str] = None,
) -> None: ...

def create_score(
Expand All @@ -1857,6 +1863,7 @@ def create_score(
config_id: Optional[str] = None,
metadata: Optional[Any] = None,
timestamp: Optional[datetime] = None,
environment: Optional[str] = None,
) -> None:
"""Create a score for a specific trace or observation.

Expand All @@ -1876,6 +1883,14 @@ def create_score(
config_id: Optional ID of a score config defined in Langfuse
metadata: Optional metadata to be attached to the score
timestamp: Optional timestamp for the score (defaults to current UTC time)
environment: Optional environment override for this score. If omitted,
the score uses the client-level environment from
`Langfuse(environment=...)` or `LANGFUSE_TRACING_ENVIRONMENT`.
Langfuse observation wrapper methods pass their resolved span
environment here so scores created via `span.score()` or
`span.score_trace()` stay grouped with the scored observation or
trace, including request-scoped environments propagated with
`propagate_attributes(environment=...)`.

Example:
```python
Expand Down Expand Up @@ -1915,7 +1930,7 @@ def create_score(
dataType=data_type, # type: ignore
comment=comment,
configId=config_id,
environment=self._environment,
environment=environment or self._environment,
metadata=metadata,
)

Expand Down Expand Up @@ -2018,6 +2033,9 @@ def score_current_span(

This method scores the currently active span in the context. It's a convenient
way to score the current operation without needing to know its trace and span IDs.
If the active span has a `langfuse.environment` attribute, including one
set by `propagate_attributes(environment=...)`, the score uses that
environment. Otherwise it uses the client-level environment.

Args:
name: Name of the score (e.g., "relevance", "accuracy")
Expand Down Expand Up @@ -2065,6 +2083,9 @@ def score_current_span(
comment=comment,
config_id=config_id,
metadata=metadata,
environment=get_string_span_attribute(
current_span, LangfuseOtelSpanAttributes.ENVIRONMENT
),
)

@overload
Expand Down Expand Up @@ -2111,6 +2132,9 @@ def score_current_trace(
This method scores the trace of the currently active span. Unlike score_current_span,
this method associates the score with the entire trace rather than a specific span.
It's useful for scoring overall performance or quality of the entire operation.
If the active span has a `langfuse.environment` attribute, including one
set by `propagate_attributes(environment=...)`, the score uses that
environment. Otherwise it uses the client-level environment.

Args:
name: Name of the score (e.g., "user_satisfaction", "overall_quality")
Expand Down Expand Up @@ -2156,6 +2180,9 @@ def score_current_trace(
comment=comment,
config_id=config_id,
metadata=metadata,
environment=get_string_span_attribute(
current_span, LangfuseOtelSpanAttributes.ENVIRONMENT
),
)

def flush(self) -> None:
Expand Down
103 changes: 91 additions & 12 deletions langfuse/_client/propagation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Attribute propagation utilities for Langfuse OpenTelemetry integration.
This module provides the `propagate_attributes` context manager for setting trace-level
attributes (user_id, session_id, metadata) that automatically propagate to all child spans
within the context.
attributes (user_id, session_id, metadata, environment, etc.) that automatically
propagate to all child spans within the context.
"""

import re
Comment thread
hassiebp marked this conversation as resolved.
from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast

from opentelemetry import (
Expand Down Expand Up @@ -36,6 +37,7 @@
"version",
"tags",
"trace_name",
"environment",
]

InternalPropagatedKeys = Literal[
Expand All @@ -55,6 +57,7 @@
"version",
"tags",
"trace_name",
"environment",
"experiment_id",
"experiment_name",
"experiment_metadata",
Expand Down Expand Up @@ -99,14 +102,16 @@ def propagate_attributes(
version: Optional[str] = None,
tags: Optional[List[str]] = None,
trace_name: Optional[str] = None,
environment: Optional[str] = None,
as_baggage: bool = False,
) -> _AgnosticContextManager[Any]:
"""Propagate trace-level attributes to all spans created within this context.
This context manager sets attributes on the currently active span AND automatically
propagates them to all new child spans created within the context. This is the
recommended way to set trace-level attributes like user_id, session_id, and metadata
dimensions that should be consistently applied across all observations in a trace.
recommended way to set trace-level attributes like user_id, session_id,
environment, and metadata dimensions that should be consistently applied across
all observations in a trace.
**IMPORTANT**: Call this as early as possible within your trace/workflow. Only the
currently active span and spans created after entering this context will have these
Expand Down Expand Up @@ -134,9 +139,19 @@ def propagate_attributes(
tags: List of tags to categorize the group of observations
trace_name: Name to assign to the trace. Must be US-ASCII string, ≤200 characters.
Use this to set a consistent trace name for all spans created within this context.
environment: Langfuse environment to assign to spans created in this context.
Must be a lowercase alphanumeric string with optional hyphens or underscores,
must be ≤40 characters, and must not start with "langfuse". This maps to
the first-class `langfuse.environment` attribute, not to trace metadata.
Use it for request-scoped environments, for example when one shared proxy
handles calls from dev, staging, qa, and prod. A propagated environment
takes precedence over the local client default configured via
`Langfuse(environment=...)` or `LANGFUSE_TRACING_ENVIRONMENT` for spans
created while this propagation context is active.
as_baggage: If True, propagates attributes using OpenTelemetry baggage for
cross-process/service propagation. **Security warning**: When enabled,
attribute values are added to HTTP headers on ALL outbound requests.
This includes `environment` as the `langfuse_environment` baggage entry.
Only enable if values are safe to transmit via HTTP headers and you need
cross-service tracing. Default: False.
Expand All @@ -156,11 +171,12 @@ def propagate_attributes(
with langfuse.propagate_attributes(
user_id="user_123",
session_id="session_abc",
metadata={"experiment": "variant_a", "environment": "production"}
environment="production",
metadata={"experiment": "variant_a"}
):
# All spans created here will have user_id, session_id, and metadata
# All spans created here will have user_id, session_id, environment, and metadata
with langfuse.start_observation(name="llm_call") as llm_span:
# This span inherits: user_id, session_id, experiment, environment
# This span inherits user_id, session_id, environment, and experiment metadata
...
with langfuse.start_generation(name="completion") as gen:
Expand Down Expand Up @@ -193,22 +209,27 @@ def propagate_attributes(
with langfuse.propagate_attributes(
user_id="user_123",
session_id="session_abc",
environment="staging",
as_baggage=True # Propagate via HTTP headers
):
# Make HTTP request to Service B
response = requests.get("https://service-b.example.com/api")
# user_id and session_id are now in HTTP headers
# user_id, session_id, and environment are now in HTTP headers
# Service B - downstream service
# OpenTelemetry will automatically extract baggage from HTTP headers
# and propagate to spans in Service B
# and propagate attributes to spans in Service B. If Service B has a local
# Langfuse environment configured, the propagated environment wins for
# spans created within this context.
```
Note:
- **Validation**: Attribute values (user_id, session_id, version, tags,
trace_name) must be strings ≤200 characters. Metadata values are
coerced to strings before the 200 character limit is applied. Invalid
values will be dropped with a warning logged.
trace_name) must be strings ≤200 characters. Environment must also match
Langfuse's environment format: lowercase alphanumeric with optional
hyphens or underscores, must be ≤40 characters, and it must not start with "langfuse". Metadata
values are coerced to strings before the 200 character limit is applied.
Invalid values will be dropped with a warning logged.
- **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood,
making it compatible with other OTel-instrumented libraries.
Expand All @@ -222,6 +243,7 @@ def propagate_attributes(
version=version,
tags=tags,
trace_name=trace_name,
environment=environment,
as_baggage=as_baggage,
)

Expand All @@ -235,6 +257,7 @@ def _propagate_attributes(
version: Optional[str] = None,
tags: Optional[List[str]] = None,
trace_name: Optional[str] = None,
environment: Optional[str] = None,
as_baggage: bool = False,
experiment: Optional[PropagatedExperimentAttributes] = None,
) -> Generator[Any, Any, Any]:
Expand All @@ -247,6 +270,7 @@ def _propagate_attributes(
"version": version,
"tags": tags,
"trace_name": trace_name,
"environment": environment,
}

propagated_metadata_attributes: Dict[str, Optional[Dict[str, Any]]] = {
Expand Down Expand Up @@ -327,6 +351,17 @@ def _get_propagated_attributes_from_context(
span_key = _get_span_key_from_baggage_key(baggage_key)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? I think we can drop as we don't revalidate on read, only validate on write to baggage / context

if span_key:
if span_key == LangfuseOtelSpanAttributes.ENVIRONMENT:
validated_environment = _validate_environment_value(
value=baggage_value
)

if validated_environment is None:
continue

propagated_attributes[span_key] = validated_environment
continue

propagated_attributes[span_key] = (
baggage_value
if isinstance(baggage_value, (str, list))
Expand All @@ -341,6 +376,17 @@ def _get_propagated_attributes_from_context(
if value is None:
continue

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? I think we can drop as we don't revalidate on read, only validate on write to baggage / context


if key == "environment":
validated_environment = _validate_environment_value(value=value)

if validated_environment is None:
continue

propagated_attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = (
validated_environment
)
continue

if isinstance(value, dict):
# Handle metadata
span_key = _get_propagated_span_key(key)
Expand Down Expand Up @@ -435,6 +481,9 @@ def _set_propagated_attribute(
def _validate_propagated_value(
*, value: Any, key: str
) -> Optional[Union[str, List[str]]]:
if key == "environment":
return _validate_environment_value(value=value)

if isinstance(value, list):
validated_values = [
v for v in value if _validate_string_value(key=key, value=v)
Expand Down Expand Up @@ -473,6 +522,35 @@ def _validate_string_value(*, value: str, key: str) -> bool:
return True


Comment thread
hassiebp marked this conversation as resolved.
_ENVIRONMENT_VALUE_PATTERN = re.compile(r"^(?!langfuse)[a-z0-9_-]+$")

@hassiebp hassiebp Jun 25, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a simple one check for the environment up in _validate_propagated_value in the key === environment condition to not start with "langfuse" and return None early if fails else fall through to the other checks


def _validate_environment_value(*, value: Any) -> Optional[str]:
key = "environment"

if not isinstance(value, str):
langfuse_logger.warning( # type: ignore
f"Propagated attribute '{key}' value is not a string. Dropping value."
)
return None

if len(value) > 40:
langfuse_logger.warning(
f"Propagated attribute '{key}' value is over 40 characters ({len(value)} chars). Dropping value."
)
return None

if not _ENVIRONMENT_VALUE_PATTERN.fullmatch(value):
langfuse_logger.warning(
"Propagated attribute 'environment' must be a lowercase alphanumeric "
"string with optional hyphens or underscores and must not start with "
"'langfuse'. Dropping value."
)
return None

return value


def _get_propagated_context_key(key: str) -> str:
return f"langfuse.propagated.{key}"

Expand Down Expand Up @@ -542,6 +620,7 @@ def _get_propagated_span_key(key: str) -> str:
"version": LangfuseOtelSpanAttributes.VERSION,
"tags": LangfuseOtelSpanAttributes.TRACE_TAGS,
"trace_name": LangfuseOtelSpanAttributes.TRACE_NAME,
"environment": LangfuseOtelSpanAttributes.ENVIRONMENT,
"metadata": LangfuseOtelSpanAttributes.TRACE_METADATA,
"experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID,
"experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME,
Expand Down
Loading