diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 40d2a0e0ef..68a26b8098 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Context, Result}; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use remote_storage::GenericRemoteStorage; +use tokio_util::sync::CancellationToken; use tracing::*; use super::models::{ @@ -86,8 +87,14 @@ fn build_timeline_info( ) -> anyhow::Result { let mut info = build_timeline_info_common(timeline)?; if include_non_incremental_logical_size { + // XXX we should be using spawn_ondemand_logical_size_calculation here. + // Otherwise, if someone deletes the timeline / detaches the tenant while + // we're executing this function, we will outlive the timeline on-disk state. info.current_logical_size_non_incremental = - Some(timeline.get_current_logical_size_non_incremental(info.last_record_lsn)?); + Some(timeline.get_current_logical_size_non_incremental( + info.last_record_lsn, + CancellationToken::new(), + )?); } if include_non_incremental_physical_size { info.current_physical_size_non_incremental = diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0e334a63df..797ee9f436 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -10,7 +10,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::tenant::Timeline; use crate::walrecord::NeonWalRecord; -use anyhow::{bail, ensure, Result}; +use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, Bytes}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -19,6 +19,7 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::Range; +use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -33,6 +34,14 @@ pub enum LsnForTimestamp { NoData(Lsn), } +#[derive(Debug, thiserror::Error)] +pub enum CalculateLogicalSizeError { + #[error("cancelled")] + Cancelled, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The @@ -376,14 +385,21 @@ impl Timeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. - pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + pub fn get_current_logical_size_non_incremental( + &self, + lsn: Lsn, + cancel: CancellationToken, + ) -> std::result::Result { // Fetch list of database dirs and iterate them let buf = self.get(DBDIR_KEY, lsn)?; - let dbdir = DbDirectory::des(&buf)?; + let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?; let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self.list_rels(*spcnode, *dbnode, lsn)? { + if cancel.is_cancelled() { + return Err(CalculateLogicalSizeError::Cancelled); + } let relsize_key = rel_size_to_key(rel); let mut buf = self.get(relsize_key, lsn)?; let relsize = buf.get_u32_le(); diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 24d9b2a10e..597461ce29 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -3,8 +3,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::Context; +use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; +use crate::pgdatadir_mapping::CalculateLogicalSizeError; + use super::Tenant; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -212,11 +215,30 @@ pub(super) async fn gather_inputs( let mut have_any_error = false; while let Some(res) = joinset.join_next().await { - // each of these come with Result, JoinError> + // each of these come with Result, JoinError> // because of spawn + spawn_blocking - let res = res.and_then(|inner| inner); match res { - Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size))) => { + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures, nor should be"); + } + Err(join_error) => { + // cannot really do anything, as this panic is likely a bug + error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"); + have_any_error = true; + } + Ok(Err(recv_result_error)) => { + // cannot really do anything, as this panic is likely a bug + error!("failed to receive logical size query result: {recv_result_error:#}"); + have_any_error = true; + } + Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => { + warn!( + timeline_id=%timeline.timeline_id, + "failed to calculate logical size at {lsn}: {error:#}" + ); + have_any_error = true; + } + Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => { debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); logical_size_cache.insert((timeline.timeline_id, lsn), size); @@ -228,21 +250,6 @@ pub(super) async fn gather_inputs( command: Command::Update(size), }); } - Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error))) => { - warn!( - timeline_id=%timeline.timeline_id, - "failed to calculate logical size at {lsn}: {error:#}" - ); - have_any_error = true; - } - Err(join_error) if join_error.is_cancelled() => { - unreachable!("we are not cancelling any of the futures, nor should be"); - } - Err(join_error) => { - // cannot really do anything, as this panic is likely a bug - error!("logical size query panicked: {join_error:#}"); - have_any_error = true; - } } } @@ -351,7 +358,7 @@ enum LsnKind { struct TimelineAtLsnSizeResult( Arc, utils::lsn::Lsn, - anyhow::Result, + Result, ); #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))] @@ -359,17 +366,15 @@ async fn calculate_logical_size( limit: Arc, timeline: Arc, lsn: utils::lsn::Lsn, -) -> Result { - let permit = tokio::sync::Semaphore::acquire_owned(limit) +) -> Result { + let _permit = tokio::sync::Semaphore::acquire_owned(limit) .await .expect("global semaphore should not had been closed"); - tokio::task::spawn_blocking(move || { - let _permit = permit; - let size_res = timeline.calculate_logical_size(lsn); - TimelineAtLsnSizeResult(timeline, lsn, size_res) - }) - .await + let size_res = timeline + .spawn_ondemand_logical_size_calculation(lsn) + .await?; + Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) } #[test] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b7f12609e6..3373c52231 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6,8 +6,9 @@ use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::TimelineState; -use tokio::sync::watch; +use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio::task::spawn_blocking; +use tokio_util::sync::CancellationToken; use tracing::*; use std::cmp::{max, min, Ordering}; @@ -36,9 +37,9 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::metrics::TimelineMetrics; -use crate::pgdatadir_mapping::BlockNumber; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; +use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; use crate::tenant_config::TenantConfOpt; use pageserver_api::reltag::RelTag; @@ -176,7 +177,6 @@ pub struct Timeline { /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, - initial_size_computation_state: Mutex, /// Information about the last processed message by the WAL receiver, /// or None if WAL receiver has not received anything for this timeline @@ -189,14 +189,6 @@ pub struct Timeline { state: watch::Sender, } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -enum InitialLogicalSizeComputationState { - NotStarted, - Running, - FailedWillRetryNextTime, - Success, -} - /// Internal structure to hold all data needed for logical size calculation. /// Calculation consists of two parts: /// 1. Initial size calculation. That might take a long time, because it requires @@ -210,6 +202,8 @@ struct LogicalSize { /// /// NOTE: initial size is not a constant and will change between restarts. initial_logical_size: OnceCell, + /// Semaphore to track ongoing calculation of `initial_logical_size`. + initial_size_computation: Arc, /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. initial_part_end: Option, /// All other size changes after startup, combined together. @@ -260,6 +254,8 @@ impl LogicalSize { fn empty_initial() -> Self { Self { initial_logical_size: OnceCell::with_value(0), + // initial_logical_size already computed, so, don't admit any calculations + initial_size_computation: Arc::new(Semaphore::new(0)), initial_part_end: None, size_added_after_initial: AtomicI64::new(0), } @@ -268,6 +264,7 @@ impl LogicalSize { fn deferred_initial(compute_to: Lsn) -> Self { Self { initial_logical_size: OnceCell::new(), + initial_size_computation: Arc::new(Semaphore::new(1)), initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), } @@ -812,9 +809,6 @@ impl Timeline { // initial logical size is 0. LogicalSize::empty_initial() }, - initial_size_computation_state: Mutex::new( - InitialLogicalSizeComputationState::NotStarted, - ), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -1231,13 +1225,21 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - use InitialLogicalSizeComputationState::*; - let mut guard = self.initial_size_computation_state.lock().unwrap(); - match *guard { - Running | Success => return, - NotStarted | FailedWillRetryNextTime => *guard = Running, - } - drop(guard); + let permit = match Arc::clone(&self.current_logical_size.initial_size_computation) + .try_acquire_owned() + { + Ok(permit) => permit, + Err(TryAcquireError::NoPermits) => { + // computation already ongoing or finished with success + return; + } + Err(TryAcquireError::Closed) => unreachable!("we never call close"), + }; + debug_assert!(self + .current_logical_size + .initial_logical_size + .get() + .is_none()); // We need to start the computation task. let self_clone = Arc::clone(self); task_mgr::spawn( @@ -1247,79 +1249,131 @@ impl Timeline { Some(self.timeline_id), "initial size calculation", false, + // NB: don't log errors here, task_mgr will do that. async move { - let res = self_clone - .initial_logical_size_calculation_task(init_lsn) - .await; - // task_mgr will log the result - let new_state = match res { - Ok(_) => Success, - Err(_) => FailedWillRetryNextTime, + let calculated_size = match self_clone.logical_size_calculation_task(init_lsn).await + { + Ok(s) => s, + Err(CalculateLogicalSizeError::Cancelled) => { + // Don't make noise, this is a common task. + // In the unlikely case that there ihs another call to this function, we'll retry + // because initial_logical_size is still None. + info!("initial size calculation cancelled, likely timeline delete / tenant detach"); + return Ok(()); + } + x @ Err(_) => x.context("Failed to calculate logical size")?, }; - let mut state = self_clone.initial_size_computation_state.lock().unwrap(); - if *state != Running { - // Should be unreachable, but no reason to crash the pageserver. Don't touch anything. - error!("expecting initial size computation task to be in state {Running:?}, got {state:?}") - } else { - *state = new_state; - } - res - }, - ); - } - - #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] - async fn initial_logical_size_calculation_task( - self: &Arc, - init_lsn: Lsn, - ) -> anyhow::Result<()> { - let mut timeline_state_updates = self.subscribe_for_state_updates(); - let self_calculation = Arc::clone(self); - tokio::select! { - calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { - let calculated_size = calculation_result - .context("Failed to spawn calculation result task")? - .context("Failed to calculate logical size")?; - match self.current_logical_size.initial_logical_size.set(calculated_size) { + match self_clone + .current_logical_size + .initial_logical_size + .set(calculated_size) + { Ok(()) => (), Err(existing_size) => { - // This shouldn't happen because we use self.initial_size_computation_running to ensure exlusivity here. + // This shouldn't happen because the semaphore is initialized with 1. // But if it happens, just complain & report success so there are no further retries. error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") } } + // now that `initial_logical_size.is_some()`, reduce permit count to 0 + // so that we prevent future callers from spawning this task + permit.forget(); Ok(()) }, - _ = task_mgr::shutdown_watcher() => { - anyhow::bail!("aborted because task_mgr shutdown requested"); - } - new_event = async { - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = *timeline_state_updates.borrow(); - match new_state { - // we're running this job for active timelines only - TimelineState::Active => continue, - TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state), + ); + } + + pub fn spawn_ondemand_logical_size_calculation( + self: &Arc, + lsn: Lsn, + ) -> oneshot::Receiver> { + let (sender, receiver) = oneshot::channel(); + let self_clone = Arc::clone(self); + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + task_mgr::TaskKind::InitialLogicalSizeCalculation, + Some(self.tenant_id), + Some(self.timeline_id), + "ondemand logical size calculation", + false, + async move { + let res = self_clone.logical_size_calculation_task(lsn).await; + let _ = sender.send(res).ok(); + Ok(()) // Receiver is responsible for handling errors + }, + ); + receiver + } + + #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] + async fn logical_size_calculation_task( + self: &Arc, + init_lsn: Lsn, + ) -> Result { + let mut timeline_state_updates = self.subscribe_for_state_updates(); + let self_calculation = Arc::clone(self); + let cancel = CancellationToken::new(); + + let calculation = async { + let cancel = cancel.child_token(); + spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn, cancel)) + .await + .context("Failed to spawn calculation result task")? + }; + let timeline_state_cancellation = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken + | TimelineState::Stopping + | TimelineState::Suspended => { + break format!("aborted because timeline became inactive (new state: {new_state:?})") } } - Err(_sender_dropped_error) => return None, + } + Err(_sender_dropped_error) => { + // can't happen, the sender is not dropped as long as the Timeline exists + break "aborted because state watch was dropped".to_string(); } } - } => { - match new_event { - Some(new_state) => anyhow::bail!("aborted because timeline became inactive (new state: {new_state:?})"), - None => anyhow::bail!("aborted because state watch was dropped"), // can't happen, the sender is not dropped as long as the Timeline exists + } + }; + + let taskmgr_shutdown_cancellation = async { + task_mgr::shutdown_watcher().await; + "aborted because task_mgr shutdown requested".to_string() + }; + + tokio::pin!(calculation); + loop { + tokio::select! { + res = &mut calculation => { return res } + reason = timeline_state_cancellation => { + debug!(reason = reason, "cancelling calculation"); + cancel.cancel(); + return calculation.await; } - }, + reason = taskmgr_shutdown_cancellation => { + debug!(reason = reason, "cancelling calculation"); + cancel.cancel(); + return calculation.await; + } + } } } /// Calculate the logical size of the database at the latest LSN. /// /// NOTE: counted incrementally, includes ancestors, this can be a slow operation. - pub fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result { + pub fn calculate_logical_size( + &self, + up_to_lsn: Lsn, + cancel: CancellationToken, + ) -> Result { info!( "Calculating logical size for timeline {} at {}", self.timeline_id, up_to_lsn @@ -1360,7 +1414,7 @@ impl Timeline { } else { self.metrics.logical_size_histo.start_timer() }; - let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?; + let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn, cancel)?; debug!("calculated logical size: {logical_size}"); timer.stop_and_record(); Ok(logical_size) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 38660cefac..523c946a68 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -265,10 +265,6 @@ def test_timeline_initial_logical_size_calculation_cancellation( log.info( f"try to delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish" ) - if deletion_method == "timeline_delete": - env.pageserver.allowed_errors.append( - f".*initial size calculation.*{tenant_id}.*{timeline_id}.*aborted because task_mgr shutdown requested" - ) delete_timeline_success: queue.Queue[bool] = queue.Queue(maxsize=1) def delete_timeline_thread_fn():