From 58fa4f0eb7f237e5d327566819eb4317b951554f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 31 Jan 2023 11:49:15 +0100 Subject: [PATCH] maintain access stats for historic layers This patch adds basic access statistics for historic layers and exposes them in the management API's `LayerMapInfo`. We record the accesses in the `{Delta,Image}Layer::load()` function because it's the common path of * page_service (`Timline::get_reconstruct_data()`) * Compaction (`PersistentLayer::iter()` and `PersistentLayer::key_iter()`) The stats survive residence status changes, and record these as well. When scraping the layer map endpoint to record its evolution over time, one must account for stat resets because they are in-memory only and will reset on pageserver restart. Use the launch timestamp header added by (#3527) to identify pageserver restarts. This is PR https://github.com/neondatabase/neon/pull/3496 --- Cargo.lock | 100 ++++++++++- Cargo.toml | 5 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/models.rs | 85 ++++++++- libs/utils/Cargo.toml | 1 + libs/utils/src/history_buffer.rs | 161 ++++++++++++++++++ libs/utils/src/lib.rs | 2 + pageserver/Cargo.toml | 4 + pageserver/src/http/routes.rs | 6 +- pageserver/src/task_mgr.rs | 9 +- pageserver/src/tenant/storage_layer.rs | 159 ++++++++++++++++- .../src/tenant/storage_layer/delta_layer.rs | 40 ++++- .../src/tenant/storage_layer/image_layer.rs | 31 +++- .../src/tenant/storage_layer/remote_layer.rs | 23 ++- pageserver/src/tenant/timeline.rs | 22 ++- 15 files changed, 619 insertions(+), 30 deletions(-) create mode 100644 libs/utils/src/history_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index 612087c97a..6be08d16b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d299f547288d6db8d5c3a2916f7b2f66134b15b8c1ac1c4357dd3b8752af7bb2" +dependencies = [ + "critical-section", +] + [[package]] name = "atty" version = "0.2.14" @@ -997,6 +1006,12 @@ dependencies = [ "itertools", ] +[[package]] +name = "critical-section" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1234,6 +1249,47 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-map" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c25992259941eb7e57b936157961b217a4fc8597829ddef0596d6c3cd86e1a" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "enumset" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753" +dependencies = [ + "enumset_derive", +] + +[[package]] +name = "enumset_derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1521,6 +1577,15 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1548,6 +1613,18 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "heapless" +version = "0.8.0" +source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "spin 0.9.4", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -2344,6 +2421,8 @@ dependencies = [ "crc32c", "criterion", "crossbeam-utils", + "enum-map", + "enumset", "fail", "futures", "git-version", @@ -2376,6 +2455,8 @@ dependencies = [ "serde_with", "signal-hook", "storage_broker", + "strum", + "strum_macros", "svg_fmt", "tempfile", "tenant_size_model", @@ -2400,6 +2481,7 @@ dependencies = [ "byteorder", "bytes", "const_format", + "enum-map", "postgres_ffi", "serde", "serde_with", @@ -3021,7 +3103,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -3568,6 +3650,21 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" +dependencies = [ + "lock_api", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -4347,6 +4444,7 @@ dependencies = [ "bytes", "criterion", "git-version", + "heapless", "hex", "hex-literal", "hyper", diff --git a/Cargo.toml b/Cargo.toml index c335b64175..9033671f55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,8 @@ comfy-table = "6.1" const_format = "0.2" crc32c = "0.6" crossbeam-utils = "0.8.5" +enum-map = "2.4.2" +enumset = "1.0.12" fail = "0.5.0" fs2 = "0.4.3" futures = "0.3" @@ -120,6 +122,9 @@ postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", re tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } +## Other git libraries +heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending + ## Local libraries consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index a3883f07a7..dafb246632 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -13,5 +13,6 @@ bytes.workspace = true byteorder.workspace = true utils.workspace = true postgres_ffi.workspace = true +enum-map.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 8827235d90..9cdcf3a173 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,9 +1,14 @@ -use std::num::{NonZeroU64, NonZeroUsize}; +use std::{ + collections::HashMap, + num::{NonZeroU64, NonZeroUsize}, + time::SystemTime, +}; use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use utils::{ + history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, }; @@ -232,6 +237,82 @@ pub struct LayerMapInfo { pub historic_layers: Vec, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)] +#[repr(usize)] +pub enum LayerAccessKind { + GetValueReconstructData, + Iter, + KeyIter, + Dump, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LayerAccessStatFullDetails { + pub when_millis_since_epoch: u64, + pub task_kind: &'static str, + pub access_kind: LayerAccessKind, +} + +/// An event that impacts the layer's residence status. +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LayerResidenceEvent { + /// The time when the event occurred. + /// NB: this timestamp is captured while the residence status changes. + /// So, it might be behind/ahead of the actual residence change by a short amount of time. + /// + #[serde(rename = "timestamp_millis_since_epoch")] + #[serde_as(as = "serde_with::TimestampMilliSeconds")] + timestamp: SystemTime, + /// The new residence status of the layer. + status: LayerResidenceStatus, + /// The reason why we had to record this event. + reason: LayerResidenceEventReason, +} + +/// The reason for recording a given [`ResidenceEvent`]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum LayerResidenceEventReason { + /// The layer map is being populated, e.g. during timeline load or attach. + /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. + /// We need to record such events because there is no persistent storage for the events. + LayerLoad, + /// We just created the layer (e.g., freeze_and_flush or compaction). + /// Such layers are always [`LayerResidenceStatus::Resident`]. + LayerCreate, + /// We on-demand downloaded or evicted the given layer. + ResidenceChange, +} + +/// The residence status of the layer, after the given [`LayerResidenceEvent`]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum LayerResidenceStatus { + /// Residence status for a layer file that exists locally. + /// It may also exist on the remote, we don't care here. + Resident, + /// Residence status for a layer file that only exists on the remote. + Evicted, +} + +impl LayerResidenceEvent { + pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self { + Self { + status, + reason, + timestamp: SystemTime::now(), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct LayerAccessStats { + pub access_count_by_access_kind: HashMap, + pub task_kind_access_flag: Vec<&'static str>, + pub first: Option, + pub accesses_history: HistoryBufferWithDropCounter, + pub residence_events_history: HistoryBufferWithDropCounter, +} + #[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] @@ -261,6 +342,7 @@ pub enum HistoricLayerInfo { #[serde_as(as = "DisplayFromStr")] lsn_end: Lsn, remote: bool, + access_stats: LayerAccessStats, }, Image { layer_file_name: String, @@ -269,6 +351,7 @@ pub enum HistoricLayerInfo { #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, remote: bool, + access_stats: LayerAccessStats, }, } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 1ce3155083..92e805ac58 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -11,6 +11,7 @@ async-trait.workspace = true anyhow.workspace = true bincode.workspace = true bytes.workspace = true +heapless.workspace = true hyper = { workspace = true, features = ["full"] } routerify.workspace = true serde.workspace = true diff --git a/libs/utils/src/history_buffer.rs b/libs/utils/src/history_buffer.rs new file mode 100644 index 0000000000..1f07f5560f --- /dev/null +++ b/libs/utils/src/history_buffer.rs @@ -0,0 +1,161 @@ +//! A heapless buffer for events of sorts. + +use std::ops; + +use heapless::HistoryBuffer; + +#[derive(Debug, Clone)] +pub struct HistoryBufferWithDropCounter { + buffer: HistoryBuffer, + drop_count: u64, +} + +impl HistoryBufferWithDropCounter { + pub fn write(&mut self, data: T) { + let len_before = self.buffer.len(); + self.buffer.write(data); + let len_after = self.buffer.len(); + self.drop_count += u64::from(len_before == len_after); + } + pub fn drop_count(&self) -> u64 { + self.drop_count + } + pub fn map U>(&self, f: F) -> HistoryBufferWithDropCounter { + let mut buffer = HistoryBuffer::new(); + buffer.extend(self.buffer.oldest_ordered().map(f)); + HistoryBufferWithDropCounter:: { + buffer, + drop_count: self.drop_count, + } + } +} + +impl Default for HistoryBufferWithDropCounter { + fn default() -> Self { + Self { + buffer: HistoryBuffer::default(), + drop_count: 0, + } + } +} + +impl ops::Deref for HistoryBufferWithDropCounter { + type Target = HistoryBuffer; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + +#[derive(serde::Serialize)] +struct SerdeRepr { + buffer: Vec, + drop_count: u64, +} + +impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter> for SerdeRepr +where + T: Clone + serde::Serialize, +{ + fn from(value: &'a HistoryBufferWithDropCounter) -> Self { + let HistoryBufferWithDropCounter { buffer, drop_count } = value; + SerdeRepr { + buffer: buffer.iter().cloned().collect(), + drop_count: *drop_count, + } + } +} + +impl serde::Serialize for HistoryBufferWithDropCounter +where + T: Clone + serde::Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + SerdeRepr::from(self).serialize(serializer) + } +} + +#[cfg(test)] +mod test { + use super::HistoryBufferWithDropCounter; + + #[test] + fn test_basics() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + b.write(2); + b.write(3); + assert!(b.iter().any(|e| *e == 2)); + assert!(b.iter().any(|e| *e == 3)); + assert!(!b.iter().any(|e| *e == 1)); + } + + #[test] + fn test_drop_count_works() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + assert_eq!(b.drop_count(), 0); + b.write(2); + assert_eq!(b.drop_count(), 0); + b.write(3); + assert_eq!(b.drop_count(), 1); + b.write(4); + assert_eq!(b.drop_count(), 2); + } + + #[test] + fn test_clone_works() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + b.write(1); + b.write(2); + b.write(3); + assert_eq!(b.drop_count(), 1); + let mut c = b.clone(); + assert_eq!(c.drop_count(), 1); + assert!(c.iter().any(|e| *e == 2)); + assert!(c.iter().any(|e| *e == 3)); + assert!(!c.iter().any(|e| *e == 1)); + + c.write(4); + assert!(c.iter().any(|e| *e == 4)); + assert!(!b.iter().any(|e| *e == 4)); + } + + #[test] + fn test_map() { + let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + + b.write(1); + assert_eq!(b.drop_count(), 0); + { + let c = b.map(|i| i + 10); + assert_eq!(c.oldest_ordered().cloned().collect::>(), vec![11]); + assert_eq!(c.drop_count(), 0); + } + + b.write(2); + assert_eq!(b.drop_count(), 0); + { + let c = b.map(|i| i + 10); + assert_eq!( + c.oldest_ordered().cloned().collect::>(), + vec![11, 12] + ); + assert_eq!(c.drop_count(), 0); + } + + b.write(3); + assert_eq!(b.drop_count(), 1); + { + let c = b.map(|i| i + 10); + assert_eq!( + c.oldest_ordered().cloned().collect::>(), + vec![12, 13] + ); + assert_eq!(c.drop_count(), 1); + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 6d35fd9f7b..9ddd702c72 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -52,6 +52,8 @@ pub mod signals; pub mod fs_ext; +pub mod history_buffer; + /// use with fail::cfg("$name", "return(2000)") #[macro_export] macro_rules! failpoint_sleep_millis_async { diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 66c25e8576..f3ad2c5de6 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -67,6 +67,10 @@ utils.workspace = true workspace_hack.workspace = true reqwest.workspace = true rpds.workspace = true +enum-map.workspace = true +enumset.workspace = true +strum.workspace = true +strum_macros.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7279ce0f63..229cf96ee3 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -20,6 +20,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::TenantMapInsertError; +use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; use utils::{ @@ -534,10 +535,13 @@ async fn tenant_size_handler(request: Request) -> Result, A async fn layer_map_info_handler(request: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let reset: LayerAccessStatsReset = + parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset); + check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(); + let layer_map_info = timeline.layer_map_info(reset); json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 8e0149046d..c4f213e755 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -169,7 +169,14 @@ task_local! { /// Note that we don't try to limit how many task of a certain kind can be running /// at the same time. /// -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive( + Debug, + // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy + enumset::EnumSetType, + serde::Serialize, + serde::Deserialize, + strum_macros::IntoStaticStr, +)] pub enum TaskKind { // Pageserver startup, i.e., `main` Startup, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 1867d489c8..e85359af16 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -9,13 +9,21 @@ mod remote_layer; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::{Key, Value}; +use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; -use pageserver_api::models::HistoricLayerInfo; +use enum_map::EnumMap; +use enumset::EnumSet; +use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{ + HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, +}; use std::ops::Range; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; +use utils::history_buffer::HistoryBufferWithDropCounter; use utils::{ id::{TenantId, TimelineId}, @@ -83,6 +91,149 @@ pub enum ValueReconstructResult { Missing, } +#[derive(Debug)] +pub struct LayerAccessStats(Mutex); + +#[derive(Debug, Default, Clone)] +struct LayerAccessStatsInner { + first_access: Option, + count_by_access_kind: EnumMap, + task_kind_flag: EnumSet, + last_accesses: HistoryBufferWithDropCounter, + last_residence_changes: HistoryBufferWithDropCounter, +} + +#[derive(Debug, Clone)] +struct LayerAccessStatFullDetails { + when: SystemTime, + task_kind: TaskKind, + access_kind: LayerAccessKind, +} + +#[derive(Clone, Copy, strum_macros::EnumString)] +pub enum LayerAccessStatsReset { + NoReset, + JustTaskKindFlags, + AllStats, +} + +fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 { + ts.duration_since(UNIX_EPOCH) + .expect("better to die in this unlikely case than report false stats") + .as_millis() + .try_into() + .expect("64 bits is enough for few more years") +} + +impl LayerAccessStatFullDetails { + fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails { + let Self { + when, + task_kind, + access_kind, + } = self; + pageserver_api::models::LayerAccessStatFullDetails { + when_millis_since_epoch: system_time_to_millis_since_epoch(when), + task_kind: task_kind.into(), // into static str, powered by strum_macros + access_kind: *access_kind, + } + } +} + +impl LayerAccessStats { + pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { + let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); + new + } + + pub(crate) fn for_new_layer_file() -> Self { + let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default())); + new.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + new + } + + /// Creates a clone of `self` and records `new_status` in the clone. + /// The `new_status` is not recorded in `self` + pub(crate) fn clone_for_residence_change( + &self, + new_status: LayerResidenceStatus, + ) -> LayerAccessStats { + let clone = { + let inner = self.0.lock().unwrap(); + inner.clone() + }; + let new = LayerAccessStats(Mutex::new(clone)); + new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); + new + } + + fn record_residence_event( + &self, + status: LayerResidenceStatus, + reason: LayerResidenceEventReason, + ) { + let mut inner = self.0.lock().unwrap(); + inner + .last_residence_changes + .write(LayerResidenceEvent::new(status, reason)); + } + + fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) { + let mut inner = self.0.lock().unwrap(); + let this_access = LayerAccessStatFullDetails { + when: SystemTime::now(), + task_kind, + access_kind, + }; + inner + .first_access + .get_or_insert_with(|| this_access.clone()); + inner.count_by_access_kind[access_kind] += 1; + inner.task_kind_flag |= task_kind; + inner.last_accesses.write(this_access); + } + fn to_api_model( + &self, + reset: LayerAccessStatsReset, + ) -> pageserver_api::models::LayerAccessStats { + let mut inner = self.0.lock().unwrap(); + let LayerAccessStatsInner { + first_access, + count_by_access_kind, + task_kind_flag, + last_accesses, + last_residence_changes, + } = &*inner; + let ret = pageserver_api::models::LayerAccessStats { + access_count_by_access_kind: count_by_access_kind + .iter() + .map(|(kind, count)| (kind, *count)) + .collect(), + task_kind_access_flag: task_kind_flag + .iter() + .map(|task_kind| task_kind.into()) // into static str, powered by strum_macros + .collect(), + first: first_access.as_ref().map(|a| a.to_api_model()), + accesses_history: last_accesses.map(|m| m.to_api_model()), + residence_events_history: last_residence_changes.clone(), + }; + match reset { + LayerAccessStatsReset::NoReset => (), + LayerAccessStatsReset::JustTaskKindFlags => { + inner.task_kind_flag.clear(); + } + LayerAccessStatsReset::AllStats => { + *inner = LayerAccessStatsInner::default(); + } + } + ret + } +} + /// Supertrait of the [`Layer`] trait that captures the bare minimum interface /// required by [`LayerMap`]. /// @@ -193,7 +344,9 @@ pub trait PersistentLayer: Layer { /// current_physical_size is computed as the som of this value. fn file_size(&self) -> Option; - fn info(&self) -> HistoricLayerInfo; + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo; + + fn access_stats(&self) -> &LayerAccessStats; } pub fn downcast_remote_layer( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 142a8ce913..9b322faa65 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -37,7 +37,7 @@ use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; -use pageserver_api::models::HistoricLayerInfo; +use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; @@ -55,7 +55,10 @@ use utils::{ lsn::Lsn, }; -use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOrConf}; +use super::{ + DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, + LayerKeyIter, LayerResidenceStatus, PathOrConf, +}; /// /// Header stored in the beginning of the file @@ -184,6 +187,8 @@ pub struct DeltaLayer { pub file_size: u64, + access_stats: LayerAccessStats, + inner: RwLock, } @@ -251,7 +256,7 @@ impl Layer for DeltaLayer { return Ok(()); } - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::Dump, ctx)?; println!( "index_start_blk: {}, root {}", @@ -324,7 +329,7 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; // Scan the page versions backwards, starting from `lsn`. let file = inner.file.as_ref().unwrap(); @@ -415,7 +420,9 @@ impl PersistentLayer for DeltaLayer { } fn iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(ctx).context("load delta layer")?; + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer")?; Ok(match DeltaValueIter::new(inner) { Ok(iter) => Box::new(iter), Err(err) => Box::new(std::iter::once(Err(err))), @@ -423,7 +430,7 @@ impl PersistentLayer for DeltaLayer { } fn key_iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::KeyIter, ctx)?; Ok(Box::new( DeltaKeyIter::new(inner).context("Layer index is corrupted")?, )) @@ -439,18 +446,25 @@ impl PersistentLayer for DeltaLayer { Some(self.file_size) } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); + let access_stats = self.access_stats.to_api_model(reset); + HistoricLayerInfo::Delta { layer_file_name, layer_file_size: Some(self.file_size), lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: false, + access_stats, } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl DeltaLayer { @@ -495,7 +509,13 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, _ctx: &RequestContext) -> Result> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result> { + self.access_stats + .record_access(access_kind, ctx.task_kind()); loop { // Quick exit if already loaded let inner = self.inner.read().unwrap(); @@ -576,6 +596,7 @@ impl DeltaLayer { tenant_id: TenantId, filename: &DeltaFileName, file_size: u64, + access_stats: LayerAccessStats, ) -> DeltaLayer { DeltaLayer { path_or_conf: PathOrConf::Conf(conf), @@ -584,6 +605,7 @@ impl DeltaLayer { key_range: filename.key_range.clone(), lsn_range: filename.lsn_range.clone(), file_size, + access_stats, inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -613,6 +635,7 @@ impl DeltaLayer { key_range: summary.key_range, lsn_range: summary.lsn_range, file_size: metadata.len(), + access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -783,6 +806,7 @@ impl DeltaLayerWriterInner { key_range: self.key_start..key_end, lsn_range: self.lsn_range.clone(), file_size: metadata.len(), + access_stats: LayerAccessStats::for_new_layer_file(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8eacaeb936..86c1aee619 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,14 +27,14 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ - PersistentLayer, ValueReconstructResult, ValueReconstructState, + LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState, }; use crate::virtual_file::VirtualFile; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use hex; -use pageserver_api::models::HistoricLayerInfo; +use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; @@ -53,7 +53,7 @@ use utils::{ }; use super::filename::{ImageFileName, LayerFileName}; -use super::{Layer, LayerIter, PathOrConf}; +use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf}; /// /// Header stored in the beginning of the file @@ -112,6 +112,8 @@ pub struct ImageLayer { // This entry contains an image of all pages as of this LSN pub lsn: Lsn, + access_stats: LayerAccessStats, + inner: RwLock, } @@ -176,7 +178,7 @@ impl Layer for ImageLayer { return Ok(()); } - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::Dump, ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file); @@ -203,7 +205,7 @@ impl Layer for ImageLayer { assert!(lsn_range.start >= self.lsn); assert!(lsn_range.end >= self.lsn); - let inner = self.load(ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); @@ -258,7 +260,7 @@ impl PersistentLayer for ImageLayer { Some(self.file_size) } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -267,8 +269,13 @@ impl PersistentLayer for ImageLayer { layer_file_size: Some(self.file_size), lsn_start: lsn_range.start, remote: false, + access_stats: self.access_stats.to_api_model(reset), } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl ImageLayer { @@ -306,7 +313,13 @@ impl ImageLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, _ctx: &RequestContext) -> Result> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result> { + self.access_stats + .record_access(access_kind, ctx.task_kind()); loop { // Quick exit if already loaded let inner = self.inner.read().unwrap(); @@ -386,6 +399,7 @@ impl ImageLayer { tenant_id: TenantId, filename: &ImageFileName, file_size: u64, + access_stats: LayerAccessStats, ) -> ImageLayer { ImageLayer { path_or_conf: PathOrConf::Conf(conf), @@ -394,6 +408,7 @@ impl ImageLayer { key_range: filename.key_range.clone(), lsn: filename.lsn, file_size, + access_stats, inner: RwLock::new(ImageLayerInner { loaded: false, file: None, @@ -421,6 +436,7 @@ impl ImageLayer { key_range: summary.key_range, lsn: summary.lsn, file_size: metadata.len(), + access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), inner: RwLock::new(ImageLayerInner { file: None, loaded: false, @@ -580,6 +596,7 @@ impl ImageLayerWriterInner { key_range: self.key_range.clone(), lsn: self.lsn, file_size: metadata.len(), + access_stats: LayerAccessStats::for_new_layer_file(), inner: RwLock::new(ImageLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 8a12d67c49..7391875d0c 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -19,7 +19,10 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName, LayerFileName}; use super::image_layer::ImageLayer; -use super::{DeltaLayer, LayerIter, LayerKeyIter, PersistentLayer}; +use super::{ + DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, + LayerResidenceStatus, PersistentLayer, +}; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or /// [`crate::storage_layer::DeltaLayer`]. @@ -43,6 +46,8 @@ pub struct RemoteLayer { is_incremental: bool, + access_stats: LayerAccessStats, + pub(crate) ongoing_download: Arc, } @@ -155,7 +160,7 @@ impl PersistentLayer for RemoteLayer { self.layer_metadata.file_size() } - fn info(&self) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -166,6 +171,7 @@ impl PersistentLayer for RemoteLayer { lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: true, + access_stats: self.access_stats.to_api_model(reset), } } else { HistoricLayerInfo::Image { @@ -173,9 +179,14 @@ impl PersistentLayer for RemoteLayer { layer_file_size: self.layer_metadata.file_size(), lsn_start: lsn_range.start, remote: true, + access_stats: self.access_stats.to_api_model(reset), } } } + + fn access_stats(&self) -> &LayerAccessStats { + &self.access_stats + } } impl RemoteLayer { @@ -184,6 +195,7 @@ impl RemoteLayer { timelineid: TimelineId, fname: &ImageFileName, layer_metadata: &LayerFileMetadata, + access_stats: LayerAccessStats, ) -> RemoteLayer { RemoteLayer { tenantid, @@ -195,6 +207,7 @@ impl RemoteLayer { file_name: fname.to_owned().into(), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), + access_stats, } } @@ -203,6 +216,7 @@ impl RemoteLayer { timelineid: TimelineId, fname: &DeltaFileName, layer_metadata: &LayerFileMetadata, + access_stats: LayerAccessStats, ) -> RemoteLayer { RemoteLayer { tenantid, @@ -214,6 +228,7 @@ impl RemoteLayer { file_name: fname.to_owned().into(), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), + access_stats, } } @@ -234,6 +249,8 @@ impl RemoteLayer { self.tenantid, &fname, file_size, + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } else { let fname = ImageFileName { @@ -246,6 +263,8 @@ impl RemoteLayer { self.tenantid, &fname, file_size, + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fc6f7ad96c..6ae23c584b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -10,7 +10,7 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - DownloadRemoteLayersTaskState, LayerMapInfo, TimelineState, + DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, }; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -30,8 +30,8 @@ use crate::broker_client::is_broker_client_initialized; use crate::context::{DownloadBehavior, RequestContext}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ - DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName, - RemoteLayer, + DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, + LayerAccessStats, LayerFileName, RemoteLayer, }; use crate::tenant::{ ephemeral_file::is_ephemeral_file, @@ -72,7 +72,7 @@ use walreceiver::spawn_connection_manager_task; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; -use super::storage_layer::{DeltaLayer, ImageLayer, Layer}; +use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum FlushLoopState { @@ -835,7 +835,7 @@ impl Timeline { self.state.subscribe() } - pub fn layer_map_info(&self) -> LayerMapInfo { + pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let layer_map = self.layers.read().unwrap(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { @@ -847,7 +847,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { - historic_layers.push(historic_layer.info()); + historic_layers.push(historic_layer.info(reset)); } LayerMapInfo { @@ -891,12 +891,18 @@ impl Timeline { self.timeline_id, &image_name, &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, self.timeline_id, &delta_name, &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), }); @@ -1172,6 +1178,7 @@ impl Timeline { self.tenant_id, &imgfilename, file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1203,6 +1210,7 @@ impl Timeline { self.tenant_id, &deltafilename, file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1340,6 +1348,7 @@ impl Timeline { self.timeline_id, imgfilename, &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); @@ -1364,6 +1373,7 @@ impl Timeline { self.timeline_id, deltafilename, &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); updates.insert_historic(remote_layer);