-
Notifications
You must be signed in to change notification settings - Fork 301
feat(tracing): propagate environment attributes #1726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2603852
a9c8ab5
a398d3b
47d75ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,11 @@ | ||
| """Attribute propagation utilities for Langfuse OpenTelemetry integration. | ||
| This module provides the `propagate_attributes` context manager for setting trace-level | ||
| attributes (user_id, session_id, metadata) that automatically propagate to all child spans | ||
| within the context. | ||
| attributes (user_id, session_id, metadata, environment, etc.) that automatically | ||
| propagate to all child spans within the context. | ||
| """ | ||
|
|
||
| import re | ||
| from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast | ||
|
|
||
| from opentelemetry import ( | ||
|
|
@@ -36,6 +37,7 @@ | |
| "version", | ||
| "tags", | ||
| "trace_name", | ||
| "environment", | ||
| ] | ||
|
|
||
| InternalPropagatedKeys = Literal[ | ||
|
|
@@ -55,6 +57,7 @@ | |
| "version", | ||
| "tags", | ||
| "trace_name", | ||
| "environment", | ||
| "experiment_id", | ||
| "experiment_name", | ||
| "experiment_metadata", | ||
|
|
@@ -99,14 +102,16 @@ def propagate_attributes( | |
| version: Optional[str] = None, | ||
| tags: Optional[List[str]] = None, | ||
| trace_name: Optional[str] = None, | ||
| environment: Optional[str] = None, | ||
| as_baggage: bool = False, | ||
| ) -> _AgnosticContextManager[Any]: | ||
| """Propagate trace-level attributes to all spans created within this context. | ||
| This context manager sets attributes on the currently active span AND automatically | ||
| propagates them to all new child spans created within the context. This is the | ||
| recommended way to set trace-level attributes like user_id, session_id, and metadata | ||
| dimensions that should be consistently applied across all observations in a trace. | ||
| recommended way to set trace-level attributes like user_id, session_id, | ||
| environment, and metadata dimensions that should be consistently applied across | ||
| all observations in a trace. | ||
| **IMPORTANT**: Call this as early as possible within your trace/workflow. Only the | ||
| currently active span and spans created after entering this context will have these | ||
|
|
@@ -134,9 +139,19 @@ def propagate_attributes( | |
| tags: List of tags to categorize the group of observations | ||
| trace_name: Name to assign to the trace. Must be US-ASCII string, ≤200 characters. | ||
| Use this to set a consistent trace name for all spans created within this context. | ||
| environment: Langfuse environment to assign to spans created in this context. | ||
| Must be a lowercase alphanumeric string with optional hyphens or underscores, | ||
| must be ≤40 characters, and must not start with "langfuse". This maps to | ||
| the first-class `langfuse.environment` attribute, not to trace metadata. | ||
| Use it for request-scoped environments, for example when one shared proxy | ||
| handles calls from dev, staging, qa, and prod. A propagated environment | ||
| takes precedence over the local client default configured via | ||
| `Langfuse(environment=...)` or `LANGFUSE_TRACING_ENVIRONMENT` for spans | ||
| created while this propagation context is active. | ||
| as_baggage: If True, propagates attributes using OpenTelemetry baggage for | ||
| cross-process/service propagation. **Security warning**: When enabled, | ||
| attribute values are added to HTTP headers on ALL outbound requests. | ||
| This includes `environment` as the `langfuse_environment` baggage entry. | ||
| Only enable if values are safe to transmit via HTTP headers and you need | ||
| cross-service tracing. Default: False. | ||
|
|
@@ -156,11 +171,12 @@ def propagate_attributes( | |
| with langfuse.propagate_attributes( | ||
| user_id="user_123", | ||
| session_id="session_abc", | ||
| metadata={"experiment": "variant_a", "environment": "production"} | ||
| environment="production", | ||
| metadata={"experiment": "variant_a"} | ||
| ): | ||
| # All spans created here will have user_id, session_id, and metadata | ||
| # All spans created here will have user_id, session_id, environment, and metadata | ||
| with langfuse.start_observation(name="llm_call") as llm_span: | ||
| # This span inherits: user_id, session_id, experiment, environment | ||
| # This span inherits user_id, session_id, environment, and experiment metadata | ||
| ... | ||
| with langfuse.start_generation(name="completion") as gen: | ||
|
|
@@ -193,22 +209,27 @@ def propagate_attributes( | |
| with langfuse.propagate_attributes( | ||
| user_id="user_123", | ||
| session_id="session_abc", | ||
| environment="staging", | ||
| as_baggage=True # Propagate via HTTP headers | ||
| ): | ||
| # Make HTTP request to Service B | ||
| response = requests.get("https://service-b.example.com/api") | ||
| # user_id and session_id are now in HTTP headers | ||
| # user_id, session_id, and environment are now in HTTP headers | ||
| # Service B - downstream service | ||
| # OpenTelemetry will automatically extract baggage from HTTP headers | ||
| # and propagate to spans in Service B | ||
| # and propagate attributes to spans in Service B. If Service B has a local | ||
| # Langfuse environment configured, the propagated environment wins for | ||
| # spans created within this context. | ||
| ``` | ||
| Note: | ||
| - **Validation**: Attribute values (user_id, session_id, version, tags, | ||
| trace_name) must be strings ≤200 characters. Metadata values are | ||
| coerced to strings before the 200 character limit is applied. Invalid | ||
| values will be dropped with a warning logged. | ||
| trace_name) must be strings ≤200 characters. Environment must also match | ||
| Langfuse's environment format: lowercase alphanumeric with optional | ||
| hyphens or underscores, must be ≤40 characters, and it must not start with "langfuse". Metadata | ||
| values are coerced to strings before the 200 character limit is applied. | ||
| Invalid values will be dropped with a warning logged. | ||
| - **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood, | ||
| making it compatible with other OTel-instrumented libraries. | ||
|
|
@@ -222,6 +243,7 @@ def propagate_attributes( | |
| version=version, | ||
| tags=tags, | ||
| trace_name=trace_name, | ||
| environment=environment, | ||
| as_baggage=as_baggage, | ||
| ) | ||
|
|
||
|
|
@@ -235,6 +257,7 @@ def _propagate_attributes( | |
| version: Optional[str] = None, | ||
| tags: Optional[List[str]] = None, | ||
| trace_name: Optional[str] = None, | ||
| environment: Optional[str] = None, | ||
| as_baggage: bool = False, | ||
| experiment: Optional[PropagatedExperimentAttributes] = None, | ||
| ) -> Generator[Any, Any, Any]: | ||
|
|
@@ -247,6 +270,7 @@ def _propagate_attributes( | |
| "version": version, | ||
| "tags": tags, | ||
| "trace_name": trace_name, | ||
| "environment": environment, | ||
| } | ||
|
|
||
| propagated_metadata_attributes: Dict[str, Optional[Dict[str, Any]]] = { | ||
|
|
@@ -327,6 +351,17 @@ def _get_propagated_attributes_from_context( | |
| span_key = _get_span_key_from_baggage_key(baggage_key) | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this necessary? I think we can drop as we don't revalidate on read, only validate on write to baggage / context |
||
| if span_key: | ||
| if span_key == LangfuseOtelSpanAttributes.ENVIRONMENT: | ||
| validated_environment = _validate_environment_value( | ||
| value=baggage_value | ||
| ) | ||
|
|
||
| if validated_environment is None: | ||
| continue | ||
|
|
||
| propagated_attributes[span_key] = validated_environment | ||
| continue | ||
|
|
||
| propagated_attributes[span_key] = ( | ||
| baggage_value | ||
| if isinstance(baggage_value, (str, list)) | ||
|
|
@@ -341,6 +376,17 @@ def _get_propagated_attributes_from_context( | |
| if value is None: | ||
| continue | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this necessary? I think we can drop as we don't revalidate on read, only validate on write to baggage / context |
||
|
|
||
| if key == "environment": | ||
| validated_environment = _validate_environment_value(value=value) | ||
|
|
||
| if validated_environment is None: | ||
| continue | ||
|
|
||
| propagated_attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = ( | ||
| validated_environment | ||
| ) | ||
| continue | ||
|
|
||
| if isinstance(value, dict): | ||
| # Handle metadata | ||
| span_key = _get_propagated_span_key(key) | ||
|
|
@@ -435,6 +481,9 @@ def _set_propagated_attribute( | |
| def _validate_propagated_value( | ||
| *, value: Any, key: str | ||
| ) -> Optional[Union[str, List[str]]]: | ||
| if key == "environment": | ||
| return _validate_environment_value(value=value) | ||
|
|
||
| if isinstance(value, list): | ||
| validated_values = [ | ||
| v for v in value if _validate_string_value(key=key, value=v) | ||
|
|
@@ -473,6 +522,35 @@ def _validate_string_value(*, value: str, key: str) -> bool: | |
| return True | ||
|
|
||
|
|
||
|
hassiebp marked this conversation as resolved.
|
||
| _ENVIRONMENT_VALUE_PATTERN = re.compile(r"^(?!langfuse)[a-z0-9_-]+$") | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this could be a simple one check for the environment up in _validate_propagated_value in the key === environment condition to not start with "langfuse" and return None early if fails else fall through to the other checks |
||
|
|
||
| def _validate_environment_value(*, value: Any) -> Optional[str]: | ||
| key = "environment" | ||
|
|
||
| if not isinstance(value, str): | ||
| langfuse_logger.warning( # type: ignore | ||
| f"Propagated attribute '{key}' value is not a string. Dropping value." | ||
| ) | ||
| return None | ||
|
|
||
| if len(value) > 40: | ||
| langfuse_logger.warning( | ||
| f"Propagated attribute '{key}' value is over 40 characters ({len(value)} chars). Dropping value." | ||
| ) | ||
| return None | ||
|
|
||
| if not _ENVIRONMENT_VALUE_PATTERN.fullmatch(value): | ||
| langfuse_logger.warning( | ||
| "Propagated attribute 'environment' must be a lowercase alphanumeric " | ||
| "string with optional hyphens or underscores and must not start with " | ||
| "'langfuse'. Dropping value." | ||
| ) | ||
| return None | ||
|
|
||
| return value | ||
|
|
||
|
|
||
| def _get_propagated_context_key(key: str) -> str: | ||
| return f"langfuse.propagated.{key}" | ||
|
|
||
|
|
@@ -542,6 +620,7 @@ def _get_propagated_span_key(key: str) -> str: | |
| "version": LangfuseOtelSpanAttributes.VERSION, | ||
| "tags": LangfuseOtelSpanAttributes.TRACE_TAGS, | ||
| "trace_name": LangfuseOtelSpanAttributes.TRACE_NAME, | ||
| "environment": LangfuseOtelSpanAttributes.ENVIRONMENT, | ||
| "metadata": LangfuseOtelSpanAttributes.TRACE_METADATA, | ||
| "experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID, | ||
| "experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.