bascule-workspace/bascule-gateway/src/main.rs
Tyler J King 6c30ae3181 bascule-gateway: BASCULE_DEMO_AUDIT=1 startup synthesizer
Adds an env-gated startup hook that submits one synthetic AuditEvent
(notarize=true) to the AuditPipeline. The flush_loop then submits the
leaf to QM via CreateAnchor on its next cycle, demonstrating the
bascule→QM integration end-to-end without requiring real OIDC sessions
(genesis hasn't lifted the realm yet).

Default off — only triggers if BASCULE_DEMO_AUDIT=1 in pod env. Leaves
no permanent test surface in normal deployments. Slated for removal
once OIDC sessions can drive the path through the auth filter chain;
keeping it default-off makes that removal a no-op for production.

Signed-off-by: Tyler J King <tking@guildhouse.dev>
2026-04-25 06:05:22 -04:00

311 lines
11 KiB
Rust

mod audit_pipeline;
mod auth;
mod breach;
mod ceremony;
mod config;
mod executor;
mod filter;
mod governance_ceremony;
mod http_ceremony;
mod migrations;
mod server;
mod session_manager;
use std::sync::Arc;
use crate::config::BasculeConfig;
use crate::executor::ExecutorRegistry;
use crate::filter::FilterChain;
use crate::server::BasculeGatewayService;
use crate::session_manager::SessionManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Workspace has both ring and aws-lc-rs rustls features.
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.json()
.init();
let config = BasculeConfig::from_env()?;
tracing::info!(listen_addr = %config.listen_addr, "Starting Bascule Gateway");
// 1. Load accord
let accord = match std::fs::read_to_string(&config.accord_path) {
Ok(yaml) => {
let accord = accord_core::schema::Accord::load(&yaml)?;
tracing::info!(version = %accord.metadata.version, "Accord loaded");
Arc::new(accord)
}
Err(e) => {
tracing::warn!(path = %config.accord_path, error = %e,
"Accord file not found — OPA policy will deny all unclassified operations");
// Minimal empty accord
let empty_yaml = r#"
apiVersion: guildhouse.io/v1alpha1
kind: Accord
metadata:
name: empty
version: "0.0.0"
previousVersionHash: "none"
authorizingCeremony: bootstrap
effectiveAt: "2025-01-01T00:00:00Z"
expiresAt: "2099-01-01T00:00:00Z"
spec:
trustDomain: guildhouse.local
policy:
bundleHash: "none"
bundlePath: "/policies"
classifications: []
ceremonies: []
ledger:
fidelity: always_notarize
notarize: []
logOnly: []
# sampled omitted — Option<SampledConfig> default None (struct shape:
# {events: [...], sample_rate: N}), `sampled: []` would mis-parse.
reconciliation:
defaultWindow: "24h"
onExpiry: alert
driftResponses: []
controllers: []
roles: []
"#;
Arc::new(accord_core::schema::Accord::load(empty_yaml)
.expect("empty accord must parse"))
}
};
// 2. Connect to database (optional — degrade gracefully for dev without PG)
let db_pool = match sqlx::PgPool::connect(&config.database_url()).await {
Ok(pool) => {
tracing::info!("PostgreSQL connected");
// 3. Run migrations
migrations::run_migrations(&pool).await?;
Some(pool)
}
Err(e) => {
tracing::warn!("PostgreSQL not available ({e}) — running in memory-only mode");
None
}
};
// 4. Create OPA client
let opa_client = Arc::new(accord_opa::OpaClient::new(&config.opa_url));
match opa_client.health_check().await {
Ok(true) => tracing::info!("OPA sidecar is healthy"),
_ => tracing::warn!("OPA sidecar not available — policy filter will deny all requests"),
}
// 5. Build the Kubernetes client (in-cluster or from kubeconfig)
let kube_client = kube::Client::try_default().await?;
tracing::info!("Kubernetes client initialized");
// 6. Session manager (dual-store: DashMap + PG)
let session_manager = Arc::new(SessionManager::new(db_pool.clone()));
if db_pool.is_some() {
let restored = session_manager.restore_from_db().await?;
if restored > 0 {
tracing::info!(restored, "Restored sessions from database");
}
}
// 7. Ceremony manager
let ceremony_manager = if let Some(pool) = &db_pool {
Some(Arc::new(ceremony::CeremonyManager::new(
pool.clone(),
config.session_lifetime_secs,
)))
} else {
None
};
// 8. Audit pipeline
let audit_pipeline = if let Some(pool) = &db_pool {
let pipeline = Arc::new(audit_pipeline::AuditPipeline::new(
pool.clone(),
config.audit_batch_size,
config.qm_endpoint.clone(),
config.cluster_id.clone(),
));
let _flush_handle = pipeline.clone().start_flush_loop(
std::time::Duration::from_secs(config.audit_flush_interval_secs),
);
// F.4 demo entry: synthesize one notarize=true audit event at
// startup so the flush_loop has something to submit to QM. This
// is gated on BASCULE_DEMO_AUDIT=1 (off by default). It's the
// only way to exercise the bascule→QM CreateAnchor path until
// genesis lands the OIDC realm and real operator sessions can
// flow through the auth filter. Remove (or leave as a no-op
// default-off ship hatch) once OIDC works end-to-end.
if std::env::var("BASCULE_DEMO_AUDIT").as_deref() == Ok("1") {
use bascule_core::audit::{AuditEvent, ExecutionResult, ExecutionStatus, PolicyDecision};
use bascule_core::command::{ChangeClassification, CommandRecord};
use bascule_core::session::OperatorIdentity;
let demo_event = AuditEvent {
event_id: uuid::Uuid::new_v4(),
session_id: uuid::Uuid::new_v4(),
operator_identity: OperatorIdentity::Oidc {
issuer: "https://auth.guildhouse.dev/realms/ffc-hetzner-nur01".into(),
subject: "f4-demo".into(),
email: "f4-demo@guildhouse.dev".into(),
},
timestamp: chrono::Utc::now(),
command: CommandRecord {
verb: "demo".into(),
namespace: Some("bascule".into()),
resource_type: Some("audit_event".into()),
resource_name: Some("f4-demo".into()),
parameters: serde_json::json!({"phase": "F.4"}),
},
classification: ChangeClassification::Mutative,
policy_decision: PolicyDecision::allow_all_stub(),
execution_result: ExecutionResult {
status: ExecutionStatus::Success,
summary: "F.4 demo event — bascule-to-QM CreateAnchor integration proof".into(),
resources_affected: 0,
mutations_applied: 0,
},
target_resources: vec![],
target_profile_hash: None,
};
let pipeline_clone = pipeline.clone();
tokio::spawn(async move {
tracing::info!("BASCULE_DEMO_AUDIT=1 — submitting one synthetic audit event for F.4 demo");
pipeline_clone.submit(&demo_event, true).await;
});
}
pipeline
} else {
// No PG — create a pipeline with a lazy pool (will error on submit)
Arc::new(audit_pipeline::AuditPipeline::new(
sqlx::PgPool::connect_lazy("postgresql://unused:unused@localhost/unused")?,
config.audit_batch_size,
config.qm_endpoint.clone(),
config.cluster_id.clone(),
))
};
// 9. Build executor registry
let executor_registry = Arc::new(ExecutorRegistry::new(kube_client.clone()));
// 10. Build filter chain
let auth_provider = Arc::new(auth::OidcAuthProvider::new(
&config.oidc_issuer,
&config.oidc_audience,
));
let filter_chain = Arc::new(FilterChain::new(
auth_provider.clone(),
session_manager.clone(),
executor_registry,
opa_client,
accord,
audit_pipeline,
));
// 11. Spawn background tasks
let reaper_manager = session_manager.clone();
tokio::spawn(async move {
reaper_manager.run_reaper().await;
});
// 11b. Spawn posture polling task for breach detection
let breach_manager = session_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
let breach_response = governance_types::BreachResponse::ReducePosture {
target_level: governance_types::PostureLevel::Lockdown,
};
tracing::info!("Posture breach polling loop started (30s interval)");
loop {
interval.tick().await;
let level = server::read_posture_level().await;
breach_manager
.on_posture_change(level.to_wire(), &breach_response)
.await;
}
});
if let Some(cm) = &ceremony_manager {
let cm = cm.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
cm.reap_expired_ceremonies().await;
}
});
}
// 12. Governance ceremony service (in-memory store + expiry loop)
let gov_ceremony_store: Arc<dyn bascule_core::ceremony_store::CeremonyStore> =
Arc::new(bascule_core::ceremony_store::InMemoryCeremonyStore::new());
let gov_ceremony_svc = Arc::new(governance_ceremony::GovernanceCeremonyService::new(
gov_ceremony_store.clone(),
));
{
let svc = gov_ceremony_svc.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let expired = svc.expire_pending().await;
if expired > 0 {
tracing::info!(expired, "Governance ceremonies expired");
}
}
});
}
// 13. Build gRPC server
let service = BasculeGatewayService::new(
filter_chain,
session_manager,
auth_provider,
ceremony_manager,
);
let addr = config.listen_addr.parse()?;
tracing::info!(%addr, "Bascule Gateway listening");
// Spawn HTTP ceremony server on separate port
let http_state = http_ceremony::CeremonyHttpState {
store: gov_ceremony_store,
};
let http_app = http_ceremony::ceremony_router(http_state);
let http_addr: std::net::SocketAddr = config
.http_listen_addr()
.parse()
.unwrap_or_else(|_| "0.0.0.0:8443".parse().unwrap());
tracing::info!(%http_addr, "Ceremony HTTP server listening");
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(http_addr).await.unwrap();
axum::serve(listener, http_app).await.unwrap();
});
tonic::transport::Server::builder()
.add_service(
bascule_proto::bascule_v1::bascule_gateway_server::BasculeGatewayServer::new(service),
)
.add_service(
bascule_proto::bascule_v1::ceremony_service_server::CeremonyServiceServer::from_arc(
gov_ceremony_svc,
),
)
.serve(addr)
.await?;
Ok(())
}