mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
dube: timeout individual layer evictions, log progress and record metrics (#6131)
Because of bugs evictions could hang and pause disk usage eviction task. One such bug is known and fixed #6928. Guard each layer eviction with a modest timeout deeming timeouted evictions as failures, to be conservative. In addition, add logging and metrics recording on each eviction iteration: - log collection completed with duration and amount of layers - per tenant collection time is observed in a new histogram - per tenant layer count is observed in a new histogram - record metric for collected, selected and evicted layer counts - log if eviction takes more than 10s - log eviction completion with eviction duration Additionally remove dead code for which no dead code warnings appeared in earlier PR. Follow-up to: #6060.
This commit is contained in:
@@ -58,6 +58,7 @@ use utils::{completion, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::disk_usage_based_eviction::METRICS,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
self,
|
||||
@@ -65,7 +66,6 @@ use crate::{
|
||||
remote_timeline_client::LayerFileMetadata,
|
||||
secondary::SecondaryTenant,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
|
||||
Timeline,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -409,13 +409,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let candidates =
|
||||
let (candidates, collection_time) = {
|
||||
let started_at = std::time::Instant::now();
|
||||
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
EvictionCandidates::Finished(partitioned) => (partitioned, started_at.elapsed()),
|
||||
}
|
||||
};
|
||||
|
||||
METRICS.layers_collected.inc_by(candidates.len() as u64);
|
||||
|
||||
tracing::info!(
|
||||
elapsed_ms = collection_time.as_millis(),
|
||||
total_layers = candidates.len(),
|
||||
"collection completed"
|
||||
);
|
||||
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
@@ -446,9 +456,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
|
||||
let selection = select_victims(&candidates, usage_pre);
|
||||
let (evicted_amount, usage_planned) =
|
||||
select_victims(&candidates, usage_pre).into_amount_and_planned();
|
||||
|
||||
let (evicted_amount, usage_planned) = selection.into_amount_and_planned();
|
||||
METRICS.layers_selected.inc_by(evicted_amount as u64);
|
||||
|
||||
// phase2: evict layers
|
||||
|
||||
@@ -477,9 +488,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
if let Some(next) = next {
|
||||
match next {
|
||||
Ok(Ok(file_size)) => {
|
||||
METRICS.layers_evicted.inc();
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => {
|
||||
Ok(Err((
|
||||
file_size,
|
||||
EvictionError::NotFound
|
||||
| EvictionError::Downloaded
|
||||
| EvictionError::Timeout,
|
||||
))) => {
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
@@ -495,7 +512,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
|
||||
// calling again when consumed_all is fine as evicted is fused.
|
||||
let Some((_partition, candidate)) = evicted.next() else {
|
||||
consumed_all = true;
|
||||
if !consumed_all {
|
||||
tracing::info!("all evictions started, waiting");
|
||||
consumed_all = true;
|
||||
}
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -503,11 +523,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
EvictionLayer::Attached(layer) => {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.evict_and_wait()
|
||||
.await
|
||||
.map(|()| file_size)
|
||||
.map_err(|e| (file_size, e))
|
||||
// have a low eviction waiting timeout because our LRU calculations go stale fast;
|
||||
// also individual layer evictions could hang because of bugs and we do not want to
|
||||
// pause disk_usage_based_eviction for such.
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
match layer.evict_and_wait(timeout).await {
|
||||
Ok(()) => Ok(file_size),
|
||||
Err(e) => Err((file_size, e)),
|
||||
}
|
||||
});
|
||||
}
|
||||
EvictionLayer::Secondary(layer) => {
|
||||
@@ -529,6 +553,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
(usage_assumed, evictions_failed)
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let evict_layers = async move {
|
||||
let mut evict_layers = std::pin::pin!(evict_layers);
|
||||
|
||||
let maximum_expected = std::time::Duration::from_secs(10);
|
||||
|
||||
let res = tokio::time::timeout(maximum_expected, &mut evict_layers).await;
|
||||
let tuple = if let Ok(tuple) = res {
|
||||
tuple
|
||||
} else {
|
||||
let elapsed = started_at.elapsed();
|
||||
tracing::info!(elapsed_ms = elapsed.as_millis(), "still ongoing");
|
||||
evict_layers.await
|
||||
};
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
tracing::info!(elapsed_ms = elapsed.as_millis(), "completed");
|
||||
tuple
|
||||
};
|
||||
|
||||
let evict_layers =
|
||||
evict_layers.instrument(tracing::info_span!("evict_layers", layers=%evicted_amount));
|
||||
|
||||
let (usage_assumed, evictions_failed) = tokio::select! {
|
||||
tuple = evict_layers => { tuple },
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -763,6 +811,8 @@ async fn collect_eviction_candidates(
|
||||
eviction_order: EvictionOrder,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
@@ -791,6 +841,8 @@ async fn collect_eviction_candidates(
|
||||
continue;
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// collect layers from all timelines in this tenant
|
||||
//
|
||||
// If one of the timelines becomes `!is_active()` during the iteration,
|
||||
@@ -805,6 +857,7 @@ async fn collect_eviction_candidates(
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
@@ -870,7 +923,25 @@ async fn collect_eviction_candidates(
|
||||
(partition, candidate)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: the same tenant ID might be hit twice, if it transitions from attached to
|
||||
@@ -885,11 +956,11 @@ async fn collect_eviction_candidates(
|
||||
},
|
||||
);
|
||||
|
||||
for secondary_tenant in secondary_tenants {
|
||||
for tenant in secondary_tenants {
|
||||
// for secondary tenants we use a sum of on_disk layers and already evicted layers. this is
|
||||
// to prevent repeated disk usage based evictions from completely draining less often
|
||||
// updating secondaries.
|
||||
let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction();
|
||||
let (mut layer_info, total_layers) = tenant.get_layers_for_eviction();
|
||||
|
||||
debug_assert!(
|
||||
total_layers >= layer_info.resident_layers.len(),
|
||||
@@ -897,6 +968,8 @@ async fn collect_eviction_candidates(
|
||||
layer_info.resident_layers.len()
|
||||
);
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
layer_info
|
||||
.resident_layers
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
@@ -918,9 +991,27 @@ async fn collect_eviction_candidates(
|
||||
)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
@@ -997,30 +1088,6 @@ impl<U: Usage> VictimSelection<U> {
|
||||
}
|
||||
}
|
||||
|
||||
struct TimelineKey(Arc<Timeline>);
|
||||
|
||||
impl PartialEq for TimelineKey {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Arc::ptr_eq(&self.0, &other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for TimelineKey {}
|
||||
|
||||
impl std::hash::Hash for TimelineKey {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
Arc::as_ptr(&self.0).hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TimelineKey {
|
||||
type Target = Timeline;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// A totally ordered f32 subset we can use with sorting functions.
|
||||
pub(crate) mod finite_f32 {
|
||||
|
||||
|
||||
@@ -2474,6 +2474,64 @@ pub(crate) mod tenant_throttling {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod disk_usage_based_eviction {
|
||||
use super::*;
|
||||
|
||||
pub(crate) struct Metrics {
|
||||
pub(crate) tenant_collection_time: Histogram,
|
||||
pub(crate) tenant_layer_count: Histogram,
|
||||
pub(crate) layers_collected: IntCounter,
|
||||
pub(crate) layers_selected: IntCounter,
|
||||
pub(crate) layers_evicted: IntCounter,
|
||||
}
|
||||
|
||||
impl Default for Metrics {
|
||||
fn default() -> Self {
|
||||
let tenant_collection_time = register_histogram!(
|
||||
"pageserver_disk_usage_based_eviction_tenant_collection_seconds",
|
||||
"Time spent collecting layers from a tenant -- not normalized by collected layer amount",
|
||||
vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let tenant_layer_count = register_histogram!(
|
||||
"pageserver_disk_usage_based_eviction_tenant_collected_layers",
|
||||
"Amount of layers gathered from a tenant",
|
||||
vec![5.0, 50.0, 500.0, 5000.0, 50000.0]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_collected = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_collected_layers_total",
|
||||
"Amount of layers collected"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_selected = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_select_layers_total",
|
||||
"Amount of layers selected"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_evicted = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_evicted_layers_total",
|
||||
"Amount of layers successfully evicted"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
tenant_collection_time,
|
||||
tenant_layer_count,
|
||||
layers_collected,
|
||||
layers_selected,
|
||||
layers_evicted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics() {
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
@@ -2508,6 +2566,7 @@ pub fn preinitialize_metrics() {
|
||||
Lazy::force(&TENANT_MANAGER);
|
||||
|
||||
Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
|
||||
Lazy::force(&disk_usage_based_eviction::METRICS);
|
||||
|
||||
// countervecs
|
||||
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
|
||||
|
||||
@@ -32,7 +32,7 @@ use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
|
||||
use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
|
||||
|
||||
enum DownloadCommand {
|
||||
Download(TenantShardId),
|
||||
@@ -121,6 +121,10 @@ impl SecondaryTenant {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.tenant_shard_id
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
|
||||
@@ -164,16 +168,17 @@ impl SecondaryTenant {
|
||||
self.detail.lock().unwrap().get_layers_for_eviction(self)
|
||||
}
|
||||
|
||||
/// Cancellation safe, but on cancellation the eviction will go through
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
|
||||
pub(crate) async fn evict_layer(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
conf: &PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
name: LayerFileName,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let _guard = match self.gate.enter() {
|
||||
let guard = match self.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
|
||||
@@ -187,35 +192,57 @@ impl SecondaryTenant {
|
||||
.timeline_path(&self.tenant_shard_id, &timeline_id)
|
||||
.join(name.file_name());
|
||||
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
// Other local I/O errors are process-fatal: these should never happen.
|
||||
tokio::fs::remove_file(path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Deleting layer during eviction");
|
||||
let this = self.clone();
|
||||
|
||||
// Update the timeline's state. This does not have to be synchronized with
|
||||
// the download process, because:
|
||||
// - If downloader is racing with us to remove a file (e.g. because it is
|
||||
// removed from heatmap), then our mutual .remove() operations will both
|
||||
// succeed.
|
||||
// - If downloader is racing with us to download the object (this would require
|
||||
// multiple eviction iterations to race with multiple download iterations), then
|
||||
// if we remove it from the state, the worst that happens is the downloader
|
||||
// downloads it again before re-inserting, or we delete the file but it remains
|
||||
// in the state map (in which case it will be downloaded if this secondary
|
||||
// tenant transitions to attached and tries to access it)
|
||||
//
|
||||
// The important assumption here is that the secondary timeline state does not
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = self.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&name);
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
// spawn it to be cancellation safe
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _guard = guard;
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
// Other local I/O errors are process-fatal: these should never happen.
|
||||
let deleted = std::fs::remove_file(path);
|
||||
|
||||
let not_found = deleted
|
||||
.as_ref()
|
||||
.is_err_and(|x| x.kind() == std::io::ErrorKind::NotFound);
|
||||
|
||||
let deleted = if not_found {
|
||||
false
|
||||
} else {
|
||||
deleted
|
||||
.map(|()| true)
|
||||
.fatal_err("Deleting layer during eviction")
|
||||
};
|
||||
|
||||
if !deleted {
|
||||
// skip updating accounting and putting perhaps later timestamp
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the timeline's state. This does not have to be synchronized with
|
||||
// the download process, because:
|
||||
// - If downloader is racing with us to remove a file (e.g. because it is
|
||||
// removed from heatmap), then our mutual .remove() operations will both
|
||||
// succeed.
|
||||
// - If downloader is racing with us to download the object (this would require
|
||||
// multiple eviction iterations to race with multiple download iterations), then
|
||||
// if we remove it from the state, the worst that happens is the downloader
|
||||
// downloads it again before re-inserting, or we delete the file but it remains
|
||||
// in the state map (in which case it will be downloaded if this secondary
|
||||
// tenant transitions to attached and tries to access it)
|
||||
//
|
||||
// The important assumption here is that the secondary timeline state does not
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = this.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&name);
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("secondary eviction should not have panicked");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ where
|
||||
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
|
||||
@@ -8,7 +8,7 @@ use pageserver_api::shard::ShardIndex;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tracing::Instrument;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::heavier_once_cell;
|
||||
@@ -208,10 +208,15 @@ impl Layer {
|
||||
/// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
|
||||
/// re-downloaded, [`EvictionError::Downloaded`] is returned.
|
||||
///
|
||||
/// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
|
||||
/// will happen regardless the future returned by this method completing unless there is a
|
||||
/// read access (currently including [`Layer::keep_resident`]) before eviction gets to
|
||||
/// complete.
|
||||
///
|
||||
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
|
||||
/// of download-evict cycle on retry.
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait().await
|
||||
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait(timeout).await
|
||||
}
|
||||
|
||||
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
|
||||
@@ -363,7 +368,7 @@ impl Layer {
|
||||
///
|
||||
/// Does not start local deletion, use [`Self::delete_on_drop`] for that
|
||||
/// separatedly.
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(feature = "testing", test))]
|
||||
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
let mut rx = self.0.status.subscribe();
|
||||
|
||||
@@ -632,7 +637,7 @@ impl LayerInner {
|
||||
|
||||
/// Cancellation safe, however dropping the future and calling this method again might result
|
||||
/// in a new attempt to evict OR join the previously started attempt.
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
assert!(self.have_remote_client);
|
||||
@@ -652,16 +657,22 @@ impl LayerInner {
|
||||
if strong.is_some() {
|
||||
// drop the DownloadedLayer outside of the holding the guard
|
||||
drop(strong);
|
||||
|
||||
// idea here is that only one evicter should ever get to witness a strong reference,
|
||||
// which means whenever get_or_maybe_download upgrades a weak, it must mark up a
|
||||
// cancelled eviction and signal us, like it currently does.
|
||||
//
|
||||
// a second concurrent evict_and_wait will not see a strong reference.
|
||||
LAYER_IMPL_METRICS.inc_started_evictions();
|
||||
}
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(Status::Evicted) => Ok(()),
|
||||
Ok(Status::Downloaded) => Err(EvictionError::Downloaded),
|
||||
Err(RecvError::Closed) => {
|
||||
match tokio::time::timeout(timeout, rx.recv()).await {
|
||||
Ok(Ok(Status::Evicted)) => Ok(()),
|
||||
Ok(Ok(Status::Downloaded)) => Err(EvictionError::Downloaded),
|
||||
Ok(Err(RecvError::Closed)) => {
|
||||
unreachable!("sender cannot be dropped while we are in &self method")
|
||||
}
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
Ok(Err(RecvError::Lagged(_))) => {
|
||||
// this is quite unlikely, but we are blocking a lot in the async context, so
|
||||
// we might be missing this because we are stuck on a LIFO slot on a thread
|
||||
// which is busy blocking for a 1TB database create_image_layers.
|
||||
@@ -674,6 +685,7 @@ impl LayerInner {
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
Err(_timeout) => Err(EvictionError::Timeout),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1195,6 +1207,9 @@ pub(crate) enum EvictionError {
|
||||
/// Evictions must always lose to downloads in races, and this time it happened.
|
||||
#[error("layer was downloaded instead")]
|
||||
Downloaded,
|
||||
|
||||
#[error("eviction did not happen within timeout")]
|
||||
Timeout,
|
||||
}
|
||||
|
||||
/// Error internal to the [`LayerInner::get_or_maybe_download`]
|
||||
|
||||
@@ -1,13 +1,173 @@
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
completion::{self, Completion},
|
||||
id::TimelineId,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::task_mgr::BACKGROUND_RUNTIME;
|
||||
use crate::tenant::harness::TenantHarness;
|
||||
use crate::{context::DownloadBehavior, task_mgr::BACKGROUND_RUNTIME};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
|
||||
|
||||
/// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
|
||||
/// timeout uses to advance futures.
|
||||
const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
|
||||
|
||||
/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
|
||||
#[tokio::test]
|
||||
async fn smoke_test() {
|
||||
let handle = BACKGROUND_RUNTIME.handle();
|
||||
|
||||
let h = TenantHarness::create("smoke_test").unwrap();
|
||||
let span = h.span();
|
||||
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
|
||||
let (tenant, _) = h.load().await;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
layers.resident_layers().collect::<Vec<_>>().await
|
||||
};
|
||||
|
||||
assert_eq!(layers.len(), 1);
|
||||
|
||||
layers.swap_remove(0)
|
||||
};
|
||||
|
||||
// all layers created at pageserver are like `layer`, initialized with strong
|
||||
// Arc<DownloadedLayer>.
|
||||
|
||||
let img_before = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
data.img
|
||||
.take()
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
// important part is evicting the layer, which can be done when there are no more ResidentLayer
|
||||
// instances -- there currently are none, only two `Layer` values, one in the layermap and on
|
||||
// in scope.
|
||||
layer.evict_and_wait(FOREVER).await.unwrap();
|
||||
|
||||
// double-evict returns an error, which is valid if both eviction_task and disk usage based
|
||||
// eviction would both evict the same layer at the same time.
|
||||
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
data.img.take().unwrap()
|
||||
};
|
||||
|
||||
assert_eq!(img_before, img_after);
|
||||
|
||||
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
|
||||
//
|
||||
// ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
|
||||
// artificially slow it down.
|
||||
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
|
||||
|
||||
match layer
|
||||
.evict_and_wait(std::time::Duration::ZERO)
|
||||
.await
|
||||
.unwrap_err()
|
||||
{
|
||||
EvictionError::Timeout => {
|
||||
// expected, but note that the eviction is "still ongoing"
|
||||
helper.release().await;
|
||||
// exhaust spawn_blocking pool to ensure it is now complete
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle)
|
||||
.await;
|
||||
}
|
||||
other => unreachable!("{other:?}"),
|
||||
}
|
||||
|
||||
// only way to query if a layer is resident is to acquire a ResidentLayer instance.
|
||||
// Layer::keep_resident never downloads, but it might initialize if the layer file is found
|
||||
// downloaded locally.
|
||||
let none = layer.keep_resident().await.unwrap();
|
||||
assert!(
|
||||
none.is_none(),
|
||||
"Expected none, because eviction removed the local file, found: {none:?}"
|
||||
);
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident()
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// last important part is deletion on drop: gc and compaction use it for compacted L0 layers
|
||||
// or fully garbage collected layers. deletion means deleting the local file, and scheduling a
|
||||
// deletion of the already unlinked from index_part.json remote file.
|
||||
//
|
||||
// marking a layer to be deleted on drop is irreversible; there is no technical reason against
|
||||
// reversiblity, but currently it is not needed so it is not provided.
|
||||
layer.delete_on_drop();
|
||||
|
||||
let path = layer.local_path().to_owned();
|
||||
|
||||
// wait_drop produces an unconnected to Layer future which will resolve when the
|
||||
// LayerInner::drop has completed.
|
||||
let mut wait_drop = std::pin::pin!(layer.wait_drop());
|
||||
|
||||
// paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
|
||||
// until here
|
||||
tokio::time::pause();
|
||||
tokio::time::timeout(ADVANCE, &mut wait_drop)
|
||||
.await
|
||||
.expect_err("should had timed out because two strong references exist");
|
||||
|
||||
tokio::fs::metadata(&path)
|
||||
.await
|
||||
.expect("the local layer file still exists");
|
||||
|
||||
let rtc = timeline.remote_client.as_ref().unwrap();
|
||||
|
||||
{
|
||||
let layers = &[layer];
|
||||
let mut g = timeline.layers.write().await;
|
||||
g.finish_gc_timeline(layers);
|
||||
// this just updates the remote_physical_size for demonstration purposes
|
||||
rtc.schedule_gc_update(layers).unwrap();
|
||||
}
|
||||
|
||||
// when strong references are dropped, the file is deleted and remote deletion is scheduled
|
||||
wait_drop.await;
|
||||
|
||||
let e = tokio::fs::metadata(&path)
|
||||
.await
|
||||
.expect_err("the local file is deleted");
|
||||
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
|
||||
|
||||
rtc.wait_completion().await.unwrap();
|
||||
|
||||
assert_eq!(rtc.get_remote_physical_size(), 0);
|
||||
}
|
||||
|
||||
/// This test demonstrates a previous hang when a eviction and deletion were requested at the same
|
||||
/// time. Now both of them complete per Arc drop semantics.
|
||||
@@ -41,10 +201,10 @@ async fn evict_and_wait_on_wanted_deleted() {
|
||||
let resident = layer.keep_resident().await.unwrap();
|
||||
|
||||
{
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
// drive the future to await on the status channel
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
|
||||
@@ -115,10 +275,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
let resident = layer.keep_resident().await.unwrap();
|
||||
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
// drive the future to await on the status channel
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
@@ -138,7 +298,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
// because the keep_resident check alters wanted evicted without sending a message, we will
|
||||
// never get completed
|
||||
let e = tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect("no timeout, because keep_resident re-initialized")
|
||||
.expect_err("eviction should not have succeeded because re-initialized");
|
||||
@@ -158,9 +318,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
let mut second_eviction = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
|
||||
// advance to the wait on the queue
|
||||
tokio::time::timeout(ADVANCE, &mut second_eviction)
|
||||
.await
|
||||
.expect_err("timeout because spawn_blocking is clogged");
|
||||
|
||||
@@ -171,7 +332,12 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
helper.release().await;
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
|
||||
// the second_eviction gets to run here
|
||||
//
|
||||
// synchronize to be *strictly* after the second_eviction spawn_blocking run
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
|
||||
|
||||
tokio::time::timeout(ADVANCE, &mut second_eviction)
|
||||
.await
|
||||
.expect("eviction goes through now that spawn_blocking is unclogged")
|
||||
.expect("eviction should succeed, because version matches");
|
||||
@@ -261,3 +427,49 @@ impl SpawnBlockingPoolHelper {
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_blocking_pool_helper_actually_works() {
|
||||
// create a custom runtime for which we know and control how many blocking threads it has
|
||||
//
|
||||
// because the amount is not configurable for our helper, expect the same amount as
|
||||
// BACKGROUND_RUNTIME using the tokio defaults would have.
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.max_blocking_threads(512)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let handle = rt.handle();
|
||||
|
||||
rt.block_on(async move {
|
||||
// this will not return until all threads are spun up and actually executing the code
|
||||
// waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
|
||||
let consumed = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
|
||||
|
||||
println!("consumed");
|
||||
|
||||
let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
|
||||
// this will not get to run before we release
|
||||
}));
|
||||
|
||||
println!("spawned");
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
|
||||
.await
|
||||
.expect_err("the task should not have gotten to run yet");
|
||||
|
||||
println!("tried to join");
|
||||
|
||||
consumed.release().await;
|
||||
|
||||
println!("released");
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), jh)
|
||||
.await
|
||||
.expect("no timeout")
|
||||
.expect("no join error");
|
||||
|
||||
println!("joined");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1512,10 +1512,14 @@ impl Timeline {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
match local_layer.evict_and_wait().await {
|
||||
// curl has this by default
|
||||
let timeout = std::time::Duration::from_secs(120);
|
||||
|
||||
match local_layer.evict_and_wait(timeout).await {
|
||||
Ok(()) => Ok(Some(true)),
|
||||
Err(EvictionError::NotFound) => Ok(Some(false)),
|
||||
Err(EvictionError::Downloaded) => Ok(Some(false)),
|
||||
Err(EvictionError::Timeout) => Ok(Some(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5157,8 +5161,7 @@ mod tests {
|
||||
let harness =
|
||||
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
|
||||
|
||||
let ctx = any_context();
|
||||
let tenant = harness.do_try_load(&ctx).await.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
@@ -5172,8 +5175,10 @@ mod tests {
|
||||
.expect("should had been resident")
|
||||
.drop_eviction_guard();
|
||||
|
||||
let first = async { layer.evict_and_wait().await };
|
||||
let second = async { layer.evict_and_wait().await };
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
|
||||
let first = layer.evict_and_wait(forever);
|
||||
let second = layer.evict_and_wait(forever);
|
||||
|
||||
let (first, second) = tokio::join!(first, second);
|
||||
|
||||
@@ -5192,12 +5197,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn any_context() -> crate::context::RequestContext {
|
||||
use crate::context::*;
|
||||
use crate::task_mgr::*;
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
async fn find_some_layer(timeline: &Timeline) -> Layer {
|
||||
let layers = timeline.layers.read().await;
|
||||
let desc = layers
|
||||
|
||||
@@ -204,6 +204,7 @@ impl Timeline {
|
||||
evicted: usize,
|
||||
errors: usize,
|
||||
not_evictable: usize,
|
||||
timeouts: usize,
|
||||
#[allow(dead_code)]
|
||||
skipped_for_shutdown: usize,
|
||||
}
|
||||
@@ -267,7 +268,11 @@ impl Timeline {
|
||||
let layer = guard.drop_eviction_guard();
|
||||
if no_activity_for > p.threshold {
|
||||
// this could cause a lot of allocations in some cases
|
||||
js.spawn(async move { layer.evict_and_wait().await });
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.evict_and_wait(std::time::Duration::from_secs(5))
|
||||
.await
|
||||
});
|
||||
stats.candidates += 1;
|
||||
}
|
||||
}
|
||||
@@ -280,6 +285,9 @@ impl Timeline {
|
||||
Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
Ok(Err(EvictionError::Timeout)) => {
|
||||
stats.timeouts += 1;
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
/* already logged */
|
||||
@@ -295,7 +303,8 @@ impl Timeline {
|
||||
stats = join_all => {
|
||||
if stats.candidates == stats.not_evictable {
|
||||
debug!(stats=?stats, "eviction iteration complete");
|
||||
} else if stats.errors > 0 || stats.not_evictable > 0 {
|
||||
} else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
|
||||
// reminder: timeouts are not eviction cancellations
|
||||
warn!(stats=?stats, "eviction iteration complete");
|
||||
} else {
|
||||
info!(stats=?stats, "eviction iteration complete");
|
||||
|
||||
Reference in New Issue
Block a user