From a0f29853b33ecd72615cd23cefe40e998f9088fd Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 21 Aug 2023 21:04:14 +0300 Subject: [PATCH] layere: rewrite to heavier_once_cell --- pageserver/src/tenant/storage_layer.rs | 495 +++++++++++++------------ 1 file changed, 267 insertions(+), 228 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 118e916422..f9d03acff8 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -358,16 +358,13 @@ pub(crate) struct LayerE { desc: PersistentLayerDesc, - /// Should this be weak? This is probably a runtime cycle which leaks Timelines on - /// detaches. timeline: Weak, access_stats: LayerAccessStats, - /// This is a mutex, because we want to be able to - /// - `Option::take(&mut self)` to drop the Arc allocation - /// - `ResidentDeltaLayer::downgrade(&mut self)` - inner: tokio::sync::Mutex>, + /// This custom OnceCell is backed by std mutex, but only held for short time periods. + /// Initialization and deinitialization is done while holding a permit. + inner: heavier_once_cell::OnceCell, /// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection /// means: @@ -387,13 +384,19 @@ pub(crate) struct LayerE { /// Version is to make sure we will in fact only evict a file if no new guard has been created /// for it. version: AtomicUsize, + have_remote_client: bool, /// Allow subscribing to when the layer actually gets evicted. /// /// This might never come unless eviction called periodically. - #[cfg(test)] - evicted: tokio::sync::Notify, + status: tokio::sync::broadcast::Sender, +} + +#[derive(Debug, Clone, Copy)] +enum Status { + Evicted, + Downloaded, } impl std::fmt::Display for LayerE { @@ -423,6 +426,7 @@ impl Drop for LayerE { return; } + // TODO: spawn_blocking? let span = tracing::info_span!(parent: None, "layer_drop", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self); // SEMITODO: yes, this is sync, could spawn as well.. @@ -481,8 +485,7 @@ impl LayerE { wanted_evicted: AtomicBool::new(false), inner: Default::default(), version: AtomicUsize::new(0), - #[cfg(test)] - evicted: tokio::sync::Notify::default(), + status: tokio::sync::broadcast::channel(1).0, } } @@ -512,10 +515,9 @@ impl LayerE { access_stats: LayerAccessStats::empty_will_record_residence_event_later(), wanted_garbage_collected: AtomicBool::new(false), wanted_evicted: AtomicBool::new(false), - inner: tokio::sync::Mutex::new(Some(ResidentOrWantedEvicted::Resident(inner))), + inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), version: AtomicUsize::new(0), - #[cfg(test)] - evicted: tokio::sync::Notify::default(), + status: tokio::sync::broadcast::channel(1).0, } }); @@ -551,33 +553,55 @@ impl LayerE { ); self.wanted_evicted.store(true, Ordering::Release); - let Ok(mut guard) = self.inner.try_lock() else { + let Some(guard) = self.inner.get() else { // we don't need to wait around if there is a download ongoing, because that might reset the wanted_evicted // however it's also possible that we are present and just accessed by someone else. - return Ok(false); + return Err(super::timeline::EvictionError::NotFound); }; - if let Some(either) = guard.as_mut() { - // now, this might immediatedly cause the drop fn to run, but that'll only act on - // background - let weak = either.downgrade(); - - let right_away = weak.upgrade().is_none(); - - Ok(right_away) - } else { - // already evicted; the wanted_evicted will be reset by next download - Err(super::timeline::EvictionError::FileNotFound) - } + // now, this might immediatedly cause the drop fn to run, but that'll only act on + // background + Ok( + Self::get_or_apply_evictedness(Some(guard), &self.wanted_evicted) + .map(|_strong| false) + .unwrap_or(true), + ) } - #[cfg(test)] - pub(crate) fn wait_evicted(&self) -> impl std::future::Future + '_ { - // for this to be actually useful, we must be first able to check some status, otherwise - // we could wait here for next eviction. - // - // states => (resident wanted_evicted evicted|wanted_evicted evicted resident)* wanted_garbage_collected? dropped - self.evicted.notified() + pub(crate) async fn evict_and_wait(&self) -> Result<(), super::timeline::EvictionError> { + use tokio::sync::broadcast::error::RecvError; + + self.wanted_evicted.store(true, Ordering::Release); + + let mut rx = self.status.subscribe(); + + // why call get instead of looking at the watch? because get will downgrade any + // Arc<_> it finds, because we set the wanted_evicted + if dbg!(self.get()).is_none() { + // it was not evictable in the first place + // our store to the wanted_evicted does not matter; it will be reset by next download + return Err(super::timeline::EvictionError::NotFound); + } + + match rx.recv().await { + Ok(Status::Evicted) => Ok(()), + Ok(Status::Downloaded) => Err(super::timeline::EvictionError::Downloaded), + Err(RecvError::Closed) => { + unreachable!("sender cannot be dropped while we are in &self method") + } + Err(RecvError::Lagged(_)) => { + // this is quite unlikely, but we are blocking a lot in the async context, so + // we might be missing this because we are stuck on a LIFO slot on a thread + // which is busy blocking for a 1TB database create_image_layers. + // + // use however late (compared to the initial expressing of wanted) as the + // "outcome" now + match self.get() { + Some(_) => Err(super::timeline::EvictionError::Downloaded), + None => Ok(()), + } + } + } } /// Delete the layer file when the `self` gets dropped, also schedule a remote index upload @@ -621,7 +645,6 @@ impl LayerE { ) -> anyhow::Result { let downloaded = if !allow_download { self.get() - .await .ok_or_else(|| anyhow::anyhow!("layer {self} is not downloaded")) } else { self.get_or_download(None).await @@ -633,20 +656,18 @@ impl LayerE { }) } - async fn get(&self) -> Option> { - let mut locked = self.inner.lock().await; + fn get(&self) -> Option> { + let locked = self.inner.get(); - Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted) + Self::get_or_apply_evictedness(locked, &self.wanted_evicted) } fn get_or_apply_evictedness( - guard: &mut tokio::sync::MutexGuard<'_, Option>, + guard: Option>, wanted_evicted: &AtomicBool, ) -> Option> { - if let Some(x) = &mut **guard { - let ret = x.get(); - - if let Some(won) = ret { + if let Some(mut x) = guard { + if let Some(won) = x.get() { // there are no guarantees that we will always get to observe a concurrent call // to evict if wanted_evicted.load(Ordering::Acquire) { @@ -664,96 +685,92 @@ impl LayerE { self: &Arc, ctx: Option<&RequestContext>, ) -> anyhow::Result> { - let mut locked = self.inner.lock().await; + let download = move || async move { + // disable any scheduled but not yet running eviction deletions for this + self.version.fetch_add(1, Ordering::Relaxed); - if let Some(strong) = Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted) { - return Ok(strong); - } + // what to do if we have a concurrent eviction request when we are downloading? eviction + // api's use ResidentLayer, so evict could be moved there, or we just reset the state here. + self.wanted_evicted.store(false, Ordering::Release); - if let Some(ctx) = ctx { - use crate::context::DownloadBehavior::*; - let b = ctx.download_behavior(); - match b { - Download => {} - Warn | Error => { - warn!( - "unexpectedly on-demand downloading remote layer {self} for task kind {:?}", - ctx.task_kind() - ); - crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc(); + // no need to make the evict_and_wait wait for the actual download to complete + drop(self.status.send(Status::Downloaded)); - let really_error = matches!(b, Error) - && !self.conf.ondemand_download_behavior_treat_error_as_warn; + // drop the old one, it at most held the weak or had not been initialized ever + // locked.take(); - if really_error { - // originally this returned - // return Err(PageReconstructError::NeedsDownload( - // TenantTimelineId::new(self.tenant_id, self.timeline_id), - // remote_layer.filename(), - // )) - // - // this check is only probablistic, seems like flakyness footgun - anyhow::bail!("refusing to download layer {self} due to RequestContext") + // technically the mutex could be dropped here. + let Some(timeline) = self.timeline.upgrade() else { anyhow::bail!("timeline has gone already") }; + + let task_name = format!("download layer {}", self); + + let can_ever_evict = timeline.remote_client.as_ref().is_some(); + + let needs_download = self + .needs_download() + .await + .context("check if layer file is present")?; + + if let Some(reason) = needs_download { + if !can_ever_evict { + anyhow::bail!("refusing to attempt downloading {self} because no remote timeline client, reason: {reason}") + }; + + if self.wanted_garbage_collected.load(Ordering::Acquire) { + // it will fail because we should had already scheduled a delete and an + // index update + tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail"); + // FIXME: we probably do not gc delete until the file goes away...? unsure + } else { + tracing::debug!(%reason, "downloading layer"); + } + + if let Some(ctx) = ctx { + use crate::context::DownloadBehavior::*; + let b = ctx.download_behavior(); + match b { + Download => {} + Warn | Error => { + warn!( + "unexpectedly on-demand downloading remote layer {self} for task kind {:?}", + ctx.task_kind() + ); + crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc(); + + let really_error = matches!(b, Error) + && !self.conf.ondemand_download_behavior_treat_error_as_warn; + + if really_error { + // originally this returned + // return Err(PageReconstructError::NeedsDownload( + // TenantTimelineId::new(self.tenant_id, self.timeline_id), + // remote_layer.filename(), + // )) + // + // this check is only probablistic, seems like flakyness footgun + anyhow::bail!("refusing to download layer {self} due to RequestContext") + } + } } } - } - } - // disable any scheduled but not yet running eviction deletions for this - self.version.fetch_add(1, Ordering::Relaxed); - - // what to do if we have a concurrent eviction request when we are downloading? eviction - // api's use ResidentLayer, so evict could be moved there, or we just reset the state here. - self.wanted_evicted.store(false, Ordering::Release); - - // drop the old one, we only held the weak or it was had not been initialized ever - locked.take(); - - // technically the mutex could be dropped here and it does seem extra not to have Option - // here - - let Some(timeline) = self.timeline.upgrade() else { anyhow::bail!("timeline has gone already") }; - - let task_name = format!("download layer {}", self); - - let can_ever_evict = timeline.remote_client.as_ref().is_some(); - - let needs_download = self - .needs_download() - .await - .context("check if layer file is present")?; - - if let Some(reason) = needs_download { - if !can_ever_evict { - anyhow::bail!("refusing to attempt downloading {self} because no remote timeline client, reason: {reason}") - }; - - if self.wanted_garbage_collected.load(Ordering::Acquire) { - // it will fail because we should had already scheduled a delete and an - // index update - tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail"); - // FIXME: we probably do not gc delete until the file goes away...? unsure - } else { - tracing::debug!(%reason, "downloading layer"); - } - - let (tx, rx) = tokio::sync::oneshot::channel(); - // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot - // block tenant::mgr::remove_tenant_from_memory. - let this = self.clone(); - crate::task_mgr::spawn( - &tokio::runtime::Handle::current(), - TaskKind::RemoteDownloadTask, - Some(self.desc.tenant_id), - Some(self.desc.timeline_id), - &task_name, - false, - async move { - let client = timeline - .remote_client - .as_ref() - .expect("checked above with can_ever_evict"); - let result = client + let (tx, rx) = tokio::sync::oneshot::channel(); + // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot + // block tenant::mgr::remove_tenant_from_memory. + let this = self.clone(); + crate::task_mgr::spawn( + &tokio::runtime::Handle::current(), + TaskKind::RemoteDownloadTask, + Some(self.desc.tenant_id), + Some(self.desc.timeline_id), + &task_name, + false, + async move { + let client = timeline + .remote_client + .as_ref() + .expect("checked above with can_ever_evict"); + let result = client .download_layer_file( &this.desc.filename(), &crate::tenant::remote_timeline_client::index::LayerFileMetadata::new( @@ -762,64 +779,70 @@ impl LayerE { ) .await; - match result { - Ok(size) => { - timeline.metrics.resident_physical_size_gauge.add(size); - let _ = tx.send(()); - } - Err(e) => { - // TODO: the temp file might still be around, metrics might be off - tracing::error!("layer file download failed: {e:?}",); + match result { + Ok(size) => { + timeline.metrics.resident_physical_size_gauge.add(size); + let _ = tx.send(()); + } + Err(e) => { + // TODO: the temp file might still be around, metrics might be off + tracing::error!("layer file download failed: {e:?}",); + } } + + Ok(()) } - - Ok(()) + .in_current_span(), + ); + if rx.await.is_err() { + return Err(anyhow::anyhow!("downloading failed, possibly for shutdown")); } - .in_current_span(), - ); - if rx.await.is_err() { - return Err(anyhow::anyhow!("downloading failed, possibly for shutdown")); + // FIXME: we need backoff here so never spiral to download loop + anyhow::ensure!( + self.needs_download() + .await + .context("test if downloading is still needed")? + .is_none(), + "post-condition for downloading: no longer needs downloading" + ); + } else { + // the file is present locally and we could even be running without remote + // storage } - // FIXME: we need backoff here so never spiral to download loop - anyhow::ensure!( - self.needs_download() - .await - .context("test if downloading is still needed")? - .is_none(), - "post-condition for downloading: no longer needs downloading" - ); - } else { - // the file is present locally and we could even be running without remote - // storage - } - // the assumption is that we own the layer residentness, no operator should go in - // and delete random files. this would be evident when trying to access the file - // Nth time (N>1) while having the VirtualFile evicted in between. - // - // we could support this by looping on NotFound from the layer access methods, but - // it's difficult to implement this so that the operator does not delete - // not-yet-uploaded files. - - let res = Arc::new(DownloadedLayer { - owner: Arc::downgrade(self), - kind: tokio::sync::OnceCell::default(), - }); - - *locked = Some(if self.wanted_evicted.load(Ordering::Acquire) { - // because we reset wanted_evictness near beginning, this means when we were downloading someone - // wanted to evict this layer. + // the assumption is that we own the layer residentness, no operator should go in + // and delete random files. this would be evident when trying to access the file + // Nth time (N>1) while having the VirtualFile evicted in between. // - // perhaps the evict should only possible via ResidentLayer because this makes my head - // spin. the caller of this function will still get the proper `Arc`. - // - // the risk is that eviction becomes too flaky. - ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res)) - } else { - ResidentOrWantedEvicted::Resident(res.clone()) - }); + // we could support this by looping on NotFound from the layer access methods, but + // it's difficult to implement this so that the operator does not delete + // not-yet-uploaded files. - Ok(res) + let res = Arc::new(DownloadedLayer { + owner: Arc::downgrade(self), + kind: tokio::sync::OnceCell::default(), + }); + + Ok(if self.wanted_evicted.load(Ordering::Acquire) { + // because we reset wanted_evictness near beginning, this means when we were downloading someone + // wanted to evict this layer. + // + // perhaps the evict should only possible via ResidentLayer because this makes my head + // spin. the caller of this function will still get the proper `Arc`. + // + // the risk is that eviction becomes too flaky. + ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res)) + } else { + ResidentOrWantedEvicted::Resident(res.clone()) + }) + }; + + let locked = self.inner.get_or_init(download).await?; + + Ok( + Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) + .expect("It is not none, we just received it"), + ) } pub(crate) fn local_path(&self) -> &std::path::Path { @@ -902,7 +925,7 @@ impl LayerE { let can_evict = self.have_remote_client; if gc { - // do nothing now, only when the whole layer is dropped. gc will end up dropping the + // do nothing now, only when the whole layer is dropped. gc will end up deleting the // whole layer, in case there is no reference cycle. } else if can_evict && evict { // we can remove this right now, but ... we really should not block or do anything. @@ -926,22 +949,37 @@ impl LayerE { // deleted or detached timeline, don't do anything. let Some(timeline) = this.timeline.upgrade() else { return; }; - let mut guard = this.inner.lock().await; - // relaxed ordering: we dont have any other atomics pending - if version != this.version.load(Ordering::Relaxed) { - // downloadness-state has advanced, we might no longer be the latest eviction - // work; don't do anything. - return; - } + // to avoid starting a new download while we evict, keep holding on to the + // permit. note that we will not close the semaphore when done, because it will + // be used by the re-download. + let _permit = { + let maybe_downloaded = this.inner.get(); + // relaxed ordering: we dont have any other atomics pending + if version != this.version.load(Ordering::Relaxed) { + // downloadness-state has advanced, we might no longer be the latest eviction + // work; don't do anything. + return; + } - // free the DownloadedLayer allocation - let taken = guard.take(); - assert!(matches!(taken, None | Some(ResidentOrWantedEvicted::WantedEvicted(_))), "this is what the version is supposed to guard against but we could just undo it and remove version?"); + // free the DownloadedLayer allocation + match maybe_downloaded.map(|mut g| g.take_and_deinit()) { + Some((taken, permit)) => { + assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_))); + permit + } + None => { + unreachable!("we do the version checking for this exact reason") + } + } + }; if !this.wanted_evicted.load(Ordering::Acquire) { // if there's already interest, should we just early exit? this is not // currently *cleared* on interest, maybe it shouldn't? // FIXME: wanted_evicted cannot be unset right now + // + // NOTE: us holding the permit prevents a new round of download happening + // right now return; } @@ -962,8 +1000,7 @@ impl LayerE { let res = capture_mtime_and_delete.await; - #[cfg(test)] - this.evicted.notify_waiters(); + drop(this.status.send(Status::Evicted)); match res { Ok(Ok(local_layer_mtime)) => { @@ -1048,12 +1085,8 @@ impl NeedsDownload { /// or garbage collection happens. #[derive(Clone)] pub(crate) struct ResidentLayer { - // field order matters: we want the downloaded layer to be dropped before owner, so that ... at - // least this is how the code expects it right now. The added spawn carrying a weak should - // protect us, but it's theoretically possible for that spawn to keep the LayerE alive and - // evict before garbage_collect. - _downloaded: Arc, owner: Arc, + _downloaded: Arc, } impl std::fmt::Display for ResidentLayer { @@ -1111,6 +1144,16 @@ pub(crate) struct DownloadedLayer { kind: tokio::sync::OnceCell>, } +impl std::fmt::Debug for DownloadedLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DownloadedLayer") + // FIXME: this is not useful, always "Weak" + .field("owner", &self.owner) + .field("kind", &self.kind) + .finish() + } +} + impl Drop for DownloadedLayer { fn drop(&mut self) { if let Some(owner) = self.owner.upgrade() { @@ -1123,42 +1166,38 @@ impl Drop for DownloadedLayer { impl DownloadedLayer { async fn get(&self) -> anyhow::Result<&LayerKind> { - self.kind - .get_or_init(|| async { - let Some(owner) = self.owner.upgrade() else { + let init = || async { + let Some(owner) = self.owner.upgrade() else { anyhow::bail!("Cannot init, the layer has already been dropped"); }; - // there is nothing async here, but it should be async - if owner.desc.is_delta { - let summary = Some(delta_layer::Summary::expected( - owner.desc.tenant_id, - owner.desc.timeline_id, - owner.desc.key_range.clone(), - owner.desc.lsn_range.clone(), - )); - delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta) - } else { - let lsn = owner.desc.image_layer_lsn(); - let summary = Some(image_layer::Summary::expected( - owner.desc.tenant_id, - owner.desc.timeline_id, - owner.desc.key_range.clone(), - lsn, - )); - image_layer::ImageLayerInner::load(&owner.path, lsn, summary) - .map(LayerKind::Image) - } - // this should be a permanent failure - .context("load layer") - }) - .await - .as_ref() - .map_err(|e| { - // errors are not clonabled, cannot but stringify - // test_broken_timeline matches this string - anyhow::anyhow!("layer loading failed: {e:#}") - }) + // there is nothing async here, but it should be async + if owner.desc.is_delta { + let summary = Some(delta_layer::Summary::expected( + owner.desc.tenant_id, + owner.desc.timeline_id, + owner.desc.key_range.clone(), + owner.desc.lsn_range.clone(), + )); + delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta) + } else { + let lsn = owner.desc.image_layer_lsn(); + let summary = Some(image_layer::Summary::expected( + owner.desc.tenant_id, + owner.desc.timeline_id, + owner.desc.key_range.clone(), + lsn, + )); + image_layer::ImageLayerInner::load(&owner.path, lsn, summary).map(LayerKind::Image) + } + // this will be a permanent failure + .context("load layer") + }; + self.kind.get_or_init(init).await.as_ref().map_err(|e| { + // errors are not clonabled, cannot but stringify + // test_broken_timeline matches this string + anyhow::anyhow!("layer loading failed: {e:#}") + }) } async fn get_value_reconstruct_data(