diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 2cf3cdeaa7..8e0dbe6ce4 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -256,7 +256,16 @@ fn update_rusage_metrics() { DISK_IO_BYTES .with_label_values(&["write"]) .set(rusage_stats.ru_oublock * BYTES_IN_BLOCK); - MAXRSS_KB.set(rusage_stats.ru_maxrss); + + // On macOS, the unit of maxrss is bytes; on Linux, it's kilobytes. https://stackoverflow.com/a/59915669 + #[cfg(target_os = "macos")] + { + MAXRSS_KB.set(rusage_stats.ru_maxrss / 1024); + } + #[cfg(not(target_os = "macos"))] + { + MAXRSS_KB.set(rusage_stats.ru_maxrss); + } } fn get_rusage_stats() -> libc::rusage { diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 78e4a3d735..a9e19e8cc7 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -162,6 +162,10 @@ impl KeySpace { .sum() } + pub fn is_empty(&self) -> bool { + self.total_size() == 0 + } + fn overlaps_at(&self, range: &Range) -> Option { match self.ranges.binary_search_by_key(&range.end, |r| r.start) { Ok(0) => None, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 66bf21ddec..6ce7f286b3 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -105,31 +105,39 @@ pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy = Lazy::new(|| { }); // Metrics collected on operations on the storage repository. +#[derive( + Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr, +)] +pub(crate) enum GetKind { + Singular, + Vectored, +} pub(crate) struct ReconstructTimeMetrics { - ok: Histogram, - err: Histogram, + singular: Histogram, + vectored: Histogram, } pub(crate) static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { let inner = register_histogram_vec!( "pageserver_getpage_reconstruct_seconds", "Time spent in reconstruct_value (reconstruct a page from deltas)", - &["result"], + &["get_kind"], CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric"); + ReconstructTimeMetrics { - ok: inner.get_metric_with_label_values(&["ok"]).unwrap(), - err: inner.get_metric_with_label_values(&["err"]).unwrap(), + singular: inner.with_label_values(&[GetKind::Singular.into()]), + vectored: inner.with_label_values(&[GetKind::Vectored.into()]), } }); impl ReconstructTimeMetrics { - pub(crate) fn for_result(&self, result: &Result) -> &Histogram { - match result { - Ok(_) => &self.ok, - Err(_) => &self.err, + pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram { + match get_kind { + GetKind::Singular => &self.singular, + GetKind::Vectored => &self.vectored, } } } @@ -142,13 +150,33 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::n .expect("failed to define a metric") }); -pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { - register_histogram!( +pub(crate) struct ReconstructDataTimeMetrics { + singular: Histogram, + vectored: Histogram, +} + +impl ReconstructDataTimeMetrics { + pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram { + match get_kind { + GetKind::Singular => &self.singular, + GetKind::Vectored => &self.vectored, + } + } +} + +pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { + let inner = register_histogram_vec!( "pageserver_getpage_get_reconstruct_data_seconds", "Time spent in get_reconstruct_value_data", + &["get_kind"], CRITICAL_OP_BUCKETS.into(), ) - .expect("failed to define a metric") + .expect("failed to define a metric"); + + ReconstructDataTimeMetrics { + singular: inner.with_label_values(&[GetKind::Singular.into()]), + vectored: inner.with_label_values(&[GetKind::Vectored.into()]), + } }); pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 22b8a17874..703654a37c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -86,7 +86,7 @@ use crate::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::metrics::{ - TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, + GetKind, TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, }; use crate::pgdatadir_mapping::CalculateLogicalSizeError; use crate::tenant::config::TenantConfOpt; @@ -797,7 +797,9 @@ impl Timeline { img: cached_page_img, }; - let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer(); + let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME + .for_get_kind(GetKind::Singular) + .start_timer(); let path = self .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) .await?; @@ -807,7 +809,7 @@ impl Timeline { let res = self.reconstruct_value(key, lsn, reconstruct_state).await; let elapsed = start.elapsed(); crate::metrics::RECONSTRUCT_TIME - .for_result(&res) + .for_get_kind(GetKind::Singular) .observe(elapsed.as_secs_f64()); if cfg!(feature = "testing") && res.is_err() { @@ -969,9 +971,22 @@ impl Timeline { ) -> Result>, GetVectoredError> { let mut reconstruct_state = ValuesReconstructState::new(); + let get_kind = if keyspace.total_size() == 1 { + GetKind::Singular + } else { + GetKind::Vectored + }; + + let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME + .for_get_kind(get_kind) + .start_timer(); self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx) .await?; + get_data_timer.stop_and_record(); + let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME + .for_get_kind(get_kind) + .start_timer(); let mut results: BTreeMap> = BTreeMap::new(); let layers_visited = reconstruct_state.get_layers_visited(); for (key, res) in reconstruct_state.keys { @@ -987,6 +1002,7 @@ impl Timeline { } } } + reconstruct_timer.stop_and_record(); // Note that this is an approximation. Tracking the exact number of layers visited // per key requires virtually unbounded memory usage and is inefficient @@ -3127,55 +3143,61 @@ impl Timeline { unmapped_keyspace.remove_overlapping_with(&keys_done_last_step); completed_keyspace.merge(&keys_done_last_step); - let guard = timeline.layers.read().await; - let layers = guard.layer_map(); + // Do not descent any further if the last layer we visited + // completed all keys in the keyspace it inspected. This is not + // required for correctness, but avoids visiting extra layers + // which turns out to be a perf bottleneck in some cases. + if !unmapped_keyspace.is_empty() { + let guard = timeline.layers.read().await; + let layers = guard.layer_map(); - let in_memory_layer = layers.find_in_memory_layer(|l| { - let start_lsn = l.get_lsn_range().start; - cont_lsn > start_lsn - }); + let in_memory_layer = layers.find_in_memory_layer(|l| { + let start_lsn = l.get_lsn_range().start; + cont_lsn > start_lsn + }); - match in_memory_layer { - Some(l) => { - let lsn_range = l.get_lsn_range().start..cont_lsn; - fringe.update( - ReadableLayer::InMemoryLayer(l), - unmapped_keyspace.clone(), - lsn_range, - ); - } - None => { - for range in unmapped_keyspace.ranges.iter() { - let results = layers.range_search(range.clone(), cont_lsn); + match in_memory_layer { + Some(l) => { + let lsn_range = l.get_lsn_range().start..cont_lsn; + fringe.update( + ReadableLayer::InMemoryLayer(l), + unmapped_keyspace.clone(), + lsn_range, + ); + } + None => { + for range in unmapped_keyspace.ranges.iter() { + let results = layers.range_search(range.clone(), cont_lsn); - results - .found - .into_iter() - .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { - ( - ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)), - keyspace_accum.to_keyspace(), - lsn_floor..cont_lsn, - ) - }) - .for_each(|(layer, keyspace, lsn_range)| { - fringe.update(layer, keyspace, lsn_range) - }); + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)), + keyspace_accum.to_keyspace(), + lsn_floor..cont_lsn, + ) + }) + .for_each(|(layer, keyspace, lsn_range)| { + fringe.update(layer, keyspace, lsn_range) + }); + } } } - } - // It's safe to drop the layer map lock after planning the next round of reads. - // The fringe keeps readable handles for the layers which are safe to read even - // if layers were compacted or flushed. - // - // The more interesting consideration is: "Why is the read algorithm still correct - // if the layer map changes while it is operating?". Doing a vectored read on a - // timeline boils down to pushing an imaginary lsn boundary downwards for each range - // covered by the read. The layer map tells us how to move the lsn downwards for a - // range at *a particular point in time*. It is fine for the answer to be different - // at two different time points. - drop(guard); + // It's safe to drop the layer map lock after planning the next round of reads. + // The fringe keeps readable handles for the layers which are safe to read even + // if layers were compacted or flushed. + // + // The more interesting consideration is: "Why is the read algorithm still correct + // if the layer map changes while it is operating?". Doing a vectored read on a + // timeline boils down to pushing an imaginary lsn boundary downwards for each range + // covered by the read. The layer map tells us how to move the lsn downwards for a + // range at *a particular point in time*. It is fine for the answer to be different + // at two different time points. + drop(guard); + } if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() { let next_cont_lsn = lsn_range.start; diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 724024b072..0411bfc908 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -403,7 +403,7 @@ async fn main() -> anyhow::Result<()> { maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); client_tasks.spawn(usage_metrics::task_backup( &metrics_config.backup_metric_collection_config, - cancellation_token, + cancellation_token.clone(), )); } @@ -423,7 +423,10 @@ async fn main() -> anyhow::Result<()> { let cache = api.caches.endpoints_cache.clone(); let con = regional_redis_client; let span = tracing::info_span!("endpoints_cache"); - maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span)); + maintenance_tasks.spawn( + async move { cache.do_read(con, cancellation_token.clone()).await } + .instrument(span), + ); } } } diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs index 2aa1986d5e..02511e6ff7 100644 --- a/proxy/src/cache/endpoints.rs +++ b/proxy/src/cache/endpoints.rs @@ -4,6 +4,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, + time::Duration, }; use dashmap::DashSet; @@ -13,6 +14,7 @@ use redis::{ }; use serde::Deserialize; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use tracing::info; use crate::{ @@ -111,16 +113,22 @@ impl EndpointsCache { pub async fn do_read( &self, mut con: ConnectionWithCredentialsProvider, + cancellation_token: CancellationToken, ) -> anyhow::Result { let mut last_id = "0-0".to_string(); loop { - self.ready.store(false, Ordering::Release); if let Err(e) = con.connect().await { tracing::error!("error connecting to redis: {:?}", e); - continue; + self.ready.store(false, Ordering::Release); } if let Err(e) = self.read_from_stream(&mut con, &mut last_id).await { tracing::error!("error reading from redis: {:?}", e); + self.ready.store(false, Ordering::Release); + } + if cancellation_token.is_cancelled() { + info!("cancellation token is cancelled, exiting"); + tokio::time::sleep(Duration::from_secs(60 * 60 * 24 * 7)).await; + // 1 week. } tokio::time::sleep(self.config.retry_interval).await; }