mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo
This commit is contained in:
@@ -24,6 +24,29 @@
|
||||
//! is called sampling. Apparently the idea here is to detect outliers.
|
||||
//! We're not sure whether the current choice of sampling method makes sense.
|
||||
//! See https://bheisler.github.io/criterion.rs/book/user_guide/command_line_output.html#collecting-samples
|
||||
//!
|
||||
//! # Reference Numbers
|
||||
//!
|
||||
//! 2024-03-20 on i3en.3xlarge
|
||||
//!
|
||||
//! ```text
|
||||
//! short/1 time: [21.612 µs 21.707 µs 21.817 µs]
|
||||
//! short/2 time: [27.216 µs 27.372 µs 27.557 µs]
|
||||
//! short/4 time: [44.398 µs 45.858 µs 47.178 µs]
|
||||
//! short/8 time: [81.236 µs 83.332 µs 85.419 µs]
|
||||
//! short/16 time: [138.29 µs 139.76 µs 141.24 µs]
|
||||
//! short/32 time: [149.67 µs 150.44 µs 151.30 µs]
|
||||
//! short/64 time: [155.31 µs 155.90 µs 156.59 µs]
|
||||
//! short/128 time: [156.33 µs 156.85 µs 157.44 µs]
|
||||
//! medium/1 time: [105.47 µs 105.87 µs 106.36 µs]
|
||||
//! medium/2 time: [157.09 µs 157.68 µs 158.40 µs]
|
||||
//! medium/4 time: [293.69 µs 306.80 µs 318.11 µs]
|
||||
//! medium/8 time: [594.88 µs 614.05 µs 633.18 µs]
|
||||
//! medium/16 time: [848.28 µs 853.06 µs 858.68 µs]
|
||||
//! medium/32 time: [916.31 µs 920.97 µs 926.27 µs]
|
||||
//! medium/64 time: [939.99 µs 945.49 µs 951.30 µs]
|
||||
//! medium/128 time: [918.89 µs 928.75 µs 938.41 µs]
|
||||
//! ```
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use criterion::{BenchmarkId, Criterion};
|
||||
|
||||
@@ -534,7 +534,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
|
||||
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
|
||||
tracing::debug!(
|
||||
"Wrote local heatmap to {}, with {} timelines",
|
||||
heatmap_path,
|
||||
heatmap.timelines.len()
|
||||
);
|
||||
|
||||
// Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
|
||||
// principle that deletions should be done before writes wherever possible, and so that we can use this
|
||||
@@ -547,6 +551,10 @@ impl<'a> TenantDownloader<'a> {
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!(
|
||||
"Cancelled before downloading timeline {}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -764,10 +772,13 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -300,6 +300,7 @@ where
|
||||
|
||||
let tenant_shard_id = job.get_tenant_shard_id();
|
||||
let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
|
||||
tracing::info!("Command already running, waiting for it");
|
||||
barrier
|
||||
} else {
|
||||
let running = self.spawn_now(job);
|
||||
|
||||
@@ -702,181 +702,132 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: Option<&RequestContext>,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
let mut init_permit = None;
|
||||
let (weak, permit) = {
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached()
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
loop {
|
||||
let download = move |permit| {
|
||||
async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
match locked {
|
||||
// this path could had been a RwLock::read
|
||||
Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
|
||||
Ok(Ok((strong, _))) => {
|
||||
// when upgraded back, the Arc<DownloadedLayer> is still available, but
|
||||
// previously a `evict_and_wait` was received.
|
||||
self.wanted_evicted.store(false, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
// error out any `evict_and_wait`
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled =
|
||||
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
let can_ever_evict = timeline.remote_client.as_ref().is_some();
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
let needs_download = match needs_download {
|
||||
Ok(reason) => reason,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let (permit, downloaded) = if let Some(reason) = needs_download {
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
if !can_ever_evict {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
let res = self.check_expected_download(ctx);
|
||||
if let Err(e) = res {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let permit = self.spawn_download_and_wait(timeline, permit).await;
|
||||
|
||||
let permit = match permit {
|
||||
Ok(permit) => permit,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
(permit, true)
|
||||
} else {
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
(permit, false)
|
||||
};
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
if downloaded {
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
}
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
if waiters > 0 {
|
||||
tracing::info!(
|
||||
waiters,
|
||||
"completing the on-demand download for other tasks"
|
||||
);
|
||||
}
|
||||
|
||||
Ok((ResidentOrWantedEvicted::Resident(res), permit))
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
};
|
||||
|
||||
if let Some(init_permit) = init_permit.take() {
|
||||
// use the already held initialization permit because it is impossible to hit the
|
||||
// below paths anymore essentially limiting the max loop iterations to 2.
|
||||
let (value, init_permit) = download(init_permit).await?;
|
||||
let mut guard = self.inner.set(value, init_permit);
|
||||
let (strong, _upgraded) = guard
|
||||
.get_and_upgrade()
|
||||
.expect("init creates strong reference, we held the init permit");
|
||||
return Ok(strong);
|
||||
}
|
||||
|
||||
let (weak, permit) = {
|
||||
let mut locked = self.inner.get_or_init(download).await?;
|
||||
|
||||
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
|
||||
if upgraded {
|
||||
// when upgraded back, the Arc<DownloadedLayer> is still available, but
|
||||
// previously a `evict_and_wait` was received.
|
||||
self.wanted_evicted.store(false, Ordering::Relaxed);
|
||||
|
||||
// error out any `evict_and_wait`
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
LAYER_IMPL_METRICS
|
||||
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
|
||||
}
|
||||
LAYER_IMPL_METRICS
|
||||
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
|
||||
|
||||
return Ok(strong);
|
||||
} else {
|
||||
}
|
||||
Ok(Err(mut guard)) => {
|
||||
// path to here: the evict_blocking is stuck on spawn_blocking queue.
|
||||
//
|
||||
// reset the contents, deactivating the eviction and causing a
|
||||
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
|
||||
locked.take_and_deinit()
|
||||
let (weak, permit) = guard.take_and_deinit();
|
||||
(Some(weak), permit)
|
||||
}
|
||||
};
|
||||
|
||||
// unlock first, then drop the weak, but because upgrade failed, we
|
||||
// know it cannot be a problem.
|
||||
Err(permit) => (None, permit),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
assert!(
|
||||
matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
|
||||
"unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
|
||||
);
|
||||
|
||||
init_permit = Some(permit);
|
||||
|
||||
LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download();
|
||||
}
|
||||
|
||||
async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
let needs_download = match needs_download {
|
||||
Ok(reason) => reason,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let Some(reason) = needs_download else {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, false);
|
||||
return Ok(res);
|
||||
};
|
||||
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
if timeline.remote_client.as_ref().is_none() {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
let res = self.check_expected_download(ctx);
|
||||
if let Err(e) = res {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let permit = self.spawn_download_and_wait(timeline, permit).await;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
let permit = permit?;
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, true);
|
||||
Ok(res)
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Nag or fail per RequestContext policy
|
||||
@@ -1026,6 +977,59 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes the `Self::inner` to a "resident" state.
|
||||
///
|
||||
/// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
|
||||
/// before calling this method.
|
||||
///
|
||||
/// If this method is ever made async, it needs to be cancellation safe so that no state
|
||||
/// changes are made before we can write to the OnceCell in non-cancellable fashion.
|
||||
fn initialize_after_layer_is_on_disk(
|
||||
self: &Arc<LayerInner>,
|
||||
next_version: usize,
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
downloaded: bool,
|
||||
) -> Arc<DownloadedLayer> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if downloaded {
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
// FIXME: this will not always be recorded correctly until #6028 (the no
|
||||
// download needed branch above)
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
}
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
// FIXME: this might now be double-accounted for !downloaded
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
if waiters > 0 {
|
||||
tracing::info!(waiters, "completing the on-demand download for other tasks");
|
||||
}
|
||||
|
||||
let value = ResidentOrWantedEvicted::Resident(res.clone());
|
||||
|
||||
self.inner.set(value, permit);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match tokio::fs::metadata(&self.path).await {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
@@ -1690,11 +1694,6 @@ impl LayerImplMetrics {
|
||||
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because requires a race with `evict_blocking` and `get_or_maybe_download`.
|
||||
fn inc_retried_get_or_maybe_download(&self) {
|
||||
self.rare_counters[RareEvent::RetriedGetOrMaybeDownload].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected, and failures are unexpected
|
||||
fn inc_download_failed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
|
||||
@@ -1779,7 +1778,6 @@ impl DeleteFailed {
|
||||
#[derive(enum_map::Enum)]
|
||||
enum RareEvent {
|
||||
RemoveOnDropFailed,
|
||||
RetriedGetOrMaybeDownload,
|
||||
DownloadFailedWithoutRequester,
|
||||
UpgradedWantedEvicted,
|
||||
InitWithoutDownload,
|
||||
@@ -1793,7 +1791,6 @@ impl RareEvent {
|
||||
|
||||
match self {
|
||||
RemoveOnDropFailed => "remove_on_drop_failed",
|
||||
RetriedGetOrMaybeDownload => "retried_gomd",
|
||||
DownloadFailedWithoutRequester => "download_failed_without",
|
||||
UpgradedWantedEvicted => "raced_wanted_evicted",
|
||||
InitWithoutDownload => "init_needed_no_download",
|
||||
|
||||
@@ -254,6 +254,8 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
let h = TenantHarness::create("residency_check_while_evict_and_wait_on_clogged_spawn_blocking")
|
||||
.unwrap();
|
||||
let (tenant, ctx) = h.load().await;
|
||||
let span = h.span();
|
||||
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
@@ -292,6 +294,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
|
||||
layer
|
||||
.keep_resident()
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.expect("keep_resident should had reinitialized without downloading")
|
||||
.expect("ResidentLayer");
|
||||
|
||||
@@ -432,6 +432,10 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
- Eviction of layers on the attached location results in deletion
|
||||
on the secondary location as well.
|
||||
"""
|
||||
|
||||
# For debug of https://github.com/neondatabase/neon/issues/6966
|
||||
neon_env_builder.rust_log_override = "DEBUG"
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
|
||||
Reference in New Issue
Block a user