diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index c9b4ca265..bfd8267fc 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -85,7 +85,11 @@ LangfuseSpan, LangfuseTool, ) -from langfuse._client.utils import get_sha256_hash_hex, run_async_safely +from langfuse._client.utils import ( + get_sha256_hash_hex, + get_string_span_attribute, + run_async_safely, +) from langfuse._utils import _get_timestamp, json_path from langfuse._utils.environment import get_common_release_envs from langfuse._utils.parse_error import handle_fern_exception @@ -1820,6 +1824,7 @@ def create_score( config_id: Optional[str] = None, metadata: Optional[Any] = None, timestamp: Optional[datetime] = None, + environment: Optional[str] = None, ) -> None: ... @overload @@ -1840,6 +1845,7 @@ def create_score( config_id: Optional[str] = None, metadata: Optional[Any] = None, timestamp: Optional[datetime] = None, + environment: Optional[str] = None, ) -> None: ... def create_score( @@ -1857,6 +1863,7 @@ def create_score( config_id: Optional[str] = None, metadata: Optional[Any] = None, timestamp: Optional[datetime] = None, + environment: Optional[str] = None, ) -> None: """Create a score for a specific trace or observation. @@ -1876,6 +1883,14 @@ def create_score( config_id: Optional ID of a score config defined in Langfuse metadata: Optional metadata to be attached to the score timestamp: Optional timestamp for the score (defaults to current UTC time) + environment: Optional environment override for this score. If omitted, + the score uses the client-level environment from + `Langfuse(environment=...)` or `LANGFUSE_TRACING_ENVIRONMENT`. + Langfuse observation wrapper methods pass their resolved span + environment here so scores created via `span.score()` or + `span.score_trace()` stay grouped with the scored observation or + trace, including request-scoped environments propagated with + `propagate_attributes(environment=...)`. Example: ```python @@ -1915,7 +1930,7 @@ def create_score( dataType=data_type, # type: ignore comment=comment, configId=config_id, - environment=self._environment, + environment=environment or self._environment, metadata=metadata, ) @@ -2018,6 +2033,9 @@ def score_current_span( This method scores the currently active span in the context. It's a convenient way to score the current operation without needing to know its trace and span IDs. + If the active span has a `langfuse.environment` attribute, including one + set by `propagate_attributes(environment=...)`, the score uses that + environment. Otherwise it uses the client-level environment. Args: name: Name of the score (e.g., "relevance", "accuracy") @@ -2065,6 +2083,9 @@ def score_current_span( comment=comment, config_id=config_id, metadata=metadata, + environment=get_string_span_attribute( + current_span, LangfuseOtelSpanAttributes.ENVIRONMENT + ), ) @overload @@ -2111,6 +2132,9 @@ def score_current_trace( This method scores the trace of the currently active span. Unlike score_current_span, this method associates the score with the entire trace rather than a specific span. It's useful for scoring overall performance or quality of the entire operation. + If the active span has a `langfuse.environment` attribute, including one + set by `propagate_attributes(environment=...)`, the score uses that + environment. Otherwise it uses the client-level environment. Args: name: Name of the score (e.g., "user_satisfaction", "overall_quality") @@ -2156,6 +2180,9 @@ def score_current_trace( comment=comment, config_id=config_id, metadata=metadata, + environment=get_string_span_attribute( + current_span, LangfuseOtelSpanAttributes.ENVIRONMENT + ), ) def flush(self) -> None: diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py index 7afa81cbe..cf06dcd33 100644 --- a/langfuse/_client/propagation.py +++ b/langfuse/_client/propagation.py @@ -1,10 +1,11 @@ """Attribute propagation utilities for Langfuse OpenTelemetry integration. This module provides the `propagate_attributes` context manager for setting trace-level -attributes (user_id, session_id, metadata) that automatically propagate to all child spans -within the context. +attributes (user_id, session_id, metadata, environment, etc.) that automatically +propagate to all child spans within the context. """ +import re from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast from opentelemetry import ( @@ -36,6 +37,7 @@ "version", "tags", "trace_name", + "environment", ] InternalPropagatedKeys = Literal[ @@ -55,6 +57,7 @@ "version", "tags", "trace_name", + "environment", "experiment_id", "experiment_name", "experiment_metadata", @@ -99,14 +102,16 @@ def propagate_attributes( version: Optional[str] = None, tags: Optional[List[str]] = None, trace_name: Optional[str] = None, + environment: Optional[str] = None, as_baggage: bool = False, ) -> _AgnosticContextManager[Any]: """Propagate trace-level attributes to all spans created within this context. This context manager sets attributes on the currently active span AND automatically propagates them to all new child spans created within the context. This is the - recommended way to set trace-level attributes like user_id, session_id, and metadata - dimensions that should be consistently applied across all observations in a trace. + recommended way to set trace-level attributes like user_id, session_id, + environment, and metadata dimensions that should be consistently applied across + all observations in a trace. **IMPORTANT**: Call this as early as possible within your trace/workflow. Only the currently active span and spans created after entering this context will have these @@ -134,9 +139,19 @@ def propagate_attributes( tags: List of tags to categorize the group of observations trace_name: Name to assign to the trace. Must be US-ASCII string, ≤200 characters. Use this to set a consistent trace name for all spans created within this context. + environment: Langfuse environment to assign to spans created in this context. + Must be a lowercase alphanumeric string with optional hyphens or underscores, + must be ≤40 characters, and must not start with "langfuse". This maps to + the first-class `langfuse.environment` attribute, not to trace metadata. + Use it for request-scoped environments, for example when one shared proxy + handles calls from dev, staging, qa, and prod. A propagated environment + takes precedence over the local client default configured via + `Langfuse(environment=...)` or `LANGFUSE_TRACING_ENVIRONMENT` for spans + created while this propagation context is active. as_baggage: If True, propagates attributes using OpenTelemetry baggage for cross-process/service propagation. **Security warning**: When enabled, attribute values are added to HTTP headers on ALL outbound requests. + This includes `environment` as the `langfuse_environment` baggage entry. Only enable if values are safe to transmit via HTTP headers and you need cross-service tracing. Default: False. @@ -156,11 +171,12 @@ def propagate_attributes( with langfuse.propagate_attributes( user_id="user_123", session_id="session_abc", - metadata={"experiment": "variant_a", "environment": "production"} + environment="production", + metadata={"experiment": "variant_a"} ): - # All spans created here will have user_id, session_id, and metadata + # All spans created here will have user_id, session_id, environment, and metadata with langfuse.start_observation(name="llm_call") as llm_span: - # This span inherits: user_id, session_id, experiment, environment + # This span inherits user_id, session_id, environment, and experiment metadata ... with langfuse.start_generation(name="completion") as gen: @@ -193,22 +209,27 @@ def propagate_attributes( with langfuse.propagate_attributes( user_id="user_123", session_id="session_abc", + environment="staging", as_baggage=True # Propagate via HTTP headers ): # Make HTTP request to Service B response = requests.get("https://service-b.example.com/api") - # user_id and session_id are now in HTTP headers + # user_id, session_id, and environment are now in HTTP headers # Service B - downstream service # OpenTelemetry will automatically extract baggage from HTTP headers - # and propagate to spans in Service B + # and propagate attributes to spans in Service B. If Service B has a local + # Langfuse environment configured, the propagated environment wins for + # spans created within this context. ``` Note: - **Validation**: Attribute values (user_id, session_id, version, tags, - trace_name) must be strings ≤200 characters. Metadata values are - coerced to strings before the 200 character limit is applied. Invalid - values will be dropped with a warning logged. + trace_name) must be strings ≤200 characters. Environment must also match + Langfuse's environment format: lowercase alphanumeric with optional + hyphens or underscores, must be ≤40 characters, and it must not start with "langfuse". Metadata + values are coerced to strings before the 200 character limit is applied. + Invalid values will be dropped with a warning logged. - **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood, making it compatible with other OTel-instrumented libraries. @@ -222,6 +243,7 @@ def propagate_attributes( version=version, tags=tags, trace_name=trace_name, + environment=environment, as_baggage=as_baggage, ) @@ -235,6 +257,7 @@ def _propagate_attributes( version: Optional[str] = None, tags: Optional[List[str]] = None, trace_name: Optional[str] = None, + environment: Optional[str] = None, as_baggage: bool = False, experiment: Optional[PropagatedExperimentAttributes] = None, ) -> Generator[Any, Any, Any]: @@ -247,6 +270,7 @@ def _propagate_attributes( "version": version, "tags": tags, "trace_name": trace_name, + "environment": environment, } propagated_metadata_attributes: Dict[str, Optional[Dict[str, Any]]] = { @@ -327,6 +351,17 @@ def _get_propagated_attributes_from_context( span_key = _get_span_key_from_baggage_key(baggage_key) if span_key: + if span_key == LangfuseOtelSpanAttributes.ENVIRONMENT: + validated_environment = _validate_environment_value( + value=baggage_value + ) + + if validated_environment is None: + continue + + propagated_attributes[span_key] = validated_environment + continue + propagated_attributes[span_key] = ( baggage_value if isinstance(baggage_value, (str, list)) @@ -341,6 +376,17 @@ def _get_propagated_attributes_from_context( if value is None: continue + if key == "environment": + validated_environment = _validate_environment_value(value=value) + + if validated_environment is None: + continue + + propagated_attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = ( + validated_environment + ) + continue + if isinstance(value, dict): # Handle metadata span_key = _get_propagated_span_key(key) @@ -435,6 +481,9 @@ def _set_propagated_attribute( def _validate_propagated_value( *, value: Any, key: str ) -> Optional[Union[str, List[str]]]: + if key == "environment": + return _validate_environment_value(value=value) + if isinstance(value, list): validated_values = [ v for v in value if _validate_string_value(key=key, value=v) @@ -473,6 +522,35 @@ def _validate_string_value(*, value: str, key: str) -> bool: return True +_ENVIRONMENT_VALUE_PATTERN = re.compile(r"^(?!langfuse)[a-z0-9_-]+$") + + +def _validate_environment_value(*, value: Any) -> Optional[str]: + key = "environment" + + if not isinstance(value, str): + langfuse_logger.warning( # type: ignore + f"Propagated attribute '{key}' value is not a string. Dropping value." + ) + return None + + if len(value) > 40: + langfuse_logger.warning( + f"Propagated attribute '{key}' value is over 40 characters ({len(value)} chars). Dropping value." + ) + return None + + if not _ENVIRONMENT_VALUE_PATTERN.fullmatch(value): + langfuse_logger.warning( + "Propagated attribute 'environment' must be a lowercase alphanumeric " + "string with optional hyphens or underscores and must not start with " + "'langfuse'. Dropping value." + ) + return None + + return value + + def _get_propagated_context_key(key: str) -> str: return f"langfuse.propagated.{key}" @@ -542,6 +620,7 @@ def _get_propagated_span_key(key: str) -> str: "version": LangfuseOtelSpanAttributes.VERSION, "tags": LangfuseOtelSpanAttributes.TRACE_TAGS, "trace_name": LangfuseOtelSpanAttributes.TRACE_NAME, + "environment": LangfuseOtelSpanAttributes.ENVIRONMENT, "metadata": LangfuseOtelSpanAttributes.TRACE_METADATA, "experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID, "experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME, diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index c3d291b3a..828c7590a 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -51,6 +51,7 @@ ObservationTypeSpanLike, get_observation_types_list, ) +from langfuse._client.utils import get_string_span_attribute from langfuse.api import MapValue, ScoreDataType from langfuse.logger import langfuse_logger from langfuse.types import SpanLevel @@ -105,7 +106,10 @@ def __init__( input: Input data for the span (any JSON-serializable object) output: Output data from the span (any JSON-serializable object) metadata: Additional metadata to associate with the span - environment: The tracing environment + environment: Local tracing environment fallback. If the underlying + OpenTelemetry span already has `langfuse.environment` from + propagated context or baggage, that propagated value takes + precedence over this fallback. release: Release identifier for the application version: Version identifier for the code or component level: Importance level of the span (info, warning, error) @@ -127,7 +131,12 @@ def __init__( self.trace_id = self._langfuse_client._get_otel_trace_id(otel_span) self.id = self._langfuse_client._get_otel_span_id(otel_span) - self._environment = environment or self._langfuse_client._environment + existing_environment = get_string_span_attribute( + self._otel_span, LangfuseOtelSpanAttributes.ENVIRONMENT + ) + self._environment = ( + existing_environment or environment or self._langfuse_client._environment + ) if self._environment is not None: self._otel_span.set_attribute( LangfuseOtelSpanAttributes.ENVIRONMENT, self._environment @@ -334,6 +343,9 @@ def score( This method creates a score associated with this specific span (observation). Scores can represent any kind of evaluation, feedback, or quality metric. + The score uses this span wrapper's resolved environment. That means a + request-scoped environment propagated via ``propagate_attributes`` takes + precedence over the client-level environment when this score is created. Args: name: Name of the score (e.g., "relevance", "accuracy") @@ -371,6 +383,7 @@ def score( config_id=config_id, timestamp=timestamp, metadata=metadata, + environment=self._environment, ) @overload @@ -423,7 +436,9 @@ def score_trace( This method creates a score associated with the entire trace that this span belongs to, rather than the specific span. This is useful for overall - evaluations that apply to the complete trace. + evaluations that apply to the complete trace. The score uses this span + wrapper's resolved environment, including any request-scoped environment + propagated via ``propagate_attributes``. Args: name: Name of the score (e.g., "user_satisfaction", "overall_quality") @@ -460,6 +475,7 @@ def score_trace( config_id=config_id, timestamp=timestamp, metadata=metadata, + environment=self._environment, ) def _set_processed_span_attributes( diff --git a/langfuse/_client/utils.py b/langfuse/_client/utils.py index ccce3d9ef..9ef9a767d 100644 --- a/langfuse/_client/utils.py +++ b/langfuse/_client/utils.py @@ -9,13 +9,37 @@ import json import threading from hashlib import sha256 -from typing import Any, Coroutine +from typing import Any, Coroutine, Optional from opentelemetry import trace as otel_trace_api from opentelemetry.sdk import util from opentelemetry.sdk.trace import ReadableSpan +def get_string_span_attribute( + otel_span: otel_trace_api.Span, attribute_key: str +) -> Optional[str]: + """Return a string-valued attribute from a recording OpenTelemetry span. + + OpenTelemetry's public ``Span`` type does not guarantee an ``attributes`` + mapping, but the SDK spans wrapped by Langfuse expose one while recording. + This helper intentionally returns ``None`` for non-recording spans, spans + without an attribute mapping, missing attributes, and non-string values. + Callers use that ``None`` to fall back to their existing local defaults. + """ + + if not otel_span.is_recording(): + return None + + attributes = getattr(otel_span, "attributes", None) + if attributes is None or not hasattr(attributes, "get"): + return None + + value = attributes.get(attribute_key) + + return value if isinstance(value, str) else None + + def span_formatter(span: ReadableSpan) -> str: parent_id = ( otel_trace_api.format_span_id(span.parent.span_id) if span.parent else None diff --git a/tests/unit/test_propagate_attributes.py b/tests/unit/test_propagate_attributes.py index 109b88a37..0ef92ac10 100644 --- a/tests/unit/test_propagate_attributes.py +++ b/tests/unit/test_propagate_attributes.py @@ -1850,6 +1850,207 @@ def test_baggage_survives_context_isolation(self, langfuse_client, memory_export ) +class TestPropagateAttributesEnvironment(TestPropagateAttributesBase): + """Tests for first-class Langfuse environment propagation.""" + + def _capture_score_events(self, monkeypatch, langfuse_client): + """Capture score ingestion events before they reach the background queue.""" + + score_events = [] + assert langfuse_client._resources is not None + + def capture_score_event(event, *, force_sample=False): + score_events.append(event) + + monkeypatch.setattr( + langfuse_client._resources, + "add_score_task", + capture_score_event, + ) + + return score_events + + def test_environment_propagates_to_child_spans( + self, langfuse_client, memory_exporter + ): + """Verify environment propagates as langfuse.environment, not metadata.""" + with langfuse_client.start_as_current_observation(name="parent-span"): + with propagate_attributes(environment="staging"): + child = langfuse_client.start_observation(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.ENVIRONMENT, "staging" + ) + self.verify_missing_attribute( + child_span, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.environment" + ) + + def test_environment_with_baggage(self, langfuse_client, memory_exporter): + """Verify environment is written to baggage and extracted onto spans.""" + from opentelemetry import baggage + from opentelemetry import context as otel_context + + with langfuse_client.start_as_current_observation(name="parent-span"): + with propagate_attributes(environment="qa", as_baggage=True): + current_context = otel_context.get_current() + baggage_entries = baggage.get_all(context=current_context) + + assert baggage_entries["langfuse_environment"] == "qa" + + child = langfuse_client.start_observation(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.ENVIRONMENT, "qa" + ) + + def test_environment_overrides_client_default_inside_context( + self, langfuse_client, memory_exporter + ): + """Propagated environment wins over the local client environment.""" + langfuse_client._environment = "proxy-prod" + + with propagate_attributes(environment="staging"): + with langfuse_client.start_as_current_observation(name="request-span"): + pass + + with langfuse_client.start_as_current_observation(name="local-span"): + pass + + request_span = self.get_span_by_name(memory_exporter, "request-span") + self.verify_span_attribute( + request_span, LangfuseOtelSpanAttributes.ENVIRONMENT, "staging" + ) + + local_span = self.get_span_by_name(memory_exporter, "local-span") + self.verify_span_attribute( + local_span, LangfuseOtelSpanAttributes.ENVIRONMENT, "proxy-prod" + ) + + def test_environment_baggage_overrides_client_default_after_context_attach( + self, langfuse_client, memory_exporter + ): + """Simulate cross-process extraction where caller environment beats proxy default.""" + from opentelemetry import context as otel_context + + langfuse_client._environment = "proxy-prod" + + with propagate_attributes(environment="dev", as_baggage=True): + context_with_baggage = otel_context.get_current() + + token = otel_context.attach(context_with_baggage) + try: + with langfuse_client.start_as_current_observation(name="proxy-request"): + child = langfuse_client.start_observation(name="proxy-child") + child.end() + finally: + otel_context.detach(token) + + proxy_request = self.get_span_by_name(memory_exporter, "proxy-request") + self.verify_span_attribute( + proxy_request, LangfuseOtelSpanAttributes.ENVIRONMENT, "dev" + ) + + proxy_child = self.get_span_by_name(memory_exporter, "proxy-child") + self.verify_span_attribute( + proxy_child, LangfuseOtelSpanAttributes.ENVIRONMENT, "dev" + ) + + def test_span_score_uses_propagated_environment(self, monkeypatch, langfuse_client): + """Score events created from a span use the span's resolved environment.""" + score_events = self._capture_score_events(monkeypatch, langfuse_client) + langfuse_client._environment = "proxy-prod" + + with propagate_attributes(environment="dev"): + with langfuse_client.start_as_current_observation( + name="request-span" + ) as span: + span.score(name="quality", value=1.0, data_type="NUMERIC") + + assert len(score_events) == 1 + assert score_events[0]["body"].environment == "dev" + + def test_span_score_trace_uses_propagated_environment( + self, monkeypatch, langfuse_client + ): + """Trace scores created from a span use the span's resolved environment.""" + score_events = self._capture_score_events(monkeypatch, langfuse_client) + langfuse_client._environment = "proxy-prod" + + with propagate_attributes(environment="staging"): + with langfuse_client.start_as_current_observation( + name="request-span" + ) as span: + span.score_trace(name="overall-quality", value=0.95) + + assert len(score_events) == 1 + assert score_events[0]["body"].environment == "staging" + + def test_current_score_helpers_use_propagated_environment( + self, monkeypatch, langfuse_client + ): + """Current-span and current-trace scores use the active span environment.""" + score_events = self._capture_score_events(monkeypatch, langfuse_client) + langfuse_client._environment = "proxy-prod" + + with propagate_attributes(environment="qa"): + with langfuse_client.start_as_current_observation(name="request-span"): + langfuse_client.score_current_span( + name="span-quality", value=0.9, data_type="NUMERIC" + ) + langfuse_client.score_current_trace( + name="trace-quality", value=0.8, data_type="NUMERIC" + ) + + assert [event["body"].environment for event in score_events] == ["qa", "qa"] + + def test_environment_exactly_40_chars_is_accepted( + self, langfuse_client, memory_exporter + ): + """Verify environment accepts Langfuse's 40-character public limit.""" + environment_40 = "e" * 40 + + with langfuse_client.start_as_current_observation(name="parent-span"): + with propagate_attributes(environment=environment_40): + child = langfuse_client.start_observation(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.ENVIRONMENT, environment_40 + ) + + @pytest.mark.parametrize( + "environment", + [ + "Production", + "langfuse-prod", + "prod.us", + "", + "p" * 41, + "prod\n", + "\nprod", + 123, + ], + ) + def test_invalid_environment_is_dropped( + self, langfuse_client, memory_exporter, environment + ): + """Invalid propagated environments do not set langfuse.environment.""" + with langfuse_client.start_as_current_observation(name="parent-span"): + with propagate_attributes(environment=environment): # type: ignore[arg-type] + child = langfuse_client.start_observation(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.ENVIRONMENT + ) + + class TestPropagateAttributesVersion(TestPropagateAttributesBase): """Tests for version parameter propagation."""