mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
eviction: regression test + distinguish layer write from map insert (#4005)
This patch adds a regression test for the threshold-based layer eviction. The test asserts the basic invariant that, if left alone, the residence statuses will stabilize, with some layers resident and some layers evicted. Thereby, we cover both the aspect of last-access-time-threshold-based eviction, and the "imitate access" hacks that we put in recently. The aggressive `period` and `threshold` values revealed a subtle bug which is also fixed in this patch. The symptom was that, without the Rust changes of this patch, there would be occasional test failures due to `WARN... unexpectedly downloading` log messages. These log messages were caused by the "imitate access" calls of the eviction task. But, the whole point of the "imitate access" hack was to prevent eviction of the layers that we access there. After some digging, I found the root cause, which is the following race condition: 1. Compact: Write out an L1 layer from several L0 layers. This records residence event `LayerCreate` with the current timestamp. 2. Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. 3. Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. 4. Eviction: observes the new L1 layer whose only activity timestamp is the `LayerCreate` event. The L1 layer had no chance of being accessed until after (3). So, if enough time passes between (1) and (3), then (4) will observe a layer with `now-last_activity > threshold` and evict it The fix is to require the first `record_residence_event` to happen while we already hold the layer map lock. The API requires a ref to a `BatchedUpdates` as a witness that we are inside a layer map lock. That is not fool-proof, e.g., new call sites for `insert_historic` could just completely forget to record the residence event. It would be nice to prevent this at the type level. In the meantime, we have a rate-limited log messages to warn us, if such an implementation error sneaks in in the future. fixes https://github.com/neondatabase/neon/issues/3593 fixes https://github.com/neondatabase/neon/issues/3942 --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
committed by
GitHub
parent
b5d64a1e32
commit
7dd9553bbb
@@ -58,6 +58,8 @@ pub mod pageserver_feedback;
|
||||
|
||||
pub mod tracing_span_assert;
|
||||
|
||||
pub mod rate_limit;
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
#[macro_export]
|
||||
macro_rules! failpoint_sleep_millis_async {
|
||||
|
||||
66
libs/utils/src/rate_limit.rs
Normal file
66
libs/utils/src/rate_limit.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
//! A helper to rate limit operations.
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub struct RateLimit {
|
||||
last: Option<Instant>,
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
pub fn new(interval: Duration) -> Self {
|
||||
Self {
|
||||
last: None,
|
||||
interval,
|
||||
}
|
||||
}
|
||||
|
||||
/// Call `f` if the rate limit allows.
|
||||
/// Don't call it otherwise.
|
||||
pub fn call<F: FnOnce()>(&mut self, f: F) {
|
||||
let now = Instant::now();
|
||||
match self.last {
|
||||
Some(last) if now - last <= self.interval => {
|
||||
// ratelimit
|
||||
}
|
||||
_ => {
|
||||
self.last = Some(now);
|
||||
f();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
|
||||
#[test]
|
||||
fn basics() {
|
||||
use super::RateLimit;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
use std::time::Duration;
|
||||
|
||||
let called = AtomicUsize::new(0);
|
||||
let mut f = RateLimit::new(Duration::from_millis(100));
|
||||
|
||||
let cl = || {
|
||||
called.fetch_add(1, Relaxed);
|
||||
};
|
||||
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 2);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 2);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 3);
|
||||
}
|
||||
}
|
||||
@@ -13,9 +13,9 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use either::Either;
|
||||
use enum_map::EnumMap;
|
||||
use enumset::EnumSet;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
@@ -23,8 +23,10 @@ use pageserver_api::models::{
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -37,6 +39,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
T: PartialOrd<T>,
|
||||
@@ -158,41 +162,82 @@ impl LayerAccessStatFullDetails {
|
||||
}
|
||||
|
||||
impl LayerAccessStats {
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
/// Create an empty stats object.
|
||||
///
|
||||
/// The caller is responsible for recording a residence event
|
||||
/// using [`record_residence_event`] before calling `latest_activity`.
|
||||
/// If they don't, [`latest_activity`] will return `None`.
|
||||
pub(crate) fn empty_will_record_residence_event_later() -> Self {
|
||||
LayerAccessStats(Mutex::default())
|
||||
}
|
||||
|
||||
pub(crate) fn for_new_layer_file() -> Self {
|
||||
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn for_loading_layer<L>(
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
) -> Self
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
layer_map_lock_held_witness,
|
||||
status,
|
||||
LayerResidenceEventReason::LayerLoad,
|
||||
);
|
||||
new
|
||||
}
|
||||
|
||||
/// Creates a clone of `self` and records `new_status` in the clone.
|
||||
/// The `new_status` is not recorded in `self`
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
///
|
||||
/// The `new_status` is not recorded in `self`.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn clone_for_residence_change<L>(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats {
|
||||
) -> LayerAccessStats
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
let clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new.record_residence_event(
|
||||
layer_map_lock_held_witness,
|
||||
new_status,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
new
|
||||
}
|
||||
|
||||
fn record_residence_event(
|
||||
/// Record a change in layer residency.
|
||||
///
|
||||
/// Recording the event must happen while holding the layer map lock to
|
||||
/// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
|
||||
/// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
|
||||
///
|
||||
/// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
|
||||
/// the following race could happen:
|
||||
///
|
||||
/// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
|
||||
/// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
|
||||
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
|
||||
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
|
||||
///
|
||||
pub(crate) fn record_residence_event<L>(
|
||||
&self,
|
||||
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) {
|
||||
) where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
locked.iter_mut().for_each(|inner| {
|
||||
inner
|
||||
@@ -255,24 +300,37 @@ impl LayerAccessStats {
|
||||
ret
|
||||
}
|
||||
|
||||
fn most_recent_access_or_residence_event(
|
||||
&self,
|
||||
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
|
||||
/// Get the latest access timestamp, falling back to latest residence event.
|
||||
///
|
||||
/// This function can only return `None` if there has not yet been a call to the
|
||||
/// [`record_residence_event`] method. That would generally be considered an
|
||||
/// implementation error. This function logs a rate-limited warning in that case.
|
||||
///
|
||||
/// TODO: use type system to avoid the need for `fallback`.
|
||||
/// The approach in https://github.com/neondatabase/neon/pull/3775
|
||||
/// could be used to enforce that a residence event is recorded
|
||||
/// before a layer is added to the layer map. We could also have
|
||||
/// a layer wrapper type that holds the LayerAccessStats, and ensure
|
||||
/// that that type can only be produced by inserting into the layer map.
|
||||
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
|
||||
let locked = self.0.lock().unwrap();
|
||||
let inner = &locked.for_eviction_policy;
|
||||
match inner.last_accesses.recent() {
|
||||
Some(a) => Either::Left(*a),
|
||||
Some(a) => Some(a.when),
|
||||
None => match inner.last_residence_changes.recent() {
|
||||
Some(e) => Either::Right(e.clone()),
|
||||
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn latest_activity(&self) -> SystemTime {
|
||||
match self.most_recent_access_or_residence_event() {
|
||||
Either::Left(mra) => mra.when,
|
||||
Either::Right(re) => re.timestamp,
|
||||
Some(e) => Some(e.timestamp),
|
||||
None => {
|
||||
static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
|
||||
Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
|
||||
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
|
||||
guard.0 += 1;
|
||||
let occurences = guard.0;
|
||||
guard.1.call(move || {
|
||||
warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
|
||||
});
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ use utils::{
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, LayerResidenceStatus, PathOrConf,
|
||||
LayerKeyIter, PathOrConf,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -637,7 +637,7 @@ impl DeltaLayer {
|
||||
key_range: summary.key_range,
|
||||
lsn_range: summary.lsn_range,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
@@ -808,7 +808,7 @@ impl DeltaLayerWriterInner {
|
||||
key_range: self.key_start..key_end,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_new_layer_file(),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -53,7 +53,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -438,7 +438,7 @@ impl ImageLayer {
|
||||
key_range: summary.key_range,
|
||||
lsn: summary.lsn,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
file: None,
|
||||
loaded: false,
|
||||
@@ -598,7 +598,7 @@ impl ImageLayerWriterInner {
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_new_layer_file(),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use anyhow::{bail, Result};
|
||||
@@ -246,11 +247,15 @@ impl RemoteLayer {
|
||||
}
|
||||
|
||||
/// Create a Layer struct representing this layer, after it has been downloaded.
|
||||
pub fn create_downloaded_layer(
|
||||
pub fn create_downloaded_layer<L>(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
conf: &'static PageServerConf,
|
||||
file_size: u64,
|
||||
) -> Arc<dyn PersistentLayer> {
|
||||
) -> Arc<dyn PersistentLayer>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
if self.is_delta {
|
||||
let fname = DeltaFileName {
|
||||
key_range: self.key_range.clone(),
|
||||
@@ -262,8 +267,10 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
))
|
||||
} else {
|
||||
let fname = ImageFileName {
|
||||
@@ -276,8 +283,10 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,8 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
TimelineState,
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
@@ -1111,7 +1112,7 @@ impl Timeline {
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::Evicted),
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
@@ -1120,7 +1121,7 @@ impl Timeline {
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::Evicted),
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
});
|
||||
|
||||
@@ -1489,7 +1490,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1521,7 +1522,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1657,7 +1658,10 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
LayerAccessStats::for_loading_layer(
|
||||
&updates,
|
||||
LayerResidenceStatus::Evicted,
|
||||
),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
@@ -1682,7 +1686,10 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
LayerAccessStats::for_loading_layer(
|
||||
&updates,
|
||||
LayerResidenceStatus::Evicted,
|
||||
),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer);
|
||||
@@ -2729,11 +2736,16 @@ impl Timeline {
|
||||
])?;
|
||||
|
||||
// Add it to the layer map
|
||||
self.layers
|
||||
.write()
|
||||
.unwrap()
|
||||
.batch_update()
|
||||
.insert_historic(Arc::new(new_delta));
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(l);
|
||||
batch_updates.flush();
|
||||
|
||||
// update the timeline's physical size
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
@@ -2938,7 +2950,13 @@ impl Timeline {
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
updates.insert_historic(Arc::new(l));
|
||||
let l = Arc::new(l);
|
||||
l.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(l);
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3371,6 +3389,11 @@ impl Timeline {
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(x);
|
||||
}
|
||||
|
||||
@@ -3881,9 +3904,9 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
use crate::tenant::layer_map::Replacement;
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
@@ -4155,7 +4178,15 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let last_activity_ts = l.access_stats().latest_activity();
|
||||
let last_activity_ts = l
|
||||
.access_stats()
|
||||
.latest_activity()
|
||||
.unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
// `latest_activity` already does rate-limited warn!() log.
|
||||
debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now");
|
||||
SystemTime::now()
|
||||
});
|
||||
|
||||
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
|
||||
layer: l,
|
||||
|
||||
@@ -184,7 +184,14 @@ impl Timeline {
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity();
|
||||
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
// `latest_activity` already does rate-limited warn!() log.
|
||||
debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now");
|
||||
SystemTime::now()
|
||||
});
|
||||
|
||||
let no_activity_for = match now.duration_since(last_activity_ts) {
|
||||
Ok(d) => d,
|
||||
Err(_e) => {
|
||||
|
||||
@@ -292,6 +292,12 @@ def port_distributor(worker_base_port: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor):
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def default_broker(
|
||||
port_distributor: PortDistributor,
|
||||
|
||||
@@ -24,13 +24,6 @@ from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor):
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Storage metrics tests
|
||||
# ==============================================================================
|
||||
|
||||
179
test_runner/regress/test_threshold_based_eviction.py
Normal file
179
test_runner/regress/test_threshold_based_eviction.py
Normal file
@@ -0,0 +1,179 @@
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Set, Tuple
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
RemoteStorageKind,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.http import LayerMapInfo
|
||||
from fixtures.types import TimelineId
|
||||
from pytest_httpserver import HTTPServer
|
||||
|
||||
# NB: basic config change tests are in test_tenant_conf.py
|
||||
|
||||
|
||||
def test_threshold_based_eviction(
|
||||
request,
|
||||
httpserver: HTTPServer,
|
||||
httpserver_listen_address,
|
||||
pg_bin: PgBin,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}")
|
||||
|
||||
# Start with metrics collection enabled, so that the eviction task
|
||||
# imitates its accesses. We'll use a non-existent endpoint to make it fail.
|
||||
# The synthetic size calculation will run regardless.
|
||||
host, port = httpserver_listen_address
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
synthetic_size_calculation_interval="2s"
|
||||
metric_collection_endpoint="http://{host}:{port}/nonexistent"
|
||||
"""
|
||||
metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*"
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(metrics_refused_log_line)
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
assert isinstance(timeline_id, TimelineId)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
|
||||
"kind": "NoEviction"
|
||||
}
|
||||
|
||||
eviction_threshold = 5
|
||||
eviction_period = 1
|
||||
ps_http.set_tenant_config(
|
||||
tenant_id,
|
||||
{
|
||||
"eviction_policy": {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"threshold": f"{eviction_threshold}s",
|
||||
"period": f"{eviction_period}s",
|
||||
},
|
||||
},
|
||||
)
|
||||
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"threshold": f"{eviction_threshold}s",
|
||||
"period": f"{eviction_period}s",
|
||||
}
|
||||
|
||||
# restart because changing tenant config is not instant
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"threshold": f"{eviction_threshold}s",
|
||||
"period": f"{eviction_period}s",
|
||||
}
|
||||
|
||||
# create a bunch of L1s, only the least of which will need to be resident
|
||||
compaction_threshold = 3 # create L1 layers quickly
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
tenant_id,
|
||||
inserts={
|
||||
# Disable gc and compaction to avoid on-demand downloads from their side.
|
||||
# The only on-demand downloads should be from the eviction tasks's "imitate access" functions.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# low checkpoint_distance so that pgbench creates many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Low compaction target size to create many L1's with tight key ranges.
|
||||
# This is so that the "imitate access" don't download all the layers.
|
||||
"compaction_target_size": 1 * 1024**2, # all keys into one L1
|
||||
# Turn L0's into L1's fast.
|
||||
"compaction_threshold": compaction_threshold,
|
||||
# Prevent compaction from collapsing L1 delta layers into image layers. We want many layers here.
|
||||
"image_creation_threshold": 100,
|
||||
# Much larger so that synthetic size caluclation worker, which is part of metric collection,
|
||||
# computes logical size for initdb_lsn every time, instead of some moving lsn as we insert data.
|
||||
# This makes the set of downloaded layers predictable,
|
||||
# thereby allowing the residence statuses to stabilize below.
|
||||
"gc_horizon": 1024**4,
|
||||
},
|
||||
)
|
||||
|
||||
# create a bunch of layers
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as pg:
|
||||
pg_bin.run(["pgbench", "-i", "-s", "3", pg.connstr()])
|
||||
last_flush_lsn_upload(env, pg, tenant_id, timeline_id)
|
||||
# wrap up and shutdown safekeepers so that no more layers will be created after the final checkpoint
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# wait for evictions and assert that they stabilize
|
||||
@dataclass
|
||||
class ByLocalAndRemote:
|
||||
remote_layers: Set[str]
|
||||
local_layers: Set[str]
|
||||
|
||||
class MapInfoProjection:
|
||||
def __init__(self, info: LayerMapInfo):
|
||||
self.info = info
|
||||
|
||||
def by_local_and_remote(self) -> ByLocalAndRemote:
|
||||
return ByLocalAndRemote(
|
||||
remote_layers={
|
||||
layer.layer_file_name for layer in self.info.historic_layers if layer.remote
|
||||
},
|
||||
local_layers={
|
||||
layer.layer_file_name for layer in self.info.historic_layers if not layer.remote
|
||||
},
|
||||
)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, MapInfoProjection):
|
||||
return False
|
||||
return self.by_local_and_remote() == other.by_local_and_remote()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
out = ["MapInfoProjection:"]
|
||||
for layer in sorted(self.info.historic_layers, key=lambda layer: layer.layer_file_name):
|
||||
remote = "R" if layer.remote else "L"
|
||||
out += [f" {remote} {layer.layer_file_name}"]
|
||||
return "\n".join(out)
|
||||
|
||||
observation_window = 8 * eviction_threshold
|
||||
consider_stable_when_no_change_for_seconds = 3 * eviction_threshold
|
||||
poll_interval = eviction_threshold / 3
|
||||
started_waiting_at = time.time()
|
||||
map_info_changes: List[Tuple[float, MapInfoProjection]] = []
|
||||
while time.time() - started_waiting_at < observation_window:
|
||||
current = (
|
||||
time.time(),
|
||||
MapInfoProjection(ps_http.layer_map_info(tenant_id, timeline_id)),
|
||||
)
|
||||
last = map_info_changes[-1] if map_info_changes else (0, None)
|
||||
if last[1] is None or current[1] != last[1]:
|
||||
map_info_changes.append(current)
|
||||
log.info("change in layer map\n before: %s\n after: %s", last, current)
|
||||
else:
|
||||
stable_for = current[0] - last[0]
|
||||
log.info("residencies stable for %s", stable_for)
|
||||
if stable_for > consider_stable_when_no_change_for_seconds:
|
||||
break
|
||||
time.sleep(poll_interval)
|
||||
|
||||
log.info("len(map_info_changes)=%s", len(map_info_changes))
|
||||
|
||||
# TODO: can we be more precise here? E.g., require we're stable _within_ X*threshold,
|
||||
# instead of what we do here, i.e., stable _for at least_ X*threshold toward the end of the observation window
|
||||
assert (
|
||||
stable_for > consider_stable_when_no_change_for_seconds
|
||||
), "layer residencies did not become stable within the observation window"
|
||||
|
||||
post = map_info_changes[-1][1].by_local_and_remote()
|
||||
assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized"
|
||||
assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident"
|
||||
|
||||
assert env.pageserver.log_contains(
|
||||
metrics_refused_log_line
|
||||
), "ensure the metrics collection worker ran"
|
||||
Reference in New Issue
Block a user