diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cc6583dcf6..b61ef09c46 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::{Duration, Instant, SystemTime}; @@ -176,7 +176,7 @@ pub struct Timeline { /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, - initial_size_computation_started: AtomicBool, + initial_size_computation_state: Mutex, /// Information about the last processed message by the WAL receiver, /// or None if WAL receiver has not received anything for this timeline @@ -189,6 +189,14 @@ pub struct Timeline { state: watch::Sender, } +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum InitialLogicalSizeComputationState { + NotStarted, + Running, + FailedWillRetryNextTime, + Success, +} + /// Internal structure to hold all data needed for logical size calculation. /// Calculation consists of two parts: /// 1. Initial size calculation. That might take a long time, because it requires @@ -804,7 +812,9 @@ impl Timeline { // initial logical size is 0. LogicalSize::empty_initial() }, - initial_size_computation_started: AtomicBool::new(false), + initial_size_computation_state: Mutex::new( + InitialLogicalSizeComputationState::NotStarted, + ), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -1221,59 +1231,85 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - // Atomically check if the timeline size calculation had already started. - // If the flag was not already set, this sets it. - if !self - .initial_size_computation_started - .swap(true, AtomicOrdering::SeqCst) - { - // We need to start the computation task. - let self_clone = Arc::clone(self); - task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), - task_mgr::TaskKind::InitialLogicalSizeCalculation, - Some(self.tenant_id), - Some(self.timeline_id), - "initial size calculation", - false, - async move { - let mut timeline_state_updates = self_clone.subscribe_for_state_updates(); - let self_calculation = Arc::clone(&self_clone); - tokio::select! { - calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { - let calculated_size = calculation_result - .context("Failed to spawn calculation result task")? - .context("Failed to calculate logical size")?; - match self_clone.current_logical_size.initial_logical_size.set(calculated_size) { - Ok(()) => info!("Successfully calculated initial logical size"), - Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), - } - Ok(()) - }, - new_event = async { - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = *timeline_state_updates.borrow(); - match new_state { - // we're running this job for active timelines only - TimelineState::Active => continue, - TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state), - } - } - Err(_sender_dropped_error) => return None, - } - } - } => { - match new_event { - Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"), - None => info!("Timeline dropped state updates sender, stopping init size calculation"), - } - Ok(()) - }, + use InitialLogicalSizeComputationState::*; + let mut guard = self.initial_size_computation_state.lock().unwrap(); + match *guard { + Running | Success => return, + NotStarted | FailedWillRetryNextTime => *guard = Running, + } + drop(guard); + // We need to start the computation task. + let self_clone = Arc::clone(self); + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + task_mgr::TaskKind::InitialLogicalSizeCalculation, + Some(self.tenant_id), + Some(self.timeline_id), + "initial size calculation", + false, + async move { + let res = self_clone + .initial_logical_size_calculation_task(init_lsn) + .await; + // task_mgr will log the result + let new_state = match res { + Ok(_) => Success, + Err(_) => FailedWillRetryNextTime, + }; + let mut state = self_clone.initial_size_computation_state.lock().unwrap(); + if *state != Running { + // Should be unreachable, but no reason to crash the pageserver. Don't touch anything. + error!("expecting initial size computation task to be in state {Running:?}, got {state:?}") + } else { + *state = new_state; + } + res + }, + ); + } + + #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] + async fn initial_logical_size_calculation_task( + self: &Arc, + init_lsn: Lsn, + ) -> anyhow::Result<()> { + let mut timeline_state_updates = self.subscribe_for_state_updates(); + let self_calculation = Arc::clone(self); + tokio::select! { + calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { + let calculated_size = calculation_result + .context("Failed to spawn calculation result task")? + .context("Failed to calculate logical size")?; + match self.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => (), + Err(existing_size) => { + // This shouldn't happen because we use self.initial_size_computation_running to ensure exlusivity here. + // But if it happens, just complain & report success so there are no further retries. + error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") } - }.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)), - ); + } + Ok(()) + }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state), + } + } + Err(_sender_dropped_error) => return None, + } + } + } => { + match new_event { + Some(new_state) => anyhow::bail!("aborted because timeline became inactive (new state: {new_state:?})"), + None => anyhow::bail!("aborted because state watch was dropped"), // can't happen, the sender is not dropped as long as the Timeline exists + } + }, } }