mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
refactor(layer): remove version checking with atomics (#5742)
The `LayerInner::version` never needed to be read in more than one place. Clarified while fixing #5737 of which this is the first step. This decrements possible wrong atomics usage in Layer, but does not really fix anything.
This commit is contained in:
@@ -125,6 +125,7 @@ impl Layer {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: 0,
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
|
||||
@@ -163,6 +164,7 @@ impl Layer {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: 0,
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
|
||||
@@ -328,16 +330,17 @@ impl Layer {
|
||||
/// read with [`Layer::get_value_reconstruct_data`].
|
||||
///
|
||||
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
|
||||
#[derive(Debug)]
|
||||
enum ResidentOrWantedEvicted {
|
||||
Resident(Arc<DownloadedLayer>),
|
||||
WantedEvicted(Weak<DownloadedLayer>),
|
||||
WantedEvicted(Weak<DownloadedLayer>, usize),
|
||||
}
|
||||
|
||||
impl ResidentOrWantedEvicted {
|
||||
fn get(&self) -> Option<Arc<DownloadedLayer>> {
|
||||
match self {
|
||||
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
|
||||
ResidentOrWantedEvicted::WantedEvicted(weak) => match weak.upgrade() {
|
||||
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
|
||||
Some(strong) => {
|
||||
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
|
||||
Some(strong)
|
||||
@@ -349,21 +352,16 @@ impl ResidentOrWantedEvicted {
|
||||
/// When eviction is first requested, drop down to holding a [`Weak`].
|
||||
///
|
||||
/// Returns `true` if this was the first time eviction was requested.
|
||||
fn downgrade(&mut self) -> &Weak<DownloadedLayer> {
|
||||
let _was_first = match self {
|
||||
fn downgrade(&mut self) -> bool {
|
||||
match self {
|
||||
ResidentOrWantedEvicted::Resident(strong) => {
|
||||
let weak = Arc::downgrade(strong);
|
||||
*self = ResidentOrWantedEvicted::WantedEvicted(weak);
|
||||
*self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
|
||||
// returning the weak is not useful, because the drop could had already ran with
|
||||
// the replacement above, and that will take care of cleaning the Option we are in
|
||||
true
|
||||
}
|
||||
ResidentOrWantedEvicted::WantedEvicted(_) => false,
|
||||
};
|
||||
|
||||
match self {
|
||||
ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak,
|
||||
_ => unreachable!("just wrote wanted evicted"),
|
||||
ResidentOrWantedEvicted::WantedEvicted(..) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,8 +396,10 @@ struct LayerInner {
|
||||
/// [`LayerInner::on_downloaded_layer_drop`].
|
||||
wanted_evicted: AtomicBool,
|
||||
|
||||
/// Version is to make sure we will in fact only evict a file if no new download has been
|
||||
/// started.
|
||||
/// Version is to make sure we will only evict a specific download of a file.
|
||||
///
|
||||
/// Incremented for each download, stored in `DownloadedLayer::version` or
|
||||
/// `ResidentOrWantedEvicted::WantedEvicted`.
|
||||
version: AtomicUsize,
|
||||
|
||||
/// Allow subscribing to when the layer actually gets evicted.
|
||||
@@ -515,6 +515,14 @@ impl LayerInner {
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
let (inner, version) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
let resident = ResidentOrWantedEvicted::Resident(inner);
|
||||
(heavier_once_cell::OnceCell::new(resident), version)
|
||||
} else {
|
||||
(heavier_once_cell::OnceCell::default(), 0)
|
||||
};
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
@@ -524,12 +532,8 @@ impl LayerInner {
|
||||
access_stats,
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner: if let Some(inner) = downloaded {
|
||||
heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner))
|
||||
} else {
|
||||
heavier_once_cell::OnceCell::default()
|
||||
},
|
||||
version: AtomicUsize::new(0),
|
||||
inner,
|
||||
version: AtomicUsize::new(version),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
@@ -604,7 +608,7 @@ impl LayerInner {
|
||||
loop {
|
||||
let download = move || async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
self.version.fetch_add(1, Ordering::Relaxed);
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
@@ -655,6 +659,7 @@ impl LayerInner {
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
@@ -896,7 +901,7 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
/// `DownloadedLayer` is being dropped, so it calls this method.
|
||||
fn on_downloaded_layer_drop(self: Arc<LayerInner>) {
|
||||
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
|
||||
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
|
||||
let evict = self.wanted_evicted.load(Ordering::Acquire);
|
||||
let can_evict = self.have_remote_client;
|
||||
@@ -904,15 +909,16 @@ impl LayerInner {
|
||||
if gc {
|
||||
// do nothing now, only in LayerInner::drop
|
||||
} else if can_evict && evict {
|
||||
let version = self.version.load(Ordering::Relaxed);
|
||||
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self);
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self, %version);
|
||||
|
||||
// downgrade for queueing, in case there's a tear down already ongoing we should not
|
||||
// hold it alive.
|
||||
let this = Arc::downgrade(&self);
|
||||
drop(self);
|
||||
|
||||
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
|
||||
// drop while the `self.inner` is being locked, leading to a deadlock.
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
|
||||
@@ -922,19 +928,15 @@ impl LayerInner {
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
return;
|
||||
};
|
||||
this.evict_blocking(version);
|
||||
match this.evict_blocking(version) {
|
||||
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
|
||||
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_blocking(&self, version: usize) {
|
||||
match self.evict_blocking0(version) {
|
||||
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
|
||||
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_blocking0(&self, version: usize) -> Result<(), EvictionCancelled> {
|
||||
fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
|
||||
// deleted or detached timeline, don't do anything.
|
||||
let Some(timeline) = self.timeline.upgrade() else {
|
||||
return Err(EvictionCancelled::TimelineGone);
|
||||
@@ -945,32 +947,34 @@ impl LayerInner {
|
||||
let _permit = {
|
||||
let maybe_downloaded = self.inner.get();
|
||||
|
||||
if version != self.version.load(Ordering::Relaxed) {
|
||||
// downloadness-state has advanced, we might no longer be the latest eviction
|
||||
// work; don't do anything.
|
||||
//
|
||||
// this is possible to get to by having:
|
||||
//
|
||||
// 1. wanted_evicted.store(true)
|
||||
// 2. ResidentOrWantedEvicted::downgrade
|
||||
// 3. DownloadedLayer::drop
|
||||
// 4. LayerInner::get_or_maybe_download
|
||||
// 5. LayerInner::evict_blocking
|
||||
return Err(EvictionCancelled::VersionCheckFailed);
|
||||
}
|
||||
|
||||
// free the DownloadedLayer allocation
|
||||
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
|
||||
Some((taken, permit)) => {
|
||||
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
|
||||
permit
|
||||
let (_weak, permit) = match maybe_downloaded {
|
||||
Some(mut guard) => {
|
||||
if let ResidentOrWantedEvicted::WantedEvicted(_weak, version) = &*guard {
|
||||
if *version == only_version {
|
||||
guard.take_and_deinit()
|
||||
} else {
|
||||
// this was not for us; maybe there's another eviction job
|
||||
// TODO: does it make any sense to stall here? unique versions do not
|
||||
// matter, we only want to make sure not to evict a resident, which we
|
||||
// are not doing.
|
||||
return Err(EvictionCancelled::VersionCheckFailed);
|
||||
}
|
||||
} else {
|
||||
return Err(EvictionCancelled::AlreadyReinitialized);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
unreachable!("we do the version checking for this exact reason")
|
||||
// already deinitialized, perhaps get_or_maybe_download did this and is
|
||||
// currently waiting to reinitialize it
|
||||
return Err(EvictionCancelled::LostToDownload);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
permit
|
||||
};
|
||||
|
||||
// now accesses to inner.get_or_init wait on the semaphore or the `_permit`
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Evicted,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
@@ -1086,6 +1090,7 @@ impl std::fmt::Display for NeedsDownload {
|
||||
pub(crate) struct DownloadedLayer {
|
||||
owner: Weak<LayerInner>,
|
||||
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
|
||||
version: usize,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DownloadedLayer {
|
||||
@@ -1093,6 +1098,7 @@ impl std::fmt::Debug for DownloadedLayer {
|
||||
f.debug_struct("DownloadedLayer")
|
||||
// owner omitted because it is always "Weak"
|
||||
.field("kind", &self.kind)
|
||||
.field("version", &self.version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -1100,7 +1106,7 @@ impl std::fmt::Debug for DownloadedLayer {
|
||||
impl Drop for DownloadedLayer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(owner) = self.owner.upgrade() {
|
||||
owner.on_downloaded_layer_drop();
|
||||
owner.on_downloaded_layer_drop(self.version);
|
||||
} else {
|
||||
// no need to do anything, we are shutting down
|
||||
}
|
||||
@@ -1458,6 +1464,9 @@ enum EvictionCancelled {
|
||||
VersionCheckFailed,
|
||||
FileNotFound,
|
||||
RemoveFailed,
|
||||
AlreadyReinitialized,
|
||||
/// Not evicted because of a pending reinitialization
|
||||
LostToDownload,
|
||||
}
|
||||
|
||||
impl EvictionCancelled {
|
||||
@@ -1468,6 +1477,8 @@ impl EvictionCancelled {
|
||||
EvictionCancelled::VersionCheckFailed => "version_check_fail",
|
||||
EvictionCancelled::FileNotFound => "file_not_found",
|
||||
EvictionCancelled::RemoveFailed => "remove_failed",
|
||||
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
|
||||
EvictionCancelled::LostToDownload => "lost_to_download",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user