From 5928cb33c553913c28c7857f126fbab9d3537ff6 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 21 Oct 2022 18:51:48 +0300 Subject: [PATCH] Introduce timeline state (#2651) Similar to https://github.com/neondatabase/neon/pull/2395, introduces a state field in Timeline, that's possible to subscribe to. Adjusts * walreceiver to not to have any connections if timeline is not Active * remote storage sync to not to schedule uploads if timeline is Broken * not to create timelines if a tenant/timeline is broken * automatically switches timelines' states based on tenant state Does not adjust timeline's gc, checkpointing and layer flush behaviour much, since it's not safe to cancel these processes abruptly and there's task_mgr::shutdown_tasks that does similar thing. --- libs/pageserver_api/src/models.rs | 18 ++ pageserver/src/http/openapi_spec.yml | 3 + pageserver/src/http/routes.rs | 31 ++- pageserver/src/page_service.rs | 3 +- pageserver/src/tenant.rs | 240 +++++++++++------- pageserver/src/tenant/timeline.rs | 101 ++++++-- pageserver/src/tenant_tasks.rs | 2 +- .../src/walreceiver/connection_manager.rs | 104 ++++++-- test_runner/regress/test_broken_timeline.py | 8 +- test_runner/regress/test_timeline_delete.py | 2 +- 10 files changed, 367 insertions(+), 145 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a153f1a01e..dd40ba9e1c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -19,6 +19,22 @@ pub enum TenantState { Broken, } +/// A state of a timeline in pageserver's memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TimelineState { + /// Timeline is fully operational, its background jobs are running. + Active, + /// A timeline is recognized by pageserver, but not yet ready to operate. + /// The status indicates, that the timeline could eventually go back to Active automatically: + /// for example, if the owning tenant goes back to Active again. + Suspended, + /// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to + /// automatically become Active after certain events: only a management call can change this status. + Paused, + /// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated. + Broken, +} + #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { @@ -160,6 +176,8 @@ pub struct TimelineInfo { pub remote_consistent_lsn: Option, pub awaits_download: bool, + pub state: TimelineState, + // Some of the above fields are duplicated in 'local' and 'remote', for backwards- // compatility with older clients. pub local: LocalTimelineInfo, diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 626cc07429..89609f5674 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -618,6 +618,7 @@ components: - last_record_lsn - disk_consistent_lsn - awaits_download + - state properties: timeline_id: type: string @@ -660,6 +661,8 @@ components: type: integer awaits_download: type: boolean + state: + type: string # These 'local' and 'remote' fields just duplicate some of the fields # above. They are kept for backwards-compatibility. They can be removed, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 489adbb2cf..8ec7604b8a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -129,6 +129,7 @@ async fn build_timeline_info( } }; let current_physical_size = Some(timeline.get_physical_size()); + let state = timeline.current_state(); let info = TimelineInfo { tenant_id: timeline.tenant_id, @@ -158,6 +159,7 @@ async fn build_timeline_info( remote_consistent_lsn, awaits_download, + state, // Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility // with the control plane. @@ -294,7 +296,7 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result format!("{}", lsn), + LsnForTimestamp::Present(lsn) => format!("{lsn}"), LsnForTimestamp::Future(_lsn) => "future".into(), LsnForTimestamp::Past(_lsn) => "past".into(), LsnForTimestamp::NoData(_lsn) => "nodata".into(), @@ -788,16 +789,16 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result) -> Result Result> { - tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id)) + tenant_mgr::get_tenant(tenant_id, true) + .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } /// diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 69c89a80b4..84833e9c40 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -11,7 +11,8 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, Context}; +use pageserver_api::models::TimelineState; use tokio::sync::watch; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -189,6 +190,7 @@ impl UninitializedTimeline<'_> { "Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}" ) })?; + new_timeline.set_state(TimelineState::Active); v.insert(Arc::clone(&new_timeline)); new_timeline.launch_wal_receiver(); } @@ -338,18 +340,26 @@ impl Tenant { /// Get Timeline handle for given Neon timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result> { - self.timelines - .lock() - .unwrap() - .get(&timeline_id) - .with_context(|| { - format!( - "Timeline {} was not found for tenant {}", - timeline_id, self.tenant_id - ) - }) - .map(Arc::clone) + pub fn get_timeline( + &self, + timeline_id: TimelineId, + active_only: bool, + ) -> anyhow::Result> { + let timelines_accessor = self.timelines.lock().unwrap(); + let timeline = timelines_accessor.get(&timeline_id).with_context(|| { + format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) + })?; + + if active_only && !timeline.is_active() { + anyhow::bail!( + "Timeline {}/{} is not active, state: {:?}", + self.tenant_id, + timeline_id, + timeline.current_state() + ) + } else { + Ok(Arc::clone(timeline)) + } } /// Lists timelines the tenant contains. @@ -372,6 +382,11 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot create empty timelines on inactive tenant" + ); + let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; drop(timelines); @@ -408,9 +423,14 @@ impl Tenant { mut ancestor_start_lsn: Option, pg_version: u32, ) -> anyhow::Result>> { + anyhow::ensure!( + self.is_active(), + "Cannot create timelines on inactive tenant" + ); + let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); - if self.get_timeline(new_timeline_id).is_ok() { + if self.get_timeline(new_timeline_id, false).is_ok() { debug!("timeline {new_timeline_id} already exists"); return Ok(None); } @@ -418,7 +438,7 @@ impl Tenant { let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = self - .get_timeline(ancestor_timeline_id) + .get_timeline(ancestor_timeline_id, false) .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { @@ -470,6 +490,11 @@ impl Tenant { pitr: Duration, checkpoint_before_gc: bool, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot run GC iteration on inactive tenant" + ); + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -486,6 +511,11 @@ impl Tenant { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub fn compaction_iteration(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.is_active(), + "Cannot run compaction iteration on inactive tenant" + ); + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -493,6 +523,7 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) .collect::>(); drop(timelines); @@ -515,13 +546,13 @@ impl Tenant { // checkpoints. We don't want to block everything else while the // checkpoint runs. let timelines = self.timelines.lock().unwrap(); - let timelines_to_compact = timelines + let timelines_to_checkpoint = timelines .iter() .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline))) .collect::>(); drop(timelines); - for (timeline_id, timeline) in &timelines_to_compact { + for (timeline_id, timeline) in &timelines_to_checkpoint { let _entered = info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id) .entered(); @@ -543,7 +574,7 @@ impl Tenant { .iter() .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - ensure!( + anyhow::ensure!( !children_exist, "Cannot delete timeline which has child timelines" ); @@ -552,7 +583,10 @@ impl Tenant { Entry::Vacant(_) => bail!("timeline not found"), }; - let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; + let timeline = timeline_entry.get(); + timeline.set_state(TimelineState::Paused); + + let layer_removal_guard = timeline.layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -569,58 +603,6 @@ impl Tenant { Ok(()) } - pub fn init_attach_timelines( - &self, - timelines: HashMap, - ) -> anyhow::Result<()> { - let sorted_timelines = if timelines.len() == 1 { - timelines.into_iter().collect() - } else if !timelines.is_empty() { - tree_sort_timelines(timelines)? - } else { - warn!("No timelines to attach received"); - return Ok(()); - }; - - let mut timelines_accessor = self.timelines.lock().unwrap(); - for (timeline_id, metadata) in sorted_timelines { - info!( - "Attaching timeline {} pg_version {}", - timeline_id, - metadata.pg_version() - ); - - if timelines_accessor.contains_key(&timeline_id) { - warn!( - "Timeline {}/{} already exists in the tenant map, skipping its initialization", - self.tenant_id, timeline_id - ); - continue; - } else { - let ancestor = metadata - .ancestor_timeline() - .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) - .cloned(); - let timeline = UninitializedTimeline { - owning_tenant: self, - timeline_id, - raw_timeline: Some(( - self.create_timeline_data(timeline_id, metadata, ancestor) - .with_context(|| { - format!("Failed to initialize timeline {timeline_id}") - })?, - TimelineUninitMark::dummy(), - )), - }; - let initialized_timeline = - timeline.initialize_with_lock(&mut timelines_accessor, true)?; - timelines_accessor.insert(timeline_id, initialized_timeline); - } - } - - Ok(()) - } - /// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn. pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index @@ -661,10 +643,30 @@ impl Tenant { } (_, new_state) => { self.state.send_replace(new_state); - if self.should_run_tasks() { - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(self.tenant_id); + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + match new_state { + TenantState::Active { + background_jobs_running, + } => { + if background_jobs_running { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); + } + + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Active); + } + } + TenantState::Paused | TenantState::Broken => { + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Suspended); + } + } } } } @@ -993,6 +995,7 @@ impl Tenant { timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline_entry)| { // This is unresolved question for now, how to do gc in presence of remote timelines // especially when this is combined with branching. @@ -1026,7 +1029,7 @@ impl Tenant { for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self - .get_timeline(timeline_id) + .get_timeline(timeline_id, false) .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines @@ -1111,7 +1114,7 @@ impl Tenant { // Step 2 is to avoid initializing the new branch using data removed by past GC iterations // or in-queue GC iterations. - let src_timeline = self.get_timeline(src).with_context(|| { + let src_timeline = self.get_timeline(src, false).with_context(|| { format!( "No ancestor {} found for timeline {}/{}", src, self.tenant_id, dst @@ -1381,6 +1384,68 @@ impl Tenant { Ok(uninit_mark) } + + pub(super) fn init_attach_timelines( + &self, + timelines: HashMap, + ) -> anyhow::Result<()> { + let sorted_timelines = if timelines.len() == 1 { + timelines.into_iter().collect() + } else if !timelines.is_empty() { + tree_sort_timelines(timelines)? + } else { + warn!("No timelines to attach received"); + return Ok(()); + }; + + let tenant_id = self.tenant_id; + let mut timelines_accessor = self.timelines.lock().unwrap(); + for (timeline_id, metadata) in sorted_timelines { + info!( + "Attaching timeline {}/{} pg_version {}", + tenant_id, + timeline_id, + metadata.pg_version() + ); + + if timelines_accessor.contains_key(&timeline_id) { + warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization"); + continue; + } + + let ancestor = metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) + .cloned(); + let dummy_timeline = self + .create_timeline_data(timeline_id, metadata.clone(), ancestor.clone()) + .with_context(|| { + format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}") + })?; + let timeline = UninitializedTimeline { + owning_tenant: self, + timeline_id, + raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), + }; + match timeline.initialize_with_lock(&mut timelines_accessor, true) { + Ok(initialized_timeline) => { + timelines_accessor.insert(timeline_id, initialized_timeline); + } + Err(e) => { + error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); + let broken_timeline = self + .create_timeline_data(timeline_id, metadata, ancestor) + .with_context(|| { + format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}") + })?; + broken_timeline.set_state(TimelineState::Broken); + timelines_accessor.insert(timeline_id, Arc::new(broken_timeline)); + } + } + } + + Ok(()) + } } /// Create the cluster temporarily in 'initdbpath' directory inside the repository @@ -1608,6 +1673,9 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant.init_attach_timelines(timelines_to_load)?; + tenant.set_state(TenantState::Active { + background_jobs_running: false, + }); Ok(tenant) } @@ -1767,7 +1835,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -1923,7 +1991,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1942,7 +2010,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1974,7 +2042,7 @@ mod tests { let tenant = harness.load(); tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot load timeline"); Ok(()) @@ -1997,7 +2065,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -2009,11 +2077,11 @@ mod tests { // check that both, child and ancestor are loaded let _child_tline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("cannot get child timeline loaded"); let _ancestor_tline = tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot get ancestor timeline loaded"); Ok(()) @@ -2267,7 +2335,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; @@ -2330,7 +2398,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ccd094b65a..194ca0d857 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5,6 +5,8 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::models::TimelineState; +use tokio::sync::watch; use tokio::task::spawn_blocking; use tracing::*; @@ -160,6 +162,8 @@ pub struct Timeline { /// Relation size cache pub rel_size_cache: RwLock>, + + state: watch::Sender, } /// Internal structure to hold all data needed for logical size calculation. @@ -416,9 +420,11 @@ impl Timeline { /// those functions with an LSN that has been processed yet is an error. /// pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline"); + // This should never be called from the WAL receiver, because that could lead // to a deadlock. - ensure!( + anyhow::ensure!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection), "wait_lsn cannot be called in WAL receiver" ); @@ -635,6 +641,35 @@ impl Timeline { } Ok(()) } + + pub fn set_state(&self, new_state: TimelineState) { + match (self.current_state(), new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + } + (TimelineState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + } + (TimelineState::Paused, TimelineState::Active) => { + debug!("Not activating a paused timeline"); + } + (_, new_state) => { + self.state.send_replace(new_state); + } + } + } + + pub fn current_state(&self) -> TimelineState { + *self.state.borrow() + } + + pub fn is_active(&self) -> bool { + self.current_state() == TimelineState::Active + } + + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + self.state.subscribe() + } } // Private functions @@ -688,8 +723,9 @@ impl Timeline { walredo_mgr: Arc, upload_layers: bool, pg_version: u32, - ) -> Timeline { + ) -> Self { let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let (state, _) = watch::channel(TimelineState::Suspended); let mut result = Timeline { conf, @@ -746,6 +782,7 @@ impl Timeline { last_received_wal: Mutex::new(None), rel_size_cache: RwLock::new(HashMap::new()), + state, }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -883,8 +920,6 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - let timeline_id = self.timeline_id; - // Atomically check if the timeline size calculation had already started. // If the flag was not already set, this sets it. if !self @@ -901,17 +936,42 @@ impl Timeline { "initial size calculation", false, async move { - let calculated_size = self_clone.calculate_logical_size(init_lsn)?; - let result = spawn_blocking(move || { - self_clone.current_logical_size.initial_logical_size.set(calculated_size) - }).await?; - match result { - Ok(()) => info!("Successfully calculated initial logical size"), - Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + let mut timeline_state_updates = self_clone.subscribe_for_state_updates(); + let self_calculation = Arc::clone(&self_clone); + tokio::select! { + calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { + let calculated_size = calculation_result + .context("Failed to spawn calculation result task")? + .context("Failed to calculate logical size")?; + match self_clone.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => info!("Successfully calculated initial logical size"), + Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + } + Ok(()) + }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state), + } + } + Err(_sender_dropped_error) => return None, + } + } + } => { + match new_event { + Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"), + None => info!("Timeline dropped state updates sender, stopping init size calculation"), + } + Ok(()) + }, } - Ok(()) - } - .instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id)) + }.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)), ); } } @@ -1356,7 +1416,7 @@ impl Timeline { false, )?; - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1826,7 +1886,7 @@ impl Timeline { } drop(layers); - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1930,7 +1990,7 @@ impl Timeline { /// obsolete. /// pub(super) fn gc(&self) -> anyhow::Result { - let mut result: GcResult = Default::default(); + let mut result: GcResult = GcResult::default(); let now = SystemTime::now(); fail_point!("before-timeline-gc"); @@ -2110,7 +2170,7 @@ impl Timeline { fail_point!("after-timeline-gc-removed-layers"); } - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_delete( self.tenant_id, self.timeline_id, @@ -2199,6 +2259,11 @@ impl Timeline { } } } + + fn can_upload_layers(&self) -> bool { + self.upload_layers.load(atomic::Ordering::Relaxed) + && self.current_state() != TimelineState::Broken + } } /// Helper function for get_reconstruct_data() to add the path of layers traversed diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 030055df6d..23ce9dc699 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -175,7 +175,7 @@ async fn wait_for_active_tenant( } state => { debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}"); - tokio::time::sleep(wait).await; + continue; } } } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 2380caaff1..53dd2d8eac 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -12,6 +12,7 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, + ops::ControlFlow, sync::Arc, time::Duration, }; @@ -26,7 +27,8 @@ use etcd_broker::{ subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, BrokerUpdate, Client, }; -use tokio::select; +use pageserver_api::models::TimelineState; +use tokio::{select, sync::watch}; use tracing::*; use crate::{ @@ -58,10 +60,7 @@ pub fn spawn_connection_manager_task( TaskKind::WalReceiverManager, Some(tenant_id), Some(timeline_id), - &format!( - "walreceiver for tenant {} timeline {}", - timeline.tenant_id, timeline.timeline_id - ), + &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { info!("WAL receiver broker started, connecting to etcd"); @@ -75,19 +74,21 @@ pub fn spawn_connection_manager_task( select! { _ = task_mgr::shutdown_watcher() => { info!("WAL receiver shutdown requested, shutting down"); - // Kill current connection, if any - if let Some(wal_connection) = walreceiver_state.wal_connection.take() - { - wal_connection.connection_task.shutdown().await; - } + walreceiver_state.shutdown().await; return Ok(()); }, - - _ = connection_manager_loop_step( + loop_step_result = connection_manager_loop_step( &broker_loop_prefix, &mut etcd_client, &mut walreceiver_state, - ) => {}, + ) => match loop_step_result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => { + info!("Connection manager loop ended, shutting down"); + walreceiver_state.shutdown().await; + return Ok(()); + } + }, } } } @@ -104,7 +105,17 @@ async fn connection_manager_loop_step( broker_prefix: &str, etcd_client: &mut Client, walreceiver_state: &mut WalreceiverState, -) { +) -> ControlFlow<(), ()> { + let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates(); + + match wait_for_active_timeline(&mut timeline_state_updates).await { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + } + let id = TenantTimelineId { tenant_id: walreceiver_state.timeline.tenant_id, timeline_id: walreceiver_state.timeline.timeline_id, @@ -129,10 +140,12 @@ async fn connection_manager_loop_step( // - change connection if the rules decide so, or if the current connection dies // - receive updates from broker // - this might change the current desired connection + // - timeline state changes to something that does not allow walreceiver to run concurrently select! { broker_connection_result = &mut broker_subscription.watcher_handle => { + info!("Broker connection was closed from the other side, ending current broker loop step"); cleanup_broker_connection(broker_connection_result, walreceiver_state); - return; + return ControlFlow::Continue(()); }, Some(wal_connection_update) = async { @@ -185,11 +198,36 @@ async fn connection_manager_loop_step( (&mut broker_subscription.watcher_handle).await, walreceiver_state, ); - return; + return ControlFlow::Continue(()); } } }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = walreceiver_state.timeline.current_state(); + match new_state { + // we're already active as walreceiver, no need to reactivate + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state), + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } + } => match new_event { + ControlFlow::Continue(new_state) => { + info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"); + return ControlFlow::Continue(()); + } + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + }, + _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} } @@ -216,6 +254,34 @@ async fn connection_manager_loop_step( } } +async fn wait_for_active_timeline( + timeline_state_updates: &mut watch::Receiver, +) -> ControlFlow<(), ()> { + let current_state = *timeline_state_updates.borrow(); + if current_state == TimelineState::Active { + return ControlFlow::Continue(()); + } + + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + TimelineState::Active => { + debug!("Timeline state changed to active, continuing the walreceiver connection manager"); + return ControlFlow::Continue(()); + } + state => { + debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); + continue; + } + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } +} + fn cleanup_broker_connection( broker_connection_result: Result, tokio::task::JoinError>, walreceiver_state: &mut WalreceiverState, @@ -723,6 +789,12 @@ impl WalreceiverState { self.wal_connection_retries.remove(&node_id); } } + + async fn shutdown(mut self) { + if let Some(wal_connection) = self.wal_connection.take() { + wal_connection.connection_task.shutdown().await; + } + } } #[derive(Debug, PartialEq, Eq)] diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 101cce9ffc..b747af4d09 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -70,18 +70,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken # First timeline would not get loaded into pageserver due to corrupt metadata file - with pytest.raises( - Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err: pg1.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") # Second timeline has no ancestors, only the metadata file and no layer files # We don't have the remote storage enabled, which means timeline is in an incorrect state, # it's not loaded at all - with pytest.raises( - Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err: pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index de05d445ed..4a78a2746e 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( NeonPageserverApiException, - match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}", + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)