From e13242ad28cb3b3a4cdc15ddfc4f30cbce5a7df9 Mon Sep 17 00:00:00 2001 From: dkay Date: Tue, 16 Jun 2026 19:00:04 -0500 Subject: [PATCH] sandbox,server: surface per-path L7 escalations as fresh draft chunks Post-approval L7 (HTTP method/path) denials were vanishing instead of reaching a reviewer. Wire them through to a fresh, reviewable draft chunk while keeping straggler-flush noise suppressed. - sandbox: wire L7 relay denials into the denial aggregator. L7EvalContext gains a denial_tx channel; every L7 deny (request-log and forward paths) emits a DenialEvent carrying the observed method/path, feeding the same observation-driven analysis as connect-stage denials so mechanistic proposals can be path-aware. - server persistence: clear dedup_key when a chunk is decided (sqlite + postgres). New observations for the same host|port|binary then surface as a fresh pending chunk instead of folding their hit_count, through the status-blind submit upsert, into a row the reviewer already acted on. - server: make the post-approval mechanistic self-reject sweep L7-evidence-aware. A resubmit asking for nothing beyond the union of the approved grants for that endpoint still self-rejects (noise suppression); a submission carrying method/path asks OUTSIDE the approved grants stays pending for review. Path coverage uses a conservative glob matcher (* = one segment, ** trailing only, unknown shapes fall back to exact equality) so ambiguity errs toward surfacing a card. - server: gate the self-reject sweep on a live-policy probe (policy_covers_rule). Approved chunk records outlive the clauses they merged (a temporary grant expiring via RemoveBinary, or a manual --remove-rule); trusting the record alone would auto-reject every future denial for that endpoint, leaving it permanently un-reviewable. Co-Authored-By: Claude Opus 4.8 --- crates/openshell-server/src/grpc/policy.rs | 429 +++++++++++++++++- .../src/persistence/postgres.rs | 7 +- .../src/persistence/sqlite.rs | 7 +- .../src/l7/graphql.rs | 1 + .../src/l7/relay.rs | 85 ++++ .../src/l7/token_grant_injection.rs | 2 + .../src/l7/websocket.rs | 1 + .../openshell-supervisor-network/src/proxy.rs | 6 + 8 files changed, 524 insertions(+), 14 deletions(-) diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2e2210f44..849c7e03f 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -49,7 +49,7 @@ use openshell_ocsf::{ ConfigStateChangeBuilder, OCSF_TARGET, OcsfEvent, SandboxContext, SeverityId, StateId, StatusId, }; use openshell_policy::{ - PolicyMergeOp, ProviderPolicyLayer, compose_effective_policy, merge_policy, + PolicyMergeOp, ProviderPolicyLayer, compose_effective_policy, merge_policy, policy_covers_rule, serialize_sandbox_policy, }; use openshell_prover::{ @@ -759,12 +759,25 @@ async fn supersede_other_pending_chunks_for_endpoint( } /// If the just-submitted mechanistic chunk targets a `(host, port, binary)` -/// already covered by an approved `agent_authored` chunk, auto-reject the -/// mechanistic chunk on arrival. The agent has already handled this access -/// decision; the mechanistic draft would only add approval-queue noise. +/// already covered by an approved chunk, auto-reject the mechanistic chunk +/// on arrival — but only when the approved grants actually cover what it +/// asks for. A connect-level resubmit (no L7 asks) is the classic +/// straggler-flush noise case: denials recorded just before an approval +/// hot-reloaded, flushed just after. Rejecting those keeps the approval +/// queue clean. A submission carrying L7 method/path evidence OUTSIDE the +/// union of the approved chunks' allow rules is different in kind: it is +/// the agent asking for MORE than was granted, and it must surface as a +/// fresh pending chunk for review, never vanish. /// /// `agent_authored` submissions are NEVER self-rejected — that path remains /// open for refinement. Only the mechanistic side is asymmetric. +/// +/// An approved chunk record only counts as covering when the live policy +/// still backs it: records outlive the clauses they merged (a temporary +/// grant expiring through `RemoveBinary`, or a manual `--remove-rule`, +/// deletes the clause but leaves the record "approved"). Trusting the +/// record alone would auto-reject every future denial for that endpoint — +/// the proposal never reaches `pending`, so no reviewer ever sees it again. async fn self_reject_mechanistic_if_already_covered( state: &Arc, sandbox_id: &str, @@ -772,6 +785,8 @@ async fn self_reject_mechanistic_if_already_covered( host: &str, port: i32, binary: &str, + proposed_rule: &NetworkPolicyRule, + current_policy: &ProtoSandboxPolicy, ) { if host.is_empty() || port == 0 || binary.is_empty() { return; @@ -793,18 +808,78 @@ async fn self_reject_mechanistic_if_already_covered( } }; - // If any approved chunk for this sandbox already targets the same - // (host, port, binary), the mechanistic submission is redundant. - let covered_by = approved + let covering: Vec<_> = approved .iter() - .find(|c| c.host == host && c.port == port && c.binary == binary); - let Some(covering) = covered_by else { + .filter(|c| c.host == host && c.port == port && c.binary == binary) + .collect(); + if covering.is_empty() { return; + } + + // Ghost-approval guard: verify the grant is still live at connect level + // before trusting the records. The probe is L4-only (host/port/binary); + // the glob-aware L7 ask comparison below handles method/path scope. + // `policy_covers_rule` errs toward "not covered" on shapes it cannot + // prove (host globs, access presets, path-scoped endpoints), which here + // errs toward surfacing a reviewable card — never a silent swallow. + let connect_probe = NetworkPolicyRule { + name: String::new(), + endpoints: vec![NetworkEndpoint { + host: host.to_string(), + port: u32::try_from(port).unwrap_or(0), + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: binary.to_string(), + ..Default::default() + }], }; + if !policy_covers_rule(current_policy, &connect_probe) { + info!( + sandbox_id = %sandbox_id, + chunk_id = %new_chunk_id, + covering_chunk = %covering[0].id, + host = %host, + port = port, + binary = %binary, + "Approved chunk record is no longer backed by the live policy \ + (clause removed since approval); mechanistic chunk kept pending for review" + ); + return; + } + + // Compare asks against the union of every approved grant for this + // endpoint, not just the first match: a ghost resubmit of the SECOND + // approval must be recognized as covered even though the first approved + // chunk knows nothing about its paths. + let new_asks = l7_asks(proposed_rule); + if !new_asks.is_empty() { + let granted: Vec<(String, String)> = covering + .iter() + .filter_map(|c| NetworkPolicyRule::decode(c.proposed_rule.as_slice()).ok()) + .flat_map(|r| l7_asks(&r)) + .collect(); + let uncovered = new_asks.iter().any(|(method, path)| { + !granted + .iter() + .any(|(gm, gp)| l7_method_covers(gm, method) && l7_path_covers(gp, path)) + }); + if uncovered { + info!( + sandbox_id = %sandbox_id, + chunk_id = %new_chunk_id, + host = %host, + port = port, + binary = %binary, + "Mechanistic chunk carries L7 evidence beyond the approved grant; kept pending for review" + ); + return; + } + } let reason = format!( - "already covered by approved chunk {} (agent_authored or prior auto-approval)", - covering.id + "already covered by approved chunk {} (no method/path evidence beyond the existing grant)", + covering[0].id ); match state .store @@ -820,7 +895,7 @@ async fn self_reject_mechanistic_if_already_covered( info!( sandbox_id = %sandbox_id, chunk_id = %new_chunk_id, - covering_chunk = %covering.id, + covering_chunk = %covering[0].id, host = %host, port = port, binary = %binary, @@ -837,6 +912,49 @@ async fn self_reject_mechanistic_if_already_covered( } } +/// Extract the L7 `(method, path)` asks from a proposed rule's allow rules. +/// Empty for a connect-level rule (no L7 inspection evidence). +fn l7_asks(rule: &NetworkPolicyRule) -> Vec<(String, String)> { + rule.endpoints + .iter() + .flat_map(|ep| ep.rules.iter()) + .filter_map(|r| r.allow.as_ref()) + .filter(|a| !a.method.is_empty() || !a.path.is_empty()) + .map(|a| (a.method.clone(), a.path.clone())) + .collect() +} + +/// Does a granted method pattern cover an asked method? +fn l7_method_covers(granted: &str, asked: &str) -> bool { + granted == "*" || granted.eq_ignore_ascii_case(asked) +} + +/// Does a granted path glob cover an asked path? +/// +/// Segment-wise: `*` matches exactly one segment, `**` matches any +/// remainder but is only honoured as the FINAL segment — the only form the +/// mechanistic mapper and provider profiles emit. Any unrecognized pattern +/// shape falls back to exact string equality, which errs toward surfacing a +/// reviewable card rather than silently swallowing an escalation. +fn l7_path_covers(granted: &str, asked: &str) -> bool { + if granted == "**" || granted == asked { + return true; + } + let g: Vec<&str> = granted.split('/').collect(); + let a: Vec<&str> = asked.split('/').collect(); + // `**` anywhere but the tail is an unsupported shape; exact equality + // already failed above, so treat as not covered. + if g[..g.len().saturating_sub(1)].contains(&"**") { + return false; + } + if g.last() == Some(&"**") { + let prefix = &g[..g.len() - 1]; + return a.len() >= prefix.len() + && prefix.iter().zip(&a).all(|(gs, asg)| *gs == "*" || gs == asg); + } + g.len() == a.len() && g.iter().zip(&a).all(|(gs, asg)| *gs == "*" || gs == asg) +} + /// Internally approve a chunk on the auto-approval path: merge into the /// active policy, flip status to "approved", notify watchers, and emit a /// `CONFIG:APPROVED` audit event carrying `auto=true`, `source=`, @@ -2392,6 +2510,8 @@ pub(super) async fn handle_submit_policy_analysis( &record.host, record.port, &record.binary, + chunk.proposed_rule.as_ref().expect("checked above"), + ¤t_policy, ) .await; } @@ -7709,6 +7829,291 @@ mod tests { ); } + /// Build a test sandbox + a submit closure for the post-decision tests. + /// The closure submits one mechanistic chunk with the given rule and + /// returns the response. + async fn setup_post_decision_sandbox( + sandbox_id: &str, + sandbox_name: &str, + ) -> Arc { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: sandbox_id.to_string(), + name: sandbox_name.to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Ready as i32); + state.store.put_message(&sandbox).await.unwrap(); + state + } + + /// Connect-level rule for `/usr/bin/curl -> api.example.com:443`, with + /// optional L7 method/path allow rules attached to the endpoint. + fn post_decision_rule(l7: &[(&str, &str)]) -> NetworkPolicyRule { + use openshell_core::proto::{L7Allow, NetworkBinary, NetworkEndpoint}; + + NetworkPolicyRule { + name: "allow_example".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + rules: l7 + .iter() + .map(|(method, path)| L7Rule { + allow: Some(L7Allow { + method: (*method).to_string(), + path: (*path).to_string(), + ..Default::default() + }), + }) + .collect(), + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + } + } + + async fn submit_mechanistic( + state: &Arc, + sandbox_name: &str, + rule: NetworkPolicyRule, + ) -> SubmitPolicyAnalysisResponse { + handle_submit_policy_analysis( + state, + with_user(Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.to_string(), + analysis_mode: "mechanistic".to_string(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_example".to_string(), + proposed_rule: Some(rule), + ..Default::default() + }], + ..Default::default() + })), + ) + .await + .unwrap() + .into_inner() + } + + /// A decided chunk must stop absorbing new mechanistic submissions for + /// the same endpoint. Once the reviewer approves the connect-level + /// proposal, later denials for that host|port|binary carrying L7 + /// method/path evidence beyond the approved grant must surface as a + /// fresh PENDING chunk for review — not fold their `hit_count` invisibly + /// into the decided row (`dedup_key` cleared on decision), and not be + /// swallowed by the post-approval self-reject sweep (which only fires + /// when the approved grants cover the new asks). + #[tokio::test] + async fn mechanistic_submit_after_decision_creates_fresh_pending_chunk() { + let sandbox_name = "mechanistic-post-decision"; + let state = setup_post_decision_sandbox("sb-mech-post-decision", sandbox_name).await; + + let first = submit_mechanistic(&state, sandbox_name, post_decision_rule(&[])).await; + assert_eq!(first.accepted_chunk_ids.len(), 1); + let first_id = first.accepted_chunk_ids[0].clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.to_string(), + chunk_id: first_id.clone(), + }), + ) + .await + .unwrap(); + + // Post-approval L7 denial evidence: the agent asked for a write the + // connect-level approval never granted. + let second = submit_mechanistic( + &state, + sandbox_name, + post_decision_rule(&[("POST", "/repos/*/issues")]), + ) + .await; + assert_eq!(second.accepted_chunk_ids.len(), 1); + let second_id = second.accepted_chunk_ids[0].clone(); + assert_ne!( + first_id, second_id, + "a submit after the decision must create a fresh chunk, not fold into the approved one" + ); + + let pending = handle_get_draft_policy( + &state, + with_user(Request::new(GetDraftPolicyRequest { + name: sandbox_name.to_string(), + status_filter: "pending".to_string(), + })), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending.chunks.len(), + 1, + "the post-decision submit with uncovered L7 evidence must be reviewable as a pending chunk" + ); + assert_eq!(pending.chunks[0].id, second_id); + } + + /// The flip side of the previous test: a post-approval mechanistic + /// resubmit that asks for NOTHING beyond the approved grant is straggler + /// noise (denials recorded just before the approval hot-reloaded, + /// flushed just after) and must be self-rejected — no ghost card for the + /// reviewer. Covers both the identical connect-level resubmit and an L7 + /// ask that is a subset of what was already approved. + #[tokio::test] + async fn mechanistic_resubmit_covered_by_approved_grant_self_rejects() { + let sandbox_name = "mechanistic-covered-resubmit"; + let state = setup_post_decision_sandbox("sb-mech-covered", sandbox_name).await; + + let first = submit_mechanistic( + &state, + sandbox_name, + post_decision_rule(&[("GET", "/user")]), + ) + .await; + let first_id = first.accepted_chunk_ids[0].clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.to_string(), + chunk_id: first_id, + }), + ) + .await + .unwrap(); + + // Identical L7 ask (subset of the grant) and a connect-level + // resubmit: both are covered, both must self-reject. + for rule in [ + post_decision_rule(&[("GET", "/user")]), + post_decision_rule(&[]), + ] { + submit_mechanistic(&state, sandbox_name, rule).await; + } + + let pending = handle_get_draft_policy( + &state, + with_user(Request::new(GetDraftPolicyRequest { + name: sandbox_name.to_string(), + status_filter: "pending".to_string(), + })), + ) + .await + .unwrap() + .into_inner(); + assert!( + pending.chunks.is_empty(), + "covered resubmits must self-reject, not pile up as ghost cards: {:?}", + pending.chunks + ); + } + + /// Approved chunk records outlive the clauses they merged: a temporary + /// grant expiring through `RemoveBinary` (or a manual `--remove-rule`) + /// deletes the clause from the live policy while the record keeps + /// saying "approved". A later denial for the same endpoint must surface + /// as a fresh PENDING chunk — if the ghost record were trusted, the + /// self-reject sweep would swallow every future proposal for that + /// endpoint and no reviewer would ever see it again. + #[tokio::test] + async fn mechanistic_resubmit_after_rule_removal_stays_pending() { + let sandbox_name = "mechanistic-ghost-approval"; + let state = setup_post_decision_sandbox("sb-mech-ghost", sandbox_name).await; + + let first = submit_mechanistic(&state, sandbox_name, post_decision_rule(&[])).await; + let first_id = first.accepted_chunk_ids[0].clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.to_string(), + chunk_id: first_id.clone(), + }), + ) + .await + .unwrap(); + + // Simulate a temporary-grant expiry: remove the merged clause from + // the live policy via the same merge op the expiry path uses. The + // chunk record stays "approved". + apply_merge_operations_with_retry( + state.store.as_ref(), + "sb-mech-ghost", + None, + &[PolicyMergeOp::RemoveBinary { + rule_name: "allow_example".to_string(), + binary_path: "/usr/bin/curl".to_string(), + }], + ) + .await + .unwrap(); + + // The next denial resubmits mechanistically. With the clause gone, + // the ghost approval must not auto-reject it. + let second = submit_mechanistic(&state, sandbox_name, post_decision_rule(&[])).await; + let second_id = second.accepted_chunk_ids[0].clone(); + assert_ne!( + first_id, second_id, + "post-removal submit must create a fresh chunk, not fold into the ghost" + ); + + let pending = handle_get_draft_policy( + &state, + with_user(Request::new(GetDraftPolicyRequest { + name: sandbox_name.to_string(), + status_filter: "pending".to_string(), + })), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending.chunks.len(), + 1, + "a denial after rule removal must surface as a reviewable pending chunk" + ); + assert_eq!(pending.chunks[0].id, second_id); + } + + /// Glob coverage semantics for the self-reject sweep: `*` is one + /// segment, `**` only as the trailing segment, unknown shapes fall back + /// to exact equality (conservative: surface a card rather than swallow). + #[test] + fn l7_path_covers_glob_semantics() { + // Exact and universal. + assert!(l7_path_covers("/user", "/user")); + assert!(l7_path_covers("**", "/anything/at/all")); + // Single-segment wildcard. + assert!(l7_path_covers("/repos/*/issues", "/repos/myorg/issues")); + assert!(!l7_path_covers("/repos/*/issues", "/repos/myorg/pulls")); + assert!(!l7_path_covers("/repos/*/issues", "/repos/a/b/issues")); + // Trailing `**` covers any remainder, including generalized asks. + assert!(l7_path_covers("/v1/models/**", "/v1/models/abc123")); + assert!(l7_path_covers("/v1/models/**", "/v1/models/*")); + assert!(l7_path_covers("/v1/**", "/v1/models/abc/def")); + assert!(!l7_path_covers("/v1/models/**", "/v2/models/abc")); + // Mid-pattern `**` is an unsupported shape: exact match only. + assert!(!l7_path_covers("/a/**/z", "/a/b/z")); + assert!(l7_path_covers("/a/**/z", "/a/**/z")); + // A grant for a specific id does not cover a generalized ask. + assert!(!l7_path_covers("/v1/models/abc", "/v1/models/*")); + } + /// Undo of an approve must clear any `rejection_reason` left over from a /// prior reject. Without this, the in-sandbox agent reading chunks via /// `policy.local` cannot tell "pending and never rejected" from "pending diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 529bc38be..8487dcaeb 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -736,10 +736,15 @@ ORDER BY created_at_ms DESC } let payload = draft_chunk_payload_from_record(&record)?; + // Clear the dedup target once a chunk is decided: new observations for + // the same host|port|binary must surface as a fresh pending chunk + // (possibly carrying new L7 evidence) instead of silently folding + // their hit_count into a row the reviewer already acted on. let result = sqlx::query( r" UPDATE objects -SET status = $3, payload = $4, updated_at_ms = $5 +SET status = $3, payload = $4, updated_at_ms = $5, + dedup_key = CASE WHEN $3 = 'pending' THEN dedup_key ELSE NULL END WHERE object_type = $1 AND id = $2 ", ) diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index 1958b3232..b85d986b8 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -758,10 +758,15 @@ ORDER BY "created_at_ms" DESC } let payload = draft_chunk_payload_from_record(&record)?; + // Clear the dedup target once a chunk is decided: new observations for + // the same host|port|binary must surface as a fresh pending chunk + // (possibly carrying new L7 evidence) instead of silently folding + // their hit_count into a row the reviewer already acted on. let result = sqlx::query( r#" UPDATE "objects" -SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5 +SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5, + "dedup_key" = CASE WHEN ?3 = 'pending' THEN "dedup_key" ELSE NULL END WHERE "object_type" = ?1 AND "id" = ?2 "#, ) diff --git a/crates/openshell-supervisor-network/src/l7/graphql.rs b/crates/openshell-supervisor-network/src/l7/graphql.rs index 82c35720e..cdf57806a 100644 --- a/crates/openshell-supervisor-network/src/l7/graphql.rs +++ b/crates/openshell-supervisor-network/src/l7/graphql.rs @@ -804,6 +804,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let request_info = crate::l7::L7RequestInfo { action: req.action, diff --git a/crates/openshell-supervisor-network/src/l7/relay.rs b/crates/openshell-supervisor-network/src/l7/relay.rs index 3054a4530..455e538fa 100644 --- a/crates/openshell-supervisor-network/src/l7/relay.rs +++ b/crates/openshell-supervisor-network/src/l7/relay.rs @@ -13,6 +13,7 @@ use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo}; use crate::opa::{PolicyGenerationGuard, TunnelPolicyEngine}; use miette::{IntoDiagnostic, Result, miette}; use openshell_core::activity::{ActivitySender, try_record_activity}; +use openshell_core::denial::DenialEvent; use openshell_core::secrets::{self, SecretResolver}; use openshell_ocsf::{ ActionId, ActivityId, DispositionId, Endpoint, HttpActivityBuilder, HttpRequest, @@ -51,6 +52,10 @@ pub struct L7EvalContext { /// Dynamic token grant resolver for endpoint-bound credentials. pub(crate) token_grant_resolver: Option>, + /// Denial aggregator channel. L7 request denials feed the same + /// observation-driven policy analysis as connect-stage denials, carrying + /// the observed method/path so proposals can be path-aware. + pub(crate) denial_tx: Option>, } #[derive(Default)] @@ -464,6 +469,28 @@ fn emit_l7_request_log( .build(); ocsf_emit!(event); emit_activity(ctx, decision_str == "deny", "l7_policy"); + if decision_str == "deny" { + emit_l7_denial(ctx, request_info, redacted_target, reason); + } +} + +/// Feed an L7 request denial to the denial aggregator (if configured) so the +/// observation-driven analysis can propose path-aware rules. The target is +/// already redacted (no query string / credentials), matching what the OCSF +/// log records. +fn emit_l7_denial(ctx: &L7EvalContext, request_info: &L7RequestInfo, path: &str, reason: &str) { + if let Some(tx) = &ctx.denial_tx { + let _ = tx.send(DenialEvent { + host: ctx.host.clone(), + port: ctx.port, + binary: ctx.binary_path.clone(), + ancestors: ctx.ancestors.clone(), + deny_reason: reason.to_string(), + denial_stage: "l7".to_string(), + l7_method: Some(request_info.action.clone()), + l7_path: Some(path.to_string()), + }); + } } fn emit_activity(ctx: &L7EvalContext, denied: bool, deny_group: &'static str) { @@ -774,6 +801,9 @@ where )) .build(); ocsf_emit!(event); + if decision_str == "deny" { + emit_l7_denial(ctx, &request_info, &redacted_target, &reason); + } } // Store the resolved target for the deny response redaction @@ -1424,6 +1454,7 @@ network_policies: activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + denial_tx: None, }; (config, tunnel_engine, ctx, fixture) @@ -1467,6 +1498,7 @@ network_policies: activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + denial_tx: None, }; (generation_guard, ctx, fixture) @@ -1482,6 +1514,53 @@ network_policies: .count() } + /// An L7 deny must feed the denial aggregator with the observed + /// method/path so observation-driven analysis can propose path-aware + /// rules; allows must not. + #[test] + fn l7_deny_emits_denial_event_with_method_and_path() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let ctx = L7EvalContext { + host: "api.example.test".into(), + port: 443, + policy_name: "rest_api".into(), + binary_path: "/usr/bin/gh".into(), + ancestors: vec!["/bin/bash".into()], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + denial_tx: Some(tx), + }; + let request = L7RequestInfo { + action: "GET".into(), + target: "/user".into(), + query_params: std::collections::HashMap::new(), + graphql: None, + }; + + emit_l7_request_log( + &ctx, + &request, + "/user", + "deny", + "l7", + "GET /user not permitted by policy", + None, + ); + let event = rx.try_recv().expect("deny must emit a denial event"); + assert_eq!(event.host, "api.example.test"); + assert_eq!(event.port, 443); + assert_eq!(event.binary, "/usr/bin/gh"); + assert_eq!(event.denial_stage, "l7"); + assert_eq!(event.l7_method.as_deref(), Some("GET")); + assert_eq!(event.l7_path.as_deref(), Some("/user")); + + emit_l7_request_log(&ctx, &request, "/user", "allow", "l7", "", None); + assert!(rx.try_recv().is_err(), "allow must not emit a denial event"); + } + #[test] fn parse_rejection_detail_adds_l7_hint_for_encoded_slash() { let detail = parse_rejection_detail( @@ -1786,6 +1865,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let request = L7RequestInfo { action: "WEBSOCKET_TEXT".into(), @@ -1844,6 +1924,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1951,6 +2032,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2071,6 +2153,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2244,6 +2327,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2334,6 +2418,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); diff --git a/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs b/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs index 0d7c18e99..259d925fe 100644 --- a/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs +++ b/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs @@ -735,6 +735,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + denial_tx: None, }; let req = L7Request { action: "GET".to_string(), @@ -772,6 +773,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + denial_tx: None, }; let req = L7Request { action: "GET".to_string(), diff --git a/crates/openshell-supervisor-network/src/l7/websocket.rs b/crates/openshell-supervisor-network/src/l7/websocket.rs index 31aa35509..ebb3a4656 100644 --- a/crates/openshell-supervisor-network/src/l7/websocket.rs +++ b/crates/openshell-supervisor-network/src/l7/websocket.rs @@ -1273,6 +1273,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let (mut client_write, mut relay_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); let (mut relay_write, mut upstream_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); diff --git a/crates/openshell-supervisor-network/src/proxy.rs b/crates/openshell-supervisor-network/src/proxy.rs index d467b022e..b357af002 100644 --- a/crates/openshell-supervisor-network/src/proxy.rs +++ b/crates/openshell-supervisor-network/src/proxy.rs @@ -970,6 +970,7 @@ async fn handle_tcp_connection( token_grant_resolver: dynamic_credentials .as_ref() .map(|_| crate::l7::token_grant_injection::default_resolver()), + denial_tx: denial_tx.clone(), }; if effective_tls_skip { @@ -3215,6 +3216,7 @@ async fn handle_forward_proxy( token_grant_resolver: dynamic_credentials .as_ref() .map(|_| crate::l7::token_grant_injection::default_resolver()), + denial_tx: denial_tx.cloned(), }; let mut l7_activity_pending = false; @@ -4293,6 +4295,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + denial_tx: None, }; (ctx, fixture) @@ -4351,6 +4354,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; (config, tunnel_engine, ctx) } @@ -4519,6 +4523,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let query_params = std::collections::HashMap::new(); @@ -4562,6 +4567,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + denial_tx: None, }; let query_params = std::collections::HashMap::new(); let config = websocket_l7_config(crate::l7::L7Protocol::Rest, false);