Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datadog_lambda/durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ def extract_durable_function_tags(event):
execution_name, execution_id = parsed
# Use the number of operations to determine if it's the first invocation. This is
# what the durable execution SDK does to determine the replay status.
operations = event.get("InitialExecutionState", {}).get("Operations", [])
is_first_invocation = len(operations) == 1
operations = event.get("InitialExecutionState", {}).get("Operations")
operation_count = (
len(operations) if isinstance(operations, (list, dict)) else 0
)
is_first_invocation = operation_count == 1
return {
"aws_lambda.durable_function.execution_name": execution_name,
"aws_lambda.durable_function.execution_id": execution_id,
Expand Down
242 changes: 222 additions & 20 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000"
HIGHER_64_BITS = "HIGHER_64_BITS"
LOWER_64_BITS = "LOWER_64_BITS"
_TRACE_CHECKPOINT_PREFIX = "_datadog_"


def _dsm_set_checkpoint(context_json, event_type, arn):
Expand Down Expand Up @@ -546,6 +547,121 @@ def extract_context_from_step_functions(event, lambda_context):
return extract_context_from_lambda_context(lambda_context)


def _durable_operations(event):
if not isinstance(event, dict):
return []

operations = event.get("InitialExecutionState", {}).get("Operations")
if isinstance(operations, list):
return operations
if not isinstance(operations, dict):
return []

numeric_keys = []
other_keys = []
for key, value in operations.items():
if not isinstance(value, dict):
continue
try:
numeric_keys.append((int(key), value))
except (TypeError, ValueError):
other_keys.append((str(key), value))

numeric_keys.sort(key=lambda item: item[0])
other_keys.sort(key=lambda item: item[0])
return [value for _, value in numeric_keys + other_keys]


def _extract_context_from_durable_checkpoint(operation):
if not isinstance(operation, dict):
return None

step_details = operation.get("StepDetails")
if not isinstance(step_details, dict):
return None

result = step_details.get("Result")
if isinstance(result, str):
try:
result = json.loads(result)
except Exception:
return None

if not isinstance(result, dict):
return None

return propagator.extract(result)


def _extract_context_from_durable_input_payload(operation):
if not isinstance(operation, dict):
return None

execution_details = operation.get("ExecutionDetails")
if not isinstance(execution_details, dict):
return None

input_payload = execution_details.get("InputPayload")
if isinstance(input_payload, str):
try:
input_payload = json.loads(input_payload)
except Exception:
return None

if not isinstance(input_payload, dict):
return None

headers = input_payload.get("headers")
if isinstance(headers, dict):
return propagator.extract(headers)

dd_data = input_payload.get("_datadog")
if isinstance(dd_data, dict):
return propagator.extract(dd_data)

return None


def extract_context_from_durable_execution(event):
if not isinstance(event, dict):
return None
if not isinstance(event.get("DurableExecutionArn"), str):
return None

operations = _durable_operations(event)
if not operations:
return None

best_context = None
best_number = -1
for operation in operations:
if not isinstance(operation, dict):
continue
name = operation.get("Name")
if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX):
continue
suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :]
try:
number = int(suffix)
except (TypeError, ValueError):
continue
if number < best_number:
continue
context = _extract_context_from_durable_checkpoint(operation)
if _is_context_complete(context):
best_context = context
best_number = number

if best_context is not None:
return best_context

upstream_context = _extract_context_from_durable_input_payload(operations[0])
if _is_context_complete(upstream_context):
return upstream_context

return None


def extract_context_custom_extractor(extractor, event, lambda_context):
"""
Extract Datadog trace context using a custom trace extractor function
Expand Down Expand Up @@ -633,29 +749,34 @@ def extract_dd_trace_context(
global dd_trace_context
trace_context_source = None
event_source = parse_event_source(event)
context = None

if extractor is not None:
context = extract_context_custom_extractor(extractor, event, lambda_context)
elif isinstance(event, (set, dict)) and "request" in event:
context = extract_context_from_request_header_or_context(
event, lambda_context, event_source
)
elif isinstance(event, (set, dict)) and "headers" in event:
context = extract_context_from_http_event_or_context(
event, lambda_context, event_source, decode_authorizer_context
)
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
context = extract_context_from_sqs_or_sns_event_or_context(
event, lambda_context, event_source
)
elif event_source.equals(EventTypes.EVENTBRIDGE):
context = extract_context_from_eventbridge_event(event, lambda_context)
elif event_source.equals(EventTypes.KINESIS):
context = extract_context_from_kinesis_event(event, lambda_context)
elif event_source.equals(EventTypes.STEPFUNCTIONS):
context = extract_context_from_step_functions(event, lambda_context)
else:
context = extract_context_from_lambda_context(lambda_context)
elif isinstance(event, (set, dict)) and "DurableExecutionArn" in event:
context = extract_context_from_durable_execution(event)

if context is None:
if isinstance(event, (set, dict)) and "request" in event:
context = extract_context_from_request_header_or_context(
event, lambda_context, event_source
)
elif isinstance(event, (set, dict)) and "headers" in event:
context = extract_context_from_http_event_or_context(
event, lambda_context, event_source, decode_authorizer_context
)
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
context = extract_context_from_sqs_or_sns_event_or_context(
event, lambda_context, event_source
)
elif event_source.equals(EventTypes.EVENTBRIDGE):
context = extract_context_from_eventbridge_event(event, lambda_context)
elif event_source.equals(EventTypes.KINESIS):
context = extract_context_from_kinesis_event(event, lambda_context)
elif event_source.equals(EventTypes.STEPFUNCTIONS):
context = extract_context_from_step_functions(event, lambda_context)
else:
context = extract_context_from_lambda_context(lambda_context)

if _is_context_complete(context):
logger.debug("Extracted Datadog trace context from event or context")
Expand Down Expand Up @@ -798,12 +919,93 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
)


def _durable_execution_start_ns(event):
operations = _durable_operations(event)
if not operations:
return None

first_operation = operations[0]
if not isinstance(first_operation, dict):
return None

start_timestamp = first_operation.get("StartTimestamp")
if isinstance(start_timestamp, str):
start_timestamp = start_timestamp.strip()

try:
start_ms = int(start_timestamp)
except (TypeError, ValueError):
try:
start_ms = int(float(start_timestamp))
except (TypeError, ValueError):
return None

return start_ms * 1000000


def create_inferred_span_from_durable_execution_event(
event, context, durable_function_tags
):
if not durable_function_tags:
return None
if durable_function_tags.get("aws_lambda.durable_function.first_invocation") != "true":
return None

inferred_span_start_ns = _durable_execution_start_ns(event)
if inferred_span_start_ns is None:
return None

service_name = os.environ.get(
"DD_DURABLE_EXECUTION_SERVICE", "aws.durable-execution"
)
execution_name = durable_function_tags.get(
"aws_lambda.durable_function.execution_name"
)
execution_id = durable_function_tags.get("aws_lambda.durable_function.execution_id")
durable_execution_arn = event.get("DurableExecutionArn")

tags = {
"operation_name": "aws.durable.execution_init",
"resource_names": execution_name,
"request_id": context.aws_request_id if context else None,
"service": service_name,
"service.name": service_name,
"span.type": "serverless",
"resource.name": execution_name,
"span.kind": "server",
"durable.execution_arn": durable_execution_arn,
"durable.execution_name": execution_name,
"durable.execution_id": execution_id,
}
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async")

tracer.set_tags(_dd_origin)
span = tracer.trace(
"aws.durable.execution_init",
service=service_name,
resource=execution_name,
span_type="serverless",
)
if span:
span.set_tags(tags)
span.set_metric(InferredSpanInfo.METRIC, 1.0)
span.start_ns = inferred_span_start_ns
return span


def create_inferred_span(
event,
context,
event_source: _EventSource = None,
decode_authorizer_context: bool = True,
durable_function_tags=None,
):
if durable_function_tags:
logger.debug("Durable execution event detected. Inferring a span")
return create_inferred_span_from_durable_execution_event(
event, context, durable_function_tags
)

if event_source is None:
event_source = parse_event_source(event)
try:
Expand Down
6 changes: 5 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ def _before(self, event, context):
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
if config.make_inferred_span:
self.inferred_span = create_inferred_span(
event, context, event_source, config.decode_authorizer_context
event,
context,
event_source,
config.decode_authorizer_context,
self.durable_function_tags,
)

if config.appsec_enabled:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ def test_sets_first_invocation_false_when_multiple_operations(self):
},
)

def test_sets_first_invocation_false_when_operations_is_a_map(self):
event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"CheckpointToken": "some-token",
"InitialExecutionState": {
"Operations": {
"0": {"Type": "EXECUTION"},
"1": {"Type": "STEP"},
}
},
}
result = extract_durable_function_tags(event)
self.assertEqual(
result,
{
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "false",
},
)

def test_returns_empty_dict_for_regular_lambda_event(self):
event = {
"body": '{"key": "value"}',
Expand Down
Loading
Loading