mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 04:22:56 +00:00
pageserver: simplify LayerAccessStats (#8431)
## Problem LayerAccessStats contains a lot of detail that we don't use: short histories of most recent accesses, specifics on what kind of task accessed a layer, etc. This is all stored inside a Mutex, which is locked every time something accesses a layer. ## Summary of changes - Store timestamps at a very low resolution (to the nearest second), sufficient for use on the timescales of eviction. - Pack access time and last residence change time into a single u64 - Use the high bits of the u64 for other flags, including the new layer visibility concept. - Simplify the external-facing model for access stats to just include what we now track. Note that the `HistoryBufferWithDropCounter` is removed here because it is no longer used. I do not dislike this type, we just happen not to use it for anything else at present. Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -261,15 +261,6 @@ version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
|
||||
|
||||
[[package]]
|
||||
name = "atomic-polyfill"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c314e70d181aa6053b26e3f7fbf86d1dfff84f816a6175b967666b3506ef7289"
|
||||
dependencies = [
|
||||
"critical-section",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-take"
|
||||
version = "1.1.0"
|
||||
@@ -1451,12 +1442,6 @@ dependencies = [
|
||||
"itertools",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "critical-section"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.8"
|
||||
@@ -2282,15 +2267,6 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hash32"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
@@ -2339,18 +2315,6 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001"
|
||||
dependencies = [
|
||||
"atomic-polyfill",
|
||||
"hash32",
|
||||
"rustc_version",
|
||||
"spin 0.9.8",
|
||||
"stable_deref_trait",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.1"
|
||||
@@ -5691,9 +5655,6 @@ name = "spin"
|
||||
version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
@@ -5715,12 +5676,6 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -6817,7 +6772,6 @@ dependencies = [
|
||||
"criterion",
|
||||
"fail",
|
||||
"futures",
|
||||
"heapless",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
|
||||
@@ -204,9 +204,6 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
## Other git libraries
|
||||
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
|
||||
@@ -5,7 +5,6 @@ pub mod utilization;
|
||||
pub use utilization::PageserverUtilization;
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
@@ -20,7 +19,6 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use utils::{
|
||||
completion,
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
serde_system_time,
|
||||
@@ -726,58 +724,7 @@ pub struct LayerMapInfo {
|
||||
pub historic_layers: Vec<HistoricLayerInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
|
||||
#[repr(usize)]
|
||||
pub enum LayerAccessKind {
|
||||
GetValueReconstructData,
|
||||
Iter,
|
||||
KeyIter,
|
||||
Dump,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerAccessStatFullDetails {
|
||||
pub when_millis_since_epoch: u64,
|
||||
pub task_kind: Cow<'static, str>,
|
||||
pub access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
/// An event that impacts the layer's residence status.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerResidenceEvent {
|
||||
/// The time when the event occurred.
|
||||
/// NB: this timestamp is captured while the residence status changes.
|
||||
/// So, it might be behind/ahead of the actual residence change by a short amount of time.
|
||||
///
|
||||
#[serde(rename = "timestamp_millis_since_epoch")]
|
||||
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
|
||||
pub timestamp: SystemTime,
|
||||
/// The new residence status of the layer.
|
||||
pub status: LayerResidenceStatus,
|
||||
/// The reason why we had to record this event.
|
||||
pub reason: LayerResidenceEventReason,
|
||||
}
|
||||
|
||||
/// The reason for recording a given [`LayerResidenceEvent`].
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum LayerResidenceEventReason {
|
||||
/// The layer map is being populated, e.g. during timeline load or attach.
|
||||
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
|
||||
/// We need to record such events because there is no persistent storage for the events.
|
||||
///
|
||||
// https://github.com/rust-lang/rust/issues/74481
|
||||
/// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
|
||||
/// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
|
||||
LayerLoad,
|
||||
/// We just created the layer (e.g., freeze_and_flush or compaction).
|
||||
/// Such layers are always [`LayerResidenceStatus::Resident`].
|
||||
LayerCreate,
|
||||
/// We on-demand downloaded or evicted the given layer.
|
||||
ResidenceChange,
|
||||
}
|
||||
|
||||
/// The residence status of the layer, after the given [`LayerResidenceEvent`].
|
||||
/// The residence status of a layer
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum LayerResidenceStatus {
|
||||
/// Residence status for a layer file that exists locally.
|
||||
@@ -787,23 +734,16 @@ pub enum LayerResidenceStatus {
|
||||
Evicted,
|
||||
}
|
||||
|
||||
impl LayerResidenceEvent {
|
||||
pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
|
||||
Self {
|
||||
status,
|
||||
reason,
|
||||
timestamp: SystemTime::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerAccessStats {
|
||||
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
|
||||
pub task_kind_access_flag: Vec<Cow<'static, str>>,
|
||||
pub first: Option<LayerAccessStatFullDetails>,
|
||||
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
|
||||
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
|
||||
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
|
||||
pub access_time: SystemTime,
|
||||
|
||||
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
|
||||
pub residence_time: SystemTime,
|
||||
|
||||
pub visible: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -20,7 +20,6 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
heapless.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -1,196 +0,0 @@
|
||||
//! A heapless buffer for events of sorts.
|
||||
|
||||
use std::ops;
|
||||
|
||||
use heapless::HistoryBuffer;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HistoryBufferWithDropCounter<T, const L: usize> {
|
||||
buffer: HistoryBuffer<T, L>,
|
||||
drop_count: u64,
|
||||
}
|
||||
|
||||
impl<T, const L: usize> HistoryBufferWithDropCounter<T, L> {
|
||||
pub fn write(&mut self, data: T) {
|
||||
let len_before = self.buffer.len();
|
||||
self.buffer.write(data);
|
||||
let len_after = self.buffer.len();
|
||||
self.drop_count += u64::from(len_before == len_after);
|
||||
}
|
||||
pub fn drop_count(&self) -> u64 {
|
||||
self.drop_count
|
||||
}
|
||||
pub fn map<U, F: Fn(&T) -> U>(&self, f: F) -> HistoryBufferWithDropCounter<U, L> {
|
||||
let mut buffer = HistoryBuffer::new();
|
||||
buffer.extend(self.buffer.oldest_ordered().map(f));
|
||||
HistoryBufferWithDropCounter::<U, L> {
|
||||
buffer,
|
||||
drop_count: self.drop_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> Default for HistoryBufferWithDropCounter<T, L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buffer: HistoryBuffer::default(),
|
||||
drop_count: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
|
||||
type Target = HistoryBuffer<T, L>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.buffer
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct SerdeRepr<T> {
|
||||
buffer: Vec<T>,
|
||||
buffer_size: usize,
|
||||
drop_count: u64,
|
||||
}
|
||||
|
||||
impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter<T, L>> for SerdeRepr<T>
|
||||
where
|
||||
T: Clone + serde::Serialize,
|
||||
{
|
||||
fn from(value: &'a HistoryBufferWithDropCounter<T, L>) -> Self {
|
||||
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
|
||||
SerdeRepr {
|
||||
buffer: buffer.iter().cloned().collect(),
|
||||
buffer_size: L,
|
||||
drop_count: *drop_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> serde::Serialize for HistoryBufferWithDropCounter<T, L>
|
||||
where
|
||||
T: Clone + serde::Serialize,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
SerdeRepr::from(self).serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter<T, L>
|
||||
where
|
||||
T: Clone + serde::Deserialize<'de>,
|
||||
{
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let SerdeRepr {
|
||||
buffer: des_buffer,
|
||||
drop_count,
|
||||
buffer_size,
|
||||
} = SerdeRepr::<T>::deserialize(deserializer)?;
|
||||
if buffer_size != L {
|
||||
use serde::de::Error;
|
||||
return Err(D::Error::custom(format!(
|
||||
"invalid buffer_size, expecting {L} got {buffer_size}"
|
||||
)));
|
||||
}
|
||||
let mut buffer = HistoryBuffer::new();
|
||||
buffer.extend(des_buffer);
|
||||
Ok(HistoryBufferWithDropCounter { buffer, drop_count })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::HistoryBufferWithDropCounter;
|
||||
|
||||
#[test]
|
||||
fn test_basics() {
|
||||
let mut b = HistoryBufferWithDropCounter::<usize, 2>::default();
|
||||
b.write(1);
|
||||
b.write(2);
|
||||
b.write(3);
|
||||
assert!(b.iter().any(|e| *e == 2));
|
||||
assert!(b.iter().any(|e| *e == 3));
|
||||
assert!(!b.iter().any(|e| *e == 1));
|
||||
|
||||
// round-trip serde
|
||||
let round_tripped: HistoryBufferWithDropCounter<usize, 2> =
|
||||
serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap();
|
||||
assert_eq!(
|
||||
round_tripped.iter().cloned().collect::<Vec<_>>(),
|
||||
b.iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_count_works() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
b.write(1);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
b.write(2);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
b.write(4);
|
||||
assert_eq!(b.drop_count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clone_works() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
b.write(1);
|
||||
b.write(2);
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
let mut c = b.clone();
|
||||
assert_eq!(c.drop_count(), 1);
|
||||
assert!(c.iter().any(|e| *e == 2));
|
||||
assert!(c.iter().any(|e| *e == 3));
|
||||
assert!(!c.iter().any(|e| *e == 1));
|
||||
|
||||
c.write(4);
|
||||
assert!(c.iter().any(|e| *e == 4));
|
||||
assert!(!b.iter().any(|e| *e == 4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_map() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
|
||||
b.write(1);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(c.oldest_ordered().cloned().collect::<Vec<_>>(), vec![11]);
|
||||
assert_eq!(c.drop_count(), 0);
|
||||
}
|
||||
|
||||
b.write(2);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(
|
||||
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
||||
vec![11, 12]
|
||||
);
|
||||
assert_eq!(c.drop_count(), 0);
|
||||
}
|
||||
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(
|
||||
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
||||
vec![12, 13]
|
||||
);
|
||||
assert_eq!(c.drop_count(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,8 +59,6 @@ pub mod signals;
|
||||
|
||||
pub mod fs_ext;
|
||||
|
||||
pub mod history_buffer;
|
||||
|
||||
pub mod measured_stream;
|
||||
|
||||
pub mod serde_percent;
|
||||
|
||||
@@ -10,29 +10,18 @@ pub mod merge_iterator;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::repository::Value;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
use enum_map::EnumMap;
|
||||
use enumset::EnumSet;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::models::{
|
||||
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::ops::Range;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
@@ -75,9 +64,9 @@ where
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
pub(crate) struct ValueReconstructState {
|
||||
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub(crate) img: Option<(Lsn, Bytes)>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
||||
@@ -462,7 +451,7 @@ pub enum ValueReconstructResult {
|
||||
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
|
||||
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
|
||||
/// be used for cache management but not for correctness-critical checks.
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum LayerVisibilityHint {
|
||||
/// A Visible layer might be read while serving a read, because there is not an image layer between it
|
||||
/// and a readable LSN (the tip of the branch or a child's branch point)
|
||||
@@ -478,95 +467,72 @@ pub(crate) enum LayerVisibilityHint {
|
||||
Uninitialized,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
|
||||
|
||||
/// This struct holds two instances of [`LayerAccessStatsInner`].
|
||||
/// Accesses are recorded to both instances.
|
||||
/// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
|
||||
/// The `for_eviction_policy` is never reset.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct LayerAccessStatsLocked {
|
||||
for_scraping_api: LayerAccessStatsInner,
|
||||
for_eviction_policy: LayerAccessStatsInner,
|
||||
visibility: LayerVisibilityHint,
|
||||
}
|
||||
|
||||
impl LayerAccessStatsLocked {
|
||||
fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
|
||||
[&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct LayerAccessStatsInner {
|
||||
first_access: Option<LayerAccessStatFullDetails>,
|
||||
count_by_access_kind: EnumMap<LayerAccessKind, u64>,
|
||||
task_kind_flag: EnumSet<TaskKind>,
|
||||
last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
|
||||
last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct LayerAccessStatFullDetails {
|
||||
pub(crate) when: SystemTime,
|
||||
pub(crate) task_kind: TaskKind,
|
||||
pub(crate) access_kind: LayerAccessKind,
|
||||
}
|
||||
pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
|
||||
|
||||
#[derive(Clone, Copy, strum_macros::EnumString)]
|
||||
pub enum LayerAccessStatsReset {
|
||||
pub(crate) enum LayerAccessStatsReset {
|
||||
NoReset,
|
||||
JustTaskKindFlags,
|
||||
AllStats,
|
||||
}
|
||||
|
||||
fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
|
||||
ts.duration_since(UNIX_EPOCH)
|
||||
.expect("better to die in this unlikely case than report false stats")
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("64 bits is enough for few more years")
|
||||
}
|
||||
impl Default for LayerAccessStats {
|
||||
fn default() -> Self {
|
||||
// Default value is to assume resident since creation time, and visible.
|
||||
let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
|
||||
value |= 0x1 << Self::VISIBILITY_SHIFT;
|
||||
|
||||
impl LayerAccessStatFullDetails {
|
||||
fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
|
||||
let Self {
|
||||
when,
|
||||
task_kind,
|
||||
access_kind,
|
||||
} = self;
|
||||
pageserver_api::models::LayerAccessStatFullDetails {
|
||||
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
|
||||
task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
|
||||
access_kind: *access_kind,
|
||||
}
|
||||
Self(std::sync::atomic::AtomicU64::new(value))
|
||||
}
|
||||
}
|
||||
|
||||
// Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and
|
||||
// last residence change time.
|
||||
impl LayerAccessStats {
|
||||
/// Create an empty stats object.
|
||||
///
|
||||
/// The caller is responsible for recording a residence event
|
||||
/// using [`record_residence_event`] before calling `latest_activity`.
|
||||
/// If they don't, [`latest_activity`] will return `None`.
|
||||
///
|
||||
/// [`record_residence_event`]: Self::record_residence_event
|
||||
/// [`latest_activity`]: Self::latest_activity
|
||||
pub(crate) fn empty_will_record_residence_event_later() -> Self {
|
||||
LayerAccessStats(Mutex::default())
|
||||
// How many high bits to drop from a u32 timestamp?
|
||||
// - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
|
||||
// after that, this software has been very successful!)
|
||||
// - Dropping the top bit is implicitly safe because unix timestamps are meant to be
|
||||
// stored in an i32, so they never used it.
|
||||
// - Dropping the next two bits is safe because this code is only running on systems in
|
||||
// years >= 2024, and these bits have been 1 since 2021
|
||||
//
|
||||
// Therefore we may store only 28 bits for a timestamp with one second resolution. We do
|
||||
// this truncation to make space for some flags in the high bits of our u64.
|
||||
const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
|
||||
const TS_MASK: u32 = 0x1f_ff_ff_ff;
|
||||
const TS_ONES: u32 = 0x60_00_00_00;
|
||||
|
||||
const ATIME_SHIFT: u32 = 0;
|
||||
const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
|
||||
const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
|
||||
|
||||
fn write_bits(&self, mask: u64, value: u64) -> u64 {
|
||||
self.0
|
||||
.fetch_update(
|
||||
// TODO: decide what orderings are correct
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
|v| Some((v & !mask) | (value & mask)),
|
||||
)
|
||||
.expect("Inner function is infallible")
|
||||
}
|
||||
|
||||
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
///
|
||||
/// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
|
||||
/// [`record_residence_event`]: Self::record_residence_event
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
|
||||
// Drop the low three bits of the timestamp, for an ~8s accuracy
|
||||
let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
|
||||
|
||||
((Self::TS_MASK as u64) << shift, timestamp << shift)
|
||||
}
|
||||
|
||||
fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
|
||||
let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
|
||||
if ts_bits == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a change in layer residency.
|
||||
@@ -582,123 +548,64 @@ impl LayerAccessStats {
|
||||
/// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
|
||||
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
|
||||
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
|
||||
///
|
||||
pub(crate) fn record_residence_event(
|
||||
&self,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) {
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
locked.iter_mut().for_each(|inner| {
|
||||
inner
|
||||
.last_residence_changes
|
||||
.write(LayerResidenceEvent::new(status, reason))
|
||||
});
|
||||
pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
|
||||
let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
|
||||
self.write_bits(mask, value);
|
||||
}
|
||||
|
||||
fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
|
||||
pub(crate) fn record_residence_event(&self) {
|
||||
self.record_residence_event_at(SystemTime::now())
|
||||
}
|
||||
|
||||
pub(crate) fn record_access_at(&self, now: SystemTime) {
|
||||
let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
|
||||
|
||||
// A layer which is accessed must be visible.
|
||||
mask |= 0x1 << Self::VISIBILITY_SHIFT;
|
||||
value |= 0x1 << Self::VISIBILITY_SHIFT;
|
||||
|
||||
self.write_bits(mask, value);
|
||||
}
|
||||
|
||||
pub(crate) fn record_access(&self, ctx: &RequestContext) {
|
||||
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
|
||||
return;
|
||||
}
|
||||
|
||||
let this_access = LayerAccessStatFullDetails {
|
||||
when: SystemTime::now(),
|
||||
task_kind: ctx.task_kind(),
|
||||
access_kind,
|
||||
};
|
||||
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
locked.iter_mut().for_each(|inner| {
|
||||
inner.first_access.get_or_insert(this_access);
|
||||
inner.count_by_access_kind[access_kind] += 1;
|
||||
inner.task_kind_flag |= ctx.task_kind();
|
||||
inner.last_accesses.write(this_access);
|
||||
});
|
||||
|
||||
// We may access a layer marked as Covered, if a new branch was created that depends on
|
||||
// this layer, and background updates to layer visibility didn't notice it yet
|
||||
if !matches!(locked.visibility, LayerVisibilityHint::Visible) {
|
||||
locked.visibility = LayerVisibilityHint::Visible;
|
||||
}
|
||||
self.record_access_at(SystemTime::now())
|
||||
}
|
||||
|
||||
fn as_api_model(
|
||||
&self,
|
||||
reset: LayerAccessStatsReset,
|
||||
) -> pageserver_api::models::LayerAccessStats {
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
let inner = &mut locked.for_scraping_api;
|
||||
let LayerAccessStatsInner {
|
||||
first_access,
|
||||
count_by_access_kind,
|
||||
task_kind_flag,
|
||||
last_accesses,
|
||||
last_residence_changes,
|
||||
} = inner;
|
||||
let ret = pageserver_api::models::LayerAccessStats {
|
||||
access_count_by_access_kind: count_by_access_kind
|
||||
.iter()
|
||||
.map(|(kind, count)| (kind, *count))
|
||||
.collect(),
|
||||
task_kind_access_flag: task_kind_flag
|
||||
.iter()
|
||||
.map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
|
||||
.collect(),
|
||||
first: first_access.as_ref().map(|a| a.as_api_model()),
|
||||
accesses_history: last_accesses.map(|m| m.as_api_model()),
|
||||
residence_events_history: last_residence_changes.clone(),
|
||||
access_time: self
|
||||
.read_low_res_timestamp(Self::ATIME_SHIFT)
|
||||
.unwrap_or(UNIX_EPOCH),
|
||||
residence_time: self
|
||||
.read_low_res_timestamp(Self::RTIME_SHIFT)
|
||||
.unwrap_or(UNIX_EPOCH),
|
||||
visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
|
||||
};
|
||||
match reset {
|
||||
LayerAccessStatsReset::NoReset => (),
|
||||
LayerAccessStatsReset::JustTaskKindFlags => {
|
||||
inner.task_kind_flag.clear();
|
||||
}
|
||||
LayerAccessStatsReset::NoReset => {}
|
||||
LayerAccessStatsReset::AllStats => {
|
||||
*inner = LayerAccessStatsInner::default();
|
||||
self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
|
||||
self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
/// Get the latest access timestamp, falling back to latest residence event, further falling
|
||||
/// back to `SystemTime::now` for a usable timestamp for eviction.
|
||||
pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
|
||||
self.latest_activity().unwrap_or_else(SystemTime::now)
|
||||
}
|
||||
|
||||
/// Get the latest access timestamp, falling back to latest residence event.
|
||||
///
|
||||
/// This function can only return `None` if there has not yet been a call to the
|
||||
/// [`record_residence_event`] method. That would generally be considered an
|
||||
/// implementation error. This function logs a rate-limited warning in that case.
|
||||
///
|
||||
/// TODO: use type system to avoid the need for `fallback`.
|
||||
/// The approach in <https://github.com/neondatabase/neon/pull/3775>
|
||||
/// could be used to enforce that a residence event is recorded
|
||||
/// before a layer is added to the layer map. We could also have
|
||||
/// a layer wrapper type that holds the LayerAccessStats, and ensure
|
||||
/// that that type can only be produced by inserting into the layer map.
|
||||
///
|
||||
/// [`record_residence_event`]: Self::record_residence_event
|
||||
fn latest_activity(&self) -> Option<SystemTime> {
|
||||
let locked = self.0.lock().unwrap();
|
||||
let inner = &locked.for_eviction_policy;
|
||||
match inner.last_accesses.recent() {
|
||||
Some(a) => Some(a.when),
|
||||
None => match inner.last_residence_changes.recent() {
|
||||
Some(e) => Some(e.timestamp),
|
||||
None => {
|
||||
static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
|
||||
Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
|
||||
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
|
||||
guard.0 += 1;
|
||||
let occurences = guard.0;
|
||||
guard.1.call(move || {
|
||||
warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
|
||||
});
|
||||
None
|
||||
}
|
||||
},
|
||||
/// Get the latest access timestamp, falling back to latest residence event. The latest residence event
|
||||
/// will be this Layer's construction time, if its residence hasn't changed since then.
|
||||
pub(crate) fn latest_activity(&self) -> SystemTime {
|
||||
if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
|
||||
t
|
||||
} else {
|
||||
self.read_low_res_timestamp(Self::RTIME_SHIFT)
|
||||
.expect("Residence time is set on construction")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -707,38 +614,46 @@ impl LayerAccessStats {
|
||||
/// This indicates whether the layer has been used for some purpose that would motivate
|
||||
/// us to keep it on disk, such as for serving a getpage request.
|
||||
fn accessed(&self) -> bool {
|
||||
let locked = self.0.lock().unwrap();
|
||||
let inner = &locked.for_eviction_policy;
|
||||
|
||||
// Consider it accessed if the most recent access is more recent than
|
||||
// the most recent change in residence status.
|
||||
match (
|
||||
inner.last_accesses.recent(),
|
||||
inner.last_residence_changes.recent(),
|
||||
self.read_low_res_timestamp(Self::ATIME_SHIFT),
|
||||
self.read_low_res_timestamp(Self::RTIME_SHIFT),
|
||||
) {
|
||||
(None, _) => false,
|
||||
(Some(_), None) => true,
|
||||
(Some(a), Some(r)) => a.when >= r.timestamp,
|
||||
(Some(a), Some(r)) => a >= r,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
|
||||
self.0.lock().unwrap().visibility = visibility;
|
||||
let value = match visibility {
|
||||
LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
|
||||
LayerVisibilityHint::Covered | LayerVisibilityHint::Uninitialized => 0x0,
|
||||
};
|
||||
|
||||
self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
|
||||
}
|
||||
|
||||
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
|
||||
self.0.lock().unwrap().visibility.clone()
|
||||
let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
|
||||
match (read >> Self::VISIBILITY_SHIFT) & 0x1 {
|
||||
1 => LayerVisibilityHint::Visible,
|
||||
0 => LayerVisibilityHint::Covered,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a layer descriptor from a layer.
|
||||
pub trait AsLayerDesc {
|
||||
pub(crate) trait AsLayerDesc {
|
||||
/// Get the layer descriptor.
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc;
|
||||
}
|
||||
|
||||
pub mod tests {
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -265,7 +265,7 @@ impl DeltaLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
let inner = self.load(ctx).await?;
|
||||
|
||||
inner.dump(ctx).await
|
||||
}
|
||||
@@ -298,12 +298,8 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&Arc<DeltaLayerInner>> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
|
||||
self.access_stats.record_access(ctx);
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
@@ -356,7 +352,7 @@ impl DeltaLayer {
|
||||
summary.lsn_range,
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
access_stats: Default::default(),
|
||||
inner: OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -49,7 +49,6 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -228,7 +227,7 @@ impl ImageLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
let inner = self.load(ctx).await?;
|
||||
|
||||
inner.dump(ctx).await?;
|
||||
|
||||
@@ -255,12 +254,8 @@ impl ImageLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&ImageLayerInner> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
|
||||
self.access_stats.record_access(ctx);
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.await
|
||||
@@ -312,7 +307,7 @@ impl ImageLayer {
|
||||
metadata.len(),
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: summary.lsn,
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
access_stats: Default::default(),
|
||||
inner: OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
@@ -160,13 +158,10 @@ impl Layer {
|
||||
metadata.file_size,
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
|
||||
let owner = Layer(Arc::new(LayerInner::new(
|
||||
conf,
|
||||
timeline,
|
||||
local_path,
|
||||
access_stats,
|
||||
desc,
|
||||
None,
|
||||
metadata.generation,
|
||||
@@ -193,8 +188,6 @@ impl Layer {
|
||||
metadata.file_size,
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
|
||||
|
||||
let mut resident = None;
|
||||
|
||||
let owner = Layer(Arc::new_cyclic(|owner| {
|
||||
@@ -209,7 +202,6 @@ impl Layer {
|
||||
conf,
|
||||
timeline,
|
||||
local_path,
|
||||
access_stats,
|
||||
desc,
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
@@ -245,13 +237,6 @@ impl Layer {
|
||||
version: 0,
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
|
||||
access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
|
||||
access_stats.set_visibility(super::LayerVisibilityHint::Visible);
|
||||
|
||||
let local_path = local_layer_path(
|
||||
conf,
|
||||
@@ -261,16 +246,22 @@ impl Layer {
|
||||
&timeline.generation,
|
||||
);
|
||||
|
||||
LayerInner::new(
|
||||
let layer = LayerInner::new(
|
||||
conf,
|
||||
timeline,
|
||||
local_path,
|
||||
access_stats,
|
||||
desc,
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
)
|
||||
);
|
||||
|
||||
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
|
||||
layer
|
||||
.access_stats
|
||||
.set_visibility(super::LayerVisibilityHint::Visible);
|
||||
|
||||
layer
|
||||
}));
|
||||
|
||||
let downloaded = resident.expect("just initialized");
|
||||
@@ -334,9 +325,7 @@ impl Layer {
|
||||
use anyhow::ensure;
|
||||
|
||||
let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
|
||||
self.0
|
||||
.access_stats
|
||||
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
|
||||
self.0.access_stats.record_access(ctx);
|
||||
|
||||
if self.layer_desc().is_delta {
|
||||
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
|
||||
@@ -370,9 +359,7 @@ impl Layer {
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
self.0
|
||||
.access_stats
|
||||
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
|
||||
self.0.access_stats.record_access(ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
@@ -788,7 +775,6 @@ impl LayerInner {
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &Arc<Timeline>,
|
||||
local_path: Utf8PathBuf,
|
||||
access_stats: LayerAccessStats,
|
||||
desc: PersistentLayerDesc,
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
@@ -823,7 +809,7 @@ impl LayerInner {
|
||||
path: local_path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
access_stats,
|
||||
access_stats: Default::default(),
|
||||
wanted_deleted: AtomicBool::new(false),
|
||||
inner,
|
||||
version: AtomicUsize::new(version),
|
||||
@@ -1178,10 +1164,7 @@ impl LayerInner {
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
self.access_stats.record_residence_event();
|
||||
|
||||
Ok(self.initialize_after_layer_is_on_disk(permit))
|
||||
}
|
||||
@@ -1535,10 +1518,7 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Evicted,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
self.access_stats.record_residence_event();
|
||||
|
||||
self.status.as_ref().unwrap().send_replace(Status::Evicted);
|
||||
|
||||
@@ -1864,9 +1844,7 @@ impl ResidentLayer {
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
owner
|
||||
.access_stats
|
||||
.record_access(LayerAccessKind::KeyIter, ctx);
|
||||
owner.access_stats.record_access(ctx);
|
||||
|
||||
delta_layer::DeltaLayerInner::load_keys(d, ctx)
|
||||
.await
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{
|
||||
@@ -7,7 +9,7 @@ use utils::{
|
||||
|
||||
use super::failpoints::{Failpoint, FailpointKind};
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
@@ -826,9 +828,9 @@ async fn eviction_cancellation_on_drop() {
|
||||
#[test]
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
fn layer_size() {
|
||||
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2048);
|
||||
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 8);
|
||||
assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
|
||||
assert_eq!(std::mem::size_of::<LayerInner>(), 2352);
|
||||
assert_eq!(std::mem::size_of::<LayerInner>(), 312);
|
||||
// it also has the utf8 path
|
||||
}
|
||||
|
||||
@@ -968,3 +970,46 @@ fn spawn_blocking_pool_helper_actually_works() {
|
||||
println!("joined");
|
||||
});
|
||||
}
|
||||
|
||||
/// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
|
||||
fn lowres_time(hires: SystemTime) -> SystemTime {
|
||||
let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
|
||||
UNIX_EPOCH + Duration::from_secs(ts)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn access_stats() {
|
||||
let access_stats = LayerAccessStats::default();
|
||||
// Default is visible
|
||||
assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
|
||||
|
||||
access_stats.set_visibility(LayerVisibilityHint::Covered);
|
||||
assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
|
||||
access_stats.set_visibility(LayerVisibilityHint::Visible);
|
||||
assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
|
||||
|
||||
let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
|
||||
access_stats.record_residence_event_at(rtime);
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
|
||||
|
||||
let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
|
||||
access_stats.record_access_at(atime);
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(atime));
|
||||
|
||||
// Setting visibility doesn't clobber access time
|
||||
access_stats.set_visibility(LayerVisibilityHint::Covered);
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(atime));
|
||||
access_stats.set_visibility(LayerVisibilityHint::Visible);
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(atime));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn access_stats_2038() {
|
||||
// The access stats structure uses a timestamp representation that will run out
|
||||
// of bits in 2038. One year before that, this unit test will start failing.
|
||||
|
||||
let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
|
||||
+ Duration::from_secs(3600 * 24 * 365);
|
||||
|
||||
assert!(one_year_from_now.as_secs() < (2 << 31));
|
||||
}
|
||||
|
||||
@@ -3155,7 +3155,7 @@ impl Timeline {
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
let resident = guard.likely_resident_layers().map(|layer| {
|
||||
let last_activity_ts = layer.access_stats().latest_activity_or_now();
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
@@ -5582,7 +5582,7 @@ impl Timeline {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
let last_activity_ts = layer.access_stats().latest_activity_or_now();
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
EvictionCandidate {
|
||||
layer: layer.into(),
|
||||
|
||||
@@ -225,7 +225,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let last_activity_ts = layer.access_stats().latest_activity_or_now();
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
let no_activity_for = match now.duration_since(last_activity_ts) {
|
||||
Ok(d) => d,
|
||||
|
||||
@@ -21,6 +21,10 @@ from fixtures.utils import human_bytes, wait_until
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
|
||||
# access times in the pageserver are stored at a very low resolution: to generate meaningfully different
|
||||
# values, tests must inject sleeps
|
||||
ATIME_RESOLUTION = 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config_level_override", [None, 400])
|
||||
def test_min_resident_size_override_handling(
|
||||
@@ -546,6 +550,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
|
||||
(tenant_id, timeline_id) = warm
|
||||
|
||||
# make picked tenant more recently used than the other one
|
||||
time.sleep(ATIME_RESOLUTION)
|
||||
env.warm_up_tenant(tenant_id)
|
||||
|
||||
# Build up enough pressure to require evictions from both tenants,
|
||||
@@ -622,6 +627,10 @@ def test_fast_growing_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, or
|
||||
for scale in [1, 1, 1, 4]:
|
||||
timelines.append((pgbench_init_tenant(layer_size, scale, env, pg_bin), scale))
|
||||
|
||||
# Eviction times are stored at a low resolution. We must ensure that the time between
|
||||
# tenants is long enough for the pageserver to distinguish them.
|
||||
time.sleep(ATIME_RESOLUTION)
|
||||
|
||||
env.neon_cli.safekeeper_stop()
|
||||
|
||||
for (tenant_id, timeline_id), scale in timelines:
|
||||
|
||||
@@ -52,8 +52,8 @@ def test_threshold_based_eviction(
|
||||
"kind": "NoEviction"
|
||||
}
|
||||
|
||||
eviction_threshold = 5
|
||||
eviction_period = 1
|
||||
eviction_threshold = 10
|
||||
eviction_period = 2
|
||||
ps_http.set_tenant_config(
|
||||
tenant_id,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user