maintain access stats for historic layers

This patch adds basic access statistics for historic layers
and exposes them in the management API's `LayerMapInfo`.

We record the accesses in the `{Delta,Image}Layer::load()` function
because it's the common path of
* page_service (`Timline::get_reconstruct_data()`)
* Compaction (`PersistentLayer::iter()` and `PersistentLayer::key_iter()`)

The stats survive residence status changes, and record these as well.

When scraping the layer map endpoint to record its evolution over time,
one must account for stat resets because they are in-memory only and
will reset on pageserver restart.
Use the launch timestamp header added by (#3527) to identify pageserver restarts.

This is PR https://github.com/neondatabase/neon/pull/3496
This commit is contained in:
Christian Schwarz
2023-01-31 11:49:15 +01:00
committed by Christian Schwarz
parent 877a2d70e3
commit 58fa4f0eb7
15 changed files with 619 additions and 30 deletions

100
Cargo.lock generated
View File

@@ -152,6 +152,15 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic-polyfill"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d299f547288d6db8d5c3a2916f7b2f66134b15b8c1ac1c4357dd3b8752af7bb2"
dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
@@ -997,6 +1006,12 @@ dependencies = [
"itertools",
]
[[package]]
name = "critical-section"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@@ -1234,6 +1249,47 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "enum-map"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50c25992259941eb7e57b936157961b217a4fc8597829ddef0596d6c3cd86e1a"
dependencies = [
"enum-map-derive",
]
[[package]]
name = "enum-map-derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "enumset"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753"
dependencies = [
"enumset_derive",
]
[[package]]
name = "enumset_derive"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "env_logger"
version = "0.10.0"
@@ -1521,6 +1577,15 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hash32"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -1548,6 +1613,18 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "heapless"
version = "0.8.0"
source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version",
"spin 0.9.4",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -2344,6 +2421,8 @@ dependencies = [
"crc32c",
"criterion",
"crossbeam-utils",
"enum-map",
"enumset",
"fail",
"futures",
"git-version",
@@ -2376,6 +2455,8 @@ dependencies = [
"serde_with",
"signal-hook",
"storage_broker",
"strum",
"strum_macros",
"svg_fmt",
"tempfile",
"tenant_size_model",
@@ -2400,6 +2481,7 @@ dependencies = [
"byteorder",
"bytes",
"const_format",
"enum-map",
"postgres_ffi",
"serde",
"serde_with",
@@ -3021,7 +3103,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@@ -3568,6 +3650,21 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -4347,6 +4444,7 @@ dependencies = [
"bytes",
"criterion",
"git-version",
"heapless",
"hex",
"hex-literal",
"hyper",

View File

@@ -38,6 +38,8 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
fs2 = "0.4.3"
futures = "0.3"
@@ -120,6 +122,9 @@ postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", re
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
## Local libraries
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }

View File

@@ -13,5 +13,6 @@ bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
workspace_hack.workspace = true

View File

@@ -1,9 +1,14 @@
use std::num::{NonZeroU64, NonZeroUsize};
use std::{
collections::HashMap,
num::{NonZeroU64, NonZeroUsize},
time::SystemTime,
};
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
@@ -232,6 +237,82 @@ pub struct LayerMapInfo {
pub historic_layers: Vec<HistoricLayerInfo>,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
#[repr(usize)]
pub enum LayerAccessKind {
GetValueReconstructData,
Iter,
KeyIter,
Dump,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStatFullDetails {
pub when_millis_since_epoch: u64,
pub task_kind: &'static str,
pub access_kind: LayerAccessKind,
}
/// An event that impacts the layer's residence status.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerResidenceEvent {
/// The time when the event occurred.
/// NB: this timestamp is captured while the residence status changes.
/// So, it might be behind/ahead of the actual residence change by a short amount of time.
///
#[serde(rename = "timestamp_millis_since_epoch")]
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
timestamp: SystemTime,
/// The new residence status of the layer.
status: LayerResidenceStatus,
/// The reason why we had to record this event.
reason: LayerResidenceEventReason,
}
/// The reason for recording a given [`ResidenceEvent`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum LayerResidenceEventReason {
/// The layer map is being populated, e.g. during timeline load or attach.
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
/// We need to record such events because there is no persistent storage for the events.
LayerLoad,
/// We just created the layer (e.g., freeze_and_flush or compaction).
/// Such layers are always [`LayerResidenceStatus::Resident`].
LayerCreate,
/// We on-demand downloaded or evicted the given layer.
ResidenceChange,
}
/// The residence status of the layer, after the given [`LayerResidenceEvent`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum LayerResidenceStatus {
/// Residence status for a layer file that exists locally.
/// It may also exist on the remote, we don't care here.
Resident,
/// Residence status for a layer file that only exists on the remote.
Evicted,
}
impl LayerResidenceEvent {
pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
Self {
status,
reason,
timestamp: SystemTime::now(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerAccessStats {
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
pub task_kind_access_flag: Vec<&'static str>,
pub first: Option<LayerAccessStatFullDetails>,
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
@@ -261,6 +342,7 @@ pub enum HistoricLayerInfo {
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
Image {
layer_file_name: String,
@@ -269,6 +351,7 @@ pub enum HistoricLayerInfo {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
}

View File

@@ -11,6 +11,7 @@ async-trait.workspace = true
anyhow.workspace = true
bincode.workspace = true
bytes.workspace = true
heapless.workspace = true
hyper = { workspace = true, features = ["full"] }
routerify.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,161 @@
//! A heapless buffer for events of sorts.
use std::ops;
use heapless::HistoryBuffer;
#[derive(Debug, Clone)]
pub struct HistoryBufferWithDropCounter<T, const L: usize> {
buffer: HistoryBuffer<T, L>,
drop_count: u64,
}
impl<T, const L: usize> HistoryBufferWithDropCounter<T, L> {
pub fn write(&mut self, data: T) {
let len_before = self.buffer.len();
self.buffer.write(data);
let len_after = self.buffer.len();
self.drop_count += u64::from(len_before == len_after);
}
pub fn drop_count(&self) -> u64 {
self.drop_count
}
pub fn map<U, F: Fn(&T) -> U>(&self, f: F) -> HistoryBufferWithDropCounter<U, L> {
let mut buffer = HistoryBuffer::new();
buffer.extend(self.buffer.oldest_ordered().map(f));
HistoryBufferWithDropCounter::<U, L> {
buffer,
drop_count: self.drop_count,
}
}
}
impl<T, const L: usize> Default for HistoryBufferWithDropCounter<T, L> {
fn default() -> Self {
Self {
buffer: HistoryBuffer::default(),
drop_count: 0,
}
}
}
impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
type Target = HistoryBuffer<T, L>;
fn deref(&self) -> &Self::Target {
&self.buffer
}
}
#[derive(serde::Serialize)]
struct SerdeRepr<T> {
buffer: Vec<T>,
drop_count: u64,
}
impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter<T, L>> for SerdeRepr<T>
where
T: Clone + serde::Serialize,
{
fn from(value: &'a HistoryBufferWithDropCounter<T, L>) -> Self {
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
SerdeRepr {
buffer: buffer.iter().cloned().collect(),
drop_count: *drop_count,
}
}
}
impl<T, const L: usize> serde::Serialize for HistoryBufferWithDropCounter<T, L>
where
T: Clone + serde::Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SerdeRepr::from(self).serialize(serializer)
}
}
#[cfg(test)]
mod test {
use super::HistoryBufferWithDropCounter;
#[test]
fn test_basics() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
b.write(1);
b.write(2);
b.write(3);
assert!(b.iter().any(|e| *e == 2));
assert!(b.iter().any(|e| *e == 3));
assert!(!b.iter().any(|e| *e == 1));
}
#[test]
fn test_drop_count_works() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
b.write(1);
assert_eq!(b.drop_count(), 0);
b.write(2);
assert_eq!(b.drop_count(), 0);
b.write(3);
assert_eq!(b.drop_count(), 1);
b.write(4);
assert_eq!(b.drop_count(), 2);
}
#[test]
fn test_clone_works() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
b.write(1);
b.write(2);
b.write(3);
assert_eq!(b.drop_count(), 1);
let mut c = b.clone();
assert_eq!(c.drop_count(), 1);
assert!(c.iter().any(|e| *e == 2));
assert!(c.iter().any(|e| *e == 3));
assert!(!c.iter().any(|e| *e == 1));
c.write(4);
assert!(c.iter().any(|e| *e == 4));
assert!(!b.iter().any(|e| *e == 4));
}
#[test]
fn test_map() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
b.write(1);
assert_eq!(b.drop_count(), 0);
{
let c = b.map(|i| i + 10);
assert_eq!(c.oldest_ordered().cloned().collect::<Vec<_>>(), vec![11]);
assert_eq!(c.drop_count(), 0);
}
b.write(2);
assert_eq!(b.drop_count(), 0);
{
let c = b.map(|i| i + 10);
assert_eq!(
c.oldest_ordered().cloned().collect::<Vec<_>>(),
vec![11, 12]
);
assert_eq!(c.drop_count(), 0);
}
b.write(3);
assert_eq!(b.drop_count(), 1);
{
let c = b.map(|i| i + 10);
assert_eq!(
c.oldest_ordered().cloned().collect::<Vec<_>>(),
vec![12, 13]
);
assert_eq!(c.drop_count(), 1);
}
}
}

View File

@@ -52,6 +52,8 @@ pub mod signals;
pub mod fs_ext;
pub mod history_buffer;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -67,6 +67,10 @@ utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true
rpds.workspace = true
enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
[dev-dependencies]
criterion.workspace = true

View File

@@ -20,6 +20,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::TenantMapInsertError;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
@@ -534,10 +535,13 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let reset: LayerAccessStatsReset =
parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info();
let layer_map_info = timeline.layer_map_info(reset);
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -169,7 +169,14 @@ task_local! {
/// Note that we don't try to limit how many task of a certain kind can be running
/// at the same time.
///
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(
Debug,
// NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
enumset::EnumSetType,
serde::Serialize,
serde::Deserialize,
strum_macros::IntoStaticStr,
)]
pub enum TaskKind {
// Pageserver startup, i.e., `main`
Startup,

View File

@@ -9,13 +9,21 @@ mod remote_layer;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use pageserver_api::models::HistoricLayerInfo;
use enum_map::EnumMap;
use enumset::EnumSet;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::{
id::{TenantId, TimelineId},
@@ -83,6 +91,149 @@ pub enum ValueReconstructResult {
Missing,
}
#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
#[derive(Debug, Default, Clone)]
struct LayerAccessStatsInner {
first_access: Option<LayerAccessStatFullDetails>,
count_by_access_kind: EnumMap<LayerAccessKind, u64>,
task_kind_flag: EnumSet<TaskKind>,
last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[derive(Debug, Clone)]
struct LayerAccessStatFullDetails {
when: SystemTime,
task_kind: TaskKind,
access_kind: LayerAccessKind,
}
#[derive(Clone, Copy, strum_macros::EnumString)]
pub enum LayerAccessStatsReset {
NoReset,
JustTaskKindFlags,
AllStats,
}
fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
ts.duration_since(UNIX_EPOCH)
.expect("better to die in this unlikely case than report false stats")
.as_millis()
.try_into()
.expect("64 bits is enough for few more years")
}
impl LayerAccessStatFullDetails {
fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
let Self {
when,
task_kind,
access_kind,
} = self;
pageserver_api::models::LayerAccessStatFullDetails {
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
task_kind: task_kind.into(), // into static str, powered by strum_macros
access_kind: *access_kind,
}
}
}
impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
}
pub(crate) fn for_new_layer_file() -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
/// The `new_status` is not recorded in `self`
pub(crate) fn clone_for_residence_change(
&self,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new
}
fn record_residence_event(
&self,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
let mut inner = self.0.lock().unwrap();
inner
.last_residence_changes
.write(LayerResidenceEvent::new(status, reason));
}
fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
let mut inner = self.0.lock().unwrap();
let this_access = LayerAccessStatFullDetails {
when: SystemTime::now(),
task_kind,
access_kind,
};
inner
.first_access
.get_or_insert_with(|| this_access.clone());
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);
}
fn to_api_model(
&self,
reset: LayerAccessStatsReset,
) -> pageserver_api::models::LayerAccessStats {
let mut inner = self.0.lock().unwrap();
let LayerAccessStatsInner {
first_access,
count_by_access_kind,
task_kind_flag,
last_accesses,
last_residence_changes,
} = &*inner;
let ret = pageserver_api::models::LayerAccessStats {
access_count_by_access_kind: count_by_access_kind
.iter()
.map(|(kind, count)| (kind, *count))
.collect(),
task_kind_access_flag: task_kind_flag
.iter()
.map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
.collect(),
first: first_access.as_ref().map(|a| a.to_api_model()),
accesses_history: last_accesses.map(|m| m.to_api_model()),
residence_events_history: last_residence_changes.clone(),
};
match reset {
LayerAccessStatsReset::NoReset => (),
LayerAccessStatsReset::JustTaskKindFlags => {
inner.task_kind_flag.clear();
}
LayerAccessStatsReset::AllStats => {
*inner = LayerAccessStatsInner::default();
}
}
ret
}
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`].
///
@@ -193,7 +344,9 @@ pub trait PersistentLayer: Layer {
/// current_physical_size is computed as the som of this value.
fn file_size(&self) -> Option<u64>;
fn info(&self) -> HistoricLayerInfo;
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(

View File

@@ -37,7 +37,7 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -55,7 +55,10 @@ use utils::{
lsn::Lsn,
};
use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOrConf};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, LayerResidenceStatus, PathOrConf,
};
///
/// Header stored in the beginning of the file
@@ -184,6 +187,8 @@ pub struct DeltaLayer {
pub file_size: u64,
access_stats: LayerAccessStats,
inner: RwLock<DeltaLayerInner>,
}
@@ -251,7 +256,7 @@ impl Layer for DeltaLayer {
return Ok(());
}
let inner = self.load(ctx)?;
let inner = self.load(LayerAccessKind::Dump, ctx)?;
println!(
"index_start_blk: {}, root {}",
@@ -324,7 +329,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -415,7 +420,9 @@ impl PersistentLayer for DeltaLayer {
}
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>> {
let inner = self.load(ctx).context("load delta layer")?;
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer")?;
Ok(match DeltaValueIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(err) => Box::new(std::iter::once(Err(err))),
@@ -423,7 +430,7 @@ impl PersistentLayer for DeltaLayer {
}
fn key_iter(&self, ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
let inner = self.load(ctx)?;
let inner = self.load(LayerAccessKind::KeyIter, ctx)?;
Ok(Box::new(
DeltaKeyIter::new(inner).context("Layer index is corrupted")?,
))
@@ -439,18 +446,25 @@ impl PersistentLayer for DeltaLayer {
Some(self.file_size)
}
fn info(&self) -> HistoricLayerInfo {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
let access_stats = self.access_stats.to_api_model(reset);
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: Some(self.file_size),
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
access_stats,
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl DeltaLayer {
@@ -495,7 +509,13 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<DeltaLayerInner>> {
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
@@ -576,6 +596,7 @@ impl DeltaLayer {
tenant_id: TenantId,
filename: &DeltaFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -584,6 +605,7 @@ impl DeltaLayer {
key_range: filename.key_range.clone(),
lsn_range: filename.lsn_range.clone(),
file_size,
access_stats,
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -613,6 +635,7 @@ impl DeltaLayer {
key_range: summary.key_range,
lsn_range: summary.lsn_range,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -783,6 +806,7 @@ impl DeltaLayerWriterInner {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,

View File

@@ -27,14 +27,14 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerIter, PathOrConf};
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -112,6 +112,8 @@ pub struct ImageLayer {
// This entry contains an image of all pages as of this LSN
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: RwLock<ImageLayerInner>,
}
@@ -176,7 +178,7 @@ impl Layer for ImageLayer {
return Ok(());
}
let inner = self.load(ctx)?;
let inner = self.load(LayerAccessKind::Dump, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -203,7 +205,7 @@ impl Layer for ImageLayer {
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -258,7 +260,7 @@ impl PersistentLayer for ImageLayer {
Some(self.file_size)
}
fn info(&self) -> HistoricLayerInfo {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
@@ -267,8 +269,13 @@ impl PersistentLayer for ImageLayer {
layer_file_size: Some(self.file_size),
lsn_start: lsn_range.start,
remote: false,
access_stats: self.access_stats.to_api_model(reset),
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl ImageLayer {
@@ -306,7 +313,13 @@ impl ImageLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<ImageLayerInner>> {
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<ImageLayerInner>> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
@@ -386,6 +399,7 @@ impl ImageLayer {
tenant_id: TenantId,
filename: &ImageFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> ImageLayer {
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -394,6 +408,7 @@ impl ImageLayer {
key_range: filename.key_range.clone(),
lsn: filename.lsn,
file_size,
access_stats,
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,
@@ -421,6 +436,7 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
inner: RwLock::new(ImageLayerInner {
file: None,
loaded: false,
@@ -580,6 +596,7 @@ impl ImageLayerWriterInner {
key_range: self.key_range.clone(),
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,

View File

@@ -19,7 +19,10 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
use super::image_layer::ImageLayer;
use super::{DeltaLayer, LayerIter, LayerKeyIter, PersistentLayer};
use super::{
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
@@ -43,6 +46,8 @@ pub struct RemoteLayer {
is_incremental: bool,
access_stats: LayerAccessStats,
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
}
@@ -155,7 +160,7 @@ impl PersistentLayer for RemoteLayer {
self.layer_metadata.file_size()
}
fn info(&self) -> HistoricLayerInfo {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
@@ -166,6 +171,7 @@ impl PersistentLayer for RemoteLayer {
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: true,
access_stats: self.access_stats.to_api_model(reset),
}
} else {
HistoricLayerInfo::Image {
@@ -173,9 +179,14 @@ impl PersistentLayer for RemoteLayer {
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
remote: true,
access_stats: self.access_stats.to_api_model(reset),
}
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl RemoteLayer {
@@ -184,6 +195,7 @@ impl RemoteLayer {
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
tenantid,
@@ -195,6 +207,7 @@ impl RemoteLayer {
file_name: fname.to_owned().into(),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
access_stats,
}
}
@@ -203,6 +216,7 @@ impl RemoteLayer {
timelineid: TimelineId,
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
tenantid,
@@ -214,6 +228,7 @@ impl RemoteLayer {
file_name: fname.to_owned().into(),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
access_stats,
}
}
@@ -234,6 +249,8 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
} else {
let fname = ImageFileName {
@@ -246,6 +263,8 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
}
}

View File

@@ -10,7 +10,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, TimelineState,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -30,8 +30,8 @@ use crate::broker_client::is_broker_client_initialized;
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
RemoteLayer,
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -72,7 +72,7 @@ use walreceiver::spawn_connection_manager_task;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer};
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum FlushLoopState {
@@ -835,7 +835,7 @@ impl Timeline {
self.state.subscribe()
}
pub fn layer_map_info(&self) -> LayerMapInfo {
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
@@ -847,7 +847,7 @@ impl Timeline {
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
historic_layers.push(historic_layer.info());
historic_layers.push(historic_layer.info(reset));
}
LayerMapInfo {
@@ -891,12 +891,18 @@ impl Timeline {
self.timeline_id,
&image_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
),
});
@@ -1172,6 +1178,7 @@ impl Timeline {
self.tenant_id,
&imgfilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
@@ -1203,6 +1210,7 @@ impl Timeline {
self.tenant_id,
&deltafilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
@@ -1340,6 +1348,7 @@ impl Timeline {
self.timeline_id,
imgfilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
);
let remote_layer = Arc::new(remote_layer);
@@ -1364,6 +1373,7 @@ impl Timeline {
self.timeline_id,
deltafilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);