diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a153f1a01e..dd40ba9e1c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -19,6 +19,22 @@ pub enum TenantState { Broken, } +/// A state of a timeline in pageserver's memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TimelineState { + /// Timeline is fully operational, its background jobs are running. + Active, + /// A timeline is recognized by pageserver, but not yet ready to operate. + /// The status indicates, that the timeline could eventually go back to Active automatically: + /// for example, if the owning tenant goes back to Active again. + Suspended, + /// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to + /// automatically become Active after certain events: only a management call can change this status. + Paused, + /// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated. + Broken, +} + #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { @@ -160,6 +176,8 @@ pub struct TimelineInfo { pub remote_consistent_lsn: Option, pub awaits_download: bool, + pub state: TimelineState, + // Some of the above fields are duplicated in 'local' and 'remote', for backwards- // compatility with older clients. pub local: LocalTimelineInfo, diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 626cc07429..89609f5674 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -618,6 +618,7 @@ components: - last_record_lsn - disk_consistent_lsn - awaits_download + - state properties: timeline_id: type: string @@ -660,6 +661,8 @@ components: type: integer awaits_download: type: boolean + state: + type: string # These 'local' and 'remote' fields just duplicate some of the fields # above. They are kept for backwards-compatibility. They can be removed, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 489adbb2cf..8ec7604b8a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -129,6 +129,7 @@ async fn build_timeline_info( } }; let current_physical_size = Some(timeline.get_physical_size()); + let state = timeline.current_state(); let info = TimelineInfo { tenant_id: timeline.tenant_id, @@ -158,6 +159,7 @@ async fn build_timeline_info( remote_consistent_lsn, awaits_download, + state, // Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility // with the control plane. @@ -294,7 +296,7 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result format!("{}", lsn), + LsnForTimestamp::Present(lsn) => format!("{lsn}"), LsnForTimestamp::Future(_lsn) => "future".into(), LsnForTimestamp::Past(_lsn) => "past".into(), LsnForTimestamp::NoData(_lsn) => "nodata".into(), @@ -788,16 +789,16 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result) -> Result Result> { - tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id)) + tenant_mgr::get_tenant(tenant_id, true) + .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } /// diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 69c89a80b4..84833e9c40 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -11,7 +11,8 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, Context}; +use pageserver_api::models::TimelineState; use tokio::sync::watch; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -189,6 +190,7 @@ impl UninitializedTimeline<'_> { "Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}" ) })?; + new_timeline.set_state(TimelineState::Active); v.insert(Arc::clone(&new_timeline)); new_timeline.launch_wal_receiver(); } @@ -338,18 +340,26 @@ impl Tenant { /// Get Timeline handle for given Neon timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result> { - self.timelines - .lock() - .unwrap() - .get(&timeline_id) - .with_context(|| { - format!( - "Timeline {} was not found for tenant {}", - timeline_id, self.tenant_id - ) - }) - .map(Arc::clone) + pub fn get_timeline( + &self, + timeline_id: TimelineId, + active_only: bool, + ) -> anyhow::Result> { + let timelines_accessor = self.timelines.lock().unwrap(); + let timeline = timelines_accessor.get(&timeline_id).with_context(|| { + format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) + })?; + + if active_only && !timeline.is_active() { + anyhow::bail!( + "Timeline {}/{} is not active, state: {:?}", + self.tenant_id, + timeline_id, + timeline.current_state() + ) + } else { + Ok(Arc::clone(timeline)) + } } /// Lists timelines the tenant contains. @@ -372,6 +382,11 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot create empty timelines on inactive tenant" + ); + let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; drop(timelines); @@ -408,9 +423,14 @@ impl Tenant { mut ancestor_start_lsn: Option, pg_version: u32, ) -> anyhow::Result>> { + anyhow::ensure!( + self.is_active(), + "Cannot create timelines on inactive tenant" + ); + let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); - if self.get_timeline(new_timeline_id).is_ok() { + if self.get_timeline(new_timeline_id, false).is_ok() { debug!("timeline {new_timeline_id} already exists"); return Ok(None); } @@ -418,7 +438,7 @@ impl Tenant { let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = self - .get_timeline(ancestor_timeline_id) + .get_timeline(ancestor_timeline_id, false) .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { @@ -470,6 +490,11 @@ impl Tenant { pitr: Duration, checkpoint_before_gc: bool, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot run GC iteration on inactive tenant" + ); + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -486,6 +511,11 @@ impl Tenant { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub fn compaction_iteration(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.is_active(), + "Cannot run compaction iteration on inactive tenant" + ); + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -493,6 +523,7 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) .collect::>(); drop(timelines); @@ -515,13 +546,13 @@ impl Tenant { // checkpoints. We don't want to block everything else while the // checkpoint runs. let timelines = self.timelines.lock().unwrap(); - let timelines_to_compact = timelines + let timelines_to_checkpoint = timelines .iter() .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline))) .collect::>(); drop(timelines); - for (timeline_id, timeline) in &timelines_to_compact { + for (timeline_id, timeline) in &timelines_to_checkpoint { let _entered = info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id) .entered(); @@ -543,7 +574,7 @@ impl Tenant { .iter() .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - ensure!( + anyhow::ensure!( !children_exist, "Cannot delete timeline which has child timelines" ); @@ -552,7 +583,10 @@ impl Tenant { Entry::Vacant(_) => bail!("timeline not found"), }; - let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; + let timeline = timeline_entry.get(); + timeline.set_state(TimelineState::Paused); + + let layer_removal_guard = timeline.layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -569,58 +603,6 @@ impl Tenant { Ok(()) } - pub fn init_attach_timelines( - &self, - timelines: HashMap, - ) -> anyhow::Result<()> { - let sorted_timelines = if timelines.len() == 1 { - timelines.into_iter().collect() - } else if !timelines.is_empty() { - tree_sort_timelines(timelines)? - } else { - warn!("No timelines to attach received"); - return Ok(()); - }; - - let mut timelines_accessor = self.timelines.lock().unwrap(); - for (timeline_id, metadata) in sorted_timelines { - info!( - "Attaching timeline {} pg_version {}", - timeline_id, - metadata.pg_version() - ); - - if timelines_accessor.contains_key(&timeline_id) { - warn!( - "Timeline {}/{} already exists in the tenant map, skipping its initialization", - self.tenant_id, timeline_id - ); - continue; - } else { - let ancestor = metadata - .ancestor_timeline() - .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) - .cloned(); - let timeline = UninitializedTimeline { - owning_tenant: self, - timeline_id, - raw_timeline: Some(( - self.create_timeline_data(timeline_id, metadata, ancestor) - .with_context(|| { - format!("Failed to initialize timeline {timeline_id}") - })?, - TimelineUninitMark::dummy(), - )), - }; - let initialized_timeline = - timeline.initialize_with_lock(&mut timelines_accessor, true)?; - timelines_accessor.insert(timeline_id, initialized_timeline); - } - } - - Ok(()) - } - /// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn. pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index @@ -661,10 +643,30 @@ impl Tenant { } (_, new_state) => { self.state.send_replace(new_state); - if self.should_run_tasks() { - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(self.tenant_id); + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + match new_state { + TenantState::Active { + background_jobs_running, + } => { + if background_jobs_running { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); + } + + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Active); + } + } + TenantState::Paused | TenantState::Broken => { + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Suspended); + } + } } } } @@ -993,6 +995,7 @@ impl Tenant { timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline_entry)| { // This is unresolved question for now, how to do gc in presence of remote timelines // especially when this is combined with branching. @@ -1026,7 +1029,7 @@ impl Tenant { for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self - .get_timeline(timeline_id) + .get_timeline(timeline_id, false) .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines @@ -1111,7 +1114,7 @@ impl Tenant { // Step 2 is to avoid initializing the new branch using data removed by past GC iterations // or in-queue GC iterations. - let src_timeline = self.get_timeline(src).with_context(|| { + let src_timeline = self.get_timeline(src, false).with_context(|| { format!( "No ancestor {} found for timeline {}/{}", src, self.tenant_id, dst @@ -1381,6 +1384,68 @@ impl Tenant { Ok(uninit_mark) } + + pub(super) fn init_attach_timelines( + &self, + timelines: HashMap, + ) -> anyhow::Result<()> { + let sorted_timelines = if timelines.len() == 1 { + timelines.into_iter().collect() + } else if !timelines.is_empty() { + tree_sort_timelines(timelines)? + } else { + warn!("No timelines to attach received"); + return Ok(()); + }; + + let tenant_id = self.tenant_id; + let mut timelines_accessor = self.timelines.lock().unwrap(); + for (timeline_id, metadata) in sorted_timelines { + info!( + "Attaching timeline {}/{} pg_version {}", + tenant_id, + timeline_id, + metadata.pg_version() + ); + + if timelines_accessor.contains_key(&timeline_id) { + warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization"); + continue; + } + + let ancestor = metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) + .cloned(); + let dummy_timeline = self + .create_timeline_data(timeline_id, metadata.clone(), ancestor.clone()) + .with_context(|| { + format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}") + })?; + let timeline = UninitializedTimeline { + owning_tenant: self, + timeline_id, + raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), + }; + match timeline.initialize_with_lock(&mut timelines_accessor, true) { + Ok(initialized_timeline) => { + timelines_accessor.insert(timeline_id, initialized_timeline); + } + Err(e) => { + error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); + let broken_timeline = self + .create_timeline_data(timeline_id, metadata, ancestor) + .with_context(|| { + format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}") + })?; + broken_timeline.set_state(TimelineState::Broken); + timelines_accessor.insert(timeline_id, Arc::new(broken_timeline)); + } + } + } + + Ok(()) + } } /// Create the cluster temporarily in 'initdbpath' directory inside the repository @@ -1608,6 +1673,9 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant.init_attach_timelines(timelines_to_load)?; + tenant.set_state(TenantState::Active { + background_jobs_running: false, + }); Ok(tenant) } @@ -1767,7 +1835,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -1923,7 +1991,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1942,7 +2010,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1974,7 +2042,7 @@ mod tests { let tenant = harness.load(); tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot load timeline"); Ok(()) @@ -1997,7 +2065,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -2009,11 +2077,11 @@ mod tests { // check that both, child and ancestor are loaded let _child_tline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("cannot get child timeline loaded"); let _ancestor_tline = tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot get ancestor timeline loaded"); Ok(()) @@ -2267,7 +2335,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; @@ -2330,7 +2398,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ccd094b65a..194ca0d857 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5,6 +5,8 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::models::TimelineState; +use tokio::sync::watch; use tokio::task::spawn_blocking; use tracing::*; @@ -160,6 +162,8 @@ pub struct Timeline { /// Relation size cache pub rel_size_cache: RwLock>, + + state: watch::Sender, } /// Internal structure to hold all data needed for logical size calculation. @@ -416,9 +420,11 @@ impl Timeline { /// those functions with an LSN that has been processed yet is an error. /// pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline"); + // This should never be called from the WAL receiver, because that could lead // to a deadlock. - ensure!( + anyhow::ensure!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection), "wait_lsn cannot be called in WAL receiver" ); @@ -635,6 +641,35 @@ impl Timeline { } Ok(()) } + + pub fn set_state(&self, new_state: TimelineState) { + match (self.current_state(), new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + } + (TimelineState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + } + (TimelineState::Paused, TimelineState::Active) => { + debug!("Not activating a paused timeline"); + } + (_, new_state) => { + self.state.send_replace(new_state); + } + } + } + + pub fn current_state(&self) -> TimelineState { + *self.state.borrow() + } + + pub fn is_active(&self) -> bool { + self.current_state() == TimelineState::Active + } + + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + self.state.subscribe() + } } // Private functions @@ -688,8 +723,9 @@ impl Timeline { walredo_mgr: Arc, upload_layers: bool, pg_version: u32, - ) -> Timeline { + ) -> Self { let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let (state, _) = watch::channel(TimelineState::Suspended); let mut result = Timeline { conf, @@ -746,6 +782,7 @@ impl Timeline { last_received_wal: Mutex::new(None), rel_size_cache: RwLock::new(HashMap::new()), + state, }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -883,8 +920,6 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - let timeline_id = self.timeline_id; - // Atomically check if the timeline size calculation had already started. // If the flag was not already set, this sets it. if !self @@ -901,17 +936,42 @@ impl Timeline { "initial size calculation", false, async move { - let calculated_size = self_clone.calculate_logical_size(init_lsn)?; - let result = spawn_blocking(move || { - self_clone.current_logical_size.initial_logical_size.set(calculated_size) - }).await?; - match result { - Ok(()) => info!("Successfully calculated initial logical size"), - Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + let mut timeline_state_updates = self_clone.subscribe_for_state_updates(); + let self_calculation = Arc::clone(&self_clone); + tokio::select! { + calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { + let calculated_size = calculation_result + .context("Failed to spawn calculation result task")? + .context("Failed to calculate logical size")?; + match self_clone.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => info!("Successfully calculated initial logical size"), + Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + } + Ok(()) + }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state), + } + } + Err(_sender_dropped_error) => return None, + } + } + } => { + match new_event { + Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"), + None => info!("Timeline dropped state updates sender, stopping init size calculation"), + } + Ok(()) + }, } - Ok(()) - } - .instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id)) + }.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)), ); } } @@ -1356,7 +1416,7 @@ impl Timeline { false, )?; - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1826,7 +1886,7 @@ impl Timeline { } drop(layers); - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1930,7 +1990,7 @@ impl Timeline { /// obsolete. /// pub(super) fn gc(&self) -> anyhow::Result { - let mut result: GcResult = Default::default(); + let mut result: GcResult = GcResult::default(); let now = SystemTime::now(); fail_point!("before-timeline-gc"); @@ -2110,7 +2170,7 @@ impl Timeline { fail_point!("after-timeline-gc-removed-layers"); } - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_delete( self.tenant_id, self.timeline_id, @@ -2199,6 +2259,11 @@ impl Timeline { } } } + + fn can_upload_layers(&self) -> bool { + self.upload_layers.load(atomic::Ordering::Relaxed) + && self.current_state() != TimelineState::Broken + } } /// Helper function for get_reconstruct_data() to add the path of layers traversed diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 030055df6d..23ce9dc699 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -175,7 +175,7 @@ async fn wait_for_active_tenant( } state => { debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}"); - tokio::time::sleep(wait).await; + continue; } } } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 2380caaff1..53dd2d8eac 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -12,6 +12,7 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, + ops::ControlFlow, sync::Arc, time::Duration, }; @@ -26,7 +27,8 @@ use etcd_broker::{ subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, BrokerUpdate, Client, }; -use tokio::select; +use pageserver_api::models::TimelineState; +use tokio::{select, sync::watch}; use tracing::*; use crate::{ @@ -58,10 +60,7 @@ pub fn spawn_connection_manager_task( TaskKind::WalReceiverManager, Some(tenant_id), Some(timeline_id), - &format!( - "walreceiver for tenant {} timeline {}", - timeline.tenant_id, timeline.timeline_id - ), + &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { info!("WAL receiver broker started, connecting to etcd"); @@ -75,19 +74,21 @@ pub fn spawn_connection_manager_task( select! { _ = task_mgr::shutdown_watcher() => { info!("WAL receiver shutdown requested, shutting down"); - // Kill current connection, if any - if let Some(wal_connection) = walreceiver_state.wal_connection.take() - { - wal_connection.connection_task.shutdown().await; - } + walreceiver_state.shutdown().await; return Ok(()); }, - - _ = connection_manager_loop_step( + loop_step_result = connection_manager_loop_step( &broker_loop_prefix, &mut etcd_client, &mut walreceiver_state, - ) => {}, + ) => match loop_step_result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => { + info!("Connection manager loop ended, shutting down"); + walreceiver_state.shutdown().await; + return Ok(()); + } + }, } } } @@ -104,7 +105,17 @@ async fn connection_manager_loop_step( broker_prefix: &str, etcd_client: &mut Client, walreceiver_state: &mut WalreceiverState, -) { +) -> ControlFlow<(), ()> { + let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates(); + + match wait_for_active_timeline(&mut timeline_state_updates).await { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + } + let id = TenantTimelineId { tenant_id: walreceiver_state.timeline.tenant_id, timeline_id: walreceiver_state.timeline.timeline_id, @@ -129,10 +140,12 @@ async fn connection_manager_loop_step( // - change connection if the rules decide so, or if the current connection dies // - receive updates from broker // - this might change the current desired connection + // - timeline state changes to something that does not allow walreceiver to run concurrently select! { broker_connection_result = &mut broker_subscription.watcher_handle => { + info!("Broker connection was closed from the other side, ending current broker loop step"); cleanup_broker_connection(broker_connection_result, walreceiver_state); - return; + return ControlFlow::Continue(()); }, Some(wal_connection_update) = async { @@ -185,11 +198,36 @@ async fn connection_manager_loop_step( (&mut broker_subscription.watcher_handle).await, walreceiver_state, ); - return; + return ControlFlow::Continue(()); } } }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = walreceiver_state.timeline.current_state(); + match new_state { + // we're already active as walreceiver, no need to reactivate + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state), + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } + } => match new_event { + ControlFlow::Continue(new_state) => { + info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"); + return ControlFlow::Continue(()); + } + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + }, + _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} } @@ -216,6 +254,34 @@ async fn connection_manager_loop_step( } } +async fn wait_for_active_timeline( + timeline_state_updates: &mut watch::Receiver, +) -> ControlFlow<(), ()> { + let current_state = *timeline_state_updates.borrow(); + if current_state == TimelineState::Active { + return ControlFlow::Continue(()); + } + + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + TimelineState::Active => { + debug!("Timeline state changed to active, continuing the walreceiver connection manager"); + return ControlFlow::Continue(()); + } + state => { + debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); + continue; + } + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } +} + fn cleanup_broker_connection( broker_connection_result: Result, tokio::task::JoinError>, walreceiver_state: &mut WalreceiverState, @@ -723,6 +789,12 @@ impl WalreceiverState { self.wal_connection_retries.remove(&node_id); } } + + async fn shutdown(mut self) { + if let Some(wal_connection) = self.wal_connection.take() { + wal_connection.connection_task.shutdown().await; + } + } } #[derive(Debug, PartialEq, Eq)] diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 101cce9ffc..b747af4d09 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -70,18 +70,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken # First timeline would not get loaded into pageserver due to corrupt metadata file - with pytest.raises( - Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err: pg1.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") # Second timeline has no ancestors, only the metadata file and no layer files # We don't have the remote storage enabled, which means timeline is in an incorrect state, # it's not loaded at all - with pytest.raises( - Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err: pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index de05d445ed..4a78a2746e 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( NeonPageserverApiException, - match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}", + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)