chore(layer): restore logging, doc changes (#5766)

Some of the log messages were lost with the #4938. This PR adds some of
them back, most notably:

- starting to on-demand download
- successful completion of on-demand download
- ability to see when there were many waiters for the layer download
- "unexpectedly on-demand downloading ..." is now `info!`

Additionally some rare events are logged as error, which should never
happen.
This commit is contained in:
Joonas Koivunen
2023-11-02 21:05:33 +02:00
committed by GitHub
parent 4c7fa12a2a
commit 27bdbf5e36
2 changed files with 67 additions and 14 deletions

View File

@@ -1,4 +1,7 @@
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -10,6 +13,7 @@ use tokio::sync::Semaphore;
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
initializers: AtomicUsize,
}
impl<T> Default for OnceCell<T> {
@@ -17,6 +21,7 @@ impl<T> Default for OnceCell<T> {
fn default() -> Self {
Self {
inner: Default::default(),
initializers: AtomicUsize::new(0),
}
}
}
@@ -49,6 +54,7 @@ impl<T> OnceCell<T> {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
initializers: AtomicUsize::new(0),
}
}
@@ -71,7 +77,11 @@ impl<T> OnceCell<T> {
guard.init_semaphore.clone()
};
let permit = sem.acquire_owned().await;
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
@@ -100,12 +110,13 @@ impl<T> OnceCell<T> {
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
// cannot assert that this permit is for self.inner.semaphore
let guard = self.inner.lock().unwrap();
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
if guard.init_semaphore.try_acquire().is_ok() {
drop(guard);
panic!("semaphore is of wrong origin");
panic!("permit is of wrong origin");
}
Self::set0(value, guard)
@@ -130,6 +141,28 @@ impl<T> OnceCell<T> {
None
}
}
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
}
}
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
/// initializing task for example at the end of initialization.
struct CountWaitingInitializers<'a, T>(&'a OnceCell<T>);
impl<'a, T> CountWaitingInitializers<'a, T> {
fn start(target: &'a OnceCell<T>) -> Self {
target.initializers.fetch_add(1, Ordering::Relaxed);
CountWaitingInitializers(target)
}
}
impl<'a, T> Drop for CountWaitingInitializers<'a, T> {
fn drop(&mut self) {
self.0.initializers.fetch_sub(1, Ordering::Relaxed);
}
}
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,

View File

@@ -411,6 +411,10 @@ struct LayerInner {
version: AtomicUsize,
/// Allow subscribing to when the layer actually gets evicted.
///
/// If in future we need to implement "wait until layer instances are gone and done", carrying
/// this over to the gc spawn_blocking from LayerInner::drop will do the trick, and adding a
/// method for "wait_gc" which will wait to this being closed.
status: tokio::sync::broadcast::Sender<Status>,
/// Counter for exponential backoff with the download
@@ -561,6 +565,8 @@ impl LayerInner {
}
}
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
pub(crate) async fn evict_and_wait(
&self,
_: &RemoteTimelineClient,
@@ -609,8 +615,8 @@ impl LayerInner {
}
}
/// Should be cancellation safe, but cancellation is troublesome together with the spawned
/// download.
/// Cancellation safe.
#[tracing::instrument(skip_all, fields(layer=%self))]
async fn get_or_maybe_download(
self: &Arc<Self>,
allow_download: bool,
@@ -654,8 +660,6 @@ impl LayerInner {
return Err(DownloadError::NoRemoteStorage);
}
tracing::debug!(%reason, "downloading layer");
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -666,6 +670,8 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
tracing::info!(%reason, "downloading on-demand");
self.spawn_download_and_wait(timeline, permit).await?
} else {
// the file is present locally, probably by a previous but cancelled call to
@@ -686,6 +692,11 @@ impl LayerInner {
LayerResidenceEventReason::ResidenceChange,
);
let waiters = self.inner.initializer_count();
if waiters > 0 {
tracing::info!(waiters, "completing the on-demand download for other tasks");
}
Ok((ResidentOrWantedEvicted::Resident(res), permit))
};
@@ -746,8 +757,8 @@ impl LayerInner {
match b {
Download => Ok(()),
Warn | Error => {
tracing::warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
tracing::info!(
"unexpectedly on-demand downloading for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
@@ -779,6 +790,7 @@ impl LayerInner {
// block tenant::mgr::remove_tenant_from_memory.
let this: Arc<Self> = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
@@ -787,6 +799,7 @@ impl LayerInner {
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
@@ -848,6 +861,7 @@ impl LayerInner {
}
self.consecutive_failures.store(0, Ordering::Relaxed);
tracing::info!("on-demand download successful");
Ok(permit)
}
@@ -1040,11 +1054,14 @@ impl LayerInner {
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::info!("failed to evict file from disk, it was already gone");
tracing::error!(
layer_size = %self.desc.file_size,
"failed to evict layer from disk, it was already gone (metrics will be inaccurate)"
);
Err(EvictionCancelled::FileNotFound)
}
Err(e) => {
tracing::warn!("failed to evict file from disk: {e:#}");
tracing::error!("failed to evict file from disk: {e:#}");
Err(EvictionCancelled::RemoveFailed)
}
};
@@ -1124,6 +1141,8 @@ impl std::fmt::Display for NeedsDownload {
/// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
pub(crate) struct DownloadedLayer {
owner: Weak<LayerInner>,
// Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
// DownloadedLayer
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
version: usize,
}
@@ -1167,7 +1186,6 @@ impl DownloadedLayer {
"these are the same, just avoiding the upgrade"
);
// there is nothing async here, but it should be async
let res = if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
@@ -1266,6 +1284,8 @@ impl std::fmt::Debug for ResidentLayer {
impl ResidentLayer {
/// Release the eviction guard, converting back into a plain [`Layer`].
///
/// You can access the [`Layer`] also by using `as_ref`.
pub(crate) fn drop_eviction_guard(self) -> Layer {
self.into()
}
@@ -1321,7 +1341,7 @@ impl AsRef<Layer> for ResidentLayer {
}
}
/// Allow slimming down if we don't want the `2*usize` with eviction candidates?
/// Drop the eviction guard.
impl From<ResidentLayer> for Layer {
fn from(value: ResidentLayer) -> Self {
value.owner