Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
cd277a1420 fmt 2023-12-08 15:58:17 +00:00
Conrad Ludgate
ff34746d21 proxy: track control-plane durations per connection request 2023-12-08 12:29:50 +00:00
11 changed files with 130 additions and 91 deletions

View File

@@ -189,7 +189,7 @@ async fn auth_quirks(
let AuthInfo {
secret,
allowed_ips,
} = api.get_auth_info(extra, &info).await?;
} = api.get_auth_info(extra, &info, latency_timer).await?;
// check allowed list
if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) {
@@ -255,7 +255,9 @@ async fn auth_and_wake_compute(
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(extra, &compute_credentials.info).await;
let wake_res = api
.wake_compute(extra, &compute_credentials.info, latency_timer)
.await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
@@ -388,12 +390,13 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*;
match self {
Console(api, creds) => api.get_allowed_ips(extra, creds).await,
Console(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
#[cfg(feature = "testing")]
Postgres(api, creds) => api.get_allowed_ips(extra, creds).await,
Postgres(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
Link(_) => Ok(Arc::new(vec![])),
#[cfg(test)]
Test(x) => x.get_allowed_ips(),
@@ -405,13 +408,22 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Console(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
#[cfg(feature = "testing")]
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Postgres(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
Link(_) => Ok(None),
#[cfg(test)]
Test(x) => x.wake_compute().map(Some),

View File

@@ -33,7 +33,7 @@ pub(super) async fn authenticate(
config.scram_protocol_timeout,
async {
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement");

View File

@@ -24,7 +24,7 @@ pub async fn authenticate_cleartext(
warn!("cleartext auth flow override is enabled, proceeding");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
let auth_outcome = AuthFlow::new(client)
.begin(auth::CleartextPassword(secret))
@@ -54,7 +54,7 @@ pub async fn password_hack_no_authentication(
warn!("project not specified, resorting to the password hack auth flow");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)

View File

@@ -6,7 +6,9 @@ use super::messages::MetricsAuxInfo;
use crate::{
auth::backend::ComputeUserInfo,
cache::{timed_lru, TimedLru},
compute, scram,
compute,
proxy::LatencyTimer,
scram,
};
use async_trait::async_trait;
use dashmap::DashMap;
@@ -250,12 +252,14 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, errors::GetAuthInfoError>;
async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
@@ -263,6 +267,7 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}

View File

@@ -6,6 +6,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::proxy::LatencyTimer;
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use async_trait::async_trait;
use futures::TryFutureExt;
@@ -146,6 +147,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(creds).await
}
@@ -154,6 +156,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
}
@@ -163,6 +166,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
_creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached)

View File

@@ -5,7 +5,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::proxy::{LatencyTimer, ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
@@ -158,7 +158,9 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
let _timer = latency_timer.control_plane();
self.do_get_auth_info(extra, creds).await
}
@@ -166,6 +168,7 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
let key: &str = &creds.endpoint;
if let Some(allowed_ips) = self.caches.allowed_ips.get(key) {
@@ -177,7 +180,11 @@ impl super::Api for Api {
ALLOWED_IPS_BY_CACHE_OUTCOME
.with_label_values(&["miss"])
.inc();
let timer = latency_timer.control_plane();
let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips);
drop(timer);
self.caches
.allowed_ips
.insert(key.into(), allowed_ips.clone());
@@ -189,6 +196,7 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key: &str = &creds.inner.cache_key;
@@ -214,7 +222,10 @@ impl super::Api for Api {
}
}
let timer = latency_timer.control_plane();
let node = self.do_wake_compute(extra, creds).await?;
drop(timer);
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
info!(key = &*key, "created a cache entry for compute node info");

View File

@@ -110,6 +110,19 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});
static CONTROL_PLANE_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_control_plane_latency_seconds",
"Time proxy spent talking to control-plane/console while trying to establish a connection to the compute endpoint",
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_console_request_latency",
@@ -174,6 +187,10 @@ pub struct LatencyTimer {
start: Option<Instant>,
// accumulated time on the stopwatch
accumulated: std::time::Duration,
// time since the stopwatch was started while talking to control-plane
start_cp: Option<Instant>,
// accumulated time on the stopwatch while talking to control-plane
accumulated_cp: std::time::Duration,
// label data
protocol: &'static str,
cache_miss: bool,
@@ -181,7 +198,11 @@ pub struct LatencyTimer {
outcome: &'static str,
}
pub struct LatencyTimerPause<'a> {
pub struct LatencyTimerUserIO<'a> {
timer: &'a mut LatencyTimer,
}
pub struct LatencyTimerControlPlane<'a> {
timer: &'a mut LatencyTimer,
}
@@ -190,6 +211,8 @@ impl LatencyTimer {
Self {
start: Some(Instant::now()),
accumulated: std::time::Duration::ZERO,
start_cp: None,
accumulated_cp: std::time::Duration::ZERO,
protocol,
cache_miss: false,
// by default we don't do pooling
@@ -199,11 +222,17 @@ impl LatencyTimer {
}
}
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
pub fn control_plane(&mut self) -> LatencyTimerControlPlane<'_> {
// start the stopwatch again
self.start = Some(Instant::now());
LatencyTimerControlPlane { timer: self }
}
pub fn wait_for_user(&mut self) -> LatencyTimerUserIO<'_> {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
LatencyTimerPause { timer: self }
LatencyTimerUserIO { timer: self }
}
pub fn cache_miss(&mut self) {
@@ -219,13 +248,25 @@ impl LatencyTimer {
}
}
impl Drop for LatencyTimerPause<'_> {
impl Drop for LatencyTimerUserIO<'_> {
fn drop(&mut self) {
// start the stopwatch again
self.timer.start = Some(Instant::now());
}
}
impl Drop for LatencyTimerControlPlane<'_> {
fn drop(&mut self) {
// stop the control-plane stopwatch and record the time that we have accumulated
let start = self
.timer
.start_cp
.take()
.expect("latency timer should be started");
self.timer.accumulated_cp += start.elapsed();
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration =
@@ -237,7 +278,21 @@ impl Drop for LatencyTimer {
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration.as_secs_f64())
.observe(duration.as_secs_f64());
let duration_cp = self
.start_cp
.map(|start| start.elapsed())
.unwrap_or_default()
+ self.accumulated_cp;
CONTROL_PLANE_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration_cp.as_secs_f64());
}
}
@@ -695,9 +750,13 @@ where
info!("compute node's state has likely changed; requesting a wake-up");
let node_info = loop {
let wake_res = match creds {
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
auth::BackendType::Console(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
#[cfg(feature = "testing")]
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
auth::BackendType::Postgres(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend

View File

@@ -405,7 +405,7 @@ async fn connect_to_compute(
conn_info: &ConnInfo,
conn_id: uuid::Uuid,
session_id: uuid::Uuid,
latency_timer: LatencyTimer,
mut latency_timer: LatencyTimer,
peer_addr: IpAddr,
) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref();
@@ -437,13 +437,13 @@ async fn connect_to_compute(
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http {
let allowed_ips = backend.get_allowed_ips(&extra).await?;
let allowed_ips = backend.get_allowed_ips(&extra, &mut latency_timer).await?;
if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed().into());
}
}
let node_info = backend
.wake_compute(&extra)
.wake_compute(&extra, &mut latency_timer)
.await?
.context("missing cache entry from wake_compute")?;

View File

@@ -142,7 +142,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.collect();
if !orphan_layers.is_empty() {
result.warnings.push(format!(
result.errors.push(format!(
"index_part.json does not contain layers from S3: {:?}",
orphan_layers
.iter()
@@ -170,7 +170,6 @@ pub(crate) async fn branch_cleanup_and_check_errors(
));
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
.into_iter()
@@ -216,8 +215,6 @@ pub(crate) enum BlobDataParseResult {
index_part_generation: Generation,
s3_layers: HashSet<(LayerFileName, Generation)>,
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect(Vec<String>),
}
@@ -248,7 +245,6 @@ pub(crate) async fn list_timeline_blobs(
timeline_dir_target.delimiter = String::new();
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
let mut initdb_archive: bool = false;
let stream = stream_listing(s3_client, &timeline_dir_target);
pin_mut!(stream);
@@ -262,10 +258,6 @@ pub(crate) async fn list_timeline_blobs(
tracing::info!("Index key {key}");
index_parts.push(obj)
}
Some("initdb.tar.zst") => {
tracing::info!("initdb archive {key}");
initdb_archive = true;
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
@@ -287,16 +279,6 @@ pub(crate) async fn list_timeline_blobs(
}
}
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::info!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Relic,
keys_to_remove: Vec::new(),
});
}
// Choose the index_part with the highest generation
let (index_part_object, index_part_generation) = match index_parts
.iter()

View File

@@ -86,9 +86,7 @@ impl S3Target {
if new_self.prefix_in_bucket.is_empty() {
new_self.prefix_in_bucket = format!("/{}/", new_segment);
} else {
if new_self.prefix_in_bucket.ends_with('/') {
let _ = new_self.prefix_in_bucket.pop();
}
let _ = new_self.prefix_in_bucket.pop();
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
}

View File

@@ -20,14 +20,6 @@ pub struct MetadataSummary {
with_warnings: HashSet<TenantTimelineId>,
with_garbage: HashSet<TenantTimelineId>,
indices_by_version: HashMap<usize, usize>,
indices_with_generation: usize,
indices_without_generation: usize,
/// Timelines that couldn't even parse metadata and/or object keys: extremely damaged
invalid_count: usize,
/// Timelines with just an initdb archive, left behind after deletion.
relic_count: usize,
layer_count: MinMaxHisto,
timeline_size_bytes: MinMaxHisto,
@@ -47,8 +39,6 @@ impl MinMaxHisto {
fn new() -> Self {
Self {
histo: histogram::Histogram::builder()
// Accomodate tenant sizes up to 32TiB
.maximum_value(32 * 1024 * 1024 * 1024 * 1024)
.build()
.expect("Bad histogram params"),
min: u64::MAX,
@@ -100,10 +90,6 @@ impl MetadataSummary {
with_warnings: HashSet::new(),
with_garbage: HashSet::new(),
indices_by_version: HashMap::new(),
indices_with_generation: 0,
indices_without_generation: 0,
invalid_count: 0,
relic_count: 0,
layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(),
layer_size_bytes: MinMaxHisto::new(),
@@ -125,35 +111,24 @@ impl MetadataSummary {
fn update_data(&mut self, data: &S3TimelineBlobData) {
self.count += 1;
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
} => {
*self
.indices_by_version
.entry(index_part.get_version())
.or_insert(0) += 1;
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
} = &data.blob_data
{
*self
.indices_by_version
.entry(index_part.get_version())
.or_insert(0) += 1;
// These statistics exist to track the transition to generations. By early 2024 there should be zero
// generation-less timelines in the field and this check can be removed.
if index_part_generation.is_none() {
self.indices_without_generation += 1;
} else {
self.indices_with_generation += 1;
}
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
}
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
}
BlobDataParseResult::Incorrect(_) => self.invalid_count += 1,
BlobDataParseResult::Relic => self.relic_count += 1,
}
}
@@ -181,10 +156,7 @@ impl MetadataSummary {
With errors: {1}
With warnings: {2}
With garbage: {3}
Invalid: {9}
Relics: {10}
Index versions: {version_summary}
Indices with/without generations: {7}/{8}
Timeline size bytes: {4}
Layer size bytes: {5}
Timeline layer count: {6}
@@ -196,10 +168,6 @@ Timeline layer count: {6}
self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(),
self.layer_count.oneline(),
self.indices_with_generation,
self.indices_without_generation,
self.invalid_count,
self.relic_count
)
}