Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
429 changes: 417 additions & 12 deletions crates/openshell-server/src/grpc/policy.rs

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion crates/openshell-server/src/persistence/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,15 @@ ORDER BY created_at_ms DESC
}
let payload = draft_chunk_payload_from_record(&record)?;

// Clear the dedup target once a chunk is decided: new observations for
// the same host|port|binary must surface as a fresh pending chunk
// (possibly carrying new L7 evidence) instead of silently folding
// their hit_count into a row the reviewer already acted on.
let result = sqlx::query(
r"
UPDATE objects
SET status = $3, payload = $4, updated_at_ms = $5
SET status = $3, payload = $4, updated_at_ms = $5,
dedup_key = CASE WHEN $3 = 'pending' THEN dedup_key ELSE NULL END
WHERE object_type = $1 AND id = $2
",
)
Expand Down
7 changes: 6 additions & 1 deletion crates/openshell-server/src/persistence/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,15 @@ ORDER BY "created_at_ms" DESC
}
let payload = draft_chunk_payload_from_record(&record)?;

// Clear the dedup target once a chunk is decided: new observations for
// the same host|port|binary must surface as a fresh pending chunk
// (possibly carrying new L7 evidence) instead of silently folding
// their hit_count into a row the reviewer already acted on.
let result = sqlx::query(
r#"
UPDATE "objects"
SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5
SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5,
"dedup_key" = CASE WHEN ?3 = 'pending' THEN "dedup_key" ELSE NULL END
WHERE "object_type" = ?1 AND "id" = ?2
"#,
)
Expand Down
1 change: 1 addition & 0 deletions crates/openshell-supervisor-network/src/l7/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
let request_info = crate::l7::L7RequestInfo {
action: req.action,
Expand Down
85 changes: 85 additions & 0 deletions crates/openshell-supervisor-network/src/l7/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo};
use crate::opa::{PolicyGenerationGuard, TunnelPolicyEngine};
use miette::{IntoDiagnostic, Result, miette};
use openshell_core::activity::{ActivitySender, try_record_activity};
use openshell_core::denial::DenialEvent;
use openshell_core::secrets::{self, SecretResolver};
use openshell_ocsf::{
ActionId, ActivityId, DispositionId, Endpoint, HttpActivityBuilder, HttpRequest,
Expand Down Expand Up @@ -51,6 +52,10 @@ pub struct L7EvalContext {
/// Dynamic token grant resolver for endpoint-bound credentials.
pub(crate) token_grant_resolver:
Option<Arc<dyn crate::l7::token_grant_injection::TokenGrantResolver>>,
/// Denial aggregator channel. L7 request denials feed the same
/// observation-driven policy analysis as connect-stage denials, carrying
/// the observed method/path so proposals can be path-aware.
pub(crate) denial_tx: Option<tokio::sync::mpsc::UnboundedSender<DenialEvent>>,
}

#[derive(Default)]
Expand Down Expand Up @@ -464,6 +469,28 @@ fn emit_l7_request_log(
.build();
ocsf_emit!(event);
emit_activity(ctx, decision_str == "deny", "l7_policy");
if decision_str == "deny" {
emit_l7_denial(ctx, request_info, redacted_target, reason);
}
}

/// Feed an L7 request denial to the denial aggregator (if configured) so the
/// observation-driven analysis can propose path-aware rules. The target is
/// already redacted (no query string / credentials), matching what the OCSF
/// log records.
fn emit_l7_denial(ctx: &L7EvalContext, request_info: &L7RequestInfo, path: &str, reason: &str) {
if let Some(tx) = &ctx.denial_tx {
let _ = tx.send(DenialEvent {
host: ctx.host.clone(),
port: ctx.port,
binary: ctx.binary_path.clone(),
ancestors: ctx.ancestors.clone(),
deny_reason: reason.to_string(),
denial_stage: "l7".to_string(),
l7_method: Some(request_info.action.clone()),
l7_path: Some(path.to_string()),
});
}
}

fn emit_activity(ctx: &L7EvalContext, denied: bool, deny_group: &'static str) {
Expand Down Expand Up @@ -774,6 +801,9 @@ where
))
.build();
ocsf_emit!(event);
if decision_str == "deny" {
emit_l7_denial(ctx, &request_info, &redacted_target, &reason);
}
}

// Store the resolved target for the deny response redaction
Expand Down Expand Up @@ -1424,6 +1454,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: Some(fixture.dynamic_credentials()),
token_grant_resolver: Some(fixture.resolver()),
denial_tx: None,
};

(config, tunnel_engine, ctx, fixture)
Expand Down Expand Up @@ -1467,6 +1498,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: Some(fixture.dynamic_credentials()),
token_grant_resolver: Some(fixture.resolver()),
denial_tx: None,
};

(generation_guard, ctx, fixture)
Expand All @@ -1482,6 +1514,53 @@ network_policies:
.count()
}

/// An L7 deny must feed the denial aggregator with the observed
/// method/path so observation-driven analysis can propose path-aware
/// rules; allows must not.
#[test]
fn l7_deny_emits_denial_event_with_method_and_path() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = L7EvalContext {
host: "api.example.test".into(),
port: 443,
policy_name: "rest_api".into(),
binary_path: "/usr/bin/gh".into(),
ancestors: vec!["/bin/bash".into()],
cmdline_paths: vec![],
secret_resolver: None,
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: Some(tx),
};
let request = L7RequestInfo {
action: "GET".into(),
target: "/user".into(),
query_params: std::collections::HashMap::new(),
graphql: None,
};

emit_l7_request_log(
&ctx,
&request,
"/user",
"deny",
"l7",
"GET /user not permitted by policy",
None,
);
let event = rx.try_recv().expect("deny must emit a denial event");
assert_eq!(event.host, "api.example.test");
assert_eq!(event.port, 443);
assert_eq!(event.binary, "/usr/bin/gh");
assert_eq!(event.denial_stage, "l7");
assert_eq!(event.l7_method.as_deref(), Some("GET"));
assert_eq!(event.l7_path.as_deref(), Some("/user"));

emit_l7_request_log(&ctx, &request, "/user", "allow", "l7", "", None);
assert!(rx.try_recv().is_err(), "allow must not emit a denial event");
}

#[test]
fn parse_rejection_detail_adds_l7_hint_for_encoded_slash() {
let detail = parse_rejection_detail(
Expand Down Expand Up @@ -1786,6 +1865,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
let request = L7RequestInfo {
action: "WEBSOCKET_TEXT".into(),
Expand Down Expand Up @@ -1844,6 +1924,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};

let (mut app, mut relay_client) = tokio::io::duplex(8192);
Expand Down Expand Up @@ -1951,6 +2032,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};

let (mut app, mut relay_client) = tokio::io::duplex(8192);
Expand Down Expand Up @@ -2071,6 +2153,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};

let (mut app, mut relay_client) = tokio::io::duplex(8192);
Expand Down Expand Up @@ -2244,6 +2327,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};

let (mut app, mut relay_client) = tokio::io::duplex(8192);
Expand Down Expand Up @@ -2334,6 +2418,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};

let (mut app, mut relay_client) = tokio::io::duplex(8192);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: Some(fixture.dynamic_credentials()),
token_grant_resolver: Some(fixture.resolver()),
denial_tx: None,
};
let req = L7Request {
action: "GET".to_string(),
Expand Down Expand Up @@ -772,6 +773,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: Some(fixture.dynamic_credentials()),
token_grant_resolver: Some(fixture.resolver()),
denial_tx: None,
};
let req = L7Request {
action: "GET".to_string(),
Expand Down
1 change: 1 addition & 0 deletions crates/openshell-supervisor-network/src/l7/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,7 @@ network_policies:
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
let (mut client_write, mut relay_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024);
let (mut relay_write, mut upstream_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024);
Expand Down
6 changes: 6 additions & 0 deletions crates/openshell-supervisor-network/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ async fn handle_tcp_connection(
token_grant_resolver: dynamic_credentials
.as_ref()
.map(|_| crate::l7::token_grant_injection::default_resolver()),
denial_tx: denial_tx.clone(),
};

if effective_tls_skip {
Expand Down Expand Up @@ -3215,6 +3216,7 @@ async fn handle_forward_proxy(
token_grant_resolver: dynamic_credentials
.as_ref()
.map(|_| crate::l7::token_grant_injection::default_resolver()),
denial_tx: denial_tx.cloned(),
};
let mut l7_activity_pending = false;

Expand Down Expand Up @@ -4293,6 +4295,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: Some(fixture.dynamic_credentials()),
token_grant_resolver: Some(fixture.resolver()),
denial_tx: None,
};

(ctx, fixture)
Expand Down Expand Up @@ -4351,6 +4354,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
(config, tunnel_engine, ctx)
}
Expand Down Expand Up @@ -4519,6 +4523,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
let query_params = std::collections::HashMap::new();

Expand Down Expand Up @@ -4562,6 +4567,7 @@ mod tests {
activity_tx: None,
dynamic_credentials: None,
token_grant_resolver: None,
denial_tx: None,
};
let query_params = std::collections::HashMap::new();
let config = websocket_l7_config(crate::l7::L7Protocol::Rest, false);
Expand Down