//! Connection request monitoring contexts use chrono::Utc; use once_cell::sync::OnceCell; use smol_str::SmolStr; use std::net::IpAddr; use tokio::sync::mpsc; use tracing::{field::display, info_span, Span}; use uuid::Uuid; use crate::{ console::messages::{ColdStartInfo, MetricsAuxInfo}, error::ErrorKind, metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND}, BranchId, DbName, EndpointId, ProjectId, RoleName, }; use self::parquet::RequestData; pub mod parquet; static LOG_CHAN: OnceCell> = OnceCell::new(); /// Context data for a single request to connect to a database. /// /// This data should **not** be used for connection logic, only for observability and limiting purposes. /// All connection logic should instead use strongly typed state machines, not a bunch of Options. pub struct RequestMonitoring { pub peer_addr: IpAddr, pub session_id: Uuid, pub protocol: &'static str, first_packet: chrono::DateTime, region: &'static str, pub span: Span, // filled in as they are discovered project: Option, branch: Option, endpoint_id: Option, dbname: Option, user: Option, application: Option, error_kind: Option, pub(crate) auth_method: Option, success: bool, cold_start_info: Option, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. sender: Option>, pub latency_timer: LatencyTimer, } #[derive(Clone, Debug)] pub enum AuthMethod { // aka link aka passwordless Web, ScramSha256, ScramSha256Plus, Cleartext, } impl RequestMonitoring { pub fn new( session_id: Uuid, peer_addr: IpAddr, protocol: &'static str, region: &'static str, ) -> Self { let span = info_span!( "connect_request", %protocol, ?session_id, %peer_addr, ep = tracing::field::Empty, ); Self { peer_addr, session_id, protocol, first_packet: Utc::now(), region, span, project: None, branch: None, endpoint_id: None, dbname: None, user: None, application: None, error_kind: None, auth_method: None, success: false, cold_start_info: None, sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()), latency_timer: LatencyTimer::new(protocol), } } #[cfg(test)] pub fn test() -> Self { RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test") } pub fn console_application_name(&self) -> String { format!( "{}/{}", self.application.as_deref().unwrap_or_default(), self.protocol ) } pub fn set_cold_start_info(&mut self, info: ColdStartInfo) { self.cold_start_info = Some(info); } pub fn set_project(&mut self, x: MetricsAuxInfo) { self.set_endpoint_id(x.endpoint_id); self.branch = Some(x.branch_id); self.project = Some(x.project_id); self.cold_start_info = x.cold_start_info; } pub fn set_project_id(&mut self, project_id: ProjectId) { self.project = Some(project_id); } pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) { self.span.record("ep", display(&endpoint_id)); crate::metrics::CONNECTING_ENDPOINTS .with_label_values(&[self.protocol]) .measure(&endpoint_id); self.endpoint_id = Some(endpoint_id); } pub fn set_application(&mut self, app: Option) { self.application = app.or_else(|| self.application.clone()); } pub fn set_dbname(&mut self, dbname: DbName) { self.dbname = Some(dbname); } pub fn set_user(&mut self, user: RoleName) { self.user = Some(user); } pub fn set_auth_method(&mut self, auth_method: AuthMethod) { self.auth_method = Some(auth_method); } pub fn set_error_kind(&mut self, kind: ErrorKind) { ERROR_BY_KIND .with_label_values(&[kind.to_metric_label()]) .inc(); if let Some(ep) = &self.endpoint_id { ENDPOINT_ERRORS_BY_KIND .with_label_values(&[kind.to_metric_label()]) .measure(ep); } self.error_kind = Some(kind); } pub fn set_success(&mut self) { self.success = true; } pub fn log(self) {} } impl Drop for RequestMonitoring { fn drop(&mut self) { if let Some(tx) = self.sender.take() { let _: Result<(), _> = tx.send(RequestData::from(&*self)); } } }