concurrency-limit low-priority initial logical size calculation [v2] (#6000)

Problem
-------

Before this PR, there was no concurrency limit on initial logical size
computations.

While logical size computations are lazy in theory, in practice
(production), they happen in a short timeframe after restart.

This means that on a PS with 20k tenants, we'd have up to 20k concurrent
initial logical size calculation requests.

This is self-inflicted needless overload.

This hasn't been a problem so far because the `.await` points on the
logical size calculation path never return `Pending`, hence we have a
natural concurrency limit of the number of executor threads.
But, as soon as we return `Pending` somewhere in the logical size
calculation path, other concurrent tasks get scheduled by tokio.
If these other tasks are also logical size calculations, they eventually
pound on the same bottleneck.

For example, in #5479, we want to switch the VirtualFile descriptor
cache to a `tokio::sync::RwLock`, which makes us return `Pending`, and
without measures like this patch, after PS restart, VirtualFile
descriptor cache thrashes heavily for 2 hours until all the logical size
calculations have been computed and the degree of concurrency /
concurrent VirtualFile operations is down to regular levels.
See the *Experiment* section below for details.

<!-- Experiments (see below) show that plain #5479 causes heavy
thrashing of the VirtualFile descriptor cache.
The high degree of concurrency is too much for 
In the case of #5479 the VirtualFile descriptor cache size starts
thrashing heavily.


-->

Background
----------

Before this PR, initial logical size calculation was spawned lazily on
first call to `Timeline::get_current_logical_size()`.

In practice (prod), the lazy calculation is triggered by
`WalReceiverConnectionHandler` if the timeline is active according to
storage broker, or by the first iteration of consumption metrics worker
after restart (`MetricsCollection`).

The spawns by walreceiver are high-priority because logical size is
needed by Safekeepers (via walreceiver `PageserverFeedback`) to enforce
the project logical size limit.
The spawns by metrics collection are not on the user-critical path and
hence low-priority. [^consumption_metrics_slo]

[^consumption_metrics_slo]: We can't delay metrics collection
indefintely because there are TBD internal SLOs tied to metrics
collection happening in a timeline manner
(https://github.com/neondatabase/cloud/issues/7408). But let's ignore
that in this issue.

The ratio of walreceiver-initiated spawns vs
consumption-metrics-initiated spawns can be reconstructed from logs
(`spawning logical size computation from context of task kind {:?}"`).
PR #5995 and #6018 adds metrics for this.

First investigation of the ratio lead to the discovery that walreceiver
spawns 75% of init logical size computations.
That's because of two bugs:
- In Safekeepers: https://github.com/neondatabase/neon/issues/5993
- In interaction between Pageservers and Safekeepers:
https://github.com/neondatabase/neon/issues/5962

The safekeeper bug is likely primarily responsible but we don't have the
data yet. The metrics will hopefully provide some insights.

When assessing production-readiness of this PR, please assume that
neither of these bugs are fixed yet.


Changes In This PR
------------------

With this PR, initial logical size calculation is reworked as follows:

First, all initial logical size calculation task_mgr tasks are started
early, as part of timeline activation, and run a retry loop with long
back-off until success. This removes the lazy computation; it was
needless complexity because in practice, we compute all logical sizes
anyways, because consumption metrics collects it.

Second, within the initial logical size calculation task, each attempt
queues behind the background loop concurrency limiter semaphore. This
fixes the performance issue that we pointed out in the "Problem" section
earlier.

Third, there is a twist to queuing behind the background loop
concurrency limiter semaphore. Logical size is needed by Safekeepers
(via walreceiver `PageserverFeedback`) to enforce the project logical
size limit. However, we currently do open walreceiver connections even
before we have an exact logical size. That's bad, and I'll build on top
of this PR to fix that
(https://github.com/neondatabase/neon/issues/5963). But, for the
purposes of this PR, we don't want to introduce a regression, i.e., we
don't want to provide an exact value later than before this PR. The
solution is to introduce a priority-boosting mechanism
(`GetLogicalSizePriority`), allowing callers of
`Timeline::get_current_logical_size` to specify how urgently they need
an exact value. The effect of specifying high urgency is that the
initial logical size calculation task for the timeline will skip the
concurrency limiting semaphore. This should yield effectively the same
behavior as we had before this PR with lazy spawning.

Last, the priority-boosting mechanism obsoletes the `init_order`'s grace
period for initial logical size calculations. It's a separate commit to
reduce the churn during review. We can drop that commit if people think
it's too much churn, and commit it later once we know this PR here
worked as intended.

Experiment With #5479 
---------------------

I validated this PR combined with #5479 to assess whether we're making
forward progress towards asyncification.

The setup is an `i3en.3xlarge` instance with 20k tenants, each with one
timeline that has 9 layers.
All tenants are inactive, i.e., not known to SKs nor storage broker.
This means all initial logical size calculations are spawned by
consumption metrics `MetricsCollection` task kind.
The consumption metrics worker starts requesting logical sizes at low
priority immediately after restart. This is achieved by deleting the
consumption metrics cache file on disk before starting
PS.[^consumption_metrics_cache_file]

[^consumption_metrics_cache_file] Consumption metrics worker persists
its interval across restarts to achieve persistent reporting intervals
across PS restarts; delete the state file on disk to get predictable
(and I believe worst-case in terms of concurrency during PS restart)
behavior.

Before this patch, all of these timelines would all do their initial
logical size calculation in parallel, leading to extreme thrashing in
page cache and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
significantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).


### Critique

The obvious critique with above experiment is that there's no skipping
of the semaphore, i.e., the priority-boosting aspect of this PR is not
exercised.

If even just 1% of our 20k tenants in the setup were active in
SK/storage_broker, then 200 logical size calculations would skip the
limiting semaphore immediately after restart and run concurrently.

Further critique: given the two bugs wrt timeline inactive vs active
state that were mentioned in the Background section, we could have 75%
of our 20k tenants being (falsely) active on restart.

So... (next section)

This Doesn't Make Us Ready For Async VirtualFile
------------------------------------------------

This PR is a step towards asynchronous `VirtualFile`, aka, #5479 or even
#4744.

But it doesn't yet enable us to ship #5479.

The reason is that this PR doesn't limit the amount of high-priority
logical size computations.
If there are many high-priority logical size calculations requested,
we'll fall over like we did if #5479 is applied without this PR.
And currently, at very least due to the bugs mentioned in the Background
section, we run thousands of high-priority logical size calculations on
PS startup in prod.

So, at a minimum, we need to fix these bugs.

Then we can ship #5479 and #4744, and things will likely be fine under
normal operation.

But in high-traffic situations, overload problems will still be more
likely to happen, e.g., VirtualFile cache descriptor thrashing.
The solution candidates for that are orthogonal to this PR though:
* global concurrency limiting
* per-tenant rate limiting => #5899
* load shedding
* scaling bottleneck resources (fd cache size (neondatabase/cloud#8351),
page cache size(neondatabase/cloud#8351), spread load across more PSes,
etc)

Conclusion
----------

Even with the remarks from in the previous section, we should merge this
PR because:
1. it's an improvement over the status quo (esp. if the aforementioned
bugs wrt timeline active / inactive are fixed)
2. it prepares the way for
https://github.com/neondatabase/neon/pull/6010
3. it gets us close to shipping #5479 and #4744
This commit is contained in:
Christian Schwarz
2023-12-04 18:22:26 +01:00
committed by GitHub
parent 7403d55013
commit c7f1143e57
16 changed files with 306 additions and 290 deletions

View File

@@ -402,15 +402,11 @@ fn start_pageserver(
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
let (init_done_tx, init_done_rx) = utils::completion::channel();
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: Some(init_logical_size_done_tx),
background_jobs_can_start: background_jobs_barrier.clone(),
};
@@ -464,7 +460,7 @@ fn start_pageserver(
});
let WaitForPhaseResult {
timeout_remaining: timeout,
timeout_remaining: _timeout,
skipped: init_load_skipped,
} = wait_for_phase("initial_tenant_load", init_load_done, timeout).await;
@@ -472,26 +468,6 @@ fn start_pageserver(
scopeguard::ScopeGuard::into_inner(guard);
let guard = scopeguard::guard_on_success((), |_| {
tracing::info!("Cancelled before initial logical sizes completed")
});
let logical_sizes_done = std::pin::pin!(async {
init_logical_size_done_rx.wait().await;
startup_checkpoint(
started_startup_at,
"initial_logical_sizes",
"Initial logical sizes completed",
);
});
let WaitForPhaseResult {
timeout_remaining: _,
skipped: logical_sizes_skipped,
} = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await;
scopeguard::ScopeGuard::into_inner(guard);
// allow background jobs to start: we either completed prior stages, or they reached timeout
// and were skipped. It is important that we do not let them block background jobs indefinitely,
// because things like consumption metrics for billing are blocked by this barrier.
@@ -514,9 +490,6 @@ fn start_pageserver(
if let Some(f) = init_load_skipped {
f.await;
}
if let Some(f) = logical_sizes_skipped {
f.await;
}
scopeguard::ScopeGuard::into_inner(guard);
startup_checkpoint(started_startup_at, "complete", "Startup complete");

View File

@@ -351,7 +351,12 @@ impl TimelineSnapshot {
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let size = span.in_scope(|| t.get_current_logical_size(ctx));
let size = span.in_scope(|| {
t.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::Background,
ctx,
)
});
match size {
// Only send timeline logical size when it is fully calculated.
CurrentLogicalSize::Exact(ref size) => Some(size.into()),

View File

@@ -338,7 +338,8 @@ async fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = timeline.get_current_logical_size(ctx);
let current_logical_size =
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn_projected = timeline

View File

@@ -186,13 +186,6 @@ pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,
/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.

View File

@@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use crate::task_mgr::TaskKind;
pub(crate) struct StartCalculation(IntCounterVec);
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
StartCalculation(
register_int_counter_vec!(
"pageserver_initial_logical_size_start_calculation",
"Incremented each time we start an initial logical size calculation attempt. \
The `task_kind` label is for the task kind that caused this attempt.",
&["attempt", "task_kind"]
The `circumstances` label provides some additional details.",
&["attempt", "circumstances"]
)
.unwrap(),
)
@@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size {
inc_drop_calculation: Option<IntCounter>,
}
#[derive(strum_macros::IntoStaticStr)]
pub(crate) enum StartCircumstances {
EmptyInitial,
SkippedConcurrencyLimiter,
AfterBackgroundTasksRateLimit,
}
impl StartCalculation {
pub(crate) fn first(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["first", task_kind_label]);
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
}
}
pub(crate) fn retry(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["retry", task_kind_label]);
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
}

View File

@@ -282,6 +282,10 @@ impl Timeline {
}
/// Get a list of all existing relations in given tablespace and database.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn list_rels(
&self,
spcnode: Oid,
@@ -630,6 +634,10 @@ impl Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,

View File

@@ -472,7 +472,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -482,7 +481,6 @@ impl Tenant {
&metadata,
ancestor.clone(),
resources,
init_order,
CreateTimelineCause::Load,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
@@ -683,10 +681,6 @@ impl Tenant {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
let _ = init_order
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
@@ -700,7 +694,6 @@ impl Tenant {
&tenant_clone,
preload,
tenants,
init_order,
&ctx,
)
.await
@@ -713,7 +706,7 @@ impl Tenant {
}
}
match tenant_clone.attach(init_order, preload, &ctx).await {
match tenant_clone.attach(preload, &ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
@@ -776,7 +769,6 @@ impl Tenant {
///
async fn attach(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -789,7 +781,7 @@ impl Tenant {
None => {
// Deprecated dev mode: load from local disk state instead of remote storage
// https://github.com/neondatabase/neon/issues/5624
return self.load_local(init_order, ctx).await;
return self.load_local(ctx).await;
}
};
@@ -884,7 +876,6 @@ impl Tenant {
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
.context("resume_deletion")
@@ -1009,10 +1000,6 @@ impl Tenant {
None
};
// we can load remote timelines during init, but they are assumed to be so rare that
// initialization order is not passed to here.
let init_order = None;
// timeline loading after attach expects to find metadata file for each metadata
save_metadata(
self.conf,
@@ -1030,7 +1017,6 @@ impl Tenant {
Some(index_part),
remote_metadata,
ancestor,
init_order,
ctx,
)
.await
@@ -1272,11 +1258,7 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load_local(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
@@ -1302,7 +1284,7 @@ impl Tenant {
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false)
.load_local_timeline(timeline_id, local_metadata, ctx, false)
.await
{
match e {
@@ -1336,13 +1318,7 @@ impl Tenant {
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(
timeline_id,
local_metadata,
init_order.as_ref(),
ctx,
true,
)
.load_local_timeline(timeline_id, local_metadata, ctx, true)
.await
{
match e {
@@ -1370,12 +1346,11 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip(self, local_metadata, init_order, ctx))]
#[instrument(skip(self, local_metadata, ctx))]
async fn load_local_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
found_delete_mark: bool,
) -> Result<(), LoadLocalTimelineError> {
@@ -1392,7 +1367,6 @@ impl Tenant {
&local_metadata,
None,
self.deletion_queue_client.clone(),
init_order,
)
.await
.context("resume deletion")
@@ -1409,17 +1383,9 @@ impl Tenant {
None
};
self.timeline_init_and_sync(
timeline_id,
resources,
None,
local_metadata,
ancestor,
init_order,
ctx,
)
.await
.map_err(LoadLocalTimelineError::Load)
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
.await
.map_err(LoadLocalTimelineError::Load)
}
pub(crate) fn tenant_id(&self) -> TenantId {
@@ -2314,7 +2280,6 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
init_order: Option<&InitializationOrder>,
cause: CreateTimelineCause,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
@@ -2329,9 +2294,6 @@ impl Tenant {
CreateTimelineCause::Delete => TimelineState::Stopping,
};
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
let pg_version = new_metadata.pg_version();
let timeline = Timeline::new(
@@ -2345,8 +2307,6 @@ impl Tenant {
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
self.cancel.child_token(),
);
@@ -3168,7 +3128,6 @@ impl Tenant {
new_metadata,
ancestor,
resources,
None,
CreateTimelineCause::Load,
)
.context("Failed to create timeline data structure")?;
@@ -3843,7 +3802,7 @@ pub(crate) mod harness {
match mode {
LoadMode::Local => {
tenant
.load_local(None, ctx)
.load_local(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
@@ -3853,7 +3812,7 @@ pub(crate) mod harness {
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
tenant
.attach(None, Some(preload), ctx)
.attach(Some(preload), ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}

View File

@@ -15,7 +15,6 @@ use crate::{
context::RequestContext,
task_mgr::{self, TaskKind},
tenant::mgr::{TenantSlot, TenantsMapRemoveResult},
InitializationOrder,
};
use super::{
@@ -390,7 +389,6 @@ impl DeleteTenantFlow {
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -400,10 +398,7 @@ impl DeleteTenantFlow {
.await
.expect("cant be stopping or broken");
tenant
.attach(init_order, preload, ctx)
.await
.context("attach")?;
tenant.attach(preload, ctx).await.context("attach")?;
Self::background(
guard,

View File

@@ -230,6 +230,10 @@ impl Layer {
///
/// It is up to the caller to collect more data from the previous layer and
/// perform WAL redo, if necessary.
///
/// # Cancellation-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,

View File

@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
Eviction,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
}
impl BackgroundLoopKind {

View File

@@ -20,23 +20,27 @@ use pageserver_api::{
},
shard::TenantShardId,
};
use rand::Rng;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
runtime::Handle,
sync::{oneshot, watch, TryAcquireError},
sync::{oneshot, watch},
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{id::TenantTimelineId, sync::gate::Gate};
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
@@ -298,13 +302,6 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
initial_logical_size_can_start: Option<completion::Barrier>,
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
@@ -453,6 +450,11 @@ pub enum LogicalSizeCalculationCause {
TenantSizeHandler,
}
pub enum GetLogicalSizePriority {
User,
Background,
}
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
@@ -489,6 +491,9 @@ impl Timeline {
/// an ancestor branch, for example, or waste a lot of cycles chasing the
/// non-existing key.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get(
&self,
key: Key,
@@ -849,46 +854,6 @@ impl Timeline {
}
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub(crate) fn get_current_logical_size(
self: &Arc<Self>,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(initial_part_end, ctx);
}
if let CurrentLogicalSize::Approximate(_) = &current_size {
if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
let first = self
.current_logical_size
.did_return_approximate_to_walreceiver
.compare_exchange(
false,
true,
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
)
.is_ok();
if first {
crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
}
}
}
current_size
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
@@ -938,6 +903,7 @@ impl Timeline {
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.spawn_initial_logical_size_computation_task(ctx);
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(background_jobs_can_start);
@@ -1051,17 +1017,6 @@ impl Timeline {
error!("Not activating a Stopping timeline");
}
(_, new_state) => {
if matches!(
new_state,
TimelineState::Stopping | TimelineState::Broken { .. }
) {
// drop the completion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
}
self.state.send_replace(new_state);
}
}
@@ -1383,8 +1338,6 @@ impl Timeline {
walredo_mgr: Arc<super::WalRedoManager>,
resources: TimelineResources,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
state: TimelineState,
cancel: CancellationToken,
) -> Arc<Self> {
@@ -1484,8 +1437,6 @@ impl Timeline {
),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
cancel,
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
@@ -1797,39 +1748,91 @@ impl Timeline {
Ok(())
}
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
let state = self.current_state();
if matches!(
state,
TimelineState::Broken { .. } | TimelineState::Stopping
) {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
return;
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub(crate) fn get_current_logical_size(
self: &Arc<Self>,
priority: GetLogicalSizePriority,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
match (current_size.accuracy(), priority) {
(logical_size::Accuracy::Exact, _) => (), // nothing to do
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
// background task will eventually deliver an exact value, we're in no rush
}
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
// background task is not ready, but user is asking for it now;
// => make the background task skip the line
// (The alternative would be to calculate the size here, but,
// it can actually take a long time if the user has a lot of rels.
// And we'll inevitable need it again; So, let the background task do the work.)
match self
.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore
.get()
{
Some(cancel) => cancel.cancel(),
None => {
let state = self.current_state();
if matches!(
state,
TimelineState::Broken { .. } | TimelineState::Stopping
) {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
// Don't make noise.
} else {
warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
}
}
};
}
}
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
.try_acquire_owned()
{
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => {
// computation already ongoing or finished with success
return;
if let CurrentLogicalSize::Approximate(_) = &current_size {
if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
let first = self
.current_logical_size
.did_return_approximate_to_walreceiver
.compare_exchange(
false,
true,
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
)
.is_ok();
if first {
crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
}
}
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
};
debug_assert!(self
.current_logical_size
.initial_logical_size
.get()
.is_none());
}
current_size
}
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
// nothing to do for freshly created timelines;
assert_eq!(
self.current_logical_size.current_size().accuracy(),
logical_size::Accuracy::Exact,
);
return;
};
let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
self.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
.expect("initial logical size calculation task must be spawned exactly once per Timeline object");
info!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
let causing_task_kind = ctx.task_kind();
// We need to start the computation task.
// It gets a separate context since it will outlive the request that called this function.
let self_clone = Arc::clone(self);
let background_ctx = ctx.detached_child(
TaskKind::InitialLogicalSizeCalculation,
@@ -1844,96 +1847,152 @@ impl Timeline {
false,
// NB: don't log errors here, task_mgr will do that.
async move {
let cancel = task_mgr::shutdown_token();
self_clone
.initial_logical_size_calculation_task(
initial_part_end,
cancel_wait_for_background_loop_concurrency_limit_semaphore,
cancel,
background_ctx,
)
.await;
Ok(())
}
.instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)),
);
}
// in case we were created during pageserver initialization, wait for
// initialization to complete before proceeding. startup time init runs on the same
// runtime.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
async fn initial_logical_size_calculation_task(
self: Arc<Self>,
initial_part_end: Lsn,
skip_concurrency_limiter: CancellationToken,
cancel: CancellationToken,
background_ctx: RequestContext,
) {
enum BackgroundCalculationError {
Cancelled,
Other(anyhow::Error),
}
let try_once = |attempt: usize| {
let background_ctx = &background_ctx;
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let cancel = task_mgr::shutdown_token();
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
background_ctx,
&cancel,
);
use crate::metrics::initial_logical_size::StartCircumstances;
let (_maybe_permit, circumstances) = tokio::select! {
res = wait_for_permit => {
match res {
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
Err(RateLimitError::Cancelled) => {
return Err(BackgroundCalculationError::Cancelled);
}
}
}
() = skip_concurrency_limiter.cancelled() => {
// Some action that is part of a end user interaction requested logical size
// => break out of the rate limit
// TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
// but then again what happens if they cancel; also, we should just be using
// one runtime across the entire process, so, let's leave this for now.
(None, StartCircumstances::SkippedConcurrencyLimiter)
}
};
// hold off background tasks from starting until all timelines get to try at least
// once initial logical size calculation; though retry will rarely be useful.
// holding off is done because heavier tasks execute blockingly on the same
// runtime.
//
// dropping this at every outcome is probably better than trying to cling on to it,
// delay will be terminated by a timeout regardless.
let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
let metrics_guard = match &completion {
Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)),
None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)),
let metrics_guard = if attempt == 1 {
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
} else {
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
};
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
match self_ref
.logical_size_calculation_task(
initial_part_end,
LogicalSizeCalculationCause::Initial,
background_ctx,
)
.await
{
Ok(s) => s,
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
Err(BackgroundCalculationError::Cancelled)
}
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
if let Some(PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
Err(BackgroundCalculationError::Cancelled)
} else {
Err(BackgroundCalculationError::Other(err))
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self_clone
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self_clone.metrics.current_logical_size_gauge.set(sum);
match self_clone
.current_logical_size
.initial_logical_size
.set((calculated_size, metrics_guard.calculation_result_saved()))
{
Ok(()) => (),
Err(_what_we_just_attempted_to_set) => {
let (existing_size, _) = self_clone
.current_logical_size
.initial_logical_size
.get()
.expect("once_cell set was lost, then get failed, impossible.");
// This shouldn't happen because the semaphore is initialized with 1.
// But if it happens, just complain & report success so there are no further retries.
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
}
}
// now that `initial_logical_size.is_some()`, reduce permit count to 0
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
}.in_current_span(),
);
}
};
let retrying = async {
let mut attempt = 0;
loop {
attempt += 1;
match try_once(attempt).await {
Ok(res) => return ControlFlow::Continue(res),
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
Err(BackgroundCalculationError::Other(e)) => {
warn!(attempt, "initial size calculation failed: {e:?}");
// exponential back-off doesn't make sense at these long intervals;
// use fixed retry interval with generous jitter instead
let sleep_duration = Duration::from_secs(
u64::try_from(
// 1hour base
(60_i64 * 60_i64)
// 10min jitter
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
)
.expect("10min < 1hour"),
);
tokio::time::sleep(sleep_duration).await;
}
}
}
};
let (calculated_size, metrics_guard) = tokio::select! {
res = retrying => {
match res {
ControlFlow::Continue(calculated_size) => calculated_size,
ControlFlow::Break(()) => return,
}
}
_ = cancel.cancelled() => {
return;
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self.metrics.current_logical_size_gauge.set(sum);
self.current_logical_size
.initial_logical_size
.set((calculated_size, metrics_guard.calculation_result_saved()))
.ok()
.expect("only this task sets it");
}
pub fn spawn_ondemand_logical_size_calculation(
@@ -1971,6 +2030,9 @@ impl Timeline {
receiver
}
/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
@@ -2008,6 +2070,10 @@ impl Timeline {
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
@@ -2123,6 +2189,10 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
async fn get_reconstruct_data(
&self,
key: Key,
@@ -2371,6 +2441,9 @@ impl Timeline {
}
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,

View File

@@ -21,7 +21,6 @@ use crate::{
},
CreateTimelineCause, DeleteTimelineError, Tenant,
},
InitializationOrder,
};
use super::{Timeline, TimelineResources};
@@ -407,7 +406,6 @@ impl DeleteTimelineFlow {
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -420,7 +418,6 @@ impl DeleteTimelineFlow {
remote_client,
deletion_queue_client,
},
init_order,
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,

View File

@@ -1,11 +1,10 @@
use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::Semaphore;
use once_cell::sync::OnceCell;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::Arc;
/// Internal structure to hold all data needed for logical size calculation.
///
@@ -28,8 +27,12 @@ pub(super) struct LogicalSize {
crate::metrics::initial_logical_size::FinishedCalculationGuard,
)>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Cancellation for the best-effort logical size calculation.
///
/// The token is kept in a once-cell so that we can error out if a higher priority
/// request comes in *before* we have started the normal logical size calculation.
pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
OnceCell<CancellationToken>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
pub initial_part_end: Option<Lsn>,
@@ -72,7 +75,7 @@ pub(crate) enum CurrentLogicalSize {
Exact(Exact),
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum Accuracy {
Approximate,
Exact,
@@ -115,11 +118,10 @@ impl LogicalSize {
Self {
initial_logical_size: OnceCell::with_value((0, {
crate::metrics::initial_logical_size::START_CALCULATION
.first(None)
.first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
.calculation_result_saved()
})),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
did_return_approximate_to_walreceiver: AtomicBool::new(false),
@@ -129,7 +131,7 @@ impl LogicalSize {
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
did_return_approximate_to_walreceiver: AtomicBool::new(false),

View File

@@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let current_timeline_size = timeline
.get_current_logical_size(&ctx)
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy();
let status_update = PageserverFeedback {

View File

@@ -384,7 +384,7 @@ def test_download_remote_layers_api(
env.pageserver.allowed_errors.extend(
[
".*download failed: downloading evicted layer file failed.*",
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed: downloading evicted layer file failed",
]
)

View File

@@ -106,7 +106,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
# Initial tenant load should reflect the delay we injected
("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p),
# Subsequent steps should occur in expected order
("initial_logical_sizes", lambda t, p: t > 0 and t >= p),
("background_jobs_can_start", lambda t, p: t > 0 and t >= p),
("complete", lambda t, p: t > 0 and t >= p),
]