use bascule_core::ceremony::CeremonyGrant; use bascule_core::scope::SessionScope; use bascule_core::session::{OperatorIdentity, Session, SessionState}; use chrono::Utc; use dashmap::DashMap; use sqlx::PgPool; use uuid::Uuid; /// Dual-store session manager: DashMap (hot cache) + PostgreSQL (persistence). pub struct SessionManager { sessions: DashMap, db_pool: Option, } impl SessionManager { /// Create a new session manager. Pass None for db_pool in tests or Phase 1 mode. pub fn new(db_pool: Option) -> Self { Self { sessions: DashMap::new(), db_pool, } } /// Create a session from a ceremony grant. pub async fn create_session(&self, grant: &CeremonyGrant) -> anyhow::Result { let session_id = Uuid::new_v4(); let now = Utc::now(); let session = Session { session_id, ceremony_id: grant.ceremony_id, identity: grant.requestor.clone(), scope: grant.granted_scope.clone(), state: SessionState::Active, mutations_used: 0, valid_from: now, expires_at: now + grant.session_lifetime, }; // Insert into hot cache self.sessions.insert(session_id, session.clone()); // Persist to PG if let Some(pool) = &self.db_pool { let (sub, email) = identity_parts(&grant.requestor); let scope_json = serde_json::to_value(&grant.granted_scope)?; sqlx::query( r#" INSERT INTO bascule.sessions (session_id, ceremony_id, operator_sub, operator_email, scope, state, mutations_used, mutation_budget, valid_from, expires_at) VALUES ($1, $2, $3, $4, $5, 'active', 0, $6, $7, $8) "#, ) .bind(session_id) .bind(grant.ceremony_id) .bind(&sub) .bind(&email) .bind(&scope_json) .bind(grant.granted_scope.mutation_budget.map(|b| b as i32)) .bind(session.valid_from) .bind(session.expires_at) .execute(pool) .await?; } tracing::info!( session_id = %session_id, ceremony_id = %grant.ceremony_id, "Session created" ); Ok(session) } /// Look up a session by ID. Checks DashMap first, then PG. pub fn get_session(&self, session_id: &Uuid) -> Option { self.sessions.get(session_id).map(|s| s.clone()) } /// End a session explicitly. Updates both stores. pub async fn end_session(&self, session_id: &Uuid) -> Option { let session = self.sessions.get_mut(session_id).map(|mut s| { s.state = SessionState::Terminated; s.clone() }); if session.is_some() { if let Some(pool) = &self.db_pool { let _ = sqlx::query( "UPDATE bascule.sessions SET state = 'terminated', terminated_at = NOW() WHERE session_id = $1", ) .bind(session_id) .execute(pool) .await; } tracing::info!(%session_id, "Session terminated"); } session } /// Record a mutation against a session. Updates both stores. /// Returns the new mutation count. pub async fn record_mutation(&self, session_id: &Uuid) -> Option { let count = self.sessions.get_mut(session_id).map(|mut s| { s.mutations_used += 1; s.mutations_used }); if let (Some(count), Some(pool)) = (count, &self.db_pool) { let _ = sqlx::query( "UPDATE bascule.sessions SET mutations_used = $2 WHERE session_id = $1", ) .bind(session_id) .bind(count as i32) .execute(pool) .await; } count } /// Restore active sessions from PG into DashMap (crash recovery). pub async fn restore_from_db(&self) -> anyhow::Result { let pool = match &self.db_pool { Some(p) => p, None => return Ok(0), }; let rows = sqlx::query_as::<_, SessionRow>( r#" SELECT session_id, ceremony_id, operator_sub, operator_email, scope, state, mutations_used, mutation_budget, valid_from, expires_at FROM bascule.sessions WHERE state = 'active' AND expires_at > NOW() "#, ) .fetch_all(pool) .await?; let count = rows.len(); for row in rows { let scope: SessionScope = serde_json::from_value(row.scope)?; let session = Session { session_id: row.session_id, ceremony_id: row.ceremony_id, identity: OperatorIdentity::Oidc { issuer: String::new(), subject: row.operator_sub, email: row.operator_email, }, scope, state: SessionState::Active, mutations_used: row.mutations_used as u32, valid_from: row.valid_from, expires_at: row.expires_at, }; self.sessions.insert(session.session_id, session); } if count > 0 { tracing::info!(count, "Restored active sessions from database"); } Ok(count) } /// Background reaper for expired sessions (runs every 30 seconds). pub async fn run_reaper(&self) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); loop { interval.tick().await; let now = Utc::now(); let mut expired = 0u32; for mut entry in self.sessions.iter_mut() { if entry.state == SessionState::Active && now >= entry.expires_at { entry.state = SessionState::Expired; expired += 1; } } if expired > 0 { tracing::info!(expired, "Reaped expired sessions"); // Update PG for expired sessions if let Some(pool) = &self.db_pool { let _ = sqlx::query( "UPDATE bascule.sessions SET state = 'expired' WHERE state = 'active' AND expires_at < NOW()", ) .execute(pool) .await; } } } } /// Create a default read-only scope for the given namespaces. pub fn default_read_scope(namespaces: &[String]) -> SessionScope { use bascule_core::scope::{ ChangePathway, GlobalScope, NamespaceScope, ScopeRule, Verb, }; let ns_scopes = namespaces .iter() .map(|ns| NamespaceScope { namespace: ns.clone(), rules: vec![ScopeRule { api_groups: vec!["".into(), "apps".into(), "batch".into()], resources: vec!["*".into()], verbs: vec![Verb::Get, Verb::List, Verb::Logs], }], workload_profiles: vec![], denied_capabilities: vec![], }) .collect(); SessionScope { namespaces: ns_scopes, global: GlobalScope::default(), pathways: vec![ChangePathway::DryRunOnly], mutation_budget: Some(0), can_delegate: false, } } } #[derive(sqlx::FromRow)] struct SessionRow { session_id: Uuid, ceremony_id: Uuid, operator_sub: String, operator_email: String, scope: serde_json::Value, #[allow(dead_code)] state: String, mutations_used: i32, #[allow(dead_code)] mutation_budget: Option, valid_from: chrono::DateTime, expires_at: chrono::DateTime, } fn identity_parts(identity: &OperatorIdentity) -> (String, String) { match identity { OperatorIdentity::Oidc { subject, email, .. } => (subject.clone(), email.clone()), OperatorIdentity::Spiffe { svid_uri } => (svid_uri.clone(), String::new()), } }