diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index 26cf7a01f2..f7241be4a9 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -25,13 +25,16 @@ pub async fn authenticate_cleartext( ctx.set_auth_method(crate::context::AuthMethod::Cleartext); // pause the timer while we communicate with the client - let _paused = ctx.latency_timer.pause(); + let paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client); - let auth_outcome = AuthFlow::new(client) + let auth_flow = AuthFlow::new(client) .begin(auth::CleartextPassword(secret)) - .await? - .authenticate() .await?; + drop(paused); + // cleartext auth is only allowed to the ws/http protocol. + // If we're here, we already received the password in the first message. + // Scram protocol will be executed on the proxy side. + let auth_outcome = auth_flow.authenticate().await?; let keys = match auth_outcome { sasl::Outcome::Success(key) => key, @@ -56,7 +59,7 @@ pub async fn password_hack_no_authentication( ctx.set_auth_method(crate::context::AuthMethod::Cleartext); // pause the timer while we communicate with the client - let _paused = ctx.latency_timer.pause(); + let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client); let payload = AuthFlow::new(client) .begin(auth::PasswordHack) diff --git a/proxy/src/auth/flow.rs b/proxy/src/auth/flow.rs index dce73138c6..788381b6c0 100644 --- a/proxy/src/auth/flow.rs +++ b/proxy/src/auth/flow.rs @@ -143,7 +143,7 @@ impl AuthFlow<'_, S, Scram<'_>> { let Scram(secret, ctx) = self.state; // pause the timer while we communicate with the client - let _paused = ctx.latency_timer.pause(); + let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client); // Initial client message contains the chosen auth method's name. let msg = self.stream.read_password_message().await?; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 3088cffa57..3b2e0cc204 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -74,7 +74,9 @@ impl Api { info!(url = request.url().as_str(), "sending http request"); let start = Instant::now(); + let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane); let response = self.endpoint.execute(request).await?; + drop(pause); info!(duration = ?start.elapsed(), "received http response"); let body = match parse_body::(response).await { Ok(body) => body, @@ -134,7 +136,9 @@ impl Api { info!(url = request.url().as_str(), "sending http request"); let start = Instant::now(); + let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane); let response = self.endpoint.execute(request).await?; + drop(pause); info!(duration = ?start.elapsed(), "received http response"); let body = parse_body::(response).await?; diff --git a/proxy/src/context.rs b/proxy/src/context.rs index 40aa21083f..7ca830cdb4 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -15,11 +15,12 @@ use crate::{ BranchId, DbName, EndpointId, ProjectId, RoleName, }; +use self::parquet::RequestData; + pub mod parquet; -static LOG_CHAN: OnceCell> = OnceCell::new(); +static LOG_CHAN: OnceCell> = OnceCell::new(); -#[derive(Clone)] /// Context data for a single request to connect to a database. /// /// This data should **not** be used for connection logic, only for observability and limiting purposes. @@ -46,7 +47,7 @@ pub struct RequestMonitoring { // extra // This sender is here to keep the request monitoring channel open while requests are taking place. - sender: Option>, + sender: Option>, pub latency_timer: LatencyTimer, } @@ -172,7 +173,7 @@ impl RequestMonitoring { impl Drop for RequestMonitoring { fn drop(&mut self) { if let Some(tx) = self.sender.take() { - let _: Result<(), _> = tx.send(self.clone()); + let _: Result<(), _> = tx.send(RequestData::from(&*self)); } } } diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index ba144bb7ba..a2be1c4186 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -74,7 +74,7 @@ pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10; // * after each rowgroup write, we check the length of the file and upload to s3 if large enough #[derive(parquet_derive::ParquetRecordWriter)] -struct RequestData { +pub struct RequestData { region: &'static str, protocol: &'static str, /// Must be UTC. The derive macro doesn't like the timezones @@ -99,8 +99,8 @@ struct RequestData { duration_us: u64, } -impl From for RequestData { - fn from(value: RequestMonitoring) -> Self { +impl From<&RequestMonitoring> for RequestData { + fn from(value: &RequestMonitoring) -> Self { Self { session_id: value.session_id, peer_addr: value.peer_addr.to_string(), diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 0477176c45..02ebcd6aaa 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -7,7 +7,7 @@ use ::metrics::{ use metrics::{register_int_counter, register_int_counter_pair, IntCounter, IntCounterPair}; use once_cell::sync::Lazy; -use tokio::time; +use tokio::time::{self, Instant}; pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { register_int_counter_pair_vec!( @@ -46,9 +46,9 @@ pub static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { register_histogram_vec!( "proxy_compute_connection_latency_seconds", "Time it took for proxy to establish a connection to the compute endpoint", - // http/ws/tcp, true/false, true/false, success/failure - // 3 * 2 * 2 * 2 = 24 counters - &["protocol", "cache_miss", "pool_miss", "outcome"], + // http/ws/tcp, true/false, true/false, success/failure, client/client_and_cplane + // 3 * 2 * 2 * 2 * 2 = 48 counters + &["protocol", "cache_miss", "pool_miss", "outcome", "excluded"], // largest bucket = 2^16 * 0.5ms = 32s exponential_buckets(0.0005, 2.0, 16).unwrap(), ) @@ -161,12 +161,26 @@ pub static NUM_CANCELLATION_REQUESTS: Lazy = Lazy::new(|| { .unwrap() }); -#[derive(Clone)] +pub enum Waiting { + Cplane, + Client, + Compute, +} + +#[derive(Default)] +struct Accumulated { + cplane: time::Duration, + client: time::Duration, + compute: time::Duration, +} + pub struct LatencyTimer { // time since the stopwatch was started - start: Option, + start: time::Instant, + // time since the stopwatch was stopped + stop: Option, // accumulated time on the stopwatch - pub accumulated: std::time::Duration, + accumulated: Accumulated, // label data protocol: &'static str, cache_miss: bool, @@ -176,13 +190,16 @@ pub struct LatencyTimer { pub struct LatencyTimerPause<'a> { timer: &'a mut LatencyTimer, + start: time::Instant, + waiting_for: Waiting, } impl LatencyTimer { pub fn new(protocol: &'static str) -> Self { Self { - start: Some(time::Instant::now()), - accumulated: std::time::Duration::ZERO, + start: time::Instant::now(), + stop: None, + accumulated: Accumulated::default(), protocol, cache_miss: false, // by default we don't do pooling @@ -192,11 +209,12 @@ impl LatencyTimer { } } - pub fn pause(&mut self) -> LatencyTimerPause<'_> { - // stop the stopwatch and record the time that we have accumulated - let start = self.start.take().expect("latency timer should be started"); - self.accumulated += start.elapsed(); - LatencyTimerPause { timer: self } + pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> { + LatencyTimerPause { + timer: self, + start: Instant::now(), + waiting_for, + } } pub fn cache_miss(&mut self) { @@ -209,9 +227,7 @@ impl LatencyTimer { pub fn success(&mut self) { // stop the stopwatch and record the time that we have accumulated - if let Some(start) = self.start.take() { - self.accumulated += start.elapsed(); - } + self.stop = Some(time::Instant::now()); // success self.outcome = "success"; @@ -220,23 +236,42 @@ impl LatencyTimer { impl Drop for LatencyTimerPause<'_> { fn drop(&mut self) { - // start the stopwatch again - self.timer.start = Some(time::Instant::now()); + let dur = self.start.elapsed(); + match self.waiting_for { + Waiting::Cplane => self.timer.accumulated.cplane += dur, + Waiting::Client => self.timer.accumulated.client += dur, + Waiting::Compute => self.timer.accumulated.compute += dur, + } } } impl Drop for LatencyTimer { fn drop(&mut self) { - let duration = - self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; + let duration = self + .stop + .unwrap_or_else(time::Instant::now) + .duration_since(self.start); + // Excluding cplane communication from the accumulated time. COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, bool_to_str(self.cache_miss), bool_to_str(self.pool_miss), self.outcome, + "client", ]) - .observe(duration.as_secs_f64()) + .observe((duration.saturating_sub(self.accumulated.client)).as_secs_f64()); + // Exclude client and cplane communication from the accumulated time. + let accumulated_total = self.accumulated.client + self.accumulated.cplane; + COMPUTE_CONNECTION_LATENCY + .with_label_values(&[ + self.protocol, + bool_to_str(self.cache_miss), + bool_to_str(self.pool_miss), + self.outcome, + "client_and_cplane", + ]) + .observe((duration.saturating_sub(accumulated_total)).as_secs_f64()); } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 7848fc2ac2..ab5bf5d494 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -248,7 +248,7 @@ pub async fn handle_client( let tls = config.tls_config.as_ref(); - let pause = ctx.latency_timer.pause(); + let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Client); let do_handshake = handshake(stream, mode.handshake_tls(tls)); let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {