diff --git a/Cargo.lock b/Cargo.lock index 612087c97a..6be08d16b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d299f547288d6db8d5c3a2916f7b2f66134b15b8c1ac1c4357dd3b8752af7bb2" +dependencies = [ + "critical-section", +] + [[package]] name = "atty" version = "0.2.14" @@ -997,6 +1006,12 @@ dependencies = [ "itertools", ] +[[package]] +name = "critical-section" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1234,6 +1249,47 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-map" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c25992259941eb7e57b936157961b217a4fc8597829ddef0596d6c3cd86e1a" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "enumset" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753" +dependencies = [ + "enumset_derive", +] + +[[package]] +name = "enumset_derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1521,6 +1577,15 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1548,6 +1613,18 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "heapless" +version = "0.8.0" +source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "spin 0.9.4", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -2344,6 +2421,8 @@ dependencies = [ "crc32c", "criterion", "crossbeam-utils", + "enum-map", + "enumset", "fail", "futures", "git-version", @@ -2376,6 +2455,8 @@ dependencies = [ "serde_with", "signal-hook", "storage_broker", + "strum", + "strum_macros", "svg_fmt", "tempfile", "tenant_size_model", @@ -2400,6 +2481,7 @@ dependencies = [ "byteorder", "bytes", "const_format", + "enum-map", "postgres_ffi", "serde", "serde_with", @@ -3021,7 +3103,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -3568,6 +3650,21 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" +dependencies = [ + "lock_api", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -4347,6 +4444,7 @@ dependencies = [ "bytes", "criterion", "git-version", + "heapless", "hex", "hex-literal", "hyper", diff --git a/Cargo.toml b/Cargo.toml index c335b64175..9033671f55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,8 @@ comfy-table = "6.1" const_format = "0.2" crc32c = "0.6" crossbeam-utils = "0.8.5" +enum-map = "2.4.2" +enumset = "1.0.12" fail = "0.5.0" fs2 = "0.4.3" futures = "0.3" @@ -120,6 +122,9 @@ postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", re tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } +## Other git libraries +heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending + ## Local libraries consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index a3883f07a7..dafb246632 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -13,5 +13,6 @@ bytes.workspace = true byteorder.workspace = true utils.workspace = true postgres_ffi.workspace = true +enum-map.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 6f53f4a01d..c56a48f8ec 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,9 +1,14 @@ -use std::num::{NonZeroU64, NonZeroUsize}; +use std::{ + collections::HashMap, + num::{NonZeroU64, NonZeroUsize}, + time::SystemTime, +}; use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use utils::{ + history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, }; @@ -233,6 +238,82 @@ pub struct LayerMapInfo { pub historic_layers: Vec, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)] +#[repr(usize)] +pub enum LayerAccessKind { + GetValueReconstructData, + Iter, + KeyIter, + Dump, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LayerAccessStatFullDetails { + pub when_millis_since_epoch: u64, + pub task_kind: &'static str, + pub access_kind: LayerAccessKind, +} + +/// An event that impacts the layer's residence status. +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LayerResidenceEvent { + /// The time when the event occurred. + /// NB: this timestamp is captured while the residence status changes. + /// So, it might be behind/ahead of the actual residence change by a short amount of time. + /// + #[serde(rename = "timestamp_millis_since_epoch")] + #[serde_as(as = "serde_with::TimestampMilliSeconds")] + timestamp: SystemTime, + /// The new residence status of the layer. + status: LayerResidenceStatus, + /// The reason why we had to record this event. + reason: LayerResidenceEventReason, +} + +/// The reason for recording a given [`ResidenceEvent`]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum LayerResidenceEventReason { + /// The layer map is being populated, e.g. during timeline load or attach. + /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. + /// We need to record such events because there is no persistent storage for the events. + LayerLoad, + /// We just created the layer (e.g., freeze_and_flush or compaction). + /// Such layers are always [`LayerResidenceStatus::Resident`]. + LayerCreate, + /// We on-demand downloaded or evicted the given layer. + ResidenceChange, +} + +/// The residence status of the layer, after the given [`LayerResidenceEvent`]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum LayerResidenceStatus { + /// Residence status for a layer file that exists locally. + /// It may also exist on the remote, we don't care here. + Resident, + /// Residence status for a layer file that only exists on the remote. + Evicted, +} + +impl LayerResidenceEvent { + pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self { + Self { + status, + reason, + timestamp: SystemTime::now(), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct LayerAccessStats { + pub access_count_by_access_kind: HashMap, + pub task_kind_access_flag: Vec<&'static str>, + pub first: Option, + pub accesses_history: HistoryBufferWithDropCounter, + pub residence_events_history: HistoryBufferWithDropCounter, +} + #[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] @@ -262,6 +343,7 @@ pub enum HistoricLayerInfo { #[serde_as(as = "DisplayFromStr")] lsn_end: Lsn, remote: bool, + access_stats: LayerAccessStats, }, Image { layer_file_name: String, @@ -270,6 +352,7 @@ pub enum HistoricLayerInfo { #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, remote: bool, + access_stats: LayerAccessStats, }, } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 1ce3155083..92e805ac58 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -11,6 +11,7 @@ async-trait.workspace = true anyhow.workspace = true bincode.workspace = true bytes.workspace = true +heapless.workspace = true hyper = { workspace = true, features = ["full"] } routerify.workspace = true serde.workspace = true diff --git a/libs/utils/src/history_buffer.rs b/libs/utils/src/history_buffer.rs new file mode 100644 index 0000000000..1f07f5560f --- /dev/null +++ b/libs/utils/src/history_buffer.rs @@ -0,0 +1,161 @@ +//! A heapless buffer for events of sorts. + +use std::ops; + +use heapless::HistoryBuffer; + +#[derive(Debug, Clone)] +pub struct HistoryBufferWithDropCounter { + buffer: HistoryBuffer, + drop_count: u64, +} + +impl HistoryBufferWithDropCounter { + pub fn write(&mut self, data: T) { + let len_before = self.buffer.len(); + self.buffer.write(data); + let len_after = self.buffer.len(); + self.drop_count += u64::from(len_before == len_after); + } + pub fn drop_count(&self) -> u64 { + self.drop_count + } + pub fn map U>(&self, f: F) -> HistoryBufferWithDropCounter { + let mut buffer = HistoryBuffer::new(); + buffer.extend(self.buffer.oldest_ordered().map(f)); + HistoryBufferWithDropCounter:: { + buffer, + drop_count: self.drop_count, + } + } +} + +impl Default for HistoryBufferWithDropCounter { + fn default() -> Self { + Self { + buffer: HistoryBuffer::default(), + drop_count: 0, + } + } +} + +impl ops::Deref for HistoryBufferWithDropCounter { + type Target = HistoryBuffer; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + +#[derive(serde::Serialize)] +struct SerdeRepr { + buffer: Vec, + drop_count: u64, +} + +impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter> for SerdeRepr +where + T: Clone + serde::Serialize, +{ + fn from(value: &'a HistoryBufferWithDropCounter) -> Self { + let HistoryBufferWithDropCounter { buffer, drop_count } = value; + SerdeRepr { + buffer: buffer.iter().cloned().collect(), + drop_count: *drop_count, + } + } +} + +impl serde::Serialize for HistoryBufferWithDropCounter +where + T: Clone + serde::Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + SerdeRepr::from(self).serialize(serializer) + } +} + +#[cfg(test)] +mod test { + use super::HistoryBufferWithDropCounter; + + #[test] + fn test_basics() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + b.write(2); + b.write(3); + assert!(b.iter().any(|e| *e == 2)); + assert!(b.iter().any(|e| *e == 3)); + assert!(!b.iter().any(|e| *e == 1)); + } + + #[test] + fn test_drop_count_works() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + assert_eq!(b.drop_count(), 0); + b.write(2); + assert_eq!(b.drop_count(), 0); + b.write(3); + assert_eq!(b.drop_count(), 1); + b.write(4); + assert_eq!(b.drop_count(), 2); + } + + #[test] + fn test_clone_works() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + b.write(2); + b.write(3); + assert_eq!(b.drop_count(), 1); + let mut c = b.clone(); + assert_eq!(c.drop_count(), 1); + assert!(c.iter().any(|e| *e == 2)); + assert!(c.iter().any(|e| *e == 3)); + assert!(!c.iter().any(|e| *e == 1)); + + c.write(4); + assert!(c.iter().any(|e| *e == 4)); + assert!(!b.iter().any(|e| *e == 4)); + } + + #[test] + fn test_map() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + + b.write(1); + assert_eq!(b.drop_count(), 0); + { + let c = b.map(|i| i + 10); + assert_eq!(c.oldest_ordered().cloned().collect::>(), vec![11]); + assert_eq!(c.drop_count(), 0); + } + + b.write(2); + assert_eq!(b.drop_count(), 0); + { + let c = b.map(|i| i + 10); + assert_eq!( + c.oldest_ordered().cloned().collect::>(), + vec![11, 12] + ); + assert_eq!(c.drop_count(), 0); + } + + b.write(3); + assert_eq!(b.drop_count(), 1); + { + let c = b.map(|i| i + 10); + assert_eq!( + c.oldest_ordered().cloned().collect::>(), + vec![12, 13] + ); + assert_eq!(c.drop_count(), 1); + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 6d35fd9f7b..9ddd702c72 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -52,6 +52,8 @@ pub mod signals; pub mod fs_ext; +pub mod history_buffer; + /// use with fail::cfg("$name", "return(2000)") #[macro_export] macro_rules! failpoint_sleep_millis_async { diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 66c25e8576..f3ad2c5de6 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -67,6 +67,10 @@ utils.workspace = true workspace_hack.workspace = true reqwest.workspace = true rpds.workspace = true +enum-map.workspace = true +enumset.workspace = true +strum.workspace = true +strum_macros.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f81f5da84c..134ec1c976 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -19,6 +19,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::TenantMapInsertError; +use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; use utils::{ @@ -529,10 +530,13 @@ async fn tenant_size_handler(request: Request) -> Result, A async fn layer_map_info_handler(request: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let reset: LayerAccessStatsReset = + parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset); + check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(); + let layer_map_info = timeline.layer_map_info(reset); json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 8e0149046d..c4f213e755 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -169,7 +169,14 @@ task_local! { /// Note that we don't try to limit how many task of a certain kind can be running /// at the same time. /// -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive( + Debug, + // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy + enumset::EnumSetType, + serde::Serialize, + serde::Deserialize, + strum_macros::IntoStaticStr, +)] pub enum TaskKind { // Pageserver startup, i.e., `main` Startup, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index d719e19bf9..f7aaebf2af 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -9,13 +9,21 @@ mod remote_layer; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::{Key, Value}; +use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; -use pageserver_api::models::HistoricLayerInfo; +use enum_map::EnumMap; +use enumset::EnumSet; +use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{ + HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, +}; use std::ops::Range; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; +use utils::history_buffer::HistoryBufferWithDropCounter; use utils::{ id::{TenantId, TimelineId}, @@ -83,6 +91,149 @@ pub enum ValueReconstructResult { Missing, } +#[derive(Debug)] +pub struct LayerAccessStats(Mutex); + +#[derive(Debug, Default, Clone)] +struct LayerAccessStatsInner { + first_access: Option, + count_by_access_kind: EnumMap, + task_kind_flag: EnumSet, + last_accesses: HistoryBufferWithDropCounter, + last_residence_changes: HistoryBufferWithDropCounter, +} + +#[derive(Debug, Clone)] +struct LayerAccessStatFullDetails { + when: SystemTime, + task_kind: TaskKind, + access_kind: LayerAccessKind, +} + +#[derive(Clone, Copy, strum_macros::EnumString)] +pub enum LayerAccessStatsReset { + NoReset, + JustTaskKindFlags, + AllStats, +} + +fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 { + ts.duration_since(UNIX_EPOCH) + .expect("better to die in this unlikely case than report false stats") + .as_millis() + .try_into() + .expect("64 bits is enough for few more years") +} + +impl LayerAccessStatFullDetails { + fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails { + let Self { + when, + task_kind, + access_kind, + } = self; + pageserver_api::models::LayerAccessStatFullDetails { + when_millis_since_epoch: system_time_to_millis_since_epoch(when), + task_kind: task_kind.into(), // into static str, powered by strum_macros + access_kind: *access_kind, + } + } +} + +impl LayerAccessStats { + pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { + let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); + new + } + + pub(crate) fn for_new_layer_file() -> Self { + let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + new.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + new + } + + /// Creates a clone of `self` and records `new_status` in the clone. + /// The `new_status` is not recorded in `self` + pub(crate) fn clone_for_residence_change( + &self, + new_status: LayerResidenceStatus, + ) -> LayerAccessStats { + let clone = { + let inner = self.0.lock().unwrap(); + inner.clone() + }; + let new = LayerAccessStats(Mutex::new(clone)); + new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); + new + } + + fn record_residence_event( + &self, + status: LayerResidenceStatus, + reason: LayerResidenceEventReason, + ) { + let mut inner = self.0.lock().unwrap(); + inner + .last_residence_changes + .write(LayerResidenceEvent::new(status, reason)); + } + + fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) { + let mut inner = self.0.lock().unwrap(); + let this_access = LayerAccessStatFullDetails { + when: SystemTime::now(), + task_kind, + access_kind, + }; + inner + .first_access + .get_or_insert_with(|| this_access.clone()); + inner.count_by_access_kind[access_kind] += 1; + inner.task_kind_flag |= task_kind; + inner.last_accesses.write(this_access); + } + fn to_api_model( + &self, + reset: LayerAccessStatsReset, + ) -> pageserver_api::models::LayerAccessStats { + let mut inner = self.0.lock().unwrap(); + let LayerAccessStatsInner { + first_access, + count_by_access_kind, + task_kind_flag, + last_accesses, + last_residence_changes, + } = &*inner; + let ret = pageserver_api::models::LayerAccessStats { + access_count_by_access_kind: count_by_access_kind + .iter() + .map(|(kind, count)| (kind, *count)) + .collect(), + task_kind_access_flag: task_kind_flag + .iter() + .map(|task_kind| task_kind.into()) // into static str, powered by strum_macros + .collect(), + first: first_access.as_ref().map(|a| a.to_api_model()), + accesses_history: last_accesses.map(|m| m.to_api_model()), + residence_events_history: last_residence_changes.clone(), + }; + match reset { + LayerAccessStatsReset::NoReset => (), + LayerAccessStatsReset::JustTaskKindFlags => { + inner.task_kind_flag.clear(); + } + LayerAccessStatsReset::AllStats => { + *inner = LayerAccessStatsInner::default(); + } + } + ret + } +} + /// Supertrait of the [`Layer`] trait that captures the bare minimum interface /// required by [`LayerMap`]. pub trait Layer: Send + Sync { @@ -190,7 +341,9 @@ pub trait PersistentLayer: Layer { /// current_physical_size is computed as the som of this value. fn file_size(&self) -> Option; - fn info(&self) -> HistoricLayerInfo; + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo; + + fn access_stats(&self) -> &LayerAccessStats; } pub fn downcast_remote_layer( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 3a23e96325..3b66d5444c 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -37,7 +37,7 @@ use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; -use pageserver_api::models::HistoricLayerInfo; +use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; @@ -55,7 +55,10 @@ use utils::{ lsn::Lsn, }; -use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOrConf}; +use super::{ + DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, + LayerKeyIter, LayerResidenceStatus, PathOrConf, +}; /// /// Header stored in the beginning of the file @@ -185,6 +188,8 @@ pub struct DeltaLayer { pub file_size: u64, + access_stats: LayerAccessStats, + inner: RwLock, } @@ -231,7 +236,7 @@ impl Layer for DeltaLayer { return Ok(()); } - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::Dump, ctx)?; println!( "index_start_blk: {}, root {}", @@ -304,7 +309,7 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; // Scan the page versions backwards, starting from `lsn`. let file = inner.file.as_ref().unwrap(); @@ -395,7 +400,9 @@ impl PersistentLayer for DeltaLayer { } fn iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(ctx).context("load delta layer")?; + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer")?; Ok(match DeltaValueIter::new(inner) { Ok(iter) => Box::new(iter), Err(err) => Box::new(std::iter::once(Err(err))), @@ -403,7 +410,7 @@ impl PersistentLayer for DeltaLayer { } fn key_iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::KeyIter, ctx)?; Ok(Box::new( DeltaKeyIter::new(inner).context("Layer index is corrupted")?, )) @@ -419,18 +426,25 @@ impl PersistentLayer for DeltaLayer { Some(self.file_size) } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); + let access_stats = self.access_stats.to_api_model(reset); + HistoricLayerInfo::Delta { layer_file_name, layer_file_size: Some(self.file_size), lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: false, + access_stats, } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl DeltaLayer { @@ -475,7 +489,13 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, _ctx: &RequestContext) -> Result> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result> { + self.access_stats + .record_access(access_kind, ctx.task_kind()); loop { // Quick exit if already loaded let inner = self.inner.read().unwrap(); @@ -556,6 +576,7 @@ impl DeltaLayer { tenant_id: TenantId, filename: &DeltaFileName, file_size: u64, + access_stats: LayerAccessStats, ) -> DeltaLayer { DeltaLayer { path_or_conf: PathOrConf::Conf(conf), @@ -564,6 +585,7 @@ impl DeltaLayer { key_range: filename.key_range.clone(), lsn_range: filename.lsn_range.clone(), file_size, + access_stats, inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -593,6 +615,7 @@ impl DeltaLayer { key_range: summary.key_range, lsn_range: summary.lsn_range, file_size: metadata.len(), + access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -763,6 +786,7 @@ impl DeltaLayerWriterInner { key_range: self.key_start..key_end, lsn_range: self.lsn_range.clone(), file_size: metadata.len(), + access_stats: LayerAccessStats::for_new_layer_file(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c11f671ffe..4c9ca0221f 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,14 +27,14 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ - PersistentLayer, ValueReconstructResult, ValueReconstructState, + LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState, }; use crate::virtual_file::VirtualFile; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use hex; -use pageserver_api::models::HistoricLayerInfo; +use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; @@ -53,7 +53,7 @@ use utils::{ }; use super::filename::{ImageFileName, LayerFileName}; -use super::{Layer, LayerIter, PathOrConf}; +use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf}; /// /// Header stored in the beginning of the file @@ -112,6 +112,8 @@ pub struct ImageLayer { // This entry contains an image of all pages as of this LSN pub lsn: Lsn, + access_stats: LayerAccessStats, + inner: RwLock, } @@ -155,7 +157,7 @@ impl Layer for ImageLayer { return Ok(()); } - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::Dump, ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file); @@ -182,7 +184,7 @@ impl Layer for ImageLayer { assert!(lsn_range.start >= self.lsn); assert!(lsn_range.end >= self.lsn); - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); @@ -237,7 +239,7 @@ impl PersistentLayer for ImageLayer { Some(self.file_size) } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -246,8 +248,13 @@ impl PersistentLayer for ImageLayer { layer_file_size: Some(self.file_size), lsn_start: lsn_range.start, remote: false, + access_stats: self.access_stats.to_api_model(reset), } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl ImageLayer { @@ -285,7 +292,13 @@ impl ImageLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, _ctx: &RequestContext) -> Result> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result> { + self.access_stats + .record_access(access_kind, ctx.task_kind()); loop { // Quick exit if already loaded let inner = self.inner.read().unwrap(); @@ -365,6 +378,7 @@ impl ImageLayer { tenant_id: TenantId, filename: &ImageFileName, file_size: u64, + access_stats: LayerAccessStats, ) -> ImageLayer { ImageLayer { path_or_conf: PathOrConf::Conf(conf), @@ -373,6 +387,7 @@ impl ImageLayer { key_range: filename.key_range.clone(), lsn: filename.lsn, file_size, + access_stats, inner: RwLock::new(ImageLayerInner { loaded: false, file: None, @@ -400,6 +415,7 @@ impl ImageLayer { key_range: summary.key_range, lsn: summary.lsn, file_size: metadata.len(), + access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), inner: RwLock::new(ImageLayerInner { file: None, loaded: false, @@ -559,6 +575,7 @@ impl ImageLayerWriterInner { key_range: self.key_range.clone(), lsn: self.lsn, file_size: metadata.len(), + access_stats: LayerAccessStats::for_new_layer_file(), inner: RwLock::new(ImageLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 8656ad0289..d8d66622a2 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -19,7 +19,10 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName, LayerFileName}; use super::image_layer::ImageLayer; -use super::{DeltaLayer, LayerIter, LayerKeyIter, PersistentLayer}; +use super::{ + DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, + LayerResidenceStatus, PersistentLayer, +}; #[derive(Debug)] pub struct RemoteLayer { @@ -36,6 +39,8 @@ pub struct RemoteLayer { is_incremental: bool, + access_stats: LayerAccessStats, + pub(crate) ongoing_download: Arc, } @@ -138,7 +143,7 @@ impl PersistentLayer for RemoteLayer { self.layer_metadata.file_size() } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -149,6 +154,7 @@ impl PersistentLayer for RemoteLayer { lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: true, + access_stats: self.access_stats.to_api_model(reset), } } else { HistoricLayerInfo::Image { @@ -156,9 +162,14 @@ impl PersistentLayer for RemoteLayer { layer_file_size: self.layer_metadata.file_size(), lsn_start: lsn_range.start, remote: true, + access_stats: self.access_stats.to_api_model(reset), } } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl RemoteLayer { @@ -167,6 +178,7 @@ impl RemoteLayer { timelineid: TimelineId, fname: &ImageFileName, layer_metadata: &LayerFileMetadata, + access_stats: LayerAccessStats, ) -> RemoteLayer { RemoteLayer { tenantid, @@ -178,6 +190,7 @@ impl RemoteLayer { file_name: fname.to_owned().into(), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), + access_stats, } } @@ -186,6 +199,7 @@ impl RemoteLayer { timelineid: TimelineId, fname: &DeltaFileName, layer_metadata: &LayerFileMetadata, + access_stats: LayerAccessStats, ) -> RemoteLayer { RemoteLayer { tenantid, @@ -197,6 +211,7 @@ impl RemoteLayer { file_name: fname.to_owned().into(), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), + access_stats, } } @@ -217,6 +232,8 @@ impl RemoteLayer { self.tenantid, &fname, file_size, + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } else { let fname = ImageFileName { @@ -229,6 +246,8 @@ impl RemoteLayer { self.tenantid, &fname, file_size, + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fc6f7ad96c..6ae23c584b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -10,7 +10,7 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - DownloadRemoteLayersTaskState, LayerMapInfo, TimelineState, + DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, }; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -30,8 +30,8 @@ use crate::broker_client::is_broker_client_initialized; use crate::context::{DownloadBehavior, RequestContext}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ - DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName, - RemoteLayer, + DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, + LayerAccessStats, LayerFileName, RemoteLayer, }; use crate::tenant::{ ephemeral_file::is_ephemeral_file, @@ -72,7 +72,7 @@ use walreceiver::spawn_connection_manager_task; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; -use super::storage_layer::{DeltaLayer, ImageLayer, Layer}; +use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum FlushLoopState { @@ -835,7 +835,7 @@ impl Timeline { self.state.subscribe() } - pub fn layer_map_info(&self) -> LayerMapInfo { + pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let layer_map = self.layers.read().unwrap(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { @@ -847,7 +847,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { - historic_layers.push(historic_layer.info()); + historic_layers.push(historic_layer.info(reset)); } LayerMapInfo { @@ -891,12 +891,18 @@ impl Timeline { self.timeline_id, &image_name, &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, self.timeline_id, &delta_name, &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), }); @@ -1172,6 +1178,7 @@ impl Timeline { self.tenant_id, &imgfilename, file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1203,6 +1210,7 @@ impl Timeline { self.tenant_id, &deltafilename, file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1340,6 +1348,7 @@ impl Timeline { self.timeline_id, imgfilename, &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); @@ -1364,6 +1373,7 @@ impl Timeline { self.timeline_id, deltafilename, &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); updates.insert_historic(remote_layer);