mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
rework initial logical size calculation
This commit is contained in:
@@ -351,7 +351,12 @@ impl TimelineSnapshot {
|
||||
|
||||
let current_exact_logical_size = {
|
||||
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
|
||||
let size = span.in_scope(|| t.get_current_logical_size(ctx));
|
||||
let size = span.in_scope(|| {
|
||||
t.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::Background,
|
||||
ctx,
|
||||
)
|
||||
});
|
||||
match size {
|
||||
// Only send timeline logical size when it is fully calculated.
|
||||
CurrentLogicalSize::Exact(ref size) => Some(size.into()),
|
||||
|
||||
@@ -337,7 +337,8 @@ async fn build_timeline_info_common(
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
};
|
||||
let current_logical_size = timeline.get_current_logical_size(ctx);
|
||||
let current_logical_size =
|
||||
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn_projected = timeline
|
||||
|
||||
@@ -406,16 +406,14 @@ pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
pub(crate) struct StartCalculation(IntCounterVec);
|
||||
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
|
||||
StartCalculation(
|
||||
register_int_counter_vec!(
|
||||
"pageserver_initial_logical_size_start_calculation",
|
||||
"Incremented each time we start an initial logical size calculation attempt. \
|
||||
The `task_kind` label is for the task kind that caused this attempt.",
|
||||
&["attempt", "task_kind"]
|
||||
The `circumstances` label provides some additional details.",
|
||||
&["attempt", "circumstances"]
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -463,19 +461,24 @@ pub(crate) mod initial_logical_size {
|
||||
inc_drop_calculation: Option<IntCounter>,
|
||||
}
|
||||
|
||||
#[derive(strum_macros::IntoStaticStr)]
|
||||
pub(crate) enum StartCircumstances {
|
||||
EmptyInitial,
|
||||
SkippedConcurrencyLimiter,
|
||||
AfterBackgroundTasksRateLimit,
|
||||
}
|
||||
|
||||
impl StartCalculation {
|
||||
pub(crate) fn first(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
|
||||
let task_kind_label: &'static str =
|
||||
causing_task_kind.map(|k| k.into()).unwrap_or_default();
|
||||
self.0.with_label_values(&["first", task_kind_label]);
|
||||
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["first", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
|
||||
}
|
||||
}
|
||||
pub(crate) fn retry(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
|
||||
let task_kind_label: &'static str =
|
||||
causing_task_kind.map(|k| k.into()).unwrap_or_default();
|
||||
self.0.with_label_values(&["retry", task_kind_label]);
|
||||
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["retry", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
|
||||
}
|
||||
|
||||
@@ -672,7 +672,7 @@ impl Tenant {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_logical_size_attempt.take());
|
||||
@@ -689,7 +689,7 @@ impl Tenant {
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
init_order,
|
||||
init_order,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
|
||||
Eviction,
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
InitialLogicalSizeCalculation,
|
||||
}
|
||||
|
||||
impl BackgroundLoopKind {
|
||||
|
||||
@@ -16,23 +16,27 @@ use itertools::Itertools;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState,
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{oneshot, watch, TryAcquireError},
|
||||
sync::{oneshot, watch},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{id::TenantTimelineId, sync::gate::Gate};
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
@@ -449,6 +453,11 @@ pub enum LogicalSizeCalculationCause {
|
||||
TenantSizeHandler,
|
||||
}
|
||||
|
||||
pub enum GetLogicalSizePriority {
|
||||
User,
|
||||
Background,
|
||||
}
|
||||
|
||||
#[derive(enumset::EnumSetType)]
|
||||
pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
@@ -845,28 +854,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
///
|
||||
/// return size and boolean flag that shows if the size is exact
|
||||
pub(crate) fn get_current_logical_size(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
) -> logical_size::CurrentLogicalSize {
|
||||
let current_size = self.current_logical_size.current_size();
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(initial_part_end, ctx);
|
||||
}
|
||||
|
||||
current_size
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
@@ -919,6 +906,7 @@ impl Timeline {
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
@@ -1759,39 +1747,60 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
|
||||
let state = self.current_state();
|
||||
if matches!(
|
||||
state,
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping
|
||||
) {
|
||||
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
|
||||
return;
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
///
|
||||
/// return size and boolean flag that shows if the size is exact
|
||||
pub(crate) fn get_current_logical_size(
|
||||
self: &Arc<Self>,
|
||||
priority: GetLogicalSizePriority,
|
||||
_ctx: &RequestContext,
|
||||
) -> logical_size::CurrentLogicalSize {
|
||||
let current_size = self.current_logical_size.current_size();
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
match (current_size.accuracy(), priority) {
|
||||
(logical_size::Accuracy::Exact, _) => (), // nothing to do
|
||||
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
|
||||
// background task will eventually deliver an exact value, we're in no rush
|
||||
}
|
||||
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
|
||||
// background task is not ready, but user is asking for it now;
|
||||
// => make the background task skip the line
|
||||
// (The alternative would be to calculate the size here, but,
|
||||
// it can actually take a long time if the user has a lot of rels.
|
||||
// And we'll inevitable need it again; So, let the background task do the work.)
|
||||
match self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
.get()
|
||||
{
|
||||
Some(cancel) => cancel.cancel(),
|
||||
None => {
|
||||
warn!("unexpected: priority_tx not set, logical size calculation will not be prioritized");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
|
||||
.try_acquire_owned()
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
// computation already ongoing or finished with success
|
||||
return;
|
||||
}
|
||||
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
|
||||
};
|
||||
debug_assert!(self
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.get()
|
||||
.is_none());
|
||||
current_size
|
||||
}
|
||||
|
||||
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
|
||||
let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
|
||||
// nothing to do for freshly created timelines;
|
||||
assert_eq!(
|
||||
self.current_logical_size.current_size().accuracy(),
|
||||
logical_size::Accuracy::Exact,
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
|
||||
self.current_logical_size.cancel_wait_for_background_loop_concurrency_limit_semaphore.set(cancel_wait_for_background_loop_concurrency_limit_semaphore.clone()).expect("initial logical size calculation task must be spawned exactly once per Timeline object");
|
||||
|
||||
info!(
|
||||
"spawning logical size computation from context of task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
let causing_task_kind = ctx.task_kind();
|
||||
// We need to start the computation task.
|
||||
// It gets a separate context since it will outlive the request that called this function.
|
||||
let self_clone = Arc::clone(self);
|
||||
let background_ctx = ctx.detached_child(
|
||||
TaskKind::InitialLogicalSizeCalculation,
|
||||
@@ -1806,96 +1815,151 @@ impl Timeline {
|
||||
false,
|
||||
// NB: don't log errors here, task_mgr will do that.
|
||||
async move {
|
||||
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
self_clone
|
||||
.initial_logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore,
|
||||
cancel,
|
||||
background_ctx,
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
}
|
||||
|
||||
// in case we were created during pageserver initialization, wait for
|
||||
// initialization to complete before proceeding. startup time init runs on the same
|
||||
// runtime.
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
|
||||
};
|
||||
async fn initial_logical_size_calculation_task(
|
||||
self: Arc<Self>,
|
||||
initial_part_end: Lsn,
|
||||
skip_concurrency_limiter: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
background_ctx: RequestContext,
|
||||
) {
|
||||
enum BackgroundCalculationError {
|
||||
Cancelled,
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
let retrying = {
|
||||
let self_clone = self.clone();
|
||||
async move {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
let try_once_res = async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
&background_ctx,
|
||||
&cancel,
|
||||
);
|
||||
|
||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||
let (_maybe_permit, circumstances) = tokio::select! {
|
||||
res = wait_for_permit => {
|
||||
match res {
|
||||
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
|
||||
Err(RateLimitError::Cancelled) => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
() = skip_concurrency_limiter.cancelled() => {
|
||||
// Some action that is part of a end user interaction requested logical size
|
||||
// => break out of the rate limit
|
||||
// TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
|
||||
// but then again what happens if they cancel; also, we should just be using
|
||||
// one runtime across the entire process, so, let's leave this for now.
|
||||
(None, StartCircumstances::SkippedConcurrencyLimiter)
|
||||
}
|
||||
};
|
||||
|
||||
// hold off background tasks from starting until all timelines get to try at least
|
||||
// once initial logical size calculation; though retry will rarely be useful.
|
||||
// holding off is done because heavier tasks execute blockingly on the same
|
||||
// runtime.
|
||||
//
|
||||
// dropping this at every outcome is probably better than trying to cling on to it,
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
let metrics_guard = if attempt == 1 {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
|
||||
} else {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
|
||||
};
|
||||
|
||||
let metrics_guard = match &completion {
|
||||
Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)),
|
||||
None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)),
|
||||
};
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
// Don't make noise, this is a common task.
|
||||
// In the unlikely case that there is another call to this function, we'll retry
|
||||
// because initial_logical_size is still None.
|
||||
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
|
||||
return Ok(());
|
||||
}
|
||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
||||
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
|
||||
err.root_cause().downcast_ref()
|
||||
match self_clone
|
||||
.logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
LogicalSizeCalculationCause::Initial,
|
||||
&background_ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// This can happen if the timeline parent timeline switches to
|
||||
// Stopping state while we're still calculating the initial
|
||||
// timeline size for the child, for example if the tenant is
|
||||
// being detached or the pageserver is shut down. Like with
|
||||
// CalculateLogicalSizeError::Cancelled, don't make noise.
|
||||
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
|
||||
return Ok(());
|
||||
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
}
|
||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
||||
if let Some(PageReconstructError::AncestorStopping(_)) =
|
||||
err.root_cause().downcast_ref()
|
||||
{
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
} else {
|
||||
Err(BackgroundCalculationError::Other(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
return Err(err.context("Failed to calculate logical size"));
|
||||
}
|
||||
};
|
||||
.await;
|
||||
|
||||
// we cannot query current_logical_size.current_size() to know the current
|
||||
// *negative* value, only truncated to u64.
|
||||
let added = self_clone
|
||||
.current_logical_size
|
||||
.size_added_after_initial
|
||||
.load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum = calculated_size.saturating_add_signed(added);
|
||||
|
||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
||||
self_clone.metrics.current_logical_size_gauge.set(sum);
|
||||
|
||||
match self_clone
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(_what_we_just_attempted_to_set) => {
|
||||
let (existing_size, _) = self_clone
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.get()
|
||||
.expect("once_cell set was lost, then get failed, impossible.");
|
||||
// This shouldn't happen because the semaphore is initialized with 1.
|
||||
// But if it happens, just complain & report success so there are no further retries.
|
||||
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
|
||||
match try_once_res {
|
||||
Ok(res) => return ControlFlow::Continue(res),
|
||||
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
|
||||
Err(BackgroundCalculationError::Other(e)) => {
|
||||
warn!(attempt, "initial size calculation failed: {e:?}");
|
||||
// exponential back-off doesn't make sense at these long intervals;
|
||||
// use fixed retry interval with generous jitter instead
|
||||
let sleep_duration = Duration::from_secs(
|
||||
u64::try_from(
|
||||
// 1hour base
|
||||
(60_i64 * 60_i64)
|
||||
// 10min jitter
|
||||
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
|
||||
)
|
||||
.expect("10min < 1hour"),
|
||||
);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
// now that `initial_logical_size.is_some()`, reduce permit count to 0
|
||||
// so that we prevent future callers from spawning this task
|
||||
permit.forget();
|
||||
Ok(())
|
||||
}.in_current_span(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let (calculated_size, metrics_guard) = tokio::select! {
|
||||
res = retrying => {
|
||||
match res {
|
||||
ControlFlow::Continue(calculated_size) => calculated_size,
|
||||
ControlFlow::Break(()) => return,
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// we cannot query current_logical_size.current_size() to know the current
|
||||
// *negative* value, only truncated to u64.
|
||||
let added = self
|
||||
.current_logical_size
|
||||
.size_added_after_initial
|
||||
.load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum = calculated_size.saturating_add_signed(added);
|
||||
|
||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
||||
// TODO: shouldn't this simple .add(calculated_size)?
|
||||
self.metrics.current_logical_size_gauge.set(sum);
|
||||
|
||||
self.current_logical_size
|
||||
.initial_logical_size
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.expect("only this task sets it");
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
@@ -1933,6 +1997,7 @@ impl Timeline {
|
||||
receiver
|
||||
}
|
||||
|
||||
/// TODO: must be cancellation safe, I think it is (?)
|
||||
#[instrument(skip_all)]
|
||||
async fn logical_size_calculation_task(
|
||||
self: &Arc<Self>,
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use tokio::sync::Semaphore;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
@@ -27,9 +26,7 @@ pub(super) struct LogicalSize {
|
||||
u64,
|
||||
crate::metrics::initial_logical_size::FinishedCalculationGuard,
|
||||
)>,
|
||||
|
||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
||||
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
|
||||
pub cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell<CancellationToken>,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
pub initial_part_end: Option<Lsn>,
|
||||
@@ -69,7 +66,7 @@ pub(crate) enum CurrentLogicalSize {
|
||||
Exact(Exact),
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum Accuracy {
|
||||
Approximate,
|
||||
Exact,
|
||||
@@ -112,11 +109,10 @@ impl LogicalSize {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value((0, {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION
|
||||
.first(None)
|
||||
.first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
|
||||
.calculation_result_saved()
|
||||
})),
|
||||
// initial_logical_size already computed, so, don't admit any calculations
|
||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
@@ -125,7 +121,7 @@ impl LogicalSize {
|
||||
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
|
||||
@@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// Send the replication feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let current_timeline_size = timeline
|
||||
.get_current_logical_size(&ctx)
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy();
|
||||
let status_update = PageserverFeedback {
|
||||
|
||||
Reference in New Issue
Block a user