mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
refactor: split evicting
This commit is contained in:
@@ -6,6 +6,7 @@ use std::ops::Range;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use tracing::Instrument;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::heavier_once_cell;
|
||||
@@ -795,9 +796,6 @@ impl LayerInner {
|
||||
// do nothing now, only when the whole layer is dropped. gc will end up deleting the
|
||||
// whole layer, in case there is no reference cycle.
|
||||
} else if can_evict && evict {
|
||||
// we can remove this right now, but ... we really should not block or do anything.
|
||||
// spawn a task which first does a version check, and that version is also incremented
|
||||
// on get_or_download, so we will not collide?
|
||||
let version = self.version.load(Ordering::Relaxed);
|
||||
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self);
|
||||
@@ -807,115 +805,92 @@ impl LayerInner {
|
||||
let this = Arc::downgrade(&self);
|
||||
drop(self);
|
||||
|
||||
let eviction = {
|
||||
let span = tracing::info_span!(parent: span.clone(), "blocking");
|
||||
async move {
|
||||
// the layer is already gone, don't do anything. LayerInner drop has already ran.
|
||||
let Some(this) = this.upgrade() else { return; };
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
|
||||
// deleted or detached timeline, don't do anything.
|
||||
let Some(timeline) = this.timeline.upgrade() else { return; };
|
||||
|
||||
// to avoid starting a new download while we evict, keep holding on to the
|
||||
// permit. note that we will not close the semaphore when done, because it will
|
||||
// be used by the re-download.
|
||||
let _permit = {
|
||||
let maybe_downloaded = this.inner.get();
|
||||
// relaxed ordering: we dont have any other atomics pending
|
||||
if version != this.version.load(Ordering::Relaxed) {
|
||||
// downloadness-state has advanced, we might no longer be the latest eviction
|
||||
// work; don't do anything.
|
||||
return;
|
||||
}
|
||||
|
||||
// free the DownloadedLayer allocation
|
||||
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
|
||||
Some((taken, permit)) => {
|
||||
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
|
||||
permit
|
||||
}
|
||||
None => {
|
||||
unreachable!("we do the version checking for this exact reason")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if !this.wanted_evicted.load(Ordering::Acquire) {
|
||||
// if there's already interest, should we just early exit? this is not
|
||||
// currently *cleared* on interest, maybe it shouldn't?
|
||||
// FIXME: wanted_evicted cannot be unset right now
|
||||
//
|
||||
// NOTE: us holding the permit prevents a new round of download happening
|
||||
// right now
|
||||
return;
|
||||
}
|
||||
|
||||
let path = this.path.to_owned();
|
||||
|
||||
let capture_mtime_and_delete = tokio::task::spawn_blocking({
|
||||
let span = span.clone();
|
||||
move || {
|
||||
let _e = span.entered();
|
||||
// FIXME: we can now initialize the mtime during first get_or_download,
|
||||
// and track that in-memory for the following? does that help?
|
||||
let m = path.metadata()?;
|
||||
let local_layer_mtime = m.modified()?;
|
||||
std::fs::remove_file(&path)?;
|
||||
Ok::<_, std::io::Error>(local_layer_mtime)
|
||||
}
|
||||
});
|
||||
|
||||
let res = capture_mtime_and_delete.await;
|
||||
|
||||
this.access_stats.record_residence_event(LayerResidenceStatus::Evicted, LayerResidenceEventReason::ResidenceChange);
|
||||
|
||||
drop(this.status.send(Status::Evicted));
|
||||
|
||||
match res {
|
||||
Ok(Ok(local_layer_mtime)) => {
|
||||
let duration =
|
||||
std::time::SystemTime::now().duration_since(local_layer_mtime);
|
||||
match duration {
|
||||
Ok(elapsed) => {
|
||||
timeline
|
||||
.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(elapsed);
|
||||
tracing::info!(
|
||||
residence_millis = elapsed.as_millis(),
|
||||
"evicted layer after known residence period"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!("evicted layer after unknown residence period");
|
||||
}
|
||||
}
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(this.desc.file_size);
|
||||
}
|
||||
Ok(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
tracing::info!("failed to evict file from disk, it was already gone");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("failed to evict file from disk: {e:#}");
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("unsupported"),
|
||||
Err(je) if je.is_panic() => { /* already logged */ }
|
||||
Err(je) => {
|
||||
tracing::warn!(error = ?je, "unexpected join_error while evicting the file")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span);
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn(eviction);
|
||||
let Some(this) = this.upgrade() else { return; };
|
||||
this.evict_blocking(version);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_blocking(&self, version: usize) {
|
||||
// deleted or detached timeline, don't do anything.
|
||||
let Some(timeline) = self.timeline.upgrade() else { return; };
|
||||
|
||||
// to avoid starting a new download while we evict, keep holding on to the
|
||||
// permit. note that we will not close the semaphore when done, because it will
|
||||
// be used by the re-download.
|
||||
let _permit = {
|
||||
let maybe_downloaded = self.inner.get();
|
||||
|
||||
if version != self.version.load(Ordering::Relaxed) {
|
||||
// downloadness-state has advanced, we might no longer be the latest eviction
|
||||
// work; don't do anything.
|
||||
return;
|
||||
}
|
||||
|
||||
// free the DownloadedLayer allocation
|
||||
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
|
||||
Some((taken, permit)) => {
|
||||
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
|
||||
permit
|
||||
}
|
||||
None => {
|
||||
unreachable!("we do the version checking for this exact reason")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Evicted,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
drop(self.status.send(Status::Evicted));
|
||||
|
||||
match capture_mtime_and_remove(&self.path) {
|
||||
Ok(local_layer_mtime) => {
|
||||
let duration = SystemTime::now().duration_since(local_layer_mtime);
|
||||
match duration {
|
||||
Ok(elapsed) => {
|
||||
timeline
|
||||
.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(elapsed);
|
||||
tracing::info!(
|
||||
residence_millis = elapsed.as_millis(),
|
||||
"evicted layer after known residence period"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!("evicted layer after unknown residence period");
|
||||
}
|
||||
}
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(self.desc.file_size);
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
tracing::info!("failed to evict file from disk, it was already gone");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to evict file from disk: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn capture_mtime_and_remove(path: &Path) -> Result<SystemTime, std::io::Error> {
|
||||
// FIXME: we can now initialize the mtime during first get_or_download,
|
||||
// and track that in-memory for the following? does that help?
|
||||
let m = path.metadata()?;
|
||||
let local_layer_mtime = m.modified()?;
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(local_layer_mtime)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
||||
Reference in New Issue
Block a user