proxy: exclude client latencies in metrics (#5688)

## Problem

In #5539, I moved the connect_to_compute latency to start counting
before authentication - this is because authentication will perform some
calls to the control plane in order to get credentials and to eagerly
wake a compute server. It felt important to include these times in the
latency metric as these are times we should definitely care about
reducing.

What is not interesting to record in this metric is the roundtrip time
during authentication when we wait for the client to respond.

## Summary of changes

Implement a mechanism to pause the latency timer, resuming on drop of
the pause struct. We pause the timer right before we send the
authentication message to the client, and we resume the timer right
after we complete the authentication flow.
This commit is contained in:
Conrad Ludgate
2023-10-27 18:17:39 +01:00
committed by GitHub
parent c13e932c3b
commit 493b47e1da
4 changed files with 99 additions and 24 deletions

View File

@@ -5,7 +5,7 @@ mod link;
pub use link::LinkAuthError;
use tokio_postgres::config::AuthKeys;
use crate::proxy::{handle_try_wake, retry_after};
use crate::proxy::{handle_try_wake, retry_after, LatencyTimer};
use crate::{
auth::{self, ClientCredentials},
config::AuthenticationConfig,
@@ -134,13 +134,14 @@ async fn auth_quirks_creds(
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
// If there's no project so far, that entails that client doesn't
// support SNI or other means of passing the endpoint (project) name.
// We now expect to see a very specific payload in the place of password.
if creds.project.is_none() {
// Password will be checked by the compute node later.
return hacks::password_hack(creds, client).await;
return hacks::password_hack(creds, client, latency_timer).await;
}
// Password hack should set the project name.
@@ -151,11 +152,11 @@ async fn auth_quirks_creds(
// Currently, we use it for websocket connections (latency).
if allow_cleartext {
// Password will be checked by the compute node later.
return hacks::cleartext_hack(client).await;
return hacks::cleartext_hack(client, latency_timer).await;
}
// Finally, proceed with the main auth flow (SCRAM-based).
classic::authenticate(api, extra, creds, client, config).await
classic::authenticate(api, extra, creds, client, config, latency_timer).await
}
/// True to its name, this function encapsulates our current auth trade-offs.
@@ -167,8 +168,18 @@ async fn auth_quirks(
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
let auth_stuff = auth_quirks_creds(api, extra, creds, client, allow_cleartext, config).await?;
let auth_stuff = auth_quirks_creds(
api,
extra,
creds,
client,
allow_cleartext,
config,
latency_timer,
)
.await?;
let mut num_retries = 0;
let mut node = loop {
@@ -233,6 +244,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
use BackendType::*;
@@ -245,7 +257,16 @@ impl BackendType<'_, ClientCredentials<'_>> {
);
let api = api.as_ref();
auth_quirks(api, extra, creds, client, allow_cleartext, config).await?
auth_quirks(
api,
extra,
creds,
client,
allow_cleartext,
config,
latency_timer,
)
.await?
}
Postgres(api, creds) => {
info!(
@@ -255,7 +276,16 @@ impl BackendType<'_, ClientCredentials<'_>> {
);
let api = api.as_ref();
auth_quirks(api, extra, creds, client, allow_cleartext, config).await?
auth_quirks(
api,
extra,
creds,
client,
allow_cleartext,
config,
latency_timer,
)
.await?
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => {

View File

@@ -4,6 +4,7 @@ use crate::{
compute,
config::AuthenticationConfig,
console::{self, AuthInfo, ConsoleReqExtra},
proxy::LatencyTimer,
sasl, scram,
stream::PqStream,
};
@@ -16,6 +17,7 @@ pub(super) async fn authenticate(
creds: &ClientCredentials<'_>,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
config: &'static AuthenticationConfig,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
info!("fetching user's authentication info");
let info = api.get_auth_info(extra, creds).await?.unwrap_or_else(|| {
@@ -36,24 +38,26 @@ pub(super) async fn authenticate(
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
let auth_flow = flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement");
error
})?;
let auth_outcome = tokio::time::timeout(
config.scram_protocol_timeout,
auth_flow.authenticate(),
async {
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement");
error
})?.authenticate().await.map_err(|error| {
warn!(?error, "error processing scram messages");
error
})
}
)
.await
.map_err(|error| {
warn!("error processing scram messages error = authentication timed out, execution time exeeded {} seconds", config.scram_protocol_timeout.as_secs());
auth::io::Error::new(auth::io::ErrorKind::TimedOut, error)
})?
.map_err(|error| {
warn!(?error, "error processing scram messages");
error
})?;
})??;
let client_key = match auth_outcome {
sasl::Outcome::Success(key) => key,

View File

@@ -1,6 +1,7 @@
use super::{AuthSuccess, ComputeCredentials};
use crate::{
auth::{self, AuthFlow, ClientCredentials},
proxy::LatencyTimer,
stream,
};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -12,8 +13,13 @@ use tracing::{info, warn};
/// use this mechanism for websocket connections.
pub async fn cleartext_hack(
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
warn!("cleartext auth flow override is enabled, proceeding");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let password = AuthFlow::new(client)
.begin(auth::CleartextPassword)
.await?
@@ -32,8 +38,13 @@ pub async fn cleartext_hack(
pub async fn password_hack(
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
latency_timer: &mut LatencyTimer,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
warn!("project not specified, resorting to the password hack auth flow");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
.await?

View File

@@ -106,17 +106,26 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
});
pub struct LatencyTimer {
start: Instant,
// time since the stopwatch was started
start: Option<Instant>,
// accumulated time on the stopwatch
accumulated: std::time::Duration,
// label data
protocol: &'static str,
cache_miss: bool,
pool_miss: bool,
outcome: &'static str,
}
pub struct LatencyTimerPause<'a> {
timer: &'a mut LatencyTimer,
}
impl LatencyTimer {
pub fn new(protocol: &'static str) -> Self {
Self {
start: Instant::now(),
start: Some(Instant::now()),
accumulated: std::time::Duration::ZERO,
protocol,
cache_miss: false,
// by default we don't do pooling
@@ -126,6 +135,13 @@ impl LatencyTimer {
}
}
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
LatencyTimerPause { timer: self }
}
pub fn cache_miss(&mut self) {
self.cache_miss = true;
}
@@ -139,9 +155,17 @@ impl LatencyTimer {
}
}
impl Drop for LatencyTimerPause<'_> {
fn drop(&mut self) {
// start the stopwatch again
self.timer.start = Some(Instant::now());
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration = self.start.elapsed().as_secs_f64();
let duration =
self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated;
COMPUTE_CONNECTION_LATENCY
.with_label_values(&[
self.protocol,
@@ -149,7 +173,7 @@ impl Drop for LatencyTimer {
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration)
.observe(duration.as_secs_f64())
}
}
@@ -862,10 +886,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
application_name: params.get("application_name"),
};
let latency_timer = LatencyTimer::new(mode.protocol_label());
let mut latency_timer = LatencyTimer::new(mode.protocol_label());
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext(), config)
.authenticate(
&extra,
&mut stream,
mode.allow_cleartext(),
config,
&mut latency_timer,
)
.await
{
Ok(auth_result) => auth_result,