From f5db655447b59366875adde2fc1176bf27dcd313 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 24 Jul 2024 08:17:28 +0100 Subject: [PATCH] pageserver: simplify LayerAccessStats (#8431) ## Problem LayerAccessStats contains a lot of detail that we don't use: short histories of most recent accesses, specifics on what kind of task accessed a layer, etc. This is all stored inside a Mutex, which is locked every time something accesses a layer. ## Summary of changes - Store timestamps at a very low resolution (to the nearest second), sufficient for use on the timescales of eviction. - Pack access time and last residence change time into a single u64 - Use the high bits of the u64 for other flags, including the new layer visibility concept. - Simplify the external-facing model for access stats to just include what we now track. Note that the `HistoryBufferWithDropCounter` is removed here because it is no longer used. I do not dislike this type, we just happen not to use it for anything else at present. Co-authored-by: Christian Schwarz --- Cargo.lock | 46 --- Cargo.toml | 3 - libs/pageserver_api/src/models.rs | 78 +---- libs/utils/Cargo.toml | 1 - libs/utils/src/history_buffer.rs | 196 ----------- libs/utils/src/lib.rs | 2 - pageserver/src/tenant/storage_layer.rs | 311 +++++++----------- .../src/tenant/storage_layer/delta_layer.rs | 14 +- .../src/tenant/storage_layer/image_layer.rs | 13 +- pageserver/src/tenant/storage_layer/layer.rs | 54 +-- .../src/tenant/storage_layer/layer/tests.rs | 51 ++- pageserver/src/tenant/timeline.rs | 4 +- .../src/tenant/timeline/eviction_task.rs | 2 +- .../regress/test_disk_usage_eviction.py | 9 + .../regress/test_threshold_based_eviction.py | 4 +- 15 files changed, 209 insertions(+), 579 deletions(-) delete mode 100644 libs/utils/src/history_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index df9efbf7cc..2b56095bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,15 +261,6 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" -[[package]] -name = "atomic-polyfill" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c314e70d181aa6053b26e3f7fbf86d1dfff84f816a6175b967666b3506ef7289" -dependencies = [ - "critical-section", -] - [[package]] name = "atomic-take" version = "1.1.0" @@ -1451,12 +1442,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "critical-section" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" - [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -2282,15 +2267,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "hash32" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" -dependencies = [ - "byteorder", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -2339,18 +2315,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "heapless" -version = "0.8.0" -source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001" -dependencies = [ - "atomic-polyfill", - "hash32", - "rustc_version", - "spin 0.9.8", - "stable_deref_trait", -] - [[package]] name = "heck" version = "0.4.1" @@ -5691,9 +5655,6 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] [[package]] name = "spki" @@ -5715,12 +5676,6 @@ dependencies = [ "der 0.7.8", ] -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_assertions" version = "1.1.0" @@ -6817,7 +6772,6 @@ dependencies = [ "criterion", "fail", "futures", - "heapless", "hex", "hex-literal", "humantime", diff --git a/Cargo.toml b/Cargo.toml index 615f5472ec..7749378114 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,9 +204,6 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } -## Other git libraries -heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending - ## Local libraries compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 231a604b47..591c45d908 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -5,7 +5,6 @@ pub mod utilization; pub use utilization::PageserverUtilization; use std::{ - borrow::Cow, collections::HashMap, io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, @@ -20,7 +19,6 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use utils::{ completion, - history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, serde_system_time, @@ -726,58 +724,7 @@ pub struct LayerMapInfo { pub historic_layers: Vec, } -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)] -#[repr(usize)] -pub enum LayerAccessKind { - GetValueReconstructData, - Iter, - KeyIter, - Dump, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LayerAccessStatFullDetails { - pub when_millis_since_epoch: u64, - pub task_kind: Cow<'static, str>, - pub access_kind: LayerAccessKind, -} - -/// An event that impacts the layer's residence status. -#[serde_as] -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LayerResidenceEvent { - /// The time when the event occurred. - /// NB: this timestamp is captured while the residence status changes. - /// So, it might be behind/ahead of the actual residence change by a short amount of time. - /// - #[serde(rename = "timestamp_millis_since_epoch")] - #[serde_as(as = "serde_with::TimestampMilliSeconds")] - pub timestamp: SystemTime, - /// The new residence status of the layer. - pub status: LayerResidenceStatus, - /// The reason why we had to record this event. - pub reason: LayerResidenceEventReason, -} - -/// The reason for recording a given [`LayerResidenceEvent`]. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub enum LayerResidenceEventReason { - /// The layer map is being populated, e.g. during timeline load or attach. - /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. - /// We need to record such events because there is no persistent storage for the events. - /// - // https://github.com/rust-lang/rust/issues/74481 - /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html - /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote - LayerLoad, - /// We just created the layer (e.g., freeze_and_flush or compaction). - /// Such layers are always [`LayerResidenceStatus::Resident`]. - LayerCreate, - /// We on-demand downloaded or evicted the given layer. - ResidenceChange, -} - -/// The residence status of the layer, after the given [`LayerResidenceEvent`]. +/// The residence status of a layer #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum LayerResidenceStatus { /// Residence status for a layer file that exists locally. @@ -787,23 +734,16 @@ pub enum LayerResidenceStatus { Evicted, } -impl LayerResidenceEvent { - pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self { - Self { - status, - reason, - timestamp: SystemTime::now(), - } - } -} - +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerAccessStats { - pub access_count_by_access_kind: HashMap, - pub task_kind_access_flag: Vec>, - pub first: Option, - pub accesses_history: HistoryBufferWithDropCounter, - pub residence_events_history: HistoryBufferWithDropCounter, + #[serde_as(as = "serde_with::TimestampMilliSeconds")] + pub access_time: SystemTime, + + #[serde_as(as = "serde_with::TimestampMilliSeconds")] + pub residence_time: SystemTime, + + pub visible: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 261ca2cc1a..ec05f849cf 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -20,7 +20,6 @@ bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true -heapless.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true hyper = { workspace = true, features = ["full"] } diff --git a/libs/utils/src/history_buffer.rs b/libs/utils/src/history_buffer.rs deleted file mode 100644 index bd35e2bad6..0000000000 --- a/libs/utils/src/history_buffer.rs +++ /dev/null @@ -1,196 +0,0 @@ -//! A heapless buffer for events of sorts. - -use std::ops; - -use heapless::HistoryBuffer; - -#[derive(Debug, Clone)] -pub struct HistoryBufferWithDropCounter { - buffer: HistoryBuffer, - drop_count: u64, -} - -impl HistoryBufferWithDropCounter { - pub fn write(&mut self, data: T) { - let len_before = self.buffer.len(); - self.buffer.write(data); - let len_after = self.buffer.len(); - self.drop_count += u64::from(len_before == len_after); - } - pub fn drop_count(&self) -> u64 { - self.drop_count - } - pub fn map U>(&self, f: F) -> HistoryBufferWithDropCounter { - let mut buffer = HistoryBuffer::new(); - buffer.extend(self.buffer.oldest_ordered().map(f)); - HistoryBufferWithDropCounter:: { - buffer, - drop_count: self.drop_count, - } - } -} - -impl Default for HistoryBufferWithDropCounter { - fn default() -> Self { - Self { - buffer: HistoryBuffer::default(), - drop_count: 0, - } - } -} - -impl ops::Deref for HistoryBufferWithDropCounter { - type Target = HistoryBuffer; - - fn deref(&self) -> &Self::Target { - &self.buffer - } -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct SerdeRepr { - buffer: Vec, - buffer_size: usize, - drop_count: u64, -} - -impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter> for SerdeRepr -where - T: Clone + serde::Serialize, -{ - fn from(value: &'a HistoryBufferWithDropCounter) -> Self { - let HistoryBufferWithDropCounter { buffer, drop_count } = value; - SerdeRepr { - buffer: buffer.iter().cloned().collect(), - buffer_size: L, - drop_count: *drop_count, - } - } -} - -impl serde::Serialize for HistoryBufferWithDropCounter -where - T: Clone + serde::Serialize, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - SerdeRepr::from(self).serialize(serializer) - } -} - -impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter -where - T: Clone + serde::Deserialize<'de>, -{ - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let SerdeRepr { - buffer: des_buffer, - drop_count, - buffer_size, - } = SerdeRepr::::deserialize(deserializer)?; - if buffer_size != L { - use serde::de::Error; - return Err(D::Error::custom(format!( - "invalid buffer_size, expecting {L} got {buffer_size}" - ))); - } - let mut buffer = HistoryBuffer::new(); - buffer.extend(des_buffer); - Ok(HistoryBufferWithDropCounter { buffer, drop_count }) - } -} - -#[cfg(test)] -mod test { - use super::HistoryBufferWithDropCounter; - - #[test] - fn test_basics() { - let mut b = HistoryBufferWithDropCounter::::default(); - b.write(1); - b.write(2); - b.write(3); - assert!(b.iter().any(|e| *e == 2)); - assert!(b.iter().any(|e| *e == 3)); - assert!(!b.iter().any(|e| *e == 1)); - - // round-trip serde - let round_tripped: HistoryBufferWithDropCounter = - serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap(); - assert_eq!( - round_tripped.iter().cloned().collect::>(), - b.iter().cloned().collect::>() - ); - } - - #[test] - fn test_drop_count_works() { - let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); - b.write(1); - assert_eq!(b.drop_count(), 0); - b.write(2); - assert_eq!(b.drop_count(), 0); - b.write(3); - assert_eq!(b.drop_count(), 1); - b.write(4); - assert_eq!(b.drop_count(), 2); - } - - #[test] - fn test_clone_works() { - let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); - b.write(1); - b.write(2); - b.write(3); - assert_eq!(b.drop_count(), 1); - let mut c = b.clone(); - assert_eq!(c.drop_count(), 1); - assert!(c.iter().any(|e| *e == 2)); - assert!(c.iter().any(|e| *e == 3)); - assert!(!c.iter().any(|e| *e == 1)); - - c.write(4); - assert!(c.iter().any(|e| *e == 4)); - assert!(!b.iter().any(|e| *e == 4)); - } - - #[test] - fn test_map() { - let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); - - b.write(1); - assert_eq!(b.drop_count(), 0); - { - let c = b.map(|i| i + 10); - assert_eq!(c.oldest_ordered().cloned().collect::>(), vec![11]); - assert_eq!(c.drop_count(), 0); - } - - b.write(2); - assert_eq!(b.drop_count(), 0); - { - let c = b.map(|i| i + 10); - assert_eq!( - c.oldest_ordered().cloned().collect::>(), - vec![11, 12] - ); - assert_eq!(c.drop_count(), 0); - } - - b.write(3); - assert_eq!(b.drop_count(), 1); - { - let c = b.map(|i| i + 10); - assert_eq!( - c.oldest_ordered().cloned().collect::>(), - vec![12, 13] - ); - assert_eq!(c.drop_count(), 1); - } - } -} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 9ad1752fb7..a46d68ef33 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -59,8 +59,6 @@ pub mod signals; pub mod fs_ext; -pub mod history_buffer; - pub mod measured_stream; pub mod serde_percent; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 3404308e56..f931341aca 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -10,29 +10,18 @@ pub mod merge_iterator; use crate::context::{AccessStatsBehavior, RequestContext}; use crate::repository::Value; -use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use bytes::Bytes; -use enum_map::EnumMap; -use enumset::EnumSet; -use once_cell::sync::Lazy; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; -use pageserver_api::models::{ - LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, -}; -use std::borrow::Cow; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; use std::ops::Range; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tracing::warn; -use utils::history_buffer::HistoryBufferWithDropCounter; -use utils::rate_limit::RateLimit; -use utils::{id::TimelineId, lsn::Lsn}; +use utils::lsn::Lsn; pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef}; pub use image_layer::{ImageLayer, ImageLayerWriter}; @@ -75,9 +64,9 @@ where /// call, to collect more records. /// #[derive(Debug, Default)] -pub struct ValueReconstructState { - pub records: Vec<(Lsn, NeonWalRecord)>, - pub img: Option<(Lsn, Bytes)>, +pub(crate) struct ValueReconstructState { + pub(crate) records: Vec<(Lsn, NeonWalRecord)>, + pub(crate) img: Option<(Lsn, Bytes)>, } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -462,7 +451,7 @@ pub enum ValueReconstructResult { /// than an authoritative value, so that we do not have to update it synchronously when changing the visibility /// of layers (for example when creating a branch that makes some previously covered layers visible). It should /// be used for cache management but not for correctness-critical checks. -#[derive(Default, Debug, Clone)] +#[derive(Default, Debug, Clone, PartialEq, Eq)] pub(crate) enum LayerVisibilityHint { /// A Visible layer might be read while serving a read, because there is not an image layer between it /// and a readable LSN (the tip of the branch or a child's branch point) @@ -478,95 +467,72 @@ pub(crate) enum LayerVisibilityHint { Uninitialized, } -#[derive(Debug)] -pub struct LayerAccessStats(Mutex); - -/// This struct holds two instances of [`LayerAccessStatsInner`]. -/// Accesses are recorded to both instances. -/// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`]. -/// The `for_eviction_policy` is never reset. -#[derive(Debug, Default, Clone)] -struct LayerAccessStatsLocked { - for_scraping_api: LayerAccessStatsInner, - for_eviction_policy: LayerAccessStatsInner, - visibility: LayerVisibilityHint, -} - -impl LayerAccessStatsLocked { - fn iter_mut(&mut self) -> impl Iterator { - [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter() - } -} - -#[derive(Debug, Default, Clone)] -struct LayerAccessStatsInner { - first_access: Option, - count_by_access_kind: EnumMap, - task_kind_flag: EnumSet, - last_accesses: HistoryBufferWithDropCounter, - last_residence_changes: HistoryBufferWithDropCounter, -} - -#[derive(Debug, Clone, Copy)] -pub(crate) struct LayerAccessStatFullDetails { - pub(crate) when: SystemTime, - pub(crate) task_kind: TaskKind, - pub(crate) access_kind: LayerAccessKind, -} +pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64); #[derive(Clone, Copy, strum_macros::EnumString)] -pub enum LayerAccessStatsReset { +pub(crate) enum LayerAccessStatsReset { NoReset, - JustTaskKindFlags, AllStats, } -fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 { - ts.duration_since(UNIX_EPOCH) - .expect("better to die in this unlikely case than report false stats") - .as_millis() - .try_into() - .expect("64 bits is enough for few more years") -} +impl Default for LayerAccessStats { + fn default() -> Self { + // Default value is to assume resident since creation time, and visible. + let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now()); + value |= 0x1 << Self::VISIBILITY_SHIFT; -impl LayerAccessStatFullDetails { - fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails { - let Self { - when, - task_kind, - access_kind, - } = self; - pageserver_api::models::LayerAccessStatFullDetails { - when_millis_since_epoch: system_time_to_millis_since_epoch(when), - task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros - access_kind: *access_kind, - } + Self(std::sync::atomic::AtomicU64::new(value)) } } +// Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and +// last residence change time. impl LayerAccessStats { - /// Create an empty stats object. - /// - /// The caller is responsible for recording a residence event - /// using [`record_residence_event`] before calling `latest_activity`. - /// If they don't, [`latest_activity`] will return `None`. - /// - /// [`record_residence_event`]: Self::record_residence_event - /// [`latest_activity`]: Self::latest_activity - pub(crate) fn empty_will_record_residence_event_later() -> Self { - LayerAccessStats(Mutex::default()) + // How many high bits to drop from a u32 timestamp? + // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use + // after that, this software has been very successful!) + // - Dropping the top bit is implicitly safe because unix timestamps are meant to be + // stored in an i32, so they never used it. + // - Dropping the next two bits is safe because this code is only running on systems in + // years >= 2024, and these bits have been 1 since 2021 + // + // Therefore we may store only 28 bits for a timestamp with one second resolution. We do + // this truncation to make space for some flags in the high bits of our u64. + const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1; + const TS_MASK: u32 = 0x1f_ff_ff_ff; + const TS_ONES: u32 = 0x60_00_00_00; + + const ATIME_SHIFT: u32 = 0; + const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS; + const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS; + + fn write_bits(&self, mask: u64, value: u64) -> u64 { + self.0 + .fetch_update( + // TODO: decide what orderings are correct + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + |v| Some((v & !mask) | (value & mask)), + ) + .expect("Inner function is infallible") } - /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. - /// - /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. - /// - /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad - /// [`record_residence_event`]: Self::record_residence_event - pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { - let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); - new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); - new + fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) { + // Drop the low three bits of the timestamp, for an ~8s accuracy + let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64); + + ((Self::TS_MASK as u64) << shift, timestamp << shift) + } + + fn read_low_res_timestamp(&self, shift: u32) -> Option { + let read = self.0.load(std::sync::atomic::Ordering::Relaxed); + + let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift; + if ts_bits == 0 { + None + } else { + Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64))) + } } /// Record a change in layer residency. @@ -582,123 +548,64 @@ impl LayerAccessStats { /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. - /// - pub(crate) fn record_residence_event( - &self, - status: LayerResidenceStatus, - reason: LayerResidenceEventReason, - ) { - let mut locked = self.0.lock().unwrap(); - locked.iter_mut().for_each(|inner| { - inner - .last_residence_changes - .write(LayerResidenceEvent::new(status, reason)) - }); + pub(crate) fn record_residence_event_at(&self, now: SystemTime) { + let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now); + self.write_bits(mask, value); } - fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) { + pub(crate) fn record_residence_event(&self) { + self.record_residence_event_at(SystemTime::now()) + } + + pub(crate) fn record_access_at(&self, now: SystemTime) { + let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now); + + // A layer which is accessed must be visible. + mask |= 0x1 << Self::VISIBILITY_SHIFT; + value |= 0x1 << Self::VISIBILITY_SHIFT; + + self.write_bits(mask, value); + } + + pub(crate) fn record_access(&self, ctx: &RequestContext) { if ctx.access_stats_behavior() == AccessStatsBehavior::Skip { return; } - let this_access = LayerAccessStatFullDetails { - when: SystemTime::now(), - task_kind: ctx.task_kind(), - access_kind, - }; - - let mut locked = self.0.lock().unwrap(); - locked.iter_mut().for_each(|inner| { - inner.first_access.get_or_insert(this_access); - inner.count_by_access_kind[access_kind] += 1; - inner.task_kind_flag |= ctx.task_kind(); - inner.last_accesses.write(this_access); - }); - - // We may access a layer marked as Covered, if a new branch was created that depends on - // this layer, and background updates to layer visibility didn't notice it yet - if !matches!(locked.visibility, LayerVisibilityHint::Visible) { - locked.visibility = LayerVisibilityHint::Visible; - } + self.record_access_at(SystemTime::now()) } fn as_api_model( &self, reset: LayerAccessStatsReset, ) -> pageserver_api::models::LayerAccessStats { - let mut locked = self.0.lock().unwrap(); - let inner = &mut locked.for_scraping_api; - let LayerAccessStatsInner { - first_access, - count_by_access_kind, - task_kind_flag, - last_accesses, - last_residence_changes, - } = inner; let ret = pageserver_api::models::LayerAccessStats { - access_count_by_access_kind: count_by_access_kind - .iter() - .map(|(kind, count)| (kind, *count)) - .collect(), - task_kind_access_flag: task_kind_flag - .iter() - .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros - .collect(), - first: first_access.as_ref().map(|a| a.as_api_model()), - accesses_history: last_accesses.map(|m| m.as_api_model()), - residence_events_history: last_residence_changes.clone(), + access_time: self + .read_low_res_timestamp(Self::ATIME_SHIFT) + .unwrap_or(UNIX_EPOCH), + residence_time: self + .read_low_res_timestamp(Self::RTIME_SHIFT) + .unwrap_or(UNIX_EPOCH), + visible: matches!(self.visibility(), LayerVisibilityHint::Visible), }; match reset { - LayerAccessStatsReset::NoReset => (), - LayerAccessStatsReset::JustTaskKindFlags => { - inner.task_kind_flag.clear(); - } + LayerAccessStatsReset::NoReset => {} LayerAccessStatsReset::AllStats => { - *inner = LayerAccessStatsInner::default(); + self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0); + self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0); } } ret } - /// Get the latest access timestamp, falling back to latest residence event, further falling - /// back to `SystemTime::now` for a usable timestamp for eviction. - pub(crate) fn latest_activity_or_now(&self) -> SystemTime { - self.latest_activity().unwrap_or_else(SystemTime::now) - } - - /// Get the latest access timestamp, falling back to latest residence event. - /// - /// This function can only return `None` if there has not yet been a call to the - /// [`record_residence_event`] method. That would generally be considered an - /// implementation error. This function logs a rate-limited warning in that case. - /// - /// TODO: use type system to avoid the need for `fallback`. - /// The approach in - /// could be used to enforce that a residence event is recorded - /// before a layer is added to the layer map. We could also have - /// a layer wrapper type that holds the LayerAccessStats, and ensure - /// that that type can only be produced by inserting into the layer map. - /// - /// [`record_residence_event`]: Self::record_residence_event - fn latest_activity(&self) -> Option { - let locked = self.0.lock().unwrap(); - let inner = &locked.for_eviction_policy; - match inner.last_accesses.recent() { - Some(a) => Some(a.when), - None => match inner.last_residence_changes.recent() { - Some(e) => Some(e.timestamp), - None => { - static WARN_RATE_LIMIT: Lazy> = - Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10))))); - let mut guard = WARN_RATE_LIMIT.lock().unwrap(); - guard.0 += 1; - let occurences = guard.0; - guard.1.call(move || { - warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value"); - }); - None - } - }, + /// Get the latest access timestamp, falling back to latest residence event. The latest residence event + /// will be this Layer's construction time, if its residence hasn't changed since then. + pub(crate) fn latest_activity(&self) -> SystemTime { + if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) { + t + } else { + self.read_low_res_timestamp(Self::RTIME_SHIFT) + .expect("Residence time is set on construction") } } @@ -707,38 +614,46 @@ impl LayerAccessStats { /// This indicates whether the layer has been used for some purpose that would motivate /// us to keep it on disk, such as for serving a getpage request. fn accessed(&self) -> bool { - let locked = self.0.lock().unwrap(); - let inner = &locked.for_eviction_policy; - // Consider it accessed if the most recent access is more recent than // the most recent change in residence status. match ( - inner.last_accesses.recent(), - inner.last_residence_changes.recent(), + self.read_low_res_timestamp(Self::ATIME_SHIFT), + self.read_low_res_timestamp(Self::RTIME_SHIFT), ) { (None, _) => false, (Some(_), None) => true, - (Some(a), Some(r)) => a.when >= r.timestamp, + (Some(a), Some(r)) => a >= r, } } pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) { - self.0.lock().unwrap().visibility = visibility; + let value = match visibility { + LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT, + LayerVisibilityHint::Covered | LayerVisibilityHint::Uninitialized => 0x0, + }; + + self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value); } pub(crate) fn visibility(&self) -> LayerVisibilityHint { - self.0.lock().unwrap().visibility.clone() + let read = self.0.load(std::sync::atomic::Ordering::Relaxed); + match (read >> Self::VISIBILITY_SHIFT) & 0x1 { + 1 => LayerVisibilityHint::Visible, + 0 => LayerVisibilityHint::Covered, + _ => unreachable!(), + } } } /// Get a layer descriptor from a layer. -pub trait AsLayerDesc { +pub(crate) trait AsLayerDesc { /// Get the layer descriptor. fn layer_desc(&self) -> &PersistentLayerDesc; } pub mod tests { use pageserver_api::shard::TenantShardId; + use utils::id::TimelineId; use super::*; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c73059c34a..586a7b7836 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -52,7 +52,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind}; +use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -265,7 +265,7 @@ impl DeltaLayer { return Ok(()); } - let inner = self.load(LayerAccessKind::Dump, ctx).await?; + let inner = self.load(ctx).await?; inner.dump(ctx).await } @@ -298,12 +298,8 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - async fn load( - &self, - access_kind: LayerAccessKind, - ctx: &RequestContext, - ) -> Result<&Arc> { - self.access_stats.record_access(access_kind, ctx); + async fn load(&self, ctx: &RequestContext) -> Result<&Arc> { + self.access_stats.record_access(ctx); // Quick exit if already loaded self.inner .get_or_try_init(|| self.load_inner(ctx)) @@ -356,7 +352,7 @@ impl DeltaLayer { summary.lsn_range, metadata.len(), ), - access_stats: LayerAccessStats::empty_will_record_residence_event_later(), + access_stats: Default::default(), inner: OnceCell::new(), }) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 19e4e9e2e9..e5e7f71928 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -49,7 +49,6 @@ use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -228,7 +227,7 @@ impl ImageLayer { return Ok(()); } - let inner = self.load(LayerAccessKind::Dump, ctx).await?; + let inner = self.load(ctx).await?; inner.dump(ctx).await?; @@ -255,12 +254,8 @@ impl ImageLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - async fn load( - &self, - access_kind: LayerAccessKind, - ctx: &RequestContext, - ) -> Result<&ImageLayerInner> { - self.access_stats.record_access(access_kind, ctx); + async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> { + self.access_stats.record_access(ctx); self.inner .get_or_try_init(|| self.load_inner(ctx)) .await @@ -312,7 +307,7 @@ impl ImageLayer { metadata.len(), ), // Now we assume image layer ALWAYS covers the full range. This may change in the future. lsn: summary.lsn, - access_stats: LayerAccessStats::empty_will_record_residence_event_later(), + access_stats: Default::default(), inner: OnceCell::new(), }) } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 25d8ee6b2b..1db3e7c675 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1,9 +1,7 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::{ - HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus, -}; +use pageserver_api::models::HistoricLayerInfo; use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId}; use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -160,13 +158,10 @@ impl Layer { metadata.file_size, ); - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted); - let owner = Layer(Arc::new(LayerInner::new( conf, timeline, local_path, - access_stats, desc, None, metadata.generation, @@ -193,8 +188,6 @@ impl Layer { metadata.file_size, ); - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident); - let mut resident = None; let owner = Layer(Arc::new_cyclic(|owner| { @@ -209,7 +202,6 @@ impl Layer { conf, timeline, local_path, - access_stats, desc, Some(inner), metadata.generation, @@ -245,13 +237,6 @@ impl Layer { version: 0, }); resident = Some(inner.clone()); - let access_stats = LayerAccessStats::empty_will_record_residence_event_later(); - access_stats.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - // Newly created layers are marked visible by default: the usual case is that they were created to be read. - access_stats.set_visibility(super::LayerVisibilityHint::Visible); let local_path = local_layer_path( conf, @@ -261,16 +246,22 @@ impl Layer { &timeline.generation, ); - LayerInner::new( + let layer = LayerInner::new( conf, timeline, local_path, - access_stats, desc, Some(inner), timeline.generation, timeline.get_shard_index(), - ) + ); + + // Newly created layers are marked visible by default: the usual case is that they were created to be read. + layer + .access_stats + .set_visibility(super::LayerVisibilityHint::Visible); + + layer })); let downloaded = resident.expect("just initialized"); @@ -334,9 +325,7 @@ impl Layer { use anyhow::ensure; let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?; - self.0 - .access_stats - .record_access(LayerAccessKind::GetValueReconstructData, ctx); + self.0.access_stats.record_access(ctx); if self.layer_desc().is_delta { ensure!(lsn_range.start >= self.layer_desc().lsn_range.start); @@ -370,9 +359,7 @@ impl Layer { other => GetVectoredError::Other(anyhow::anyhow!(other)), })?; - self.0 - .access_stats - .record_access(LayerAccessKind::GetValueReconstructData, ctx); + self.0.access_stats.record_access(ctx); layer .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx) @@ -788,7 +775,6 @@ impl LayerInner { conf: &'static PageServerConf, timeline: &Arc, local_path: Utf8PathBuf, - access_stats: LayerAccessStats, desc: PersistentLayerDesc, downloaded: Option>, generation: Generation, @@ -823,7 +809,7 @@ impl LayerInner { path: local_path, desc, timeline: Arc::downgrade(timeline), - access_stats, + access_stats: Default::default(), wanted_deleted: AtomicBool::new(false), inner, version: AtomicUsize::new(version), @@ -1178,10 +1164,7 @@ impl LayerInner { LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); } - self.access_stats.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::ResidenceChange, - ); + self.access_stats.record_residence_event(); Ok(self.initialize_after_layer_is_on_disk(permit)) } @@ -1535,10 +1518,7 @@ impl LayerInner { } } - self.access_stats.record_residence_event( - LayerResidenceStatus::Evicted, - LayerResidenceEventReason::ResidenceChange, - ); + self.access_stats.record_residence_event(); self.status.as_ref().unwrap().send_replace(Status::Evicted); @@ -1864,9 +1844,7 @@ impl ResidentLayer { // this is valid because the DownloadedLayer::kind is a OnceCell, not a // Mutex, so we cannot go and deinitialize the value with OnceCell::take // while it's being held. - owner - .access_stats - .record_access(LayerAccessKind::KeyIter, ctx); + owner.access_stats.record_access(ctx); delta_layer::DeltaLayerInner::load_keys(d, ctx) .await diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 66a4493218..d5d2f748a9 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -1,3 +1,5 @@ +use std::time::UNIX_EPOCH; + use pageserver_api::key::CONTROLFILE_KEY; use tokio::task::JoinSet; use utils::{ @@ -7,7 +9,7 @@ use utils::{ use super::failpoints::{Failpoint, FailpointKind}; use super::*; -use crate::context::DownloadBehavior; +use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint}; use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness}; /// Used in tests to advance a future to wanted await point, and not futher. @@ -826,9 +828,9 @@ async fn eviction_cancellation_on_drop() { #[test] #[cfg(target_arch = "x86_64")] fn layer_size() { - assert_eq!(std::mem::size_of::(), 2048); + assert_eq!(std::mem::size_of::(), 8); assert_eq!(std::mem::size_of::(), 104); - assert_eq!(std::mem::size_of::(), 2352); + assert_eq!(std::mem::size_of::(), 312); // it also has the utf8 path } @@ -968,3 +970,46 @@ fn spawn_blocking_pool_helper_actually_works() { println!("joined"); }); } + +/// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats +fn lowres_time(hires: SystemTime) -> SystemTime { + let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs(); + UNIX_EPOCH + Duration::from_secs(ts) +} + +#[test] +fn access_stats() { + let access_stats = LayerAccessStats::default(); + // Default is visible + assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible); + + access_stats.set_visibility(LayerVisibilityHint::Covered); + assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered); + access_stats.set_visibility(LayerVisibilityHint::Visible); + assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible); + + let rtime = UNIX_EPOCH + Duration::from_secs(2000000000); + access_stats.record_residence_event_at(rtime); + assert_eq!(access_stats.latest_activity(), lowres_time(rtime)); + + let atime = UNIX_EPOCH + Duration::from_secs(2100000000); + access_stats.record_access_at(atime); + assert_eq!(access_stats.latest_activity(), lowres_time(atime)); + + // Setting visibility doesn't clobber access time + access_stats.set_visibility(LayerVisibilityHint::Covered); + assert_eq!(access_stats.latest_activity(), lowres_time(atime)); + access_stats.set_visibility(LayerVisibilityHint::Visible); + assert_eq!(access_stats.latest_activity(), lowres_time(atime)); +} + +#[test] +fn access_stats_2038() { + // The access stats structure uses a timestamp representation that will run out + // of bits in 2038. One year before that, this unit test will start failing. + + let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + + Duration::from_secs(3600 * 24 * 365); + + assert!(one_year_from_now.as_secs() < (2 << 31)); +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4bfcdc43e8..82e8ff02ca 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3155,7 +3155,7 @@ impl Timeline { let guard = self.layers.read().await; let resident = guard.likely_resident_layers().map(|layer| { - let last_activity_ts = layer.access_stats().latest_activity_or_now(); + let last_activity_ts = layer.access_stats().latest_activity(); HeatMapLayer::new( layer.layer_desc().layer_name(), @@ -5582,7 +5582,7 @@ impl Timeline { let file_size = layer.layer_desc().file_size; max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); - let last_activity_ts = layer.access_stats().latest_activity_or_now(); + let last_activity_ts = layer.access_stats().latest_activity(); EvictionCandidate { layer: layer.into(), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 972ac48cda..fec66aabc1 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -225,7 +225,7 @@ impl Timeline { continue; } - let last_activity_ts = layer.access_stats().latest_activity_or_now(); + let last_activity_ts = layer.access_stats().latest_activity(); let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 930fb14947..91c7b97fdd 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -21,6 +21,10 @@ from fixtures.utils import human_bytes, wait_until GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy" +# access times in the pageserver are stored at a very low resolution: to generate meaningfully different +# values, tests must inject sleeps +ATIME_RESOLUTION = 2 + @pytest.mark.parametrize("config_level_override", [None, 400]) def test_min_resident_size_override_handling( @@ -546,6 +550,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder): (tenant_id, timeline_id) = warm # make picked tenant more recently used than the other one + time.sleep(ATIME_RESOLUTION) env.warm_up_tenant(tenant_id) # Build up enough pressure to require evictions from both tenants, @@ -622,6 +627,10 @@ def test_fast_growing_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, or for scale in [1, 1, 1, 4]: timelines.append((pgbench_init_tenant(layer_size, scale, env, pg_bin), scale)) + # Eviction times are stored at a low resolution. We must ensure that the time between + # tenants is long enough for the pageserver to distinguish them. + time.sleep(ATIME_RESOLUTION) + env.neon_cli.safekeeper_stop() for (tenant_id, timeline_id), scale in timelines: diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 7bf49a0874..b62398d427 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -52,8 +52,8 @@ def test_threshold_based_eviction( "kind": "NoEviction" } - eviction_threshold = 5 - eviction_period = 1 + eviction_threshold = 10 + eviction_period = 2 ps_http.set_tenant_config( tenant_id, {