diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index aaba9bd933..761fe311c6 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -337,31 +337,41 @@ enum ResidentOrWantedEvicted { } impl ResidentOrWantedEvicted { - fn get(&self) -> Option> { + /// If `Some` is returned, the ResidentOrWantedEvicted has been upgraded back from + /// `ResidentOrWantedEvicted::WantedEvicted` to `ResidentOrWantedEvicted::Resident`. + fn get_and_upgrade(&mut self) -> Option> { match self { ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() { Some(strong) => { LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses(); + + *self = ResidentOrWantedEvicted::Resident(strong.clone()); + Some(strong) } None => None, }, } } + /// When eviction is first requested, drop down to holding a [`Weak`]. /// - /// Returns `true` if this was the first time eviction was requested. - fn downgrade(&mut self) -> bool { + /// Returns `Some` if this was the first time eviction was requested. Care should be taken to + /// drop the possibly last strong reference outside of the mutex of + /// heavier_once_cell::OnceCell. + fn downgrade(&mut self) -> Option> { match self { ResidentOrWantedEvicted::Resident(strong) => { let weak = Arc::downgrade(strong); - *self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); - // returning the weak is not useful, because the drop could had already ran with - // the replacement above, and that will take care of cleaning the Option we are in - true + let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); + std::mem::swap(self, &mut temp); + match temp { + ResidentOrWantedEvicted::Resident(strong) => Some(strong), + ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"), + } } - ResidentOrWantedEvicted::WantedEvicted(..) => false, + ResidentOrWantedEvicted::WantedEvicted(..) => None, } } } @@ -563,20 +573,22 @@ impl LayerInner { let mut rx = self.status.subscribe(); - let res = - self.wanted_evicted - .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed); + let strong = { + match self.inner.get() { + Some(mut either) => { + self.wanted_evicted.store(true, Ordering::Relaxed); + either.downgrade() + } + None => return Err(EvictionError::NotFound), + } + }; - if res.is_ok() { + if strong.is_some() { + // drop the DownloadedLayer outside of the holding the guard + drop(strong); LAYER_IMPL_METRICS.inc_started_evictions(); } - if self.get().is_none() { - // it was not evictable in the first place - // our store to the wanted_evicted does not matter; it will be reset by next download - return Err(EvictionError::NotFound); - } - match rx.recv().await { Ok(Status::Evicted) => Ok(()), Ok(Status::Downloaded) => Err(EvictionError::Downloaded), @@ -590,7 +602,8 @@ impl LayerInner { // // use however late (compared to the initial expressing of wanted) as the // "outcome" now - match self.get() { + LAYER_IMPL_METRICS.inc_broadcast_lagged(); + match self.inner.get() { Some(_) => Err(EvictionError::Downloaded), None => Ok(()), } @@ -605,6 +618,8 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { + let mut permit = None; + loop { let download = move || async move { // disable any scheduled but not yet running eviction deletions for this @@ -622,6 +637,8 @@ impl LayerInner { // check if we really need to be downloaded; could have been already downloaded by a // cancelled previous attempt. + // + // FIXME: what if it's a directory? that is currently needs_download == true let needs_download = self .needs_download() .await @@ -670,16 +687,37 @@ impl LayerInner { Ok(ResidentOrWantedEvicted::Resident(res)) }; - let locked = self.inner.get_or_init(download).await?; + let (weak, _permit) = { + // should we be able to give the permit to the `get_or_init`? would make sense. + drop(permit.take()); + let mut locked = self.inner.get_or_init(download).await?; - if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) - { - return Ok(strong); - } + if let Some(strong) = locked.get_and_upgrade() { + self.wanted_evicted.store(false, Ordering::Relaxed); + + // error out any `evict_and_wait` + drop(self.status.send(Status::Downloaded)); + + return Ok(strong); + } else { + // path to here: the evict_blocking is stuck on spawn_blocking queue. + // + // reset the contents, deactivating the eviction and causing a + // EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed. + locked.take_and_deinit() + } + }; + + // unlock first, then drop the weak, but because upgrade failed, we + // know it cannot be a problem. + + assert!( + matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)), + "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug" + ); + + permit = Some(_permit); - // the situation in which we might need to retry is that our init was ready - // immediatedly, but the DownloadedLayer had been dropped BUT failed to complete - // Self::evict_blocking LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download(); } } @@ -812,33 +850,6 @@ impl LayerInner { } } - /// Access the current state without waiting for the file to be downloaded. - /// - /// Requires that we've initialized to state which is respective to the - /// actual residency state. - fn get(&self) -> Option> { - let locked = self.inner.get(); - Self::get_or_apply_evictedness(locked, &self.wanted_evicted) - } - - fn get_or_apply_evictedness( - guard: Option>, - wanted_evicted: &AtomicBool, - ) -> Option> { - if let Some(mut x) = guard { - if let Some(won) = x.get() { - // there are no guarantees that we will always get to observe a concurrent call - // to evict - if wanted_evicted.load(Ordering::Acquire) { - x.downgrade(); - } - return Some(won); - } - } - - None - } - async fn needs_download(&self) -> Result, std::io::Error> { match tokio::fs::metadata(&self.path).await { Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()), @@ -872,7 +883,9 @@ impl LayerInner { fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); - let remote = self.get().is_none(); + // this is not accurate: we could have the file locally but there was a cancellation + // and now we are not in sync, or we are currently downloading it. + let remote = self.inner.get().is_none(); let access_stats = self.access_stats.as_api_model(reset); @@ -1456,6 +1469,13 @@ impl LayerImplMetrics { .unwrap() .inc(); } + + fn inc_broadcast_lagged(&self) { + self.rare_counters + .get_metric_with_label_values(&["broadcast_lagged"]) + .unwrap() + .inc(); + } } enum EvictionCancelled {