diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 850d3a1837..9cf45c0eec 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -5,7 +5,7 @@ mod link; pub use link::LinkAuthError; use tokio_postgres::config::AuthKeys; -use crate::proxy::{handle_try_wake, retry_after}; +use crate::proxy::{handle_try_wake, retry_after, LatencyTimer}; use crate::{ auth::{self, ClientCredentials}, config::AuthenticationConfig, @@ -134,13 +134,14 @@ async fn auth_quirks_creds( client: &mut stream::PqStream, allow_cleartext: bool, config: &'static AuthenticationConfig, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { // If there's no project so far, that entails that client doesn't // support SNI or other means of passing the endpoint (project) name. // We now expect to see a very specific payload in the place of password. if creds.project.is_none() { // Password will be checked by the compute node later. - return hacks::password_hack(creds, client).await; + return hacks::password_hack(creds, client, latency_timer).await; } // Password hack should set the project name. @@ -151,11 +152,11 @@ async fn auth_quirks_creds( // Currently, we use it for websocket connections (latency). if allow_cleartext { // Password will be checked by the compute node later. - return hacks::cleartext_hack(client).await; + return hacks::cleartext_hack(client, latency_timer).await; } // Finally, proceed with the main auth flow (SCRAM-based). - classic::authenticate(api, extra, creds, client, config).await + classic::authenticate(api, extra, creds, client, config, latency_timer).await } /// True to its name, this function encapsulates our current auth trade-offs. @@ -167,8 +168,18 @@ async fn auth_quirks( client: &mut stream::PqStream, allow_cleartext: bool, config: &'static AuthenticationConfig, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { - let auth_stuff = auth_quirks_creds(api, extra, creds, client, allow_cleartext, config).await?; + let auth_stuff = auth_quirks_creds( + api, + extra, + creds, + client, + allow_cleartext, + config, + latency_timer, + ) + .await?; let mut num_retries = 0; let mut node = loop { @@ -233,6 +244,7 @@ impl BackendType<'_, ClientCredentials<'_>> { client: &mut stream::PqStream, allow_cleartext: bool, config: &'static AuthenticationConfig, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { use BackendType::*; @@ -245,7 +257,16 @@ impl BackendType<'_, ClientCredentials<'_>> { ); let api = api.as_ref(); - auth_quirks(api, extra, creds, client, allow_cleartext, config).await? + auth_quirks( + api, + extra, + creds, + client, + allow_cleartext, + config, + latency_timer, + ) + .await? } Postgres(api, creds) => { info!( @@ -255,7 +276,16 @@ impl BackendType<'_, ClientCredentials<'_>> { ); let api = api.as_ref(); - auth_quirks(api, extra, creds, client, allow_cleartext, config).await? + auth_quirks( + api, + extra, + creds, + client, + allow_cleartext, + config, + latency_timer, + ) + .await? } // NOTE: this auth backend doesn't use client credentials. Link(url) => { diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index e7013a4cac..aee0057606 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -4,6 +4,7 @@ use crate::{ compute, config::AuthenticationConfig, console::{self, AuthInfo, ConsoleReqExtra}, + proxy::LatencyTimer, sasl, scram, stream::PqStream, }; @@ -16,6 +17,7 @@ pub(super) async fn authenticate( creds: &ClientCredentials<'_>, client: &mut PqStream, config: &'static AuthenticationConfig, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { info!("fetching user's authentication info"); let info = api.get_auth_info(extra, creds).await?.unwrap_or_else(|| { @@ -36,24 +38,26 @@ pub(super) async fn authenticate( info!("auth endpoint chooses SCRAM"); let scram = auth::Scram(&secret); - let auth_flow = flow.begin(scram).await.map_err(|error| { - warn!(?error, "error sending scram acknowledgement"); - error - })?; - let auth_outcome = tokio::time::timeout( config.scram_protocol_timeout, - auth_flow.authenticate(), + async { + // pause the timer while we communicate with the client + let _paused = latency_timer.pause(); + + flow.begin(scram).await.map_err(|error| { + warn!(?error, "error sending scram acknowledgement"); + error + })?.authenticate().await.map_err(|error| { + warn!(?error, "error processing scram messages"); + error + }) + } ) .await .map_err(|error| { warn!("error processing scram messages error = authentication timed out, execution time exeeded {} seconds", config.scram_protocol_timeout.as_secs()); auth::io::Error::new(auth::io::ErrorKind::TimedOut, error) - })? - .map_err(|error| { - warn!(?error, "error processing scram messages"); - error - })?; + })??; let client_key = match auth_outcome { sasl::Outcome::Success(key) => key, diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index 53958bf337..895683af1b 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -1,6 +1,7 @@ use super::{AuthSuccess, ComputeCredentials}; use crate::{ auth::{self, AuthFlow, ClientCredentials}, + proxy::LatencyTimer, stream, }; use tokio::io::{AsyncRead, AsyncWrite}; @@ -12,8 +13,13 @@ use tracing::{info, warn}; /// use this mechanism for websocket connections. pub async fn cleartext_hack( client: &mut stream::PqStream, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { warn!("cleartext auth flow override is enabled, proceeding"); + + // pause the timer while we communicate with the client + let _paused = latency_timer.pause(); + let password = AuthFlow::new(client) .begin(auth::CleartextPassword) .await? @@ -32,8 +38,13 @@ pub async fn cleartext_hack( pub async fn password_hack( creds: &mut ClientCredentials<'_>, client: &mut stream::PqStream, + latency_timer: &mut LatencyTimer, ) -> auth::Result> { warn!("project not specified, resorting to the password hack auth flow"); + + // pause the timer while we communicate with the client + let _paused = latency_timer.pause(); + let payload = AuthFlow::new(client) .begin(auth::PasswordHack) .await? diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 9a01a27fc4..884aae1651 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -106,17 +106,26 @@ static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { }); pub struct LatencyTimer { - start: Instant, + // time since the stopwatch was started + start: Option, + // accumulated time on the stopwatch + accumulated: std::time::Duration, + // label data protocol: &'static str, cache_miss: bool, pool_miss: bool, outcome: &'static str, } +pub struct LatencyTimerPause<'a> { + timer: &'a mut LatencyTimer, +} + impl LatencyTimer { pub fn new(protocol: &'static str) -> Self { Self { - start: Instant::now(), + start: Some(Instant::now()), + accumulated: std::time::Duration::ZERO, protocol, cache_miss: false, // by default we don't do pooling @@ -126,6 +135,13 @@ impl LatencyTimer { } } + pub fn pause(&mut self) -> LatencyTimerPause<'_> { + // stop the stopwatch and record the time that we have accumulated + let start = self.start.take().expect("latency timer should be started"); + self.accumulated += start.elapsed(); + LatencyTimerPause { timer: self } + } + pub fn cache_miss(&mut self) { self.cache_miss = true; } @@ -139,9 +155,17 @@ impl LatencyTimer { } } +impl Drop for LatencyTimerPause<'_> { + fn drop(&mut self) { + // start the stopwatch again + self.timer.start = Some(Instant::now()); + } +} + impl Drop for LatencyTimer { fn drop(&mut self) { - let duration = self.start.elapsed().as_secs_f64(); + let duration = + self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, @@ -149,7 +173,7 @@ impl Drop for LatencyTimer { bool_to_str(self.pool_miss), self.outcome, ]) - .observe(duration) + .observe(duration.as_secs_f64()) } } @@ -862,10 +886,16 @@ impl Client<'_, S> { application_name: params.get("application_name"), }; - let latency_timer = LatencyTimer::new(mode.protocol_label()); + let mut latency_timer = LatencyTimer::new(mode.protocol_label()); let auth_result = match creds - .authenticate(&extra, &mut stream, mode.allow_cleartext(), config) + .authenticate( + &extra, + &mut stream, + mode.allow_cleartext(), + config, + &mut latency_timer, + ) .await { Ok(auth_result) => auth_result,