mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
8 Commits
console_ge
...
problame/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57a7b2e8e3 | ||
|
|
ef1848f002 | ||
|
|
c416fbbab1 | ||
|
|
5f83acb35b | ||
|
|
bc98824858 | ||
|
|
8d427ea169 | ||
|
|
67df3d1624 | ||
|
|
1eba00633d |
@@ -400,7 +400,9 @@ pub struct TimelineInfo {
|
||||
/// The LSN that we are advertizing to safekeepers
|
||||
pub remote_consistent_lsn_visible: Lsn,
|
||||
|
||||
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
pub current_logical_size: u64,
|
||||
pub current_logical_size_is_accurate: bool,
|
||||
|
||||
/// Sum of the size of all layer files.
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
|
||||
@@ -402,15 +402,11 @@ fn start_pageserver(
|
||||
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
|
||||
let (init_done_tx, init_done_rx) = utils::completion::channel();
|
||||
|
||||
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
|
||||
|
||||
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
|
||||
|
||||
let order = pageserver::InitializationOrder {
|
||||
initial_tenant_load_remote: Some(init_done_tx),
|
||||
initial_tenant_load: Some(init_remote_done_tx),
|
||||
initial_logical_size_can_start: init_done_rx.clone(),
|
||||
initial_logical_size_attempt: Some(init_logical_size_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
};
|
||||
|
||||
@@ -464,7 +460,7 @@ fn start_pageserver(
|
||||
});
|
||||
|
||||
let WaitForPhaseResult {
|
||||
timeout_remaining: timeout,
|
||||
timeout_remaining: _timeout,
|
||||
skipped: init_load_skipped,
|
||||
} = wait_for_phase("initial_tenant_load", init_load_done, timeout).await;
|
||||
|
||||
@@ -476,20 +472,6 @@ fn start_pageserver(
|
||||
tracing::info!("Cancelled before initial logical sizes completed")
|
||||
});
|
||||
|
||||
let logical_sizes_done = std::pin::pin!(async {
|
||||
init_logical_size_done_rx.wait().await;
|
||||
startup_checkpoint(
|
||||
started_startup_at,
|
||||
"initial_logical_sizes",
|
||||
"Initial logical sizes completed",
|
||||
);
|
||||
});
|
||||
|
||||
let WaitForPhaseResult {
|
||||
timeout_remaining: _,
|
||||
skipped: logical_sizes_skipped,
|
||||
} = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(guard);
|
||||
|
||||
// allow background jobs to start: we either completed prior stages, or they reached timeout
|
||||
@@ -514,9 +496,6 @@ fn start_pageserver(
|
||||
if let Some(f) = init_load_skipped {
|
||||
f.await;
|
||||
}
|
||||
if let Some(f) = logical_sizes_skipped {
|
||||
f.await;
|
||||
}
|
||||
scopeguard::ScopeGuard::into_inner(guard);
|
||||
|
||||
startup_checkpoint(started_startup_at, "complete", "Startup complete");
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::context::RequestContext;
|
||||
use anyhow::Context;
|
||||
use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::EventType;
|
||||
use futures::stream::StreamExt;
|
||||
@@ -352,13 +351,16 @@ impl TimelineSnapshot {
|
||||
|
||||
let current_exact_logical_size = {
|
||||
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
|
||||
let res = span
|
||||
.in_scope(|| t.get_current_logical_size(ctx))
|
||||
.context("get_current_logical_size");
|
||||
match res? {
|
||||
let size = span.in_scope(|| {
|
||||
t.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::Background,
|
||||
ctx,
|
||||
)
|
||||
});
|
||||
match size {
|
||||
// Only send timeline logical size when it is fully calculated.
|
||||
(size, is_exact) if is_exact => Some(size),
|
||||
(_, _) => None,
|
||||
CurrentLogicalSize::Exact(ref size) => Some(size.into()),
|
||||
CurrentLogicalSize::Approximate(_) => None,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -337,13 +337,8 @@ async fn build_timeline_info_common(
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
};
|
||||
let current_logical_size = match timeline.get_current_logical_size(ctx) {
|
||||
Ok((size, _)) => Some(size),
|
||||
Err(err) => {
|
||||
error!("Timeline info creation failed to get current logical size: {err:?}");
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_logical_size =
|
||||
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn_projected = timeline
|
||||
@@ -366,7 +361,11 @@ async fn build_timeline_info_common(
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
current_logical_size,
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
current_logical_size_is_accurate: match current_logical_size.accuracy() {
|
||||
tenant::timeline::logical_size::Accuracy::Approximate => false,
|
||||
tenant::timeline::logical_size::Accuracy::Exact => true,
|
||||
},
|
||||
current_physical_size,
|
||||
current_logical_size_non_incremental: None,
|
||||
timeline_dir_layer_file_size_sum: None,
|
||||
|
||||
@@ -186,13 +186,6 @@ pub struct InitializationOrder {
|
||||
/// Each initial tenant load task carries this until completion.
|
||||
pub initial_tenant_load: Option<utils::completion::Completion>,
|
||||
|
||||
/// Barrier for when we can start initial logical size calculations.
|
||||
pub initial_logical_size_can_start: utils::completion::Barrier,
|
||||
|
||||
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
|
||||
/// attempt. It is important to drop this once the attempt has completed.
|
||||
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
|
||||
|
||||
/// Barrier for when we can start any background jobs.
|
||||
///
|
||||
/// This can be broken up later on, but right now there is just one class of a background job.
|
||||
|
||||
@@ -402,6 +402,137 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define current logical size metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub(crate) struct StartCalculation(IntCounterVec);
|
||||
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
|
||||
StartCalculation(
|
||||
register_int_counter_vec!(
|
||||
"pageserver_initial_logical_size_start_calculation",
|
||||
"Incremented each time we start an initial logical size calculation attempt. \
|
||||
The `circumstances` label provides some additional details.",
|
||||
&["attempt", "circumstances"]
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
});
|
||||
|
||||
struct DropCalculation {
|
||||
first: IntCounter,
|
||||
retry: IntCounter,
|
||||
}
|
||||
|
||||
static DROP_CALCULATION: Lazy<DropCalculation> = Lazy::new(|| {
|
||||
let vec = register_int_counter_vec!(
|
||||
"pageserver_initial_logical_size_drop_calculation",
|
||||
"Incremented each time we abort a started size calculation attmpt.",
|
||||
&["attempt"]
|
||||
)
|
||||
.unwrap();
|
||||
DropCalculation {
|
||||
first: vec.with_label_values(&["first"]),
|
||||
retry: vec.with_label_values(&["retry"]),
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) struct Calculated {
|
||||
pub(crate) births: IntCounter,
|
||||
pub(crate) deaths: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static CALCULATED: Lazy<Calculated> = Lazy::new(|| Calculated {
|
||||
births: register_int_counter!(
|
||||
"pageserver_initial_logical_size_finish_calculation",
|
||||
"Incremented every time we finish calculation of initial logical size.\
|
||||
If everything is working well, this should happen at most once per Timeline object."
|
||||
)
|
||||
.unwrap(),
|
||||
deaths: register_int_counter!(
|
||||
"pageserver_initial_logical_size_drop_finished_calculation",
|
||||
"Incremented when we drop a finished initial logical size calculation result.\
|
||||
Mainly useful to turn pageserver_initial_logical_size_finish_calculation into a gauge."
|
||||
)
|
||||
.unwrap(),
|
||||
});
|
||||
|
||||
pub(crate) struct OngoingCalculationGuard {
|
||||
inc_drop_calculation: Option<IntCounter>,
|
||||
}
|
||||
|
||||
#[derive(strum_macros::IntoStaticStr)]
|
||||
pub(crate) enum StartCircumstances {
|
||||
EmptyInitial,
|
||||
SkippedConcurrencyLimiter,
|
||||
AfterBackgroundTasksRateLimit,
|
||||
}
|
||||
|
||||
impl StartCalculation {
|
||||
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["first", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
|
||||
}
|
||||
}
|
||||
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["retry", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OngoingCalculationGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(counter) = self.inc_drop_calculation.take() {
|
||||
counter.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OngoingCalculationGuard {
|
||||
pub(crate) fn calculation_result_saved(mut self) -> FinishedCalculationGuard {
|
||||
drop(self.inc_drop_calculation.take());
|
||||
CALCULATED.births.inc();
|
||||
FinishedCalculationGuard {
|
||||
inc_on_drop: CALCULATED.deaths.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct FinishedCalculationGuard {
|
||||
inc_on_drop: IntCounter,
|
||||
}
|
||||
|
||||
impl Drop for FinishedCalculationGuard {
|
||||
fn drop(&mut self) {
|
||||
self.inc_on_drop.inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Calls {
|
||||
pub(crate) approximate: IntCounter,
|
||||
pub(crate) exact: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static CALLS: Lazy<Calls> = Lazy::new(|| {
|
||||
let vec = register_int_counter_vec!(
|
||||
"pageserver_initial_logical_size_calls",
|
||||
"Incremented each time some code asks for incremental logical size.\
|
||||
The label records the accuracy of the result.",
|
||||
&["accuracy"]
|
||||
)
|
||||
.unwrap();
|
||||
Calls {
|
||||
approximate: vec.with_label_values(&["approximate"]),
|
||||
exact: vec.with_label_values(&["exact"]),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_states_count",
|
||||
|
||||
@@ -463,7 +463,6 @@ impl Tenant {
|
||||
index_part: Option<IndexPart>,
|
||||
metadata: TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant_id = self.tenant_id;
|
||||
@@ -473,7 +472,6 @@ impl Tenant {
|
||||
&metadata,
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
init_order,
|
||||
CreateTimelineCause::Load,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
@@ -672,10 +670,6 @@ impl Tenant {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_logical_size_attempt.take());
|
||||
let background_jobs_can_start =
|
||||
init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
if let Some(background) = background_jobs_can_start {
|
||||
@@ -689,7 +683,6 @@ impl Tenant {
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
init_order,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -702,7 +695,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
match tenant_clone.attach(init_order, preload, &ctx).await {
|
||||
match tenant_clone.attach(preload, &ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
@@ -765,7 +758,6 @@ impl Tenant {
|
||||
///
|
||||
async fn attach(
|
||||
self: &Arc<Tenant>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
preload: Option<TenantPreload>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -778,7 +770,7 @@ impl Tenant {
|
||||
None => {
|
||||
// Deprecated dev mode: load from local disk state instead of remote storage
|
||||
// https://github.com/neondatabase/neon/issues/5624
|
||||
return self.load_local(init_order, ctx).await;
|
||||
return self.load_local(ctx).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -862,7 +854,6 @@ impl Tenant {
|
||||
&index_part.metadata,
|
||||
Some(remote_timeline_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context("resume_deletion")
|
||||
@@ -987,10 +978,6 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
// we can load remote timelines during init, but they are assumed to be so rare that
|
||||
// initialization order is not passed to here.
|
||||
let init_order = None;
|
||||
|
||||
// timeline loading after attach expects to find metadata file for each metadata
|
||||
save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
|
||||
.await
|
||||
@@ -1003,7 +990,6 @@ impl Tenant {
|
||||
Some(index_part),
|
||||
remote_metadata,
|
||||
ancestor,
|
||||
init_order,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1243,11 +1229,7 @@ impl Tenant {
|
||||
/// files on disk. Used at pageserver startup.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
async fn load_local(
|
||||
self: &Arc<Tenant>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
debug!("loading tenant task");
|
||||
@@ -1273,7 +1255,7 @@ impl Tenant {
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false)
|
||||
.load_local_timeline(timeline_id, local_metadata, ctx, false)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1307,13 +1289,7 @@ impl Tenant {
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
init_order.as_ref(),
|
||||
ctx,
|
||||
true,
|
||||
)
|
||||
.load_local_timeline(timeline_id, local_metadata, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1341,12 +1317,11 @@ impl Tenant {
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip(self, local_metadata, init_order, ctx))]
|
||||
#[instrument(skip(self, local_metadata, ctx))]
|
||||
async fn load_local_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: TimelineMetadata,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
found_delete_mark: bool,
|
||||
) -> Result<(), LoadLocalTimelineError> {
|
||||
@@ -1363,7 +1338,6 @@ impl Tenant {
|
||||
&local_metadata,
|
||||
None,
|
||||
self.deletion_queue_client.clone(),
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
.context("resume deletion")
|
||||
@@ -1380,17 +1354,9 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
None,
|
||||
local_metadata,
|
||||
ancestor,
|
||||
init_order,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(LoadLocalTimelineError::Load)
|
||||
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
|
||||
.await
|
||||
.map_err(LoadLocalTimelineError::Load)
|
||||
}
|
||||
|
||||
pub fn tenant_id(&self) -> TenantId {
|
||||
@@ -2281,7 +2247,6 @@ impl Tenant {
|
||||
new_metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
cause: CreateTimelineCause,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let state = match cause {
|
||||
@@ -2296,9 +2261,6 @@ impl Tenant {
|
||||
CreateTimelineCause::Delete => TimelineState::Stopping,
|
||||
};
|
||||
|
||||
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
|
||||
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
|
||||
|
||||
let pg_version = new_metadata.pg_version();
|
||||
|
||||
let timeline = Timeline::new(
|
||||
@@ -2312,8 +2274,6 @@ impl Tenant {
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
resources,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned().flatten(),
|
||||
state,
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
@@ -3104,7 +3064,6 @@ impl Tenant {
|
||||
new_metadata,
|
||||
ancestor,
|
||||
resources,
|
||||
None,
|
||||
CreateTimelineCause::Load,
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
@@ -3703,7 +3662,7 @@ pub(crate) mod harness {
|
||||
match mode {
|
||||
LoadMode::Local => {
|
||||
tenant
|
||||
.load_local(None, ctx)
|
||||
.load_local(ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
}
|
||||
@@ -3713,7 +3672,7 @@ pub(crate) mod harness {
|
||||
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
tenant
|
||||
.attach(None, Some(preload), ctx)
|
||||
.attach(Some(preload), ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::{
|
||||
context::RequestContext,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::mgr::{TenantSlot, TenantsMapRemoveResult},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -391,7 +390,6 @@ impl DeleteTenantFlow {
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
@@ -401,10 +399,7 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
tenant
|
||||
.attach(init_order, preload, ctx)
|
||||
.await
|
||||
.context("attach")?;
|
||||
tenant.attach(preload, ctx).await.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
|
||||
@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
|
||||
Eviction,
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
InitialLogicalSizeCalculation,
|
||||
}
|
||||
|
||||
impl BackgroundLoopKind {
|
||||
|
||||
@@ -2,7 +2,7 @@ pub mod delete;
|
||||
mod eviction_task;
|
||||
mod init;
|
||||
pub mod layer_manager;
|
||||
mod logical_size;
|
||||
pub(crate) mod logical_size;
|
||||
pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
@@ -16,23 +16,27 @@ use itertools::Itertools;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState,
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{oneshot, watch, TryAcquireError},
|
||||
sync::{oneshot, watch},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{id::TenantTimelineId, sync::gate::Gate};
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
@@ -294,13 +298,6 @@ pub struct Timeline {
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
|
||||
/// Completion shared between all timelines loaded during startup; used to delay heavier
|
||||
/// background tasks until some logical sizes have been calculated.
|
||||
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
|
||||
|
||||
/// Load or creation time information about the disk_consistent_lsn and when the loading
|
||||
/// happened. Used for consumption metrics.
|
||||
pub(crate) loaded_at: (Lsn, SystemTime),
|
||||
@@ -449,6 +446,11 @@ pub enum LogicalSizeCalculationCause {
|
||||
TenantSizeHandler,
|
||||
}
|
||||
|
||||
pub enum GetLogicalSizePriority {
|
||||
User,
|
||||
Background,
|
||||
}
|
||||
|
||||
#[derive(enumset::EnumSetType)]
|
||||
pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
@@ -845,31 +847,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
///
|
||||
/// return size and boolean flag that shows if the size is exact
|
||||
pub fn get_current_logical_size(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(u64, bool)> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let mut is_exact = true;
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
is_exact = false;
|
||||
self.try_spawn_size_init_task(initial_part_end, ctx);
|
||||
}
|
||||
|
||||
Ok((size, is_exact))
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
@@ -922,6 +899,7 @@ impl Timeline {
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
@@ -1027,17 +1005,6 @@ impl Timeline {
|
||||
error!("Not activating a Stopping timeline");
|
||||
}
|
||||
(_, new_state) => {
|
||||
if matches!(
|
||||
new_state,
|
||||
TimelineState::Stopping | TimelineState::Broken { .. }
|
||||
) {
|
||||
// drop the completion guard, if any; it might be holding off the completion
|
||||
// forever needlessly
|
||||
self.initial_logical_size_attempt
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.take();
|
||||
}
|
||||
self.state.send_replace(new_state);
|
||||
}
|
||||
}
|
||||
@@ -1355,8 +1322,6 @@ impl Timeline {
|
||||
walredo_mgr: Arc<super::WalRedoManager>,
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
initial_logical_size_attempt: Option<completion::Completion>,
|
||||
state: TimelineState,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
@@ -1456,8 +1421,6 @@ impl Timeline {
|
||||
),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
|
||||
|
||||
initial_logical_size_can_start,
|
||||
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
|
||||
cancel,
|
||||
gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")),
|
||||
|
||||
@@ -1762,38 +1725,44 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
|
||||
let state = self.current_state();
|
||||
if matches!(
|
||||
state,
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping
|
||||
) {
|
||||
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
///
|
||||
/// return size and boolean flag that shows if the size is exact
|
||||
pub(crate) fn get_current_logical_size(
|
||||
self: &Arc<Self>,
|
||||
priority: GetLogicalSizePriority,
|
||||
_ctx: &RequestContext,
|
||||
) -> logical_size::CurrentLogicalSize {
|
||||
let current_size = self.current_logical_size.current_size();
|
||||
debug!("Current size: {current_size:?}");
|
||||
current_size
|
||||
}
|
||||
|
||||
// if it's not already computed, it computes it _now_
|
||||
pub(crate) async fn get_current_logical_size_wait_exact(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<logical_size::Exact, TimelineCancelled | CalculationError> {
|
||||
self.current_logical_size.initial_logical_size.get_or_try_init(async {
|
||||
// do calcualtion here
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
|
||||
let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
|
||||
// nothing to do for freshly created timelines;
|
||||
assert_eq!(
|
||||
self.current_logical_size.current_size().accuracy(),
|
||||
logical_size::Accuracy::Exact,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
|
||||
.try_acquire_owned()
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
// computation already ongoing or finished with success
|
||||
return;
|
||||
}
|
||||
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
|
||||
};
|
||||
debug_assert!(self
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.get()
|
||||
.is_none());
|
||||
|
||||
info!(
|
||||
"spawning logical size computation from context of task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
// We need to start the computation task.
|
||||
// It gets a separate context since it will outlive the request that called this function.
|
||||
let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
|
||||
self.current_logical_size.cancel_wait_for_background_loop_concurrency_limit_semaphore.set(cancel_wait_for_background_loop_concurrency_limit_semaphore.clone()).expect("initial logical size calculation task must be spawned exactly once per Timeline object");
|
||||
|
||||
let self_clone = Arc::clone(self);
|
||||
let background_ctx = ctx.detached_child(
|
||||
TaskKind::InitialLogicalSizeCalculation,
|
||||
@@ -1808,89 +1777,131 @@ impl Timeline {
|
||||
false,
|
||||
// NB: don't log errors here, task_mgr will do that.
|
||||
async move {
|
||||
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
self_clone
|
||||
.initial_logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore,
|
||||
cancel,
|
||||
background_ctx,
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_id, timeline_id=%self.timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
// in case we were created during pageserver initialization, wait for
|
||||
// initialization to complete before proceeding. startup time init runs on the same
|
||||
// runtime.
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
|
||||
};
|
||||
async fn initial_logical_size_calculation_task(
|
||||
self: Arc<Self>,
|
||||
initial_part_end: Lsn,
|
||||
skip_concurrency_limiter: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
background_ctx: RequestContext,
|
||||
) {
|
||||
enum BackgroundCalculationError {
|
||||
Cancelled,
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
// hold off background tasks from starting until all timelines get to try at least
|
||||
// once initial logical size calculation; though retry will rarely be useful.
|
||||
// holding off is done because heavier tasks execute blockingly on the same
|
||||
// runtime.
|
||||
//
|
||||
// dropping this at every outcome is probably better than trying to cling on to it,
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
let try_once = |attempt: usize| {
|
||||
let background_ctx = &background_ctx;
|
||||
let self_ref = &self;
|
||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
background_ctx,
|
||||
&cancel,
|
||||
);
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
self.current_logical_size.initial_logical_size.get_or_init(async {
|
||||
// do calcualtion here
|
||||
});
|
||||
|
||||
match self_ref
|
||||
.logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
LogicalSizeCalculationCause::Initial,
|
||||
background_ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
// Don't make noise, this is a common task.
|
||||
// In the unlikely case that there is another call to this function, we'll retry
|
||||
// because initial_logical_size is still None.
|
||||
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
|
||||
return Ok(());
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
}
|
||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
||||
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
|
||||
if let Some(PageReconstructError::AncestorStopping(_)) =
|
||||
err.root_cause().downcast_ref()
|
||||
{
|
||||
// This can happen if the timeline parent timeline switches to
|
||||
// Stopping state while we're still calculating the initial
|
||||
// timeline size for the child, for example if the tenant is
|
||||
// being detached or the pageserver is shut down. Like with
|
||||
// CalculateLogicalSizeError::Cancelled, don't make noise.
|
||||
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
|
||||
return Ok(());
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
} else {
|
||||
Err(BackgroundCalculationError::Other(err))
|
||||
}
|
||||
return Err(err.context("Failed to calculate logical size"));
|
||||
}
|
||||
};
|
||||
|
||||
// we cannot query current_logical_size.current_size() to know the current
|
||||
// *negative* value, only truncated to u64.
|
||||
let added = self_clone
|
||||
.current_logical_size
|
||||
.size_added_after_initial
|
||||
.load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum = calculated_size.saturating_add_signed(added);
|
||||
|
||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
||||
self_clone.metrics.current_logical_size_gauge.set(sum);
|
||||
|
||||
match self_clone
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.set(calculated_size)
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(_what_we_just_attempted_to_set) => {
|
||||
let existing_size = self_clone
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.get()
|
||||
.expect("once_cell set was lost, then get failed, impossible.");
|
||||
// This shouldn't happen because the semaphore is initialized with 1.
|
||||
// But if it happens, just complain & report success so there are no further retries.
|
||||
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
|
||||
}
|
||||
}
|
||||
// now that `initial_logical_size.is_some()`, reduce permit count to 0
|
||||
// so that we prevent future callers from spawning this task
|
||||
permit.forget();
|
||||
Ok(())
|
||||
}.in_current_span(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let retrying = async {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
|
||||
match try_once(attempt).await {
|
||||
Ok(res) => return ControlFlow::Continue(res),
|
||||
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
|
||||
Err(BackgroundCalculationError::Other(e)) => {
|
||||
warn!(attempt, "initial size calculation failed: {e:?}");
|
||||
// exponential back-off doesn't make sense at these long intervals;
|
||||
// use fixed retry interval with generous jitter instead
|
||||
let sleep_duration = Duration::from_secs(
|
||||
u64::try_from(
|
||||
// 1hour base
|
||||
(60_i64 * 60_i64)
|
||||
// 10min jitter
|
||||
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
|
||||
)
|
||||
.expect("10min < 1hour"),
|
||||
);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (calculated_size, metrics_guard) = tokio::select! {
|
||||
res = retrying => {
|
||||
match res {
|
||||
ControlFlow::Continue(calculated_size) => calculated_size,
|
||||
ControlFlow::Break(()) => return,
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// we cannot query current_logical_size.current_size() to know the current
|
||||
// *negative* value, only truncated to u64.
|
||||
let added = self
|
||||
.current_logical_size
|
||||
.size_added_after_initial
|
||||
.load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum = calculated_size.saturating_add_signed(added);
|
||||
|
||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
||||
// TODO: shouldn't this simple .add(calculated_size)?
|
||||
self.metrics.current_logical_size_gauge.set(sum);
|
||||
|
||||
self.current_logical_size
|
||||
.initial_logical_size
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.expect("only this task sets it");
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
@@ -1928,6 +1939,7 @@ impl Timeline {
|
||||
receiver
|
||||
}
|
||||
|
||||
/// TODO: must be cancellation safe, I think it is (?)
|
||||
#[instrument(skip_all)]
|
||||
async fn logical_size_calculation_task(
|
||||
self: &Arc<Self>,
|
||||
@@ -2029,16 +2041,14 @@ impl Timeline {
|
||||
// one value while current_logical_size is set to the
|
||||
// other.
|
||||
match logical_size.current_size() {
|
||||
Ok(CurrentLogicalSize::Exact(new_current_size)) => self
|
||||
CurrentLogicalSize::Exact(ref new_current_size) => self
|
||||
.metrics
|
||||
.current_logical_size_gauge
|
||||
.set(new_current_size),
|
||||
Ok(CurrentLogicalSize::Approximate(_)) => {
|
||||
.set(new_current_size.into()),
|
||||
CurrentLogicalSize::Approximate(_) => {
|
||||
// don't update the gauge yet, this allows us not to update the gauge back and
|
||||
// forth between the initial size calculation task.
|
||||
}
|
||||
// this is overflow
|
||||
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ use crate::{
|
||||
},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::{Timeline, TimelineResources};
|
||||
@@ -405,7 +404,6 @@ impl DeleteTimelineFlow {
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
// RemoteTimelineClient is the only functioning part.
|
||||
@@ -418,7 +416,6 @@ impl DeleteTimelineFlow {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
},
|
||||
init_order,
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
CreateTimelineCause::Delete,
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use tokio::sync::Semaphore;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
@@ -23,10 +22,11 @@ pub(super) struct LogicalSize {
|
||||
///
|
||||
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
|
||||
/// the initial size at a different LSN.
|
||||
pub initial_logical_size: OnceCell<u64>,
|
||||
|
||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
||||
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
|
||||
pub initial_logical_size: tokio::sync::OnceCell<(
|
||||
u64,
|
||||
crate::metrics::initial_logical_size::FinishedCalculationGuard,
|
||||
)>,
|
||||
pub cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell<CancellationToken>,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
pub initial_part_end: Option<Lsn>,
|
||||
@@ -56,21 +56,50 @@ pub(super) struct LogicalSize {
|
||||
|
||||
/// Normalized current size, that the data in pageserver occupies.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(super) enum CurrentLogicalSize {
|
||||
pub(crate) enum CurrentLogicalSize {
|
||||
/// The size is not yet calculated to the end, this is an intermediate result,
|
||||
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
|
||||
/// yet total logical size cannot be below 0.
|
||||
Approximate(u64),
|
||||
Approximate(Approximate),
|
||||
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
|
||||
// available for observation without any calculations.
|
||||
Exact(u64),
|
||||
Exact(Exact),
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum Accuracy {
|
||||
Approximate,
|
||||
Exact,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct Approximate(u64);
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct Exact(u64);
|
||||
|
||||
impl From<&Approximate> for u64 {
|
||||
fn from(value: &Approximate) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Exact> for u64 {
|
||||
fn from(val: &Exact) -> Self {
|
||||
val.0
|
||||
}
|
||||
}
|
||||
|
||||
impl CurrentLogicalSize {
|
||||
pub(super) fn size(&self) -> u64 {
|
||||
*match self {
|
||||
Self::Approximate(size) => size,
|
||||
Self::Exact(size) => size,
|
||||
pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
|
||||
match self {
|
||||
Self::Approximate(size) => size.into(),
|
||||
Self::Exact(size) => size.into(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn accuracy(&self) -> Accuracy {
|
||||
match self {
|
||||
Self::Approximate(_) => Accuracy::Approximate,
|
||||
Self::Exact(_) => Accuracy::Exact,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,9 +107,12 @@ impl CurrentLogicalSize {
|
||||
impl LogicalSize {
|
||||
pub(super) fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value(0),
|
||||
// initial_logical_size already computed, so, don't admit any calculations
|
||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
||||
initial_logical_size: OnceCell::with_value((0, {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION
|
||||
.first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
|
||||
.calculation_result_saved()
|
||||
})),
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
@@ -89,25 +121,29 @@ impl LogicalSize {
|
||||
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
|
||||
pub(super) fn current_size(&self) -> CurrentLogicalSize {
|
||||
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
|
||||
// ^^^ keep this type explicit so that the casts in this function break if
|
||||
// we change the type.
|
||||
match self.initial_logical_size.get() {
|
||||
Some(initial_size) => {
|
||||
initial_size.checked_add_signed(size_increment)
|
||||
Some((initial_size, _)) => {
|
||||
crate::metrics::initial_logical_size::CALLS.exact.inc();
|
||||
CurrentLogicalSize::Exact(Exact(initial_size.checked_add_signed(size_increment)
|
||||
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
||||
.map(CurrentLogicalSize::Exact)
|
||||
.unwrap()))
|
||||
}
|
||||
None => {
|
||||
crate::metrics::initial_logical_size::CALLS
|
||||
.approximate
|
||||
.inc();
|
||||
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
|
||||
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
|
||||
CurrentLogicalSize::Approximate(Approximate(non_negative_size_increment))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,7 +157,7 @@ impl LogicalSize {
|
||||
/// available for re-use. This doesn't contain the incremental part.
|
||||
pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
|
||||
match self.initial_part_end {
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, _)| *s),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,6 +206,10 @@ pub(super) async fn connection_manager_loop_step(
|
||||
|
||||
if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
|
||||
info!("Switching to new connection candidate: {new_candidate:?}");
|
||||
tokio::select! {
|
||||
logical_size = connection_manager_state.timeline.get_current_logical_size_wait_exact().await,
|
||||
_ = connection_manager.should_shutdown(),
|
||||
}
|
||||
connection_manager_state
|
||||
.change_connection(new_candidate, ctx)
|
||||
.await
|
||||
|
||||
@@ -396,11 +396,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
// Send the replication feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let (timeline_logical_size, _) = timeline
|
||||
.get_current_logical_size(&ctx)
|
||||
.context("Status update creation failed to get current logical size")?;
|
||||
let current_timeline_size = timeline
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy();
|
||||
let status_update = PageserverFeedback {
|
||||
current_timeline_size: timeline_logical_size,
|
||||
current_timeline_size,
|
||||
last_received_lsn,
|
||||
disk_consistent_lsn,
|
||||
remote_consistent_lsn,
|
||||
|
||||
Reference in New Issue
Block a user