fix(layer): remove the need to repair internal state (#7030)

## Problem

The current implementation of struct Layer supports canceled read
requests, but those will leave the internal state such that a following
`Layer::keep_resident` call will need to repair the state. In
pathological cases seen during generation numbers resetting in staging
or with too many in-progress on-demand downloads, this repair activity
will need to wait for the download to complete, which stalls disk
usage-based eviction. Similar stalls have been observed in staging near
disk-full situations, where downloads failed because the disk was full.

Fixes #6028 or the "layer is present on filesystem but not evictable"
problems by:
1. not canceling pending evictions by a canceled
`LayerInner::get_or_maybe_download`
2. completing post-download initialization of the `LayerInner::inner`
from the download task

Not canceling evictions above case (1) and always initializing (2) lead
to plain `LayerInner::inner` always having the up-to-date information,
which leads to the old `Layer::keep_resident` never having to wait for
downloads to complete. Finally, the `Layer::keep_resident` is replaced
with `Layer::is_likely_resident`. These fix #7145.

## Summary of changes

- add a new test showing that a canceled get_or_maybe_download should
not cancel the eviction
- switch to using a `watch` internally rather than a `broadcast` to
avoid hanging eviction while a download is ongoing
- doc changes for new semantics and cleanup
- fix `Layer::keep_resident` to use just `self.0.inner.get()` as truth
as `Layer::is_likely_resident`
- remove `LayerInner::wanted_evicted` boolean as no longer needed

Builds upon: #7185. Cc: #5331.
This commit is contained in:
Joonas Koivunen
2024-03-21 03:19:08 +02:00
committed by GitHub
parent a95c41f463
commit 2206e14c26
7 changed files with 1088 additions and 331 deletions

View File

@@ -2,7 +2,6 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use futures::stream::StreamExt;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
@@ -1662,9 +1661,9 @@ impl TenantManager {
.layers
.read()
.await
.resident_layers()
.collect::<Vec<_>>()
.await;
.likely_resident_layers()
.collect::<Vec<_>>();
for layer in timeline_layers {
let relative_path = layer
.local_path()

View File

@@ -32,6 +32,9 @@ use utils::generation::Generation;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod failpoints;
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
///
@@ -46,7 +49,41 @@ mod tests;
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN.
///
/// This type models the on-disk layers, which can be evicted and on-demand downloaded.
/// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
/// general goal, read accesses should always win eviction and eviction should not wait for
/// download.
///
/// ### State transitions
///
/// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
/// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
/// right size) or deleted.
///
/// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
/// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
/// `heavier_once_cell::InitPermit` has been acquired, any read request
/// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
/// cancelling the eviction.
///
/// ```text
/// +-----------------+ get_or_maybe_download +--------------------------------+
/// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
/// | ENOENT | /->| |
/// +-----------------+ | +--------------------------------+
/// ^ | | ^
/// | get_or_maybe_download | | | get_or_maybe_download, either:
/// evict_blocking | /-------------------------/ | | - upgrade weak to strong
/// | | | | - re-initialize without download
/// | | evict_and_wait | |
/// +-----------------+ v |
/// | not initialized | on_downloaded_layer_drop +--------------------------------------+
/// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
/// +-----------------+ +--------------------------------------+
/// ```
///
/// ### Unsupported
///
/// - Evicting by the operator deleting files from the filesystem
///
/// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
#[derive(Clone)]
@@ -211,8 +248,7 @@ impl Layer {
///
/// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
/// will happen regardless the future returned by this method completing unless there is a
/// read access (currently including [`Layer::keep_resident`]) before eviction gets to
/// complete.
/// read access before eviction gets to complete.
///
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
/// of download-evict cycle on retry.
@@ -307,21 +343,28 @@ impl Layer {
/// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
/// while the guard exists.
///
/// Returns None if the layer is currently evicted.
pub(crate) async fn keep_resident(&self) -> anyhow::Result<Option<ResidentLayer>> {
let downloaded = match self.0.get_or_maybe_download(false, None).await {
Ok(d) => d,
// technically there are a lot of possible errors, but in practice it should only be
// DownloadRequired which is tripped up. could work to improve this situation
// statically later.
Err(DownloadError::DownloadRequired) => return Ok(None),
Err(e) => return Err(e.into()),
};
/// Returns None if the layer is currently evicted or becoming evicted.
#[cfg(test)]
pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
Ok(Some(ResidentLayer {
Some(ResidentLayer {
downloaded,
owner: self.clone(),
}))
})
}
/// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
/// with `EvictionError::NotFound`.
///
/// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
/// will be unless a read happens soon.
pub(crate) fn is_likely_resident(&self) -> bool {
self.0
.inner
.get()
.map(|rowe| rowe.is_likely_resident())
.unwrap_or(false)
}
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
@@ -371,11 +414,11 @@ impl Layer {
/// separatedly.
#[cfg(any(feature = "testing", test))]
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
let mut rx = self.0.status.subscribe();
let mut rx = self.0.status.as_ref().unwrap().subscribe();
async move {
loop {
if let Err(tokio::sync::broadcast::error::RecvError::Closed) = rx.recv().await {
if rx.changed().await.is_err() {
break;
}
}
@@ -397,6 +440,32 @@ enum ResidentOrWantedEvicted {
}
impl ResidentOrWantedEvicted {
/// Non-mutating access to the a DownloadedLayer, if possible.
///
/// This is not used on the read path (anything that calls
/// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
/// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
#[cfg(test)]
fn get(&self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
}
}
/// Best-effort query for residency right now, not as strong guarantee as receiving a strong
/// reference from `ResidentOrWantedEvicted::get`.
fn is_likely_resident(&self) -> bool {
match self {
ResidentOrWantedEvicted::Resident(_) => true,
ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
}
}
/// Upgrades any weak to strong if possible.
///
/// Returns a strong reference if possible, along with a boolean telling if an upgrade
/// happened.
fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
@@ -417,7 +486,7 @@ impl ResidentOrWantedEvicted {
///
/// Returns `Some` if this was the first time eviction was requested. Care should be taken to
/// drop the possibly last strong reference outside of the mutex of
/// heavier_once_cell::OnceCell.
/// [`heavier_once_cell::OnceCell`].
fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => {
@@ -445,6 +514,9 @@ struct LayerInner {
desc: PersistentLayerDesc,
/// Timeline access is needed for remote timeline client and metrics.
///
/// There should not be an access to timeline for any reason without entering the
/// [`Timeline::gate`] at the same time.
timeline: Weak<Timeline>,
/// Cached knowledge of [`Timeline::remote_client`] being `Some`.
@@ -453,27 +525,38 @@ struct LayerInner {
access_stats: LayerAccessStats,
/// This custom OnceCell is backed by std mutex, but only held for short time periods.
/// Initialization and deinitialization are done while holding a permit.
///
/// Filesystem changes (download, evict) are only done while holding a permit which the
/// `heavier_once_cell` provides.
///
/// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
/// possibly read while not holding it.
inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
/// Do we want to delete locally and remotely this when `LayerInner` is dropped
wanted_deleted: AtomicBool,
/// Do we want to evict this layer as soon as possible? After being set to `true`, all accesses
/// will try to downgrade [`ResidentOrWantedEvicted`], which will eventually trigger
/// [`LayerInner::on_downloaded_layer_drop`].
wanted_evicted: AtomicBool,
/// Version is to make sure we will only evict a specific download of a file.
/// Version is to make sure we will only evict a specific initialization of the downloaded file.
///
/// Incremented for each download, stored in `DownloadedLayer::version` or
/// Incremented for each initialization, stored in `DownloadedLayer::version` or
/// `ResidentOrWantedEvicted::WantedEvicted`.
version: AtomicUsize,
/// Allow subscribing to when the layer actually gets evicted.
status: tokio::sync::broadcast::Sender<Status>,
/// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
/// starts, or completes.
///
/// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
/// Holding the InitPermit is the only time we can do state transitions, but we also need to
/// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
/// [`ResidentOrWantedEvicted::Resident`] on access.
///
/// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
status: Option<tokio::sync::watch::Sender<Status>>,
/// Counter for exponential backoff with the download
/// Counter for exponential backoff with the download.
///
/// This is atomic only for the purposes of having additional data only accessed while holding
/// the InitPermit.
consecutive_failures: AtomicUsize,
/// The generation of this Layer.
@@ -491,7 +574,13 @@ struct LayerInner {
/// a shard split since the layer was originally written.
shard: ShardIndex,
/// When the Layer was last evicted but has not been downloaded since.
///
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
#[cfg(test)]
failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
}
impl std::fmt::Display for LayerInner {
@@ -508,16 +597,16 @@ impl AsLayerDesc for LayerInner {
#[derive(Debug, Clone, Copy)]
enum Status {
Resident,
Evicted,
Downloaded,
Downloading,
}
impl Drop for LayerInner {
fn drop(&mut self) {
if !*self.wanted_deleted.get_mut() {
// should we try to evict if the last wish was for eviction?
// feels like there's some hazard of overcrowding near shutdown near by, but we don't
// run drops during shutdown (yet)
// should we try to evict if the last wish was for eviction? seems more like a hazard
// than a clear win.
return;
}
@@ -528,9 +617,9 @@ impl Drop for LayerInner {
let file_size = self.layer_desc().file_size;
let timeline = self.timeline.clone();
let meta = self.metadata();
let status = self.status.clone();
let status = self.status.take();
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
Self::spawn_blocking(move || {
let _g = span.entered();
// carry this until we are finished for [`Layer::wait_drop`] support
@@ -605,12 +694,16 @@ impl LayerInner {
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
.join(desc.filename().to_string());
let (inner, version) = if let Some(inner) = downloaded {
let (inner, version, init_status) = if let Some(inner) = downloaded {
let version = inner.version;
let resident = ResidentOrWantedEvicted::Resident(inner);
(heavier_once_cell::OnceCell::new(resident), version)
(
heavier_once_cell::OnceCell::new(resident),
version,
Status::Resident,
)
} else {
(heavier_once_cell::OnceCell::default(), 0)
(heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
};
LayerInner {
@@ -621,14 +714,15 @@ impl LayerInner {
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_deleted: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner,
version: AtomicUsize::new(version),
status: tokio::sync::broadcast::channel(1).0,
status: Some(tokio::sync::watch::channel(init_status).0),
consecutive_failures: AtomicUsize::new(0),
generation,
shard,
last_evicted_at: std::sync::Mutex::default(),
#[cfg(test)]
failpoints: Default::default(),
}
}
@@ -644,20 +738,34 @@ impl LayerInner {
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
use tokio::sync::broadcast::error::RecvError;
assert!(self.have_remote_client);
let mut rx = self.status.subscribe();
let mut rx = self.status.as_ref().unwrap().subscribe();
{
let current = rx.borrow_and_update();
match &*current {
Status::Resident => {
// we might get lucky and evict this; continue
}
Status::Evicted | Status::Downloading => {
// it is already evicted
return Err(EvictionError::NotFound);
}
}
}
let strong = {
match self.inner.get() {
Some(mut either) => {
self.wanted_evicted.store(true, Ordering::Relaxed);
either.downgrade()
Some(mut either) => either.downgrade(),
None => {
// we already have a scheduled eviction, which just has not gotten to run yet.
// it might still race with a read access, but that could also get cancelled,
// so let's say this is not evictable.
return Err(EvictionError::NotFound);
}
None => return Err(EvictionError::NotFound),
}
};
@@ -673,26 +781,26 @@ impl LayerInner {
LAYER_IMPL_METRICS.inc_started_evictions();
}
match tokio::time::timeout(timeout, rx.recv()).await {
Ok(Ok(Status::Evicted)) => Ok(()),
Ok(Ok(Status::Downloaded)) => Err(EvictionError::Downloaded),
Ok(Err(RecvError::Closed)) => {
unreachable!("sender cannot be dropped while we are in &self method")
}
Ok(Err(RecvError::Lagged(_))) => {
// this is quite unlikely, but we are blocking a lot in the async context, so
// we might be missing this because we are stuck on a LIFO slot on a thread
// which is busy blocking for a 1TB database create_image_layers.
//
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
LAYER_IMPL_METRICS.inc_broadcast_lagged();
match self.inner.get() {
Some(_) => Err(EvictionError::Downloaded),
None => Ok(()),
}
}
Err(_timeout) => Err(EvictionError::Timeout),
let changed = rx.changed();
let changed = tokio::time::timeout(timeout, changed).await;
let Ok(changed) = changed else {
return Err(EvictionError::Timeout);
};
let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
let current = rx.borrow_and_update();
match &*current {
// the easiest case
Status::Evicted => Ok(()),
// it surely was evicted in between, but then there was a new access now; we can't know
// if it'll succeed so lets just call it evicted
Status::Downloading => Ok(()),
// either the download which was started after eviction completed already, or it was
// never evicted
Status::Resident => Err(EvictionError::Downloaded),
}
}
@@ -702,38 +810,38 @@ impl LayerInner {
allow_download: bool,
ctx: Option<&RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
// get_or_init_detached can:
// - be fast (mutex lock) OR uncontested semaphore permit acquire
// - be slow (wait for semaphore permit or closing)
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let (weak, permit) = {
// get_or_init_detached can:
// - be fast (mutex lock) OR uncontested semaphore permit acquire
// - be slow (wait for semaphore permit or closing)
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let locked = self
.inner
.get_or_init_detached()
.await
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
scopeguard::ScopeGuard::into_inner(init_cancelled);
match locked {
// this path could had been a RwLock::read
Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
Ok(Ok((strong, _))) => {
// when upgraded back, the Arc<DownloadedLayer> is still available, but
// previously a `evict_and_wait` was received.
self.wanted_evicted.store(false, Ordering::Relaxed);
// error out any `evict_and_wait`
drop(self.status.send(Status::Downloaded));
// previously a `evict_and_wait` was received. this is the only place when we
// send out an update without holding the InitPermit.
//
// note that we also have dropped the Guard; this is fine, because we just made
// a state change and are holding a strong reference to be returned.
self.status.as_ref().unwrap().send_replace(Status::Resident);
LAYER_IMPL_METRICS
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
return Ok(strong);
}
Ok(Err(guard)) => {
// path to here: the evict_blocking is stuck on spawn_blocking queue.
//
// reset the contents, deactivating the eviction and causing a
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
// path to here: we won the eviction, the file should still be on the disk.
let (weak, permit) = guard.take_and_deinit();
(Some(weak), permit)
}
@@ -741,8 +849,6 @@ impl LayerInner {
}
};
scopeguard::ScopeGuard::into_inner(init_cancelled);
if let Some(weak) = weak {
// only drop the weak after dropping the heavier_once_cell guard
assert!(
@@ -759,8 +865,11 @@ impl LayerInner {
// count cancellations, which currently remain largely unexpected
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
// check if we really need to be downloaded; could have been already downloaded by a
// cancelled previous attempt.
// check if we really need to be downloaded: this can happen if a read access won the
// semaphore before eviction.
//
// if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
// pending eviction will try to evict even upon finding an uninitialized `self.inner`.
let needs_download = self
.needs_download()
.await
@@ -771,13 +880,20 @@ impl LayerInner {
let needs_download = needs_download?;
let Some(reason) = needs_download else {
// the file is present locally, probably by a previous but cancelled call to
// get_or_maybe_download. alternatively we might be running without remote storage.
// the file is present locally because eviction has not had a chance to run yet
#[cfg(test)]
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
.await?;
LAYER_IMPL_METRICS.inc_init_needed_no_download();
return Ok(self.initialize_after_layer_is_on_disk(permit));
};
// we must download; getting cancelled before spawning the download is not an issue as
// any still running eviction would not find anything to evict.
if let NeedsDownload::NotFile(ft) = reason {
return Err(DownloadError::NotFile(ft));
}
@@ -791,8 +907,7 @@ impl LayerInner {
}
if !allow_download {
// this does look weird, but for LayerInner the "downloading" means also changing
// internal once related state ...
// this is only used from tests, but it is hard to test without the boolean
return Err(DownloadError::DownloadRequired);
}
@@ -851,11 +966,22 @@ impl LayerInner {
.enter()
.map_err(|_| DownloadError::DownloadCancelled)?;
tokio::task::spawn(
Self::spawn(
async move {
let _guard = guard;
drop(this.status.send(Status::Downloaded));
// now that we have commited to downloading, send out an update to:
// - unhang any pending eviction
// - break out of evict_and_wait
this.status
.as_ref()
.unwrap()
.send_replace(Status::Downloading);
#[cfg(test)]
this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
.await
.unwrap();
let res = this.download_and_init(timeline, permit).await;
@@ -887,6 +1013,8 @@ impl LayerInner {
Some(remote_storage::DownloadError::Cancelled) => {
Err(DownloadError::DownloadCancelled)
}
// FIXME: this is not embedding the error because historically it would had
// been output to compute, however that is no longer the case.
_ => Err(DownloadError::DownloadFailed),
}
}
@@ -985,18 +1113,9 @@ impl LayerInner {
) -> Arc<DownloadedLayer> {
debug_assert_current_span_has_tenant_and_timeline_id();
// disable any scheduled but not yet running eviction deletions for this
// disable any scheduled but not yet running eviction deletions for this initialization
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
// only reset this after we've decided we really need to download. otherwise it'd
// be impossible to mark cancelled downloads for eviction, like one could imagine
// we would like to do for prefetching which was not needed.
self.wanted_evicted.store(false, Ordering::Release);
// re-send the notification we've already sent when we started to download, just so
// evict_and_wait does not need to wait for the download to complete. note that this is
// sent when initializing after finding the file on the disk.
drop(self.status.send(Status::Downloaded));
self.status.as_ref().unwrap().send_replace(Status::Resident);
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
@@ -1049,9 +1168,11 @@ impl LayerInner {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
// this is not accurate: we could have the file locally but there was a cancellation
// and now we are not in sync, or we are currently downloading it.
let remote = self.inner.get().is_none();
let resident = self
.inner
.get()
.map(|rowe| rowe.is_likely_resident())
.unwrap_or(false);
let access_stats = self.access_stats.as_api_model(reset);
@@ -1063,7 +1184,7 @@ impl LayerInner {
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote,
remote: !resident,
access_stats,
}
} else {
@@ -1073,94 +1194,195 @@ impl LayerInner {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start: lsn,
remote,
remote: !resident,
access_stats,
}
}
}
/// `DownloadedLayer` is being dropped, so it calls this method.
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
let evict = self.wanted_evicted.load(Ordering::Acquire);
fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
let can_evict = self.have_remote_client;
if can_evict && evict {
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, %version);
// we cannot know without inspecting LayerInner::inner if we should evict or not, even
// though here it is very likely
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
// downgrade for queueing, in case there's a tear down already ongoing we should not
// hold it alive.
let this = Arc::downgrade(&self);
drop(self);
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
// drop while the `self.inner` is being locked, leading to a deadlock.
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
let _g = span.entered();
// if LayerInner is already dropped here, do nothing because the delete on drop
// has already ran while we were in queue
let Some(this) = this.upgrade() else {
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
return;
};
match this.evict_blocking(version) {
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
}
if !can_evict {
// it would be nice to assert this case out, but we are in drop
span.in_scope(|| {
tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
});
return;
}
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
// drop while the `self.inner` is being locked, leading to a deadlock.
let start_evicting = async move {
#[cfg(test)]
self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
.await
.expect("failpoint should not have errored");
tracing::debug!("eviction started");
let res = self.wait_for_turn_and_evict(only_version).await;
// metrics: ignore the Ok branch, it is not done yet
if let Err(e) = res {
tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
}
};
Self::spawn(start_evicting.instrument(span));
}
fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
// deleted or detached timeline, don't do anything.
let Some(timeline) = self.timeline.upgrade() else {
return Err(EvictionCancelled::TimelineGone);
};
async fn wait_for_turn_and_evict(
self: Arc<LayerInner>,
only_version: usize,
) -> Result<(), EvictionCancelled> {
fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
use Status::*;
match status {
Resident => Ok(()),
Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
Downloading => Err(EvictionCancelled::LostToDownload),
}
}
let timeline = self
.timeline
.upgrade()
.ok_or(EvictionCancelled::TimelineGone)?;
let mut rx = self
.status
.as_ref()
.expect("LayerInner cannot be dropped, holding strong ref")
.subscribe();
is_good_to_continue(&rx.borrow_and_update())?;
let Ok(_gate) = timeline.gate.enter() else {
return Err(EvictionCancelled::TimelineGone);
};
// to avoid starting a new download while we evict, keep holding on to the
// permit.
let _permit = {
let maybe_downloaded = self.inner.get();
let permit = {
// we cannot just `std::fs::remove_file` because there might already be an
// get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
// operations must be done while holding the heavier_once_cell::InitPermit
let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
let (_weak, permit) = match maybe_downloaded {
Some(guard) => {
if let ResidentOrWantedEvicted::WantedEvicted(_weak, version) = &*guard {
if *version == only_version {
guard.take_and_deinit()
} else {
// this was not for us; maybe there's another eviction job
// TODO: does it make any sense to stall here? unique versions do not
// matter, we only want to make sure not to evict a resident, which we
// are not doing.
return Err(EvictionCancelled::VersionCheckFailed);
}
} else {
return Err(EvictionCancelled::AlreadyReinitialized);
let waited = loop {
// we must race to the Downloading starting, otherwise we would have to wait until the
// completion of the download. waiting for download could be long and hinder our
// efforts to alert on "hanging" evictions.
tokio::select! {
res = &mut wait => break res,
_ = rx.changed() => {
is_good_to_continue(&rx.borrow_and_update())?;
// two possibilities for Status::Resident:
// - the layer was found locally from disk by a read
// - we missed a bunch of updates and now the layer is
// again downloaded -- assume we'll fail later on with
// version check or AlreadyReinitialized
}
}
None => {
// already deinitialized, perhaps get_or_maybe_download did this and is
// currently waiting to reinitialize it
return Err(EvictionCancelled::LostToDownload);
};
// re-check now that we have the guard or permit; all updates should have happened
// while holding the permit.
is_good_to_continue(&rx.borrow_and_update())?;
// the term deinitialize is used here, because we clearing out the Weak will eventually
// lead to deallocating the reference counted value, and the value we
// `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
let (_weak, permit) = match waited {
Ok(guard) => {
match &*guard {
ResidentOrWantedEvicted::WantedEvicted(_weak, version)
if *version == only_version =>
{
tracing::debug!(version, "deinitializing matching WantedEvicted");
let (weak, permit) = guard.take_and_deinit();
(Some(weak), permit)
}
ResidentOrWantedEvicted::WantedEvicted(_, version) => {
// if we were not doing the version check, we would need to try to
// upgrade the weak here to see if it really is dropped. version check
// is done instead assuming that it is cheaper.
tracing::debug!(
version,
only_version,
"version mismatch, not deinitializing"
);
return Err(EvictionCancelled::VersionCheckFailed);
}
ResidentOrWantedEvicted::Resident(_) => {
return Err(EvictionCancelled::AlreadyReinitialized);
}
}
}
Err(permit) => {
tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
(None, permit)
}
};
permit
};
// now accesses to inner.get_or_init wait on the semaphore or the `_permit`
let span = tracing::Span::current();
self.access_stats.record_residence_event(
LayerResidenceStatus::Evicted,
LayerResidenceEventReason::ResidenceChange,
);
let spawned_at = std::time::Instant::now();
let res = match capture_mtime_and_remove(&self.path) {
// this is on purpose a detached spawn; we don't need to wait for it
//
// eviction completion reporting is the only thing hinging on this, and it can be just as
// well from a spawn_blocking thread.
//
// important to note that now that we've acquired the permit we have made sure the evicted
// file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
// there are multiple evictions. The rest is not cancellable, and we've now commited to
// evicting.
//
// If spawn_blocking has a queue and maximum number of threads are in use, we could stall
// reads. We will need to add cancellation for that if necessary.
Self::spawn_blocking(move || {
let _span = span.entered();
let res = self.evict_blocking(&timeline, &permit);
let waiters = self.inner.initializer_count();
if waiters > 0 {
LAYER_IMPL_METRICS.inc_evicted_with_waiters();
}
let completed_in = spawned_at.elapsed();
LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
match res {
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
}
tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
});
Ok(())
}
/// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
fn evict_blocking(
&self,
timeline: &Timeline,
_permit: &heavier_once_cell::InitPermit,
) -> Result<(), EvictionCancelled> {
// now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
match capture_mtime_and_remove(&self.path) {
Ok(local_layer_mtime) => {
let duration = SystemTime::now().duration_since(local_layer_mtime);
match duration {
@@ -1184,33 +1406,60 @@ impl LayerInner {
timeline
.metrics
.resident_physical_size_sub(self.desc.file_size);
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::error!(
layer_size = %self.desc.file_size,
"failed to evict layer from disk, it was already gone (metrics will be inaccurate)"
"failed to evict layer from disk, it was already gone"
);
Err(EvictionCancelled::FileNotFound)
return Err(EvictionCancelled::FileNotFound);
}
Err(e) => {
// FIXME: this should probably be an abort
tracing::error!("failed to evict file from disk: {e:#}");
Err(EvictionCancelled::RemoveFailed)
return Err(EvictionCancelled::RemoveFailed);
}
};
}
// we are still holding the permit, so no new spawn_download_and_wait can happen
drop(self.status.send(Status::Evicted));
self.access_stats.record_residence_event(
LayerResidenceStatus::Evicted,
LayerResidenceEventReason::ResidenceChange,
);
self.status.as_ref().unwrap().send_replace(Status::Evicted);
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
res
Ok(())
}
fn metadata(&self) -> LayerFileMetadata {
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
}
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
///
/// Synchronizing with spawned tasks is very complicated otherwise.
fn spawn<F>(fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
#[cfg(test)]
tokio::task::spawn(fut);
#[cfg(not(test))]
crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
}
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
fn spawn_blocking<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
#[cfg(test)]
tokio::task::spawn_blocking(f);
#[cfg(not(test))]
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
}
}
fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
@@ -1254,6 +1503,10 @@ pub(crate) enum DownloadError {
DownloadCancelled,
#[error("pre-condition: stat before download failed")]
PreStatFailed(#[source] std::io::Error),
#[cfg(test)]
#[error("failpoint: {0:?}")]
Failpoint(failpoints::FailpointKind),
}
#[derive(Debug, PartialEq)]
@@ -1300,6 +1553,7 @@ impl Drop for DownloadedLayer {
owner.on_downloaded_layer_drop(self.version);
} else {
// no need to do anything, we are shutting down
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
}
}
}
@@ -1540,6 +1794,7 @@ pub(crate) struct LayerImplMetrics {
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
redownload_after: metrics::Histogram,
time_to_evict: metrics::Histogram,
}
impl Default for LayerImplMetrics {
@@ -1635,6 +1890,13 @@ impl Default for LayerImplMetrics {
.unwrap()
};
let time_to_evict = metrics::register_histogram!(
"pageserver_layer_eviction_held_permit_seconds",
"Time eviction held the permit.",
vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
)
.unwrap();
Self {
started_evictions,
completed_evictions,
@@ -1647,6 +1909,7 @@ impl Default for LayerImplMetrics {
rare_counters,
inits_cancelled,
redownload_after,
time_to_evict,
}
}
}
@@ -1708,10 +1971,6 @@ impl LayerImplMetrics {
self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
}
fn inc_broadcast_lagged(&self) {
self.rare_counters[RareEvent::EvictAndWaitLagged].inc();
}
fn inc_init_cancelled(&self) {
self.inits_cancelled.inc()
}
@@ -1719,9 +1978,22 @@ impl LayerImplMetrics {
fn record_redownloaded_after(&self, duration: std::time::Duration) {
self.redownload_after.observe(duration.as_secs_f64())
}
/// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
/// instead cancel eviction if we would have read waiters. We cannot however separate reads
/// from other evictions, so this could have noise as well.
fn inc_evicted_with_waiters(&self) {
self.rare_counters[RareEvent::EvictedWithWaiters].inc();
}
/// Recorded at least initially as the permit is now acquired in async context before
/// spawn_blocking action.
fn record_time_to_evict(&self, duration: std::time::Duration) {
self.time_to_evict.observe(duration.as_secs_f64())
}
}
#[derive(enum_map::Enum)]
#[derive(Debug, Clone, Copy, enum_map::Enum)]
enum EvictionCancelled {
LayerGone,
TimelineGone,
@@ -1733,6 +2005,7 @@ enum EvictionCancelled {
LostToDownload,
/// After eviction, there was a new layer access which cancelled the eviction.
UpgradedBackOnAccess,
UnexpectedEvictedState,
}
impl EvictionCancelled {
@@ -1746,6 +2019,7 @@ impl EvictionCancelled {
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
EvictionCancelled::LostToDownload => "lost_to_download",
EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
}
}
}
@@ -1773,7 +2047,7 @@ enum RareEvent {
UpgradedWantedEvicted,
InitWithoutDownload,
PermanentLoadingFailure,
EvictAndWaitLagged,
EvictedWithWaiters,
}
impl RareEvent {
@@ -1787,7 +2061,7 @@ impl RareEvent {
UpgradedWantedEvicted => "raced_wanted_evicted",
InitWithoutDownload => "init_needed_no_download",
PermanentLoadingFailure => "permanent_loading_failure",
EvictAndWaitLagged => "broadcast_lagged",
EvictedWithWaiters => "evicted_with_waiters",
}
}
}

View File

@@ -0,0 +1,119 @@
//! failpoints for unit tests, implying `#[cfg(test)]`.
//!
//! These are not accessible over http.
use super::*;
impl Layer {
/// Enable a failpoint from a unit test.
pub(super) fn enable_failpoint(&self, failpoint: Failpoint) {
self.0.failpoints.lock().unwrap().push(failpoint);
}
}
impl LayerInner {
/// Query if this failpoint is enabled, as in, arrive at a failpoint.
///
/// Calls to this method need to be `#[cfg(test)]` guarded.
pub(super) async fn failpoint(&self, kind: FailpointKind) -> Result<(), FailpointHit> {
let fut = {
let mut fps = self.failpoints.lock().unwrap();
// find the *last* failpoint for cases in which we need to use multiple for the same
// thing (two blocked evictions)
let fp = fps.iter_mut().rfind(|x| x.kind() == kind);
let Some(fp) = fp else {
return Ok(());
};
fp.hit()
};
fut.await
}
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum FailpointKind {
/// Failpoint acts as an accurate cancelled by drop here; see the only site of use.
AfterDeterminingLayerNeedsNoDownload,
/// Failpoint for stalling eviction starting
WaitBeforeStartingEvicting,
/// Failpoint hit in the spawned task
WaitBeforeDownloading,
}
pub(crate) enum Failpoint {
AfterDeterminingLayerNeedsNoDownload,
WaitBeforeStartingEvicting(
Option<utils::completion::Completion>,
utils::completion::Barrier,
),
WaitBeforeDownloading(
Option<utils::completion::Completion>,
utils::completion::Barrier,
),
}
impl Failpoint {
fn kind(&self) -> FailpointKind {
match self {
Failpoint::AfterDeterminingLayerNeedsNoDownload => {
FailpointKind::AfterDeterminingLayerNeedsNoDownload
}
Failpoint::WaitBeforeStartingEvicting(..) => FailpointKind::WaitBeforeStartingEvicting,
Failpoint::WaitBeforeDownloading(..) => FailpointKind::WaitBeforeDownloading,
}
}
fn hit(&mut self) -> impl std::future::Future<Output = Result<(), FailpointHit>> + 'static {
use futures::future::FutureExt;
// use boxed futures to avoid Either hurdles
match self {
Failpoint::AfterDeterminingLayerNeedsNoDownload => {
let kind = self.kind();
async move { Err(FailpointHit(kind)) }.boxed()
}
Failpoint::WaitBeforeStartingEvicting(arrival, b)
| Failpoint::WaitBeforeDownloading(arrival, b) => {
// first one signals arrival
drop(arrival.take());
let b = b.clone();
async move {
tracing::trace!("waiting on a failpoint barrier");
b.wait().await;
tracing::trace!("done waiting on a failpoint barrier");
Ok(())
}
.boxed()
}
}
}
}
impl std::fmt::Display for FailpointKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(self, f)
}
}
#[derive(Debug)]
pub(crate) struct FailpointHit(FailpointKind);
impl std::fmt::Display for FailpointHit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(self, f)
}
}
impl std::error::Error for FailpointHit {}
impl From<FailpointHit> for DownloadError {
fn from(value: FailpointHit) -> Self {
DownloadError::Failpoint(value.0)
}
}

View File

@@ -1,14 +1,13 @@
use futures::StreamExt;
use pageserver_api::key::CONTROLFILE_KEY;
use tokio::task::JoinSet;
use tracing::Instrument;
use utils::{
completion::{self, Completion},
id::TimelineId,
};
use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{context::DownloadBehavior, task_mgr::BACKGROUND_RUNTIME};
use crate::context::DownloadBehavior;
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
/// Used in tests to advance a future to wanted await point, and not futher.
@@ -21,7 +20,7 @@ const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_s
/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
#[tokio::test]
async fn smoke_test() {
let handle = BACKGROUND_RUNTIME.handle();
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("smoke_test").unwrap();
let span = h.span();
@@ -38,7 +37,7 @@ async fn smoke_test() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.resident_layers().collect::<Vec<_>>().await
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -88,7 +87,7 @@ async fn smoke_test() {
//
// ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
// artificially slow it down.
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
match layer
.evict_and_wait(std::time::Duration::ZERO)
@@ -99,7 +98,7 @@ async fn smoke_test() {
// expected, but note that the eviction is "still ongoing"
helper.release().await;
// exhaust spawn_blocking pool to ensure it is now complete
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle)
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
.await;
}
other => unreachable!("{other:?}"),
@@ -108,7 +107,7 @@ async fn smoke_test() {
// only way to query if a layer is resident is to acquire a ResidentLayer instance.
// Layer::keep_resident never downloads, but it might initialize if the layer file is found
// downloaded locally.
let none = layer.keep_resident().await.unwrap();
let none = layer.keep_resident().await;
assert!(
none.is_none(),
"Expected none, because eviction removed the local file, found: {none:?}"
@@ -167,6 +166,7 @@ async fn smoke_test() {
rtc.wait_completion().await.unwrap();
assert_eq!(rtc.get_remote_physical_size(), 0);
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
}
/// This test demonstrates a previous hang when a eviction and deletion were requested at the same
@@ -174,7 +174,7 @@ async fn smoke_test() {
#[tokio::test(start_paused = true)]
async fn evict_and_wait_on_wanted_deleted() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = BACKGROUND_RUNTIME.handle();
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
@@ -188,7 +188,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.resident_layers().collect::<Vec<_>>().await
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -213,11 +213,11 @@ async fn evict_and_wait_on_wanted_deleted() {
drop(resident);
// make sure the eviction task gets to run
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
let resident = layer.keep_resident().await;
assert!(
matches!(resident, Ok(None)),
resident.is_none(),
"keep_resident should not have re-initialized: {resident:?}"
);
@@ -235,24 +235,408 @@ async fn evict_and_wait_on_wanted_deleted() {
layers.finish_gc_timeline(&[layer]);
}
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
}
/// This test shows that ensures we are able to read the layer while the layer eviction has been
/// started but not completed due to spawn_blocking pool being blocked.
///
/// Here `Layer::keep_resident` is used to "simulate" reads, because it cannot download.
#[tokio::test(start_paused = true)]
async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = BACKGROUND_RUNTIME.handle();
let h = TenantHarness::create("residency_check_while_evict_and_wait_on_clogged_spawn_blocking")
/// This test ensures we are able to read the layer while the layer eviction has been
/// started but not completed.
#[test]
fn read_wins_pending_eviction() {
let rt = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.enable_all()
.start_paused(true)
.build()
.unwrap();
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("read_wins_pending_eviction").unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
layers.swap_remove(0)
};
// setup done
let resident = layer.keep_resident().await.unwrap();
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
// drive the future to await on the status channel
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
let (completion, barrier) = utils::completion::channel();
let (arrival, arrived_at_barrier) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
Some(arrival),
barrier,
));
// now the eviction cannot proceed because the threads are consumed while completion exists
drop(resident);
arrived_at_barrier.wait().await;
assert!(!layer.is_likely_resident());
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
layer
.0
.get_or_maybe_download(false, None)
.instrument(download_span)
.await
.expect("should had reinitialized without downloading");
assert!(layer.is_likely_resident());
// reinitialization notifies of new resident status, which should error out all evict_and_wait
let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect("no timeout, because get_or_maybe_download re-initialized")
.expect_err("eviction should not have succeeded because re-initialized");
// works as intended: evictions lose to "downloads"
assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
// this is not wrong: the eviction is technically still "on the way" as it's still queued
// because of a failpoint
assert_eq!(
0,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
);
drop(completion);
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
.await;
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
// now we finally can observe the original eviction failing
// it would had been possible to observe it earlier, but here it is guaranteed to have
// happened.
assert_eq!(
1,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
);
assert_eq!(
1,
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
);
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
});
}
/// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
#[test]
fn multiple_pending_evictions_in_order() {
let name = "multiple_pending_evictions_in_order";
let in_order = true;
multiple_pending_evictions_scenario(name, in_order);
}
/// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
#[test]
fn multiple_pending_evictions_out_of_order() {
let name = "multiple_pending_evictions_out_of_order";
let in_order = false;
multiple_pending_evictions_scenario(name, in_order);
}
fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let rt = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.enable_all()
.start_paused(true)
.build()
.unwrap();
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create(name).unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
layers.swap_remove(0)
};
// setup done
let resident = layer.keep_resident().await.unwrap();
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
// drive the future to await on the status channel
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
let (completion1, barrier) = utils::completion::channel();
let mut completion1 = Some(completion1);
let (arrival, arrived_at_barrier) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
Some(arrival),
barrier,
));
// now the eviction cannot proceed because we are simulating arbitrary long delay for the
// eviction task start.
drop(resident);
assert!(!layer.is_likely_resident());
arrived_at_barrier.wait().await;
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
layer
.0
.get_or_maybe_download(false, None)
.instrument(download_span)
.await
.expect("should had reinitialized without downloading");
assert!(layer.is_likely_resident());
// reinitialization notifies of new resident status, which should error out all evict_and_wait
let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect("no timeout, because get_or_maybe_download re-initialized")
.expect_err("eviction should not have succeeded because re-initialized");
// works as intended: evictions lose to "downloads"
assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
// this is not wrong: the eviction is technically still "on the way" as it's still queued
// because of a failpoint
assert_eq!(
0,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
);
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
// configure another failpoint for the second eviction -- evictions are per initialization,
// so now that we've reinitialized the inner, we get to run two of them at the same time.
let (completion2, barrier) = utils::completion::channel();
let (arrival, arrived_at_barrier) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
Some(arrival),
barrier,
));
let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
// advance to the wait on the queue
tokio::time::timeout(ADVANCE, &mut second_eviction)
.await
.expect_err("timeout because failpoint is blocking");
arrived_at_barrier.wait().await;
assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
let mut release_earlier_eviction = |expected_reason| {
assert_eq!(
0,
LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
);
drop(completion1.take().unwrap());
let handle = &handle;
async move {
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
handle, 1,
)
.await;
assert_eq!(
1,
LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
);
}
};
if in_order {
release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
}
// release the later eviction which is for the current version
drop(completion2);
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
.await;
if !in_order {
release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
}
tokio::time::timeout(ADVANCE, &mut second_eviction)
.await
.expect("eviction goes through now that spawn_blocking is unclogged")
.expect("eviction should succeed, because version matches");
assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
// ensure the cancelled are unchanged
assert_eq!(
1,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
);
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
});
}
/// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
/// a `Layer::keep_resident` call.
///
/// This matters because cancelling the eviction would leave us in a state where the file is on
/// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
/// have non-repairing `Layer::is_likely_resident`.
#[tokio::test(start_paused = true)]
async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let handle = tokio::runtime::Handle::current();
let h =
TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap();
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
layers.swap_remove(0)
};
// this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
// Err) at the right time as in "during" the `LayerInner::needs_download`.
layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
let (completion, barrier) = utils::completion::channel();
let (arrival, arrived_at_barrier) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
Some(arrival),
barrier,
));
tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
.await
.expect_err("should had advanced to waiting on channel");
arrived_at_barrier.wait().await;
// simulate a cancelled read which is cancelled before it gets to re-initialize
let e = layer
.0
.get_or_maybe_download(false, None)
.await
.unwrap_err();
assert!(
matches!(
e,
DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
),
"{e:?}"
);
assert!(
layer.0.needs_download().await.unwrap().is_none(),
"file is still on disk"
);
// release the eviction task
drop(completion);
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
// failpoint is still enabled, but it is not hit
let e = layer
.0
.get_or_maybe_download(false, None)
.await
.unwrap_err();
assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
// failpoint is not counted as cancellation either
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
}
#[tokio::test(start_paused = true)]
async fn evict_and_wait_does_not_wait_for_download() {
// let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -265,7 +649,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.resident_layers().collect::<Vec<_>>().await
layers.likely_resident_layers().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -273,91 +657,76 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
layers.swap_remove(0)
};
// setup done
let resident = layer.keep_resident().await.unwrap();
// kind of forced setup: start an eviction but do not allow it progress until we are
// downloading
let (eviction_can_continue, barrier) = utils::completion::channel();
let (arrival, eviction_arrived) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
Some(arrival),
barrier,
));
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
// drive the future to await on the status channel
// use this once-awaited other_evict to synchronize with the eviction
let other_evict = layer.evict_and_wait(FOREVER);
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
.expect_err("should had advanced");
eviction_arrived.wait().await;
drop(eviction_can_continue);
other_evict.await.unwrap();
// clog up BACKGROUND_RUNTIME spawn_blocking
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
// now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
assert!(!layer.is_likely_resident());
// now the eviction cannot proceed because the threads are consumed while completion exists
drop(resident);
// following new evict_and_wait will fail until we've completed the download
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
assert!(matches!(e, EvictionError::NotFound), "{e:?}");
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
layer
.keep_resident()
.instrument(download_span)
.await
.expect("keep_resident should had reinitialized without downloading")
.expect("ResidentLayer");
let (download_can_continue, barrier) = utils::completion::channel();
let (arrival, _download_arrived) = utils::completion::channel();
layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
// because the keep_resident check alters wanted evicted without sending a message, we will
// never get completed
let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
.await
.expect("no timeout, because keep_resident re-initialized")
.expect_err("eviction should not have succeeded because re-initialized");
let mut download = std::pin::pin!(layer
.0
.get_or_maybe_download(true, None)
.instrument(download_span));
// works as intended: evictions lose to "downloads"
assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
// this is not wrong: the eviction is technically still "on the way" as it's still queued
// because spawn_blocking is clogged up
assert_eq!(
0,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
assert!(
!layer.is_likely_resident(),
"during download layer is evicted"
);
let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
// advance to the wait on the queue
tokio::time::timeout(ADVANCE, &mut second_eviction)
tokio::time::timeout(ADVANCE, &mut download)
.await
.expect_err("timeout because spawn_blocking is clogged");
.expect_err("should had timed out because of failpoint");
// in this case we don't leak started evictions, but I think there is still a chance of that
// happening, because we could have upgrades race multiple evictions while only one of them
// happens?
assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
// now we finally get to continue, and because the latest state is downloading, we deduce that
// original eviction succeeded
evict_and_wait.await.unwrap();
helper.release().await;
// however a new evict_and_wait will fail
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
assert!(matches!(e, EvictionError::NotFound), "{e:?}");
// the second_eviction gets to run here
//
// synchronize to be *strictly* after the second_eviction spawn_blocking run
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
assert!(!layer.is_likely_resident());
tokio::time::timeout(ADVANCE, &mut second_eviction)
.await
.expect("eviction goes through now that spawn_blocking is unclogged")
.expect("eviction should succeed, because version matches");
drop(download_can_continue);
download.await.expect("download should had succeeded");
assert!(layer.is_likely_resident());
assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
// only now can we evict
layer.evict_and_wait(FOREVER).await.unwrap();
}
// now we finally can observe the original spawn_blocking failing
// it would had been possible to observe it earlier, but here it is guaranteed to have
// happened.
assert_eq!(
1,
LAYER_IMPL_METRICS
.cancelled_evictions
.values()
.map(|ctr| ctr.get())
.sum::<u64>()
);
#[test]
fn layer_size() {
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
assert_eq!(std::mem::size_of::<LayerInner>(), 2328);
// it also has the utf8 path
}
struct SpawnBlockingPoolHelper {
@@ -374,31 +743,41 @@ impl SpawnBlockingPoolHelper {
///
/// This should be no issue nowdays, because nextest runs each test in it's own process.
async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
let (completion, barrier) = completion::channel();
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let default_max_blocking_threads = 512;
let assumed_max_blocking_threads = 512;
Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
}
async fn consume_all_spawn_blocking_threads0(
handle: &tokio::runtime::Handle,
threads: usize,
) -> Self {
assert_ne!(threads, 0);
let (completion, barrier) = completion::channel();
let (started, starts_completed) = completion::channel();
let mut blocking_tasks = JoinSet::new();
for _ in 0..assumed_max_blocking_threads {
for _ in 0..threads {
let barrier = barrier.clone();
let tx = tx.clone();
let started = started.clone();
blocking_tasks.spawn_blocking_on(
move || {
tx.blocking_send(()).unwrap();
drop(tx);
drop(started);
tokio::runtime::Handle::current().block_on(barrier.wait());
},
handle,
);
}
drop(started);
starts_completed.wait().await;
drop(barrier);
for _ in 0..assumed_max_blocking_threads {
rx.recv().await.unwrap();
}
tracing::trace!("consumed all threads");
SpawnBlockingPoolHelper {
awaited_by_spawn_blocking_tasks: completion,
@@ -418,13 +797,22 @@ impl SpawnBlockingPoolHelper {
while let Some(res) = blocking_tasks.join_next().await {
res.expect("none of the tasks should had panicked");
}
tracing::trace!("released all threads");
}
/// In the tests it is used as an easy way of making sure something scheduled on the target
/// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
/// before our tasks have a chance to schedule and complete.
async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
Self::consume_all_spawn_blocking_threads(handle)
Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
}
async fn consume_and_release_all_of_spawn_blocking_threads0(
handle: &tokio::runtime::Handle,
threads: usize,
) {
Self::consume_all_spawn_blocking_threads0(handle, threads)
.await
.release()
.await
@@ -438,7 +826,7 @@ fn spawn_blocking_pool_helper_actually_works() {
// because the amount is not configurable for our helper, expect the same amount as
// BACKGROUND_RUNTIME using the tokio defaults would have.
let rt = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(512)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
@@ -448,7 +836,8 @@ fn spawn_blocking_pool_helper_actually_works() {
rt.block_on(async move {
// this will not return until all threads are spun up and actually executing the code
// waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
let consumed = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
let consumed =
SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
println!("consumed");

View File

@@ -13,7 +13,6 @@ use bytes::Bytes;
use camino::Utf8Path;
use enumset::EnumSet;
use fail::fail_point;
use futures::stream::StreamExt;
use once_cell::sync::Lazy;
use pageserver_api::{
key::AUX_FILES_KEY,
@@ -2442,7 +2441,7 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.resident_layers().map(|layer| {
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity_or_now();
HeatMapLayer::new(
@@ -2452,7 +2451,7 @@ impl Timeline {
)
});
let layers = resident.collect().await;
let layers = resident.collect();
Some(HeatMapTimeline::new(self.timeline_id, layers))
}
@@ -4302,7 +4301,7 @@ impl Timeline {
let mut max_layer_size: Option<u64> = None;
let resident_layers = guard
.resident_layers()
.likely_resident_layers()
.map(|layer| {
let file_size = layer.layer_desc().file_size;
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
@@ -4315,8 +4314,7 @@ impl Timeline {
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
})
.collect()
.await;
.collect();
DiskUsageEvictionInfo {
max_layer_size,
@@ -4713,7 +4711,6 @@ mod tests {
.keep_resident()
.await
.expect("no download => no downloading errors")
.expect("should had been resident")
.drop_eviction_guard();
let forever = std::time::Duration::from_secs(120);
@@ -4724,7 +4721,7 @@ mod tests {
let (first, second) = tokio::join!(first, second);
let res = layer.keep_resident().await;
assert!(matches!(res, Ok(None)), "{res:?}");
assert!(res.is_none(), "{res:?}");
match (first, second) {
(Ok(()), Ok(())) => {

View File

@@ -225,24 +225,18 @@ impl Timeline {
{
let guard = self.layers.read().await;
let layers = guard.layer_map();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = guard.get_from_desc(&hist_layer);
for layer in layers.iter_historic_layers() {
let layer = guard.get_from_desc(&layer);
// guard against eviction while we inspect it; it might be that eviction_task and
// disk_usage_eviction_task both select the same layers to be evicted, and
// seemingly free up double the space. both succeeding is of no consequence.
let guard = match hist_layer.keep_resident().await {
Ok(Some(l)) => l,
Ok(None) => continue,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
continue;
}
};
let last_activity_ts = hist_layer.access_stats().latest_activity_or_now();
if !layer.is_likely_resident() {
continue;
}
let last_activity_ts = layer.access_stats().latest_activity_or_now();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
@@ -265,9 +259,8 @@ impl Timeline {
continue;
}
};
let layer = guard.drop_eviction_guard();
if no_activity_for > p.threshold {
// this could cause a lot of allocations in some cases
js.spawn(async move {
layer
.evict_and_wait(std::time::Duration::from_secs(5))

View File

@@ -1,5 +1,4 @@
use anyhow::{bail, ensure, Context, Result};
use futures::StreamExt;
use pageserver_api::shard::TenantShardId;
use std::{collections::HashMap, sync::Arc};
use tracing::trace;
@@ -241,29 +240,16 @@ impl LayerManager {
layer.delete_on_drop();
}
pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
// for small layer maps, we most likely have all resident, but for larger more are likely
// to be evicted assuming lots of layers correlated with longer lifespan.
let layers = self
.layer_map()
.iter_historic_layers()
.map(|desc| self.get_from_desc(&desc));
let layers = futures::stream::iter(layers);
layers.filter_map(|layer| async move {
// TODO(#6028): this query does not really need to see the ResidentLayer
match layer.keep_resident().await {
Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
Ok(None) => None,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
None
}
}
self.layer_map().iter_historic_layers().filter_map(|desc| {
self.layer_fmgr
.0
.get(&desc.key())
.filter(|l| l.is_likely_resident())
.cloned()
})
}