proxy: add new dimension to exclude cplane latency (#7011)

## Problem

Currently cplane communication is a part of the latency monitoring. It
doesn't allow to setup the proper alerting based on proxy latency.

## Summary of changes

Added dimension to exclude cplane latency.
This commit is contained in:
Anna Khanova
2024-03-13 16:50:05 +04:00
committed by GitHub
parent 0554bee022
commit b0aff04157
7 changed files with 79 additions and 36 deletions

View File

@@ -25,13 +25,16 @@ pub async fn authenticate_cleartext(
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client
let _paused = ctx.latency_timer.pause();
let paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let auth_outcome = AuthFlow::new(client)
let auth_flow = AuthFlow::new(client)
.begin(auth::CleartextPassword(secret))
.await?
.authenticate()
.await?;
drop(paused);
// cleartext auth is only allowed to the ws/http protocol.
// If we're here, we already received the password in the first message.
// Scram protocol will be executed on the proxy side.
let auth_outcome = auth_flow.authenticate().await?;
let keys = match auth_outcome {
sasl::Outcome::Success(key) => key,
@@ -56,7 +59,7 @@ pub async fn password_hack_no_authentication(
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client
let _paused = ctx.latency_timer.pause();
let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)

View File

@@ -143,7 +143,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
let Scram(secret, ctx) = self.state;
// pause the timer while we communicate with the client
let _paused = ctx.latency_timer.pause();
let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
// Initial client message contains the chosen auth method's name.
let msg = self.stream.read_password_message().await?;

View File

@@ -74,7 +74,9 @@ impl Api {
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
@@ -134,7 +136,9 @@ impl Api {
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?;

View File

@@ -15,11 +15,12 @@ use crate::{
BranchId, DbName, EndpointId, ProjectId, RoleName,
};
use self::parquet::RequestData;
pub mod parquet;
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
#[derive(Clone)]
/// Context data for a single request to connect to a database.
///
/// This data should **not** be used for connection logic, only for observability and limiting purposes.
@@ -46,7 +47,7 @@ pub struct RequestMonitoring {
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
sender: Option<mpsc::UnboundedSender<RequestData>>,
pub latency_timer: LatencyTimer,
}
@@ -172,7 +173,7 @@ impl RequestMonitoring {
impl Drop for RequestMonitoring {
fn drop(&mut self) {
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(self.clone());
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}
}
}

View File

@@ -74,7 +74,7 @@ pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
// * after each rowgroup write, we check the length of the file and upload to s3 if large enough
#[derive(parquet_derive::ParquetRecordWriter)]
struct RequestData {
pub struct RequestData {
region: &'static str,
protocol: &'static str,
/// Must be UTC. The derive macro doesn't like the timezones
@@ -99,8 +99,8 @@ struct RequestData {
duration_us: u64,
}
impl From<RequestMonitoring> for RequestData {
fn from(value: RequestMonitoring) -> Self {
impl From<&RequestMonitoring> for RequestData {
fn from(value: &RequestMonitoring) -> Self {
Self {
session_id: value.session_id,
peer_addr: value.peer_addr.to_string(),

View File

@@ -7,7 +7,7 @@ use ::metrics::{
use metrics::{register_int_counter, register_int_counter_pair, IntCounter, IntCounterPair};
use once_cell::sync::Lazy;
use tokio::time;
use tokio::time::{self, Instant};
pub static NUM_DB_CONNECTIONS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
@@ -46,9 +46,9 @@ pub static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_latency_seconds",
"Time it took for proxy to establish a connection to the compute endpoint",
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// http/ws/tcp, true/false, true/false, success/failure, client/client_and_cplane
// 3 * 2 * 2 * 2 * 2 = 48 counters
&["protocol", "cache_miss", "pool_miss", "outcome", "excluded"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
@@ -161,12 +161,26 @@ pub static NUM_CANCELLATION_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
#[derive(Clone)]
pub enum Waiting {
Cplane,
Client,
Compute,
}
#[derive(Default)]
struct Accumulated {
cplane: time::Duration,
client: time::Duration,
compute: time::Duration,
}
pub struct LatencyTimer {
// time since the stopwatch was started
start: Option<time::Instant>,
start: time::Instant,
// time since the stopwatch was stopped
stop: Option<time::Instant>,
// accumulated time on the stopwatch
pub accumulated: std::time::Duration,
accumulated: Accumulated,
// label data
protocol: &'static str,
cache_miss: bool,
@@ -176,13 +190,16 @@ pub struct LatencyTimer {
pub struct LatencyTimerPause<'a> {
timer: &'a mut LatencyTimer,
start: time::Instant,
waiting_for: Waiting,
}
impl LatencyTimer {
pub fn new(protocol: &'static str) -> Self {
Self {
start: Some(time::Instant::now()),
accumulated: std::time::Duration::ZERO,
start: time::Instant::now(),
stop: None,
accumulated: Accumulated::default(),
protocol,
cache_miss: false,
// by default we don't do pooling
@@ -192,11 +209,12 @@ impl LatencyTimer {
}
}
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
LatencyTimerPause { timer: self }
pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
LatencyTimerPause {
timer: self,
start: Instant::now(),
waiting_for,
}
}
pub fn cache_miss(&mut self) {
@@ -209,9 +227,7 @@ impl LatencyTimer {
pub fn success(&mut self) {
// stop the stopwatch and record the time that we have accumulated
if let Some(start) = self.start.take() {
self.accumulated += start.elapsed();
}
self.stop = Some(time::Instant::now());
// success
self.outcome = "success";
@@ -220,23 +236,42 @@ impl LatencyTimer {
impl Drop for LatencyTimerPause<'_> {
fn drop(&mut self) {
// start the stopwatch again
self.timer.start = Some(time::Instant::now());
let dur = self.start.elapsed();
match self.waiting_for {
Waiting::Cplane => self.timer.accumulated.cplane += dur,
Waiting::Client => self.timer.accumulated.client += dur,
Waiting::Compute => self.timer.accumulated.compute += dur,
}
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration =
self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated;
let duration = self
.stop
.unwrap_or_else(time::Instant::now)
.duration_since(self.start);
// Excluding cplane communication from the accumulated time.
COMPUTE_CONNECTION_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
"client",
])
.observe(duration.as_secs_f64())
.observe((duration.saturating_sub(self.accumulated.client)).as_secs_f64());
// Exclude client and cplane communication from the accumulated time.
let accumulated_total = self.accumulated.client + self.accumulated.cplane;
COMPUTE_CONNECTION_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
"client_and_cplane",
])
.observe((duration.saturating_sub(accumulated_total)).as_secs_f64());
}
}

View File

@@ -248,7 +248,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let tls = config.tls_config.as_ref();
let pause = ctx.latency_timer.pause();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(stream, mode.handshake_tls(tls));
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {