mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 13:10:37 +00:00
carry stats across eviction & on-demand download
This commit is contained in:
committed by
Christian Schwarz
parent
63ad7c4461
commit
8ce21eb7e3
@@ -253,12 +253,20 @@ pub struct LayerAccessStatFullDetails {
|
||||
pub access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind")]
|
||||
pub enum LayerResidenceStatus {
|
||||
Resident { timestamp_millis_since_epoch: u128 },
|
||||
Evicted { timestamp_millis_since_epoch: u128 },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct LayerAccessStats {
|
||||
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
|
||||
pub task_kind_access_flag: Vec<&'static str>,
|
||||
pub first: Option<LayerAccessStatFullDetails>,
|
||||
pub most_recent: Vec<LayerAccessStatFullDetails>,
|
||||
pub most_recent_residence_changes: Vec<LayerResidenceStatus>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -88,21 +88,29 @@ pub enum ValueReconstructResult {
|
||||
Missing,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct LayerAccessStatFullDetails {
|
||||
when: SystemTime,
|
||||
task_kind: TaskKind,
|
||||
access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct LayerAccessStatsInner {
|
||||
first_access: Option<LayerAccessStatFullDetails>,
|
||||
count_by_access_kind: EnumMap<LayerAccessKind, u64>,
|
||||
task_kind_flag: EnumSet<TaskKind>,
|
||||
last_accesses: VecDeque<LayerAccessStatFullDetails>,
|
||||
last_residence_changes: VecDeque<LayerResidenceStatus>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum LayerResidenceStatus {
|
||||
Resident { timestamp: SystemTime },
|
||||
Evicted { timestamp: SystemTime },
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, strum_macros::EnumString)]
|
||||
@@ -111,6 +119,12 @@ pub enum LayerAccessStatsReset {
|
||||
AllStats,
|
||||
}
|
||||
|
||||
fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u128 {
|
||||
ts.duration_since(UNIX_EPOCH)
|
||||
.expect("better to die in this unlikely case than report false stats")
|
||||
.as_millis()
|
||||
}
|
||||
|
||||
impl LayerAccessStatFullDetails {
|
||||
fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
|
||||
let Self {
|
||||
@@ -119,16 +133,42 @@ impl LayerAccessStatFullDetails {
|
||||
access_kind,
|
||||
} = self;
|
||||
pageserver_api::models::LayerAccessStatFullDetails {
|
||||
when_millis_since_epoch: when
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("better to die in this unlikely case than report false stats")
|
||||
.as_millis(),
|
||||
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
|
||||
task_kind: task_kind.into(), // into static str, powered by strum_macros
|
||||
access_kind: *access_kind,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerResidenceStatus {
|
||||
pub fn evicted() -> Self {
|
||||
LayerResidenceStatus::Evicted {
|
||||
timestamp: SystemTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resident() -> Self {
|
||||
LayerResidenceStatus::Resident {
|
||||
timestamp: SystemTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_api_model(&self) -> pageserver_api::models::LayerResidenceStatus {
|
||||
match self {
|
||||
LayerResidenceStatus::Resident { timestamp } => {
|
||||
pageserver_api::models::LayerResidenceStatus::Resident {
|
||||
timestamp_millis_since_epoch: system_time_to_millis_since_epoch(timestamp),
|
||||
}
|
||||
}
|
||||
LayerResidenceStatus::Evicted { timestamp } => {
|
||||
pageserver_api::models::LayerResidenceStatus::Evicted {
|
||||
timestamp_millis_since_epoch: system_time_to_millis_since_epoch(timestamp),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub static LAYER_ACCESS_STATS_KILLSWITCH: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
impl Default for LayerAccessStatsInner {
|
||||
@@ -138,12 +178,36 @@ impl Default for LayerAccessStatsInner {
|
||||
count_by_access_kind: EnumMap::default(),
|
||||
task_kind_flag: EnumSet::default(),
|
||||
last_accesses: VecDeque::with_capacity(LayerAccessStats::LAST_ACCESSES_MAX_LEN),
|
||||
last_residence_changes: VecDeque::with_capacity(
|
||||
LayerAccessStats::LAST_RESIDENCE_CHANGES_MAX_LEN,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerAccessStats {
|
||||
const LAST_ACCESSES_MAX_LEN: usize = 16;
|
||||
const LAST_RESIDENCE_CHANGES_MAX_LEN: usize = 16;
|
||||
|
||||
/// Creates a clone of `self` and records `new_status` in the clone.
|
||||
/// The `new_status` is not recorded in `self`
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats {
|
||||
let mut clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
|
||||
// make room first to avoid reallocs
|
||||
while clone.last_residence_changes.len() >= Self::LAST_RESIDENCE_CHANGES_MAX_LEN {
|
||||
clone.last_residence_changes.pop_back();
|
||||
}
|
||||
clone.last_residence_changes.push_front(new_status.clone());
|
||||
|
||||
LayerAccessStats(Mutex::new(clone))
|
||||
}
|
||||
|
||||
fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
|
||||
if LAYER_ACCESS_STATS_KILLSWITCH.load(atomic::Ordering::SeqCst) {
|
||||
@@ -184,6 +248,7 @@ impl LayerAccessStats {
|
||||
count_by_access_kind,
|
||||
task_kind_flag,
|
||||
last_accesses,
|
||||
last_residence_changes,
|
||||
} = &*inner;
|
||||
pageserver_api::models::LayerAccessStats {
|
||||
access_count_by_access_kind: count_by_access_kind
|
||||
@@ -196,6 +261,10 @@ impl LayerAccessStats {
|
||||
.collect(),
|
||||
first: first_access.as_ref().map(|a| a.to_api_model()),
|
||||
most_recent: last_accesses.iter().map(|a| a.to_api_model()).collect(),
|
||||
most_recent_residence_changes: last_residence_changes
|
||||
.iter()
|
||||
.map(|s| s.to_api_model())
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -308,6 +377,8 @@ pub trait PersistentLayer: Layer {
|
||||
fn file_size(&self) -> Option<u64>;
|
||||
|
||||
fn info(&self, reset: Option<LayerAccessStatsReset>) -> HistoricLayerInfo;
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
|
||||
@@ -447,6 +447,10 @@ impl PersistentLayer for DeltaLayer {
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
@@ -578,6 +582,7 @@ impl DeltaLayer {
|
||||
tenant_id: TenantId,
|
||||
filename: &DeltaFileName,
|
||||
file_size: u64,
|
||||
existing_access_stats: Option<LayerAccessStats>,
|
||||
) -> DeltaLayer {
|
||||
DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
@@ -586,7 +591,7 @@ impl DeltaLayer {
|
||||
key_range: filename.key_range.clone(),
|
||||
lsn_range: filename.lsn_range.clone(),
|
||||
file_size,
|
||||
access_stats: LayerAccessStats::default(),
|
||||
access_stats: existing_access_stats.unwrap_or_default(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -258,6 +258,10 @@ impl PersistentLayer for ImageLayer {
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
@@ -381,6 +385,7 @@ impl ImageLayer {
|
||||
tenant_id: TenantId,
|
||||
filename: &ImageFileName,
|
||||
file_size: u64,
|
||||
existing_access_stats: Option<LayerAccessStats>,
|
||||
) -> ImageLayer {
|
||||
ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
@@ -389,7 +394,7 @@ impl ImageLayer {
|
||||
key_range: filename.key_range.clone(),
|
||||
lsn: filename.lsn,
|
||||
file_size,
|
||||
access_stats: LayerAccessStats::default(),
|
||||
access_stats: existing_access_stats.unwrap_or_default(),
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -18,7 +18,10 @@ use utils::{
|
||||
|
||||
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
use super::image_layer::ImageLayer;
|
||||
use super::{DeltaLayer, LayerAccessStatsReset, LayerIter, LayerKeyIter, PersistentLayer};
|
||||
use super::{
|
||||
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteLayer {
|
||||
@@ -35,6 +38,8 @@ pub struct RemoteLayer {
|
||||
|
||||
is_incremental: bool,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
@@ -150,7 +155,7 @@ impl PersistentLayer for RemoteLayer {
|
||||
lsn_start: lsn_range.start,
|
||||
lsn_end: lsn_range.end,
|
||||
remote: true,
|
||||
access_stats: None, // remote layer doesn't get accessed
|
||||
access_stats: Some(self.access_stats.to_api_model()),
|
||||
}
|
||||
} else {
|
||||
HistoricLayerInfo::Image {
|
||||
@@ -159,10 +164,14 @@ impl PersistentLayer for RemoteLayer {
|
||||
key_end: key_range.end,
|
||||
lsn_start: lsn_range.start,
|
||||
remote: true,
|
||||
access_stats: None, // remote layer doesn't get accessed
|
||||
access_stats: Some(self.access_stats.to_api_model()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayer {
|
||||
@@ -171,6 +180,7 @@ impl RemoteLayer {
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
existing_access_stats: Option<LayerAccessStats>,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
@@ -182,6 +192,7 @@ impl RemoteLayer {
|
||||
file_name: fname.to_owned().into(),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
access_stats: existing_access_stats.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,6 +201,8 @@ impl RemoteLayer {
|
||||
timelineid: TimelineId,
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
|
||||
existing_access_stats: Option<LayerAccessStats>,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
@@ -201,6 +214,7 @@ impl RemoteLayer {
|
||||
file_name: fname.to_owned().into(),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
access_stats: existing_access_stats.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,6 +235,10 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
Some(
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::resident()),
|
||||
),
|
||||
))
|
||||
} else {
|
||||
let fname = ImageFileName {
|
||||
@@ -233,6 +251,10 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
Some(
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::resident()),
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +71,9 @@ use walreceiver::spawn_connection_manager_task;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
|
||||
use super::storage_layer::{
|
||||
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, LayerResidenceStatus,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum FlushLoopState {
|
||||
@@ -877,12 +879,22 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
&image_name,
|
||||
&layer_metadata,
|
||||
Some(
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::evicted()),
|
||||
),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&delta_name,
|
||||
&layer_metadata,
|
||||
Some(
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::evicted()),
|
||||
),
|
||||
),
|
||||
#[cfg(test)]
|
||||
LayerFileName::Test(_) => unreachable!(),
|
||||
@@ -1159,6 +1171,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
file_size,
|
||||
None,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1190,6 +1203,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
file_size,
|
||||
None,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1327,6 +1341,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
None,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
@@ -1351,6 +1366,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
None,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer);
|
||||
|
||||
Reference in New Issue
Block a user