diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7a8d4ced6f..4a80708b4a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,6 +1,7 @@ //! mod eviction_task; +mod logical_size; pub mod span; pub mod uninit; mod walreceiver; @@ -10,7 +11,6 @@ use bytes::Bytes; use fail::fail_point; use futures::StreamExt; use itertools::Itertools; -use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, @@ -19,7 +19,7 @@ use pageserver_api::models::{ use remote_storage::GenericRemoteStorage; use serde_with::serde_as; use storage_broker::BrokerClientChannel; -use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; +use tokio::sync::{oneshot, watch, TryAcquireError}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TenantTimelineId; @@ -30,7 +30,7 @@ use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; -use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; +use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; @@ -40,6 +40,7 @@ use crate::tenant::storage_layer::{ DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer, }; +use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ ephemeral_file::is_ephemeral_file, layer_map::{LayerMap, SearchResult}, @@ -81,6 +82,7 @@ use crate::{is_temporary, task_mgr}; pub(super) use self::eviction_task::EvictionTaskTenantState; use self::eviction_task::EvictionTaskTimelineState; +use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; @@ -367,126 +369,6 @@ pub struct Timeline { initial_logical_size_attempt: Mutex>, } -/// Internal structure to hold all data needed for logical size calculation. -/// -/// Calculation consists of two stages: -/// -/// 1. Initial size calculation. That might take a long time, because it requires -/// reading all layers containing relation sizes at `initial_part_end`. -/// -/// 2. Collecting an incremental part and adding that to the initial size. -/// Increments are appended on walreceiver writing new timeline data, -/// which result in increase or decrease of the logical size. -struct LogicalSize { - /// Size, potentially slow to compute. Calculating this might require reading multiple - /// layers, and even ancestor's layers. - /// - /// NOTE: size at a given LSN is constant, but after a restart we will calculate - /// the initial size at a different LSN. - initial_logical_size: OnceCell, - - /// Semaphore to track ongoing calculation of `initial_logical_size`. - initial_size_computation: Arc, - - /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. - initial_part_end: Option, - - /// All other size changes after startup, combined together. - /// - /// Size shouldn't ever be negative, but this is signed for two reasons: - /// - /// 1. If we initialized the "baseline" size lazily, while we already - /// process incoming WAL, the incoming WAL records could decrement the - /// variable and temporarily make it negative. (This is just future-proofing; - /// the initialization is currently not done lazily.) - /// - /// 2. If there is a bug and we e.g. forget to increment it in some cases - /// when size grows, but remember to decrement it when it shrinks again, the - /// variable could go negative. In that case, it seems better to at least - /// try to keep tracking it, rather than clamp or overflow it. Note that - /// get_current_logical_size() will clamp the returned value to zero if it's - /// negative, and log an error. Could set it permanently to zero or some - /// special value to indicate "broken" instead, but this will do for now. - /// - /// Note that we also expose a copy of this value as a prometheus metric, - /// see `current_logical_size_gauge`. Use the `update_current_logical_size` - /// to modify this, it will also keep the prometheus metric in sync. - size_added_after_initial: AtomicI64, -} - -/// Normalized current size, that the data in pageserver occupies. -#[derive(Debug, Clone, Copy)] -enum CurrentLogicalSize { - /// The size is not yet calculated to the end, this is an intermediate result, - /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative, - /// yet total logical size cannot be below 0. - Approximate(u64), - // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are - // available for observation without any calculations. - Exact(u64), -} - -impl CurrentLogicalSize { - fn size(&self) -> u64 { - *match self { - Self::Approximate(size) => size, - Self::Exact(size) => size, - } - } -} - -impl LogicalSize { - fn empty_initial() -> Self { - Self { - initial_logical_size: OnceCell::with_value(0), - // initial_logical_size already computed, so, don't admit any calculations - initial_size_computation: Arc::new(Semaphore::new(0)), - initial_part_end: None, - size_added_after_initial: AtomicI64::new(0), - } - } - - fn deferred_initial(compute_to: Lsn) -> Self { - Self { - initial_logical_size: OnceCell::new(), - initial_size_computation: Arc::new(Semaphore::new(1)), - initial_part_end: Some(compute_to), - size_added_after_initial: AtomicI64::new(0), - } - } - - fn current_size(&self) -> anyhow::Result { - let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire); - // ^^^ keep this type explicit so that the casts in this function break if - // we change the type. - match self.initial_logical_size.get() { - Some(initial_size) => { - initial_size.checked_add_signed(size_increment) - .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}")) - .map(CurrentLogicalSize::Exact) - } - None => { - let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0); - Ok(CurrentLogicalSize::Approximate(non_negative_size_increment)) - } - } - } - - fn increment_size(&self, delta: i64) { - self.size_added_after_initial - .fetch_add(delta, AtomicOrdering::SeqCst); - } - - /// Make the value computed by initial logical size computation - /// available for re-use. This doesn't contain the incremental part. - fn initialized_size(&self, lsn: Lsn) -> Option { - match self.initial_part_end { - Some(v) if v == lsn => self.initial_logical_size.get().copied(), - _ => None, - } - } -} - pub struct WalReceiverInfo { pub wal_source_connconf: PgConnectionConfig, pub last_received_msg_lsn: Lsn, diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs new file mode 100644 index 0000000000..d9c2bc4cb9 --- /dev/null +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -0,0 +1,128 @@ +use anyhow::Context; +use once_cell::sync::OnceCell; + +use tokio::sync::Semaphore; +use utils::lsn::Lsn; + +use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; +use std::sync::Arc; + +/// Internal structure to hold all data needed for logical size calculation. +/// +/// Calculation consists of two stages: +/// +/// 1. Initial size calculation. That might take a long time, because it requires +/// reading all layers containing relation sizes at `initial_part_end`. +/// +/// 2. Collecting an incremental part and adding that to the initial size. +/// Increments are appended on walreceiver writing new timeline data, +/// which result in increase or decrease of the logical size. +pub(super) struct LogicalSize { + /// Size, potentially slow to compute. Calculating this might require reading multiple + /// layers, and even ancestor's layers. + /// + /// NOTE: size at a given LSN is constant, but after a restart we will calculate + /// the initial size at a different LSN. + pub initial_logical_size: OnceCell, + + /// Semaphore to track ongoing calculation of `initial_logical_size`. + pub initial_size_computation: Arc, + + /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. + pub initial_part_end: Option, + + /// All other size changes after startup, combined together. + /// + /// Size shouldn't ever be negative, but this is signed for two reasons: + /// + /// 1. If we initialized the "baseline" size lazily, while we already + /// process incoming WAL, the incoming WAL records could decrement the + /// variable and temporarily make it negative. (This is just future-proofing; + /// the initialization is currently not done lazily.) + /// + /// 2. If there is a bug and we e.g. forget to increment it in some cases + /// when size grows, but remember to decrement it when it shrinks again, the + /// variable could go negative. In that case, it seems better to at least + /// try to keep tracking it, rather than clamp or overflow it. Note that + /// get_current_logical_size() will clamp the returned value to zero if it's + /// negative, and log an error. Could set it permanently to zero or some + /// special value to indicate "broken" instead, but this will do for now. + /// + /// Note that we also expose a copy of this value as a prometheus metric, + /// see `current_logical_size_gauge`. Use the `update_current_logical_size` + /// to modify this, it will also keep the prometheus metric in sync. + pub size_added_after_initial: AtomicI64, +} + +/// Normalized current size, that the data in pageserver occupies. +#[derive(Debug, Clone, Copy)] +pub(super) enum CurrentLogicalSize { + /// The size is not yet calculated to the end, this is an intermediate result, + /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative, + /// yet total logical size cannot be below 0. + Approximate(u64), + // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are + // available for observation without any calculations. + Exact(u64), +} + +impl CurrentLogicalSize { + pub(super) fn size(&self) -> u64 { + *match self { + Self::Approximate(size) => size, + Self::Exact(size) => size, + } + } +} + +impl LogicalSize { + pub(super) fn empty_initial() -> Self { + Self { + initial_logical_size: OnceCell::with_value(0), + // initial_logical_size already computed, so, don't admit any calculations + initial_size_computation: Arc::new(Semaphore::new(0)), + initial_part_end: None, + size_added_after_initial: AtomicI64::new(0), + } + } + + pub(super) fn deferred_initial(compute_to: Lsn) -> Self { + Self { + initial_logical_size: OnceCell::new(), + initial_size_computation: Arc::new(Semaphore::new(1)), + initial_part_end: Some(compute_to), + size_added_after_initial: AtomicI64::new(0), + } + } + + pub(super) fn current_size(&self) -> anyhow::Result { + let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire); + // ^^^ keep this type explicit so that the casts in this function break if + // we change the type. + match self.initial_logical_size.get() { + Some(initial_size) => { + initial_size.checked_add_signed(size_increment) + .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}")) + .map(CurrentLogicalSize::Exact) + } + None => { + let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0); + Ok(CurrentLogicalSize::Approximate(non_negative_size_increment)) + } + } + } + + pub(super) fn increment_size(&self, delta: i64) { + self.size_added_after_initial + .fetch_add(delta, AtomicOrdering::SeqCst); + } + + /// Make the value computed by initial logical size computation + /// available for re-use. This doesn't contain the incremental part. + pub(super) fn initialized_size(&self, lsn: Lsn) -> Option { + match self.initial_part_end { + Some(v) if v == lsn => self.initial_logical_size.get().copied(), + _ => None, + } + } +}