Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 100
max-line-length = 100
extend-ignore = E203,E701
59 changes: 59 additions & 0 deletions datadog_lambda/durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
# Copyright 2019 Datadog, Inc.
import logging
import re
import ujson as json

logger = logging.getLogger(__name__)

_TRACE_CHECKPOINT_PREFIX = "_datadog_"


def _parse_durable_execution_arn(arn):
"""
Expand Down Expand Up @@ -56,6 +59,62 @@ def extract_durable_function_tags(event):
VALID_DURABLE_STATUSES = {"SUCCEEDED", "FAILED", "PENDING"}


def _extract_context_from_durable_checkpoint(operation):
# Checkpoint data is written by the dd-trace-py in Datadog style
# (x-datadog-* headers). Extraction goes through the standard
# propagator.extract path, which honors DD_TRACE_PROPAGATION_STYLE_EXTRACT.
# The default extract list (datadog, tracecontext, baggage) already
# includes datadog. Customers who override the extract list MUST keep
# datadog in it.
Comment on lines +63 to +68
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document this somewhere? Do we have a workaround so compatibility doesn't depend on customer config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will put this in the public documentation. the alternative was my previous commit which introduces some unnecessary complexity.

if not isinstance(operation, dict):
return None

step_details = operation.get("StepDetails")
if not isinstance(step_details, dict):
return None

result = step_details.get("Result")
if isinstance(result, str):
try:
result = json.loads(result)
except Exception:
return None

if not isinstance(result, dict):
return None

from datadog_lambda.tracing import propagator

return propagator.extract(result)


def extract_context_from_durable_execution(event):
operations = event.get("InitialExecutionState", {}).get("Operations")
if isinstance(operations, dict):
operations = list(operations.values())
if not isinstance(operations, list) or not operations:
return None

highest = -1
best_operation = None
for operation in operations:
if not isinstance(operation, dict):
continue
name = operation.get("Name")
if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX):
continue
suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :]
try:
number = int(suffix)
except (TypeError, ValueError):
continue
if number > highest:
highest = number
best_operation = operation

return _extract_context_from_durable_checkpoint(best_operation)


def extract_durable_execution_status(response, event):
if not isinstance(event, dict) or "DurableExecutionArn" not in event:
return None
Expand Down
4 changes: 4 additions & 0 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
EventTypes,
EventSubtypes,
)
from datadog_lambda.durable import extract_context_from_durable_execution

if config.otel_enabled:
from opentelemetry.trace import set_tracer_provider
Expand Down Expand Up @@ -633,6 +634,7 @@ def extract_dd_trace_context(
global dd_trace_context
trace_context_source = None
event_source = parse_event_source(event)
context = None

if extractor is not None:
context = extract_context_custom_extractor(extractor, event, lambda_context)
Expand All @@ -654,6 +656,8 @@ def extract_dd_trace_context(
context = extract_context_from_kinesis_event(event, lambda_context)
elif event_source.equals(EventTypes.STEPFUNCTIONS):
context = extract_context_from_step_functions(event, lambda_context)
elif isinstance(event, dict) and "DurableExecutionArn" in event:
context = extract_context_from_durable_execution(event)
else:
context = extract_context_from_lambda_context(lambda_context)

Expand Down
38 changes: 38 additions & 0 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,44 @@ def test_with_w3c_trace_headers(self):
headers, {"headers": headers}
)

@with_trace_propagation_style("datadog")
def test_extracts_durable_trace_context_from_latest_checkpoint_operation_map(self):
lambda_ctx = get_mock_context()
headers = {
TraceHeader.TRACE_ID: "123",
TraceHeader.PARENT_ID: "321",
TraceHeader.SAMPLING_PRIORITY: "1",
}

event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-2:123456789012:function:demo:1/durable-execution/demo/abc",
"CheckpointToken": "token",
"InitialExecutionState": {
"Operations": {
"0": {"Type": "EXECUTION"},
"1": {
"Name": "_datadog_0",
"StepDetails": {
"Result": {
TraceHeader.TRACE_ID: "999",
TraceHeader.PARENT_ID: "888",
TraceHeader.SAMPLING_PRIORITY: "1",
}
},
},
"2": {
"Name": "_datadog_1",
"StepDetails": {"Result": headers},
},
}
},
}

ctx, source, _ = extract_dd_trace_context(event, lambda_ctx)

self.assertEqual(source, "event")
self.assertEqual(ctx, Context(trace_id=123, span_id=321, sampling_priority=1))

@with_trace_propagation_style("datadog")
def test_with_extractor_function(self):
def extractor_foo(event, context):
Expand Down
Loading