Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datadog_lambda/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def _resolve_env(self, key, default=None, cast=None, depends_on_tracing=False):
data_streams_enabled = _get_env(
"DD_DATA_STREAMS_ENABLED", "false", as_bool, depends_on_tracing=True
)
# EventBridge bus name used as the DSM `exchange` tag. The bus name is not
# present in the inbound event, so it must be provided explicitly to allow
# the consume checkpoint to pair with the EventBridge produce checkpoint.
dsm_exchange_name = _get_env("DD_DSM_EXCHANGE_NAME")
appsec_enabled = _get_env("DD_APPSEC_ENABLED", "false", as_bool)
sca_enabled = _get_env("DD_APPSEC_SCA_ENABLED", "false", as_bool)

Expand Down
64 changes: 62 additions & 2 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,43 @@ def _dsm_set_checkpoint(context_json, event_type, arn):
)


def _dsm_set_eventbridge_checkpoint(context_json, detail_type):
"""Set a DSM consume checkpoint for an EventBridge event.

Unlike the SQS/SNS/Kinesis helper, the EventBridge edge tags include an
`exchange` tag (the bus name) to mirror the produce-side tags so the
consume node pairs with the produce node. The bus name is not present in
the inbound event, so it is sourced from `DD_DSM_EXCHANGE_NAME` when set.
The public `set_consume_checkpoint` helper cannot emit an `exchange` tag,
so the lower-level processor API is used directly.
"""
if not config.data_streams_enabled:
return

if not detail_type:
return

try:
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64

processor = data_streams_processor()
if not processor:
return

carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
processor.decode_pathway_b64(carrier_get(PROPAGATION_KEY_BASE_64))

tags = ["direction:in", "topic:" + detail_type, "type:eventbridge"]
if config.dsm_exchange_name:
tags.append("exchange:" + config.dsm_exchange_name)
processor.set_checkpoint(tags)
except Exception as e:
logger.debug(
f"DSM:Failed to set consume checkpoint for eventbridge {detail_type}: {e}"
)


def _convert_xray_trace_id(xray_trace_id):
"""
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
Expand Down Expand Up @@ -353,12 +390,32 @@ def _extract_context_from_eventbridge_sqs_event(event):
This is only possible if first record in `Records` contains a
`body` field which contains the EventBridge `detail` as a JSON string.
"""
first_record = event.get("Records")[0]
records = event.get("Records")
first_record = records[0]
body_str = first_record.get("body")
body = json.loads(body_str)
detail = body.get("detail")
# If `detail` is missing this is not an EventBridge -> SQS event; raising
# here lets the caller fall back to the regular SQS extraction path before
# any DSM checkpoint is set, avoiding double counting for plain SQS events.
dd_context = detail.get("_datadog")

# The event has been confirmed as EventBridge -> SQS. Set a consume
# checkpoint for every record in the batch. The message is consumed from
# the SQS queue, so it follows SQS conventions (type:sqs, topic:queue ARN).
if config.data_streams_enabled:
for record in records:
try:
record_body = json.loads(record.get("body"))
record_context = (record_body.get("detail") or {}).get("_datadog")
_dsm_set_checkpoint(
record_context, "sqs", record.get("eventSourceARN", "")
)
Comment on lines +411 to +413

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid double-counting EventBridge SQS checkpoints

When DSM is enabled, this emits checkpoints as soon as the SQS body has a detail field, but the helper can still return an incomplete context when the EventBridge payload has no _datadog trace headers or only DSM propagation. In that case extract_context_from_sqs_or_sns_event_or_context does not return from the EventBridge branch and falls through to the regular SQS path, which emits another _dsm_set_checkpoint(None, "sqs", source_arn), so plain/unenriched EventBridge-to-SQS messages are counted twice in DSM.

Useful? React with 👍 / 👎.

except Exception:
logger.debug(
"Failed to set DSM checkpoint for an EventBridge to SQS record."
)

if is_step_function_event(dd_context):
try:
return extract_context_from_step_functions(dd_context, None)
Expand All @@ -379,8 +436,11 @@ def extract_context_from_eventbridge_event(event, lambda_context):
that header.
"""
try:
detail = event.get("detail")
detail = event.get("detail") or {}
dd_context = detail.get("_datadog")

_dsm_set_eventbridge_checkpoint(dd_context, event.get("detail-type"))

if not dd_context:
return extract_context_from_lambda_context(lambda_context)

Expand Down
153 changes: 153 additions & 0 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
_dsm_set_checkpoint,
extract_context_from_kinesis_event,
extract_context_from_sqs_or_sns_event_or_context,
extract_context_from_eventbridge_event,
)

from datadog_lambda.trigger import parse_event_source
Expand Down Expand Up @@ -3646,3 +3647,155 @@ def test_kinesis_data_streams_disabled(self):
arn = "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"

_dsm_set_checkpoint(context_json, event_type, arn)

# EVENTBRIDGE -> SQS TESTS

@staticmethod
def _eventbridge_sqs_record(queue_arn, pathway_ctx):
body = {
"detail-type": "MyDetailType",
"source": "my.event.source",
"detail": {
"_datadog": {
# Complete trace context so the extractor returns early and
# does not fall through to the regular SQS path.
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx-base64": pathway_ctx,
}
},
}
return {
"eventSourceARN": queue_arn,
"eventSource": "aws:sqs",
"body": json.dumps(body),
}

def test_eventbridge_sqs_context_propagated(self):
queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]}

extract_context_from_sqs_or_sns_event_or_context(
event, self.lambda_context, parse_event_source(event)
)

# EventBridge -> SQS is consumed from the queue, so it uses SQS tags.
self.assertEqual(self.mock_checkpoint.call_count, 1)
args, _ = self.mock_checkpoint.call_args
self.assertEqual(args[0], "sqs")
self.assertEqual(args[1], queue_arn)
carrier_get = args[2]
self.assertEqual(carrier_get("dd-pathway-ctx-base64"), "12345")

def test_eventbridge_sqs_checkpoints_all_records(self):
arn1 = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
arn2 = "arn:aws:sqs:us-east-1:123456789012:eb-queue-2"
event = {
"Records": [
self._eventbridge_sqs_record(arn1, "ctx-1"),
self._eventbridge_sqs_record(arn2, "ctx-2"),
]
}

extract_context_from_sqs_or_sns_event_or_context(
event, self.lambda_context, parse_event_source(event)
)

self.assertEqual(self.mock_checkpoint.call_count, 2)
first_args, _ = self.mock_checkpoint.call_args_list[0]
second_args, _ = self.mock_checkpoint.call_args_list[1]
self.assertEqual((first_args[0], first_args[1]), ("sqs", arn1))
self.assertEqual(first_args[2]("dd-pathway-ctx-base64"), "ctx-1")
self.assertEqual((second_args[0], second_args[1]), ("sqs", arn2))
self.assertEqual(second_args[2]("dd-pathway-ctx-base64"), "ctx-2")

@patch("datadog_lambda.config.Config.data_streams_enabled", False)
def test_eventbridge_sqs_data_streams_disabled(self):
queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]}

extract_context_from_sqs_or_sns_event_or_context(
event, self.lambda_context, parse_event_source(event)
)

self.mock_checkpoint.assert_not_called()


class TestEventBridgeDSMLogic(unittest.TestCase):
def setUp(self):
self.lambda_context = get_mock_context()
self.mock_processor = Mock()
processor_patcher = patch(
"ddtrace.internal.datastreams.data_streams_processor",
return_value=self.mock_processor,
)
processor_patcher.start()
self.addCleanup(processor_patcher.stop)
config_patcher = patch(
"datadog_lambda.config.Config.data_streams_enabled", True
)
config_patcher.start()
self.addCleanup(config_patcher.stop)

@staticmethod
def _eventbridge_event(detail_type="MyDetailType", pathway_ctx="12345"):
return {
"detail-type": detail_type,
"source": "my.event.source",
"detail": {"_datadog": {"dd-pathway-ctx-base64": pathway_ctx}},
}

def test_eventbridge_context_propagated(self):
event = self._eventbridge_event()

extract_context_from_eventbridge_event(event, self.lambda_context)

self.mock_processor.decode_pathway_b64.assert_called_once_with("12345")
self.mock_processor.set_checkpoint.assert_called_once()
(tags,), _ = self.mock_processor.set_checkpoint.call_args
self.assertIn("direction:in", tags)
self.assertIn("type:eventbridge", tags)
self.assertIn("topic:MyDetailType", tags)
self.assertFalse(any(t.startswith("exchange:") for t in tags))

@patch("datadog_lambda.config.Config.dsm_exchange_name", "my-event-bus")
def test_eventbridge_exchange_tag_from_env(self):
event = self._eventbridge_event()

extract_context_from_eventbridge_event(event, self.lambda_context)

(tags,), _ = self.mock_processor.set_checkpoint.call_args
self.assertIn("exchange:my-event-bus", tags)
self.assertIn("topic:MyDetailType", tags)
self.assertIn("type:eventbridge", tags)

def test_eventbridge_no_detail_type_skips_checkpoint(self):
event = self._eventbridge_event(detail_type=None)

extract_context_from_eventbridge_event(event, self.lambda_context)

self.mock_processor.set_checkpoint.assert_not_called()

def test_eventbridge_no_dd_context_still_checkpoints(self):
event = {"detail-type": "MyDetailType", "detail": {}}

extract_context_from_eventbridge_event(event, self.lambda_context)

self.mock_processor.decode_pathway_b64.assert_called_once_with(None)
self.mock_processor.set_checkpoint.assert_called_once()

def test_eventbridge_missing_detail_still_checkpoints(self):
event = {"detail-type": "MyDetailType"}

extract_context_from_eventbridge_event(event, self.lambda_context)

self.mock_processor.set_checkpoint.assert_called_once()

@patch("datadog_lambda.config.Config.data_streams_enabled", False)
def test_eventbridge_data_streams_disabled(self):
event = self._eventbridge_event()

extract_context_from_eventbridge_event(event, self.lambda_context)

self.mock_processor.set_checkpoint.assert_not_called()
Loading