diff --git a/datadog_lambda/config.py b/datadog_lambda/config.py index ce4924af..a4e8a817 100644 --- a/datadog_lambda/config.py +++ b/datadog_lambda/config.py @@ -89,6 +89,10 @@ def _resolve_env(self, key, default=None, cast=None, depends_on_tracing=False): data_streams_enabled = _get_env( "DD_DATA_STREAMS_ENABLED", "false", as_bool, depends_on_tracing=True ) + # EventBridge bus name used as the DSM `exchange` tag. The bus name is not + # present in the inbound event, so it must be provided explicitly to allow + # the consume checkpoint to pair with the EventBridge produce checkpoint. + dsm_exchange_name = _get_env("DD_DSM_EXCHANGE_NAME") appsec_enabled = _get_env("DD_APPSEC_ENABLED", "false", as_bool) sca_enabled = _get_env("DD_APPSEC_SCA_ENABLED", "false", as_bool) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index b3f79a96..ae0994a8 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -82,6 +82,43 @@ def _dsm_set_checkpoint(context_json, event_type, arn): ) +def _dsm_set_eventbridge_checkpoint(context_json, detail_type): + """Set a DSM consume checkpoint for an EventBridge event. + + Unlike the SQS/SNS/Kinesis helper, the EventBridge edge tags include an + `exchange` tag (the bus name) to mirror the produce-side tags so the + consume node pairs with the produce node. The bus name is not present in + the inbound event, so it is sourced from `DD_DSM_EXCHANGE_NAME` when set. + The public `set_consume_checkpoint` helper cannot emit an `exchange` tag, + so the lower-level processor API is used directly. + """ + if not config.data_streams_enabled: + return + + if not detail_type: + return + + try: + from ddtrace.internal.datastreams import data_streams_processor + from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64 + + processor = data_streams_processor() + if not processor: + return + + carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731 + processor.decode_pathway_b64(carrier_get(PROPAGATION_KEY_BASE_64)) + + tags = ["direction:in", "topic:" + detail_type, "type:eventbridge"] + if config.dsm_exchange_name: + tags.append("exchange:" + config.dsm_exchange_name) + processor.set_checkpoint(tags) + except Exception as e: + logger.debug( + f"DSM:Failed to set consume checkpoint for eventbridge {detail_type}: {e}" + ) + + def _convert_xray_trace_id(xray_trace_id): """ Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int). @@ -353,12 +390,32 @@ def _extract_context_from_eventbridge_sqs_event(event): This is only possible if first record in `Records` contains a `body` field which contains the EventBridge `detail` as a JSON string. """ - first_record = event.get("Records")[0] + records = event.get("Records") + first_record = records[0] body_str = first_record.get("body") body = json.loads(body_str) detail = body.get("detail") + # If `detail` is missing this is not an EventBridge -> SQS event; raising + # here lets the caller fall back to the regular SQS extraction path before + # any DSM checkpoint is set, avoiding double counting for plain SQS events. dd_context = detail.get("_datadog") + # The event has been confirmed as EventBridge -> SQS. Set a consume + # checkpoint for every record in the batch. The message is consumed from + # the SQS queue, so it follows SQS conventions (type:sqs, topic:queue ARN). + if config.data_streams_enabled: + for record in records: + try: + record_body = json.loads(record.get("body")) + record_context = (record_body.get("detail") or {}).get("_datadog") + _dsm_set_checkpoint( + record_context, "sqs", record.get("eventSourceARN", "") + ) + except Exception: + logger.debug( + "Failed to set DSM checkpoint for an EventBridge to SQS record." + ) + if is_step_function_event(dd_context): try: return extract_context_from_step_functions(dd_context, None) @@ -379,8 +436,11 @@ def extract_context_from_eventbridge_event(event, lambda_context): that header. """ try: - detail = event.get("detail") + detail = event.get("detail") or {} dd_context = detail.get("_datadog") + + _dsm_set_eventbridge_checkpoint(dd_context, event.get("detail-type")) + if not dd_context: return extract_context_from_lambda_context(lambda_context) diff --git a/tests/test_tracing.py b/tests/test_tracing.py index fc18f6e5..ae38702e 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -46,6 +46,7 @@ _dsm_set_checkpoint, extract_context_from_kinesis_event, extract_context_from_sqs_or_sns_event_or_context, + extract_context_from_eventbridge_event, ) from datadog_lambda.trigger import parse_event_source @@ -3646,3 +3647,155 @@ def test_kinesis_data_streams_disabled(self): arn = "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" _dsm_set_checkpoint(context_json, event_type, arn) + + # EVENTBRIDGE -> SQS TESTS + + @staticmethod + def _eventbridge_sqs_record(queue_arn, pathway_ctx): + body = { + "detail-type": "MyDetailType", + "source": "my.event.source", + "detail": { + "_datadog": { + # Complete trace context so the extractor returns early and + # does not fall through to the regular SQS path. + "x-datadog-trace-id": "12345", + "x-datadog-parent-id": "67890", + "x-datadog-sampling-priority": "1", + "dd-pathway-ctx-base64": pathway_ctx, + } + }, + } + return { + "eventSourceARN": queue_arn, + "eventSource": "aws:sqs", + "body": json.dumps(body), + } + + def test_eventbridge_sqs_context_propagated(self): + queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue" + event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]} + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + # EventBridge -> SQS is consumed from the queue, so it uses SQS tags. + self.assertEqual(self.mock_checkpoint.call_count, 1) + args, _ = self.mock_checkpoint.call_args + self.assertEqual(args[0], "sqs") + self.assertEqual(args[1], queue_arn) + carrier_get = args[2] + self.assertEqual(carrier_get("dd-pathway-ctx-base64"), "12345") + + def test_eventbridge_sqs_checkpoints_all_records(self): + arn1 = "arn:aws:sqs:us-east-1:123456789012:eb-queue" + arn2 = "arn:aws:sqs:us-east-1:123456789012:eb-queue-2" + event = { + "Records": [ + self._eventbridge_sqs_record(arn1, "ctx-1"), + self._eventbridge_sqs_record(arn2, "ctx-2"), + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 2) + first_args, _ = self.mock_checkpoint.call_args_list[0] + second_args, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual((first_args[0], first_args[1]), ("sqs", arn1)) + self.assertEqual(first_args[2]("dd-pathway-ctx-base64"), "ctx-1") + self.assertEqual((second_args[0], second_args[1]), ("sqs", arn2)) + self.assertEqual(second_args[2]("dd-pathway-ctx-base64"), "ctx-2") + + @patch("datadog_lambda.config.Config.data_streams_enabled", False) + def test_eventbridge_sqs_data_streams_disabled(self): + queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue" + event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]} + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.mock_checkpoint.assert_not_called() + + +class TestEventBridgeDSMLogic(unittest.TestCase): + def setUp(self): + self.lambda_context = get_mock_context() + self.mock_processor = Mock() + processor_patcher = patch( + "ddtrace.internal.datastreams.data_streams_processor", + return_value=self.mock_processor, + ) + processor_patcher.start() + self.addCleanup(processor_patcher.stop) + config_patcher = patch( + "datadog_lambda.config.Config.data_streams_enabled", True + ) + config_patcher.start() + self.addCleanup(config_patcher.stop) + + @staticmethod + def _eventbridge_event(detail_type="MyDetailType", pathway_ctx="12345"): + return { + "detail-type": detail_type, + "source": "my.event.source", + "detail": {"_datadog": {"dd-pathway-ctx-base64": pathway_ctx}}, + } + + def test_eventbridge_context_propagated(self): + event = self._eventbridge_event() + + extract_context_from_eventbridge_event(event, self.lambda_context) + + self.mock_processor.decode_pathway_b64.assert_called_once_with("12345") + self.mock_processor.set_checkpoint.assert_called_once() + (tags,), _ = self.mock_processor.set_checkpoint.call_args + self.assertIn("direction:in", tags) + self.assertIn("type:eventbridge", tags) + self.assertIn("topic:MyDetailType", tags) + self.assertFalse(any(t.startswith("exchange:") for t in tags)) + + @patch("datadog_lambda.config.Config.dsm_exchange_name", "my-event-bus") + def test_eventbridge_exchange_tag_from_env(self): + event = self._eventbridge_event() + + extract_context_from_eventbridge_event(event, self.lambda_context) + + (tags,), _ = self.mock_processor.set_checkpoint.call_args + self.assertIn("exchange:my-event-bus", tags) + self.assertIn("topic:MyDetailType", tags) + self.assertIn("type:eventbridge", tags) + + def test_eventbridge_no_detail_type_skips_checkpoint(self): + event = self._eventbridge_event(detail_type=None) + + extract_context_from_eventbridge_event(event, self.lambda_context) + + self.mock_processor.set_checkpoint.assert_not_called() + + def test_eventbridge_no_dd_context_still_checkpoints(self): + event = {"detail-type": "MyDetailType", "detail": {}} + + extract_context_from_eventbridge_event(event, self.lambda_context) + + self.mock_processor.decode_pathway_b64.assert_called_once_with(None) + self.mock_processor.set_checkpoint.assert_called_once() + + def test_eventbridge_missing_detail_still_checkpoints(self): + event = {"detail-type": "MyDetailType"} + + extract_context_from_eventbridge_event(event, self.lambda_context) + + self.mock_processor.set_checkpoint.assert_called_once() + + @patch("datadog_lambda.config.Config.data_streams_enabled", False) + def test_eventbridge_data_streams_disabled(self): + event = self._eventbridge_event() + + extract_context_from_eventbridge_event(event, self.lambda_context) + + self.mock_processor.set_checkpoint.assert_not_called()