layere: rewrite to heavier_once_cell

This commit is contained in:
Joonas Koivunen
2023-08-21 21:04:14 +03:00
parent c4cdf747f8
commit a0f29853b3

View File

@@ -358,16 +358,13 @@ pub(crate) struct LayerE {
desc: PersistentLayerDesc,
/// Should this be weak? This is probably a runtime cycle which leaks Timelines on
/// detaches.
timeline: Weak<Timeline>,
access_stats: LayerAccessStats,
/// This is a mutex, because we want to be able to
/// - `Option::take(&mut self)` to drop the Arc allocation
/// - `ResidentDeltaLayer::downgrade(&mut self)`
inner: tokio::sync::Mutex<Option<ResidentOrWantedEvicted>>,
/// This custom OnceCell is backed by std mutex, but only held for short time periods.
/// Initialization and deinitialization is done while holding a permit.
inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
/// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection
/// means:
@@ -387,13 +384,19 @@ pub(crate) struct LayerE {
/// Version is to make sure we will in fact only evict a file if no new guard has been created
/// for it.
version: AtomicUsize,
have_remote_client: bool,
/// Allow subscribing to when the layer actually gets evicted.
///
/// This might never come unless eviction called periodically.
#[cfg(test)]
evicted: tokio::sync::Notify,
status: tokio::sync::broadcast::Sender<Status>,
}
#[derive(Debug, Clone, Copy)]
enum Status {
Evicted,
Downloaded,
}
impl std::fmt::Display for LayerE {
@@ -423,6 +426,7 @@ impl Drop for LayerE {
return;
}
// TODO: spawn_blocking?
let span = tracing::info_span!(parent: None, "layer_drop", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self);
// SEMITODO: yes, this is sync, could spawn as well..
@@ -481,8 +485,7 @@ impl LayerE {
wanted_evicted: AtomicBool::new(false),
inner: Default::default(),
version: AtomicUsize::new(0),
#[cfg(test)]
evicted: tokio::sync::Notify::default(),
status: tokio::sync::broadcast::channel(1).0,
}
}
@@ -512,10 +515,9 @@ impl LayerE {
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: tokio::sync::Mutex::new(Some(ResidentOrWantedEvicted::Resident(inner))),
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
version: AtomicUsize::new(0),
#[cfg(test)]
evicted: tokio::sync::Notify::default(),
status: tokio::sync::broadcast::channel(1).0,
}
});
@@ -551,33 +553,55 @@ impl LayerE {
);
self.wanted_evicted.store(true, Ordering::Release);
let Ok(mut guard) = self.inner.try_lock() else {
let Some(guard) = self.inner.get() else {
// we don't need to wait around if there is a download ongoing, because that might reset the wanted_evicted
// however it's also possible that we are present and just accessed by someone else.
return Ok(false);
return Err(super::timeline::EvictionError::NotFound);
};
if let Some(either) = guard.as_mut() {
// now, this might immediatedly cause the drop fn to run, but that'll only act on
// background
let weak = either.downgrade();
let right_away = weak.upgrade().is_none();
Ok(right_away)
} else {
// already evicted; the wanted_evicted will be reset by next download
Err(super::timeline::EvictionError::FileNotFound)
}
// now, this might immediatedly cause the drop fn to run, but that'll only act on
// background
Ok(
Self::get_or_apply_evictedness(Some(guard), &self.wanted_evicted)
.map(|_strong| false)
.unwrap_or(true),
)
}
#[cfg(test)]
pub(crate) fn wait_evicted(&self) -> impl std::future::Future<Output = ()> + '_ {
// for this to be actually useful, we must be first able to check some status, otherwise
// we could wait here for next eviction.
//
// states => (resident wanted_evicted evicted|wanted_evicted evicted resident)* wanted_garbage_collected? dropped
self.evicted.notified()
pub(crate) async fn evict_and_wait(&self) -> Result<(), super::timeline::EvictionError> {
use tokio::sync::broadcast::error::RecvError;
self.wanted_evicted.store(true, Ordering::Release);
let mut rx = self.status.subscribe();
// why call get instead of looking at the watch? because get will downgrade any
// Arc<_> it finds, because we set the wanted_evicted
if dbg!(self.get()).is_none() {
// it was not evictable in the first place
// our store to the wanted_evicted does not matter; it will be reset by next download
return Err(super::timeline::EvictionError::NotFound);
}
match rx.recv().await {
Ok(Status::Evicted) => Ok(()),
Ok(Status::Downloaded) => Err(super::timeline::EvictionError::Downloaded),
Err(RecvError::Closed) => {
unreachable!("sender cannot be dropped while we are in &self method")
}
Err(RecvError::Lagged(_)) => {
// this is quite unlikely, but we are blocking a lot in the async context, so
// we might be missing this because we are stuck on a LIFO slot on a thread
// which is busy blocking for a 1TB database create_image_layers.
//
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
match self.get() {
Some(_) => Err(super::timeline::EvictionError::Downloaded),
None => Ok(()),
}
}
}
}
/// Delete the layer file when the `self` gets dropped, also schedule a remote index upload
@@ -621,7 +645,6 @@ impl LayerE {
) -> anyhow::Result<ResidentLayer> {
let downloaded = if !allow_download {
self.get()
.await
.ok_or_else(|| anyhow::anyhow!("layer {self} is not downloaded"))
} else {
self.get_or_download(None).await
@@ -633,20 +656,18 @@ impl LayerE {
})
}
async fn get(&self) -> Option<Arc<DownloadedLayer>> {
let mut locked = self.inner.lock().await;
fn get(&self) -> Option<Arc<DownloadedLayer>> {
let locked = self.inner.get();
Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted)
Self::get_or_apply_evictedness(locked, &self.wanted_evicted)
}
fn get_or_apply_evictedness(
guard: &mut tokio::sync::MutexGuard<'_, Option<ResidentOrWantedEvicted>>,
guard: Option<heavier_once_cell::Guard<'_, ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
) -> Option<Arc<DownloadedLayer>> {
if let Some(x) = &mut **guard {
let ret = x.get();
if let Some(won) = ret {
if let Some(mut x) = guard {
if let Some(won) = x.get() {
// there are no guarantees that we will always get to observe a concurrent call
// to evict
if wanted_evicted.load(Ordering::Acquire) {
@@ -664,96 +685,92 @@ impl LayerE {
self: &Arc<Self>,
ctx: Option<&RequestContext>,
) -> anyhow::Result<Arc<DownloadedLayer>> {
let mut locked = self.inner.lock().await;
let download = move || async move {
// disable any scheduled but not yet running eviction deletions for this
self.version.fetch_add(1, Ordering::Relaxed);
if let Some(strong) = Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted) {
return Ok(strong);
}
// what to do if we have a concurrent eviction request when we are downloading? eviction
// api's use ResidentLayer, so evict could be moved there, or we just reset the state here.
self.wanted_evicted.store(false, Ordering::Release);
if let Some(ctx) = ctx {
use crate::context::DownloadBehavior::*;
let b = ctx.download_behavior();
match b {
Download => {}
Warn | Error => {
warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
// no need to make the evict_and_wait wait for the actual download to complete
drop(self.status.send(Status::Downloaded));
let really_error = matches!(b, Error)
&& !self.conf.ondemand_download_behavior_treat_error_as_warn;
// drop the old one, it at most held the weak or had not been initialized ever
// locked.take();
if really_error {
// originally this returned
// return Err(PageReconstructError::NeedsDownload(
// TenantTimelineId::new(self.tenant_id, self.timeline_id),
// remote_layer.filename(),
// ))
//
// this check is only probablistic, seems like flakyness footgun
anyhow::bail!("refusing to download layer {self} due to RequestContext")
// technically the mutex could be dropped here.
let Some(timeline) = self.timeline.upgrade() else { anyhow::bail!("timeline has gone already") };
let task_name = format!("download layer {}", self);
let can_ever_evict = timeline.remote_client.as_ref().is_some();
let needs_download = self
.needs_download()
.await
.context("check if layer file is present")?;
if let Some(reason) = needs_download {
if !can_ever_evict {
anyhow::bail!("refusing to attempt downloading {self} because no remote timeline client, reason: {reason}")
};
if self.wanted_garbage_collected.load(Ordering::Acquire) {
// it will fail because we should had already scheduled a delete and an
// index update
tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail");
// FIXME: we probably do not gc delete until the file goes away...? unsure
} else {
tracing::debug!(%reason, "downloading layer");
}
if let Some(ctx) = ctx {
use crate::context::DownloadBehavior::*;
let b = ctx.download_behavior();
match b {
Download => {}
Warn | Error => {
warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
let really_error = matches!(b, Error)
&& !self.conf.ondemand_download_behavior_treat_error_as_warn;
if really_error {
// originally this returned
// return Err(PageReconstructError::NeedsDownload(
// TenantTimelineId::new(self.tenant_id, self.timeline_id),
// remote_layer.filename(),
// ))
//
// this check is only probablistic, seems like flakyness footgun
anyhow::bail!("refusing to download layer {self} due to RequestContext")
}
}
}
}
}
}
// disable any scheduled but not yet running eviction deletions for this
self.version.fetch_add(1, Ordering::Relaxed);
// what to do if we have a concurrent eviction request when we are downloading? eviction
// api's use ResidentLayer, so evict could be moved there, or we just reset the state here.
self.wanted_evicted.store(false, Ordering::Release);
// drop the old one, we only held the weak or it was had not been initialized ever
locked.take();
// technically the mutex could be dropped here and it does seem extra not to have Option
// here
let Some(timeline) = self.timeline.upgrade() else { anyhow::bail!("timeline has gone already") };
let task_name = format!("download layer {}", self);
let can_ever_evict = timeline.remote_client.as_ref().is_some();
let needs_download = self
.needs_download()
.await
.context("check if layer file is present")?;
if let Some(reason) = needs_download {
if !can_ever_evict {
anyhow::bail!("refusing to attempt downloading {self} because no remote timeline client, reason: {reason}")
};
if self.wanted_garbage_collected.load(Ordering::Acquire) {
// it will fail because we should had already scheduled a delete and an
// index update
tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail");
// FIXME: we probably do not gc delete until the file goes away...? unsure
} else {
tracing::debug!(%reason, "downloading layer");
}
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_id),
Some(self.desc.timeline_id),
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
.expect("checked above with can_ever_evict");
let result = client
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_id),
Some(self.desc.timeline_id),
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
.expect("checked above with can_ever_evict");
let result = client
.download_layer_file(
&this.desc.filename(),
&crate::tenant::remote_timeline_client::index::LayerFileMetadata::new(
@@ -762,64 +779,70 @@ impl LayerE {
)
.await;
match result {
Ok(size) => {
timeline.metrics.resident_physical_size_gauge.add(size);
let _ = tx.send(());
}
Err(e) => {
// TODO: the temp file might still be around, metrics might be off
tracing::error!("layer file download failed: {e:?}",);
match result {
Ok(size) => {
timeline.metrics.resident_physical_size_gauge.add(size);
let _ = tx.send(());
}
Err(e) => {
// TODO: the temp file might still be around, metrics might be off
tracing::error!("layer file download failed: {e:?}",);
}
}
Ok(())
}
Ok(())
.in_current_span(),
);
if rx.await.is_err() {
return Err(anyhow::anyhow!("downloading failed, possibly for shutdown"));
}
.in_current_span(),
);
if rx.await.is_err() {
return Err(anyhow::anyhow!("downloading failed, possibly for shutdown"));
// FIXME: we need backoff here so never spiral to download loop
anyhow::ensure!(
self.needs_download()
.await
.context("test if downloading is still needed")?
.is_none(),
"post-condition for downloading: no longer needs downloading"
);
} else {
// the file is present locally and we could even be running without remote
// storage
}
// FIXME: we need backoff here so never spiral to download loop
anyhow::ensure!(
self.needs_download()
.await
.context("test if downloading is still needed")?
.is_none(),
"post-condition for downloading: no longer needs downloading"
);
} else {
// the file is present locally and we could even be running without remote
// storage
}
// the assumption is that we own the layer residentness, no operator should go in
// and delete random files. this would be evident when trying to access the file
// Nth time (N>1) while having the VirtualFile evicted in between.
//
// we could support this by looping on NotFound from the layer access methods, but
// it's difficult to implement this so that the operator does not delete
// not-yet-uploaded files.
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
kind: tokio::sync::OnceCell::default(),
});
*locked = Some(if self.wanted_evicted.load(Ordering::Acquire) {
// because we reset wanted_evictness near beginning, this means when we were downloading someone
// wanted to evict this layer.
// the assumption is that we own the layer residentness, no operator should go in
// and delete random files. this would be evident when trying to access the file
// Nth time (N>1) while having the VirtualFile evicted in between.
//
// perhaps the evict should only possible via ResidentLayer because this makes my head
// spin. the caller of this function will still get the proper `Arc<DownloadedLayer>`.
//
// the risk is that eviction becomes too flaky.
ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res))
} else {
ResidentOrWantedEvicted::Resident(res.clone())
});
// we could support this by looping on NotFound from the layer access methods, but
// it's difficult to implement this so that the operator does not delete
// not-yet-uploaded files.
Ok(res)
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
kind: tokio::sync::OnceCell::default(),
});
Ok(if self.wanted_evicted.load(Ordering::Acquire) {
// because we reset wanted_evictness near beginning, this means when we were downloading someone
// wanted to evict this layer.
//
// perhaps the evict should only possible via ResidentLayer because this makes my head
// spin. the caller of this function will still get the proper `Arc<DownloadedLayer>`.
//
// the risk is that eviction becomes too flaky.
ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res))
} else {
ResidentOrWantedEvicted::Resident(res.clone())
})
};
let locked = self.inner.get_or_init(download).await?;
Ok(
Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted)
.expect("It is not none, we just received it"),
)
}
pub(crate) fn local_path(&self) -> &std::path::Path {
@@ -902,7 +925,7 @@ impl LayerE {
let can_evict = self.have_remote_client;
if gc {
// do nothing now, only when the whole layer is dropped. gc will end up dropping the
// do nothing now, only when the whole layer is dropped. gc will end up deleting the
// whole layer, in case there is no reference cycle.
} else if can_evict && evict {
// we can remove this right now, but ... we really should not block or do anything.
@@ -926,22 +949,37 @@ impl LayerE {
// deleted or detached timeline, don't do anything.
let Some(timeline) = this.timeline.upgrade() else { return; };
let mut guard = this.inner.lock().await;
// relaxed ordering: we dont have any other atomics pending
if version != this.version.load(Ordering::Relaxed) {
// downloadness-state has advanced, we might no longer be the latest eviction
// work; don't do anything.
return;
}
// to avoid starting a new download while we evict, keep holding on to the
// permit. note that we will not close the semaphore when done, because it will
// be used by the re-download.
let _permit = {
let maybe_downloaded = this.inner.get();
// relaxed ordering: we dont have any other atomics pending
if version != this.version.load(Ordering::Relaxed) {
// downloadness-state has advanced, we might no longer be the latest eviction
// work; don't do anything.
return;
}
// free the DownloadedLayer allocation
let taken = guard.take();
assert!(matches!(taken, None | Some(ResidentOrWantedEvicted::WantedEvicted(_))), "this is what the version is supposed to guard against but we could just undo it and remove version?");
// free the DownloadedLayer allocation
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
Some((taken, permit)) => {
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
permit
}
None => {
unreachable!("we do the version checking for this exact reason")
}
}
};
if !this.wanted_evicted.load(Ordering::Acquire) {
// if there's already interest, should we just early exit? this is not
// currently *cleared* on interest, maybe it shouldn't?
// FIXME: wanted_evicted cannot be unset right now
//
// NOTE: us holding the permit prevents a new round of download happening
// right now
return;
}
@@ -962,8 +1000,7 @@ impl LayerE {
let res = capture_mtime_and_delete.await;
#[cfg(test)]
this.evicted.notify_waiters();
drop(this.status.send(Status::Evicted));
match res {
Ok(Ok(local_layer_mtime)) => {
@@ -1048,12 +1085,8 @@ impl NeedsDownload {
/// or garbage collection happens.
#[derive(Clone)]
pub(crate) struct ResidentLayer {
// field order matters: we want the downloaded layer to be dropped before owner, so that ... at
// least this is how the code expects it right now. The added spawn carrying a weak should
// protect us, but it's theoretically possible for that spawn to keep the LayerE alive and
// evict before garbage_collect.
_downloaded: Arc<DownloadedLayer>,
owner: Arc<LayerE>,
_downloaded: Arc<DownloadedLayer>,
}
impl std::fmt::Display for ResidentLayer {
@@ -1111,6 +1144,16 @@ pub(crate) struct DownloadedLayer {
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
}
impl std::fmt::Debug for DownloadedLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DownloadedLayer")
// FIXME: this is not useful, always "Weak"
.field("owner", &self.owner)
.field("kind", &self.kind)
.finish()
}
}
impl Drop for DownloadedLayer {
fn drop(&mut self) {
if let Some(owner) = self.owner.upgrade() {
@@ -1123,42 +1166,38 @@ impl Drop for DownloadedLayer {
impl DownloadedLayer {
async fn get(&self) -> anyhow::Result<&LayerKind> {
self.kind
.get_or_init(|| async {
let Some(owner) = self.owner.upgrade() else {
let init = || async {
let Some(owner) = self.owner.upgrade() else {
anyhow::bail!("Cannot init, the layer has already been dropped");
};
// there is nothing async here, but it should be async
if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta)
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary)
.map(LayerKind::Image)
}
// this should be a permanent failure
.context("load layer")
})
.await
.as_ref()
.map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
// there is nothing async here, but it should be async
if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta)
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary).map(LayerKind::Image)
}
// this will be a permanent failure
.context("load layer")
};
self.kind.get_or_init(init).await.as_ref().map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
}
async fn get_value_reconstruct_data(