From 9e871318a0bfb530877247724d6fe2c62b857269 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 20 Jul 2023 13:14:13 +0300 Subject: [PATCH] Wait detaches or ignores on pageserver shutdown (#4678) Adds in a barrier for the duration of the `Tenant::shutdown`. `pageserver_shutdown` will join this await, `detach`es and `ignore`s will not. Fixes #4429. --------- Co-authored-by: Christian Schwarz --- libs/pageserver_api/src/models.rs | 18 ++- libs/utils/src/completion.rs | 16 +++ libs/utils/src/tracing_span_assert.rs | 4 +- pageserver/Cargo.toml | 1 + pageserver/src/tenant.rs | 43 +++--- pageserver/src/tenant/mgr.rs | 198 +++++++++++++++++++++++--- workspace_hack/Cargo.toml | 2 +- 7 files changed, 242 insertions(+), 40 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 4c6529ffab..2f4c21326e 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use strum_macros; use utils::{ + completion, history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, @@ -76,7 +77,12 @@ pub enum TenantState { /// system is being shut down. /// /// Transitions out of this state are possible through `set_broken()`. - Stopping, + Stopping { + // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field, + // otherwise it will not be skipped during deserialization + #[serde(skip)] + progress: completion::Barrier, + }, /// The tenant is recognized by the pageserver, but can no longer be used for /// any operations. /// @@ -118,7 +124,7 @@ impl TenantState { // Why is Stopping a Maybe case? Because, during pageserver shutdown, // we set the Stopping state irrespective of whether the tenant // has finished attaching or not. - Self::Stopping => Maybe, + Self::Stopping { .. } => Maybe, } } @@ -928,7 +934,13 @@ mod tests { "Activating", ), (line!(), TenantState::Active, "Active"), - (line!(), TenantState::Stopping, "Stopping"), + ( + line!(), + TenantState::Stopping { + progress: utils::completion::Barrier::default(), + }, + "Stopping", + ), ( line!(), TenantState::Broken { diff --git a/libs/utils/src/completion.rs b/libs/utils/src/completion.rs index 2cdaee548e..e2e84dd0ee 100644 --- a/libs/utils/src/completion.rs +++ b/libs/utils/src/completion.rs @@ -12,6 +12,13 @@ pub struct Completion(mpsc::Sender<()>); #[derive(Clone)] pub struct Barrier(Arc>>); +impl Default for Barrier { + fn default() -> Self { + let (_, rx) = channel(); + rx + } +} + impl Barrier { pub async fn wait(self) { self.0.lock().await.recv().await; @@ -24,6 +31,15 @@ impl Barrier { } } +impl PartialEq for Barrier { + fn eq(&self, other: &Self) -> bool { + // we don't use dyn so this is good + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl Eq for Barrier {} + /// Create new Guard and Barrier pair. pub fn channel() -> (Completion, Barrier) { let (tx, rx) = mpsc::channel::<()>(1); diff --git a/libs/utils/src/tracing_span_assert.rs b/libs/utils/src/tracing_span_assert.rs index 926bfc3188..db17f7d8cd 100644 --- a/libs/utils/src/tracing_span_assert.rs +++ b/libs/utils/src/tracing_span_assert.rs @@ -164,9 +164,7 @@ fn tracing_subscriber_configured() -> bool { tracing::dispatcher::get_default(|d| { // it is possible that this closure will not be invoked, but the current implementation // always invokes it - noop_configured = d - .downcast_ref::() - .is_some(); + noop_configured = d.is::(); }); !noop_configured diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9381ed0bfa..27e90ea97d 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,6 +82,7 @@ strum_macros.workspace = true criterion.workspace = true hex-literal.workspace = true tempfile.workspace = true +tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] } [[bench]] name = "bench_layer_map" diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 142118bf6e..379db0720f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -281,7 +281,7 @@ pub enum DeleteTimelineError { } pub enum SetStoppingError { - AlreadyStopping, + AlreadyStopping(completion::Barrier), Broken, } @@ -318,10 +318,6 @@ impl std::fmt::Display for WaitToBecomeActiveError { } } -pub(crate) enum ShutdownError { - AlreadyStopping, -} - struct DeletionGuard(OwnedMutexGuard); impl DeletionGuard { @@ -1721,7 +1717,7 @@ impl Tenant { self.state.send_modify(|current_state| { use pageserver_api::models::ActivatingFrom; match &*current_state { - TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => { + TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => { panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state); } TenantState::Loading => { @@ -1785,7 +1781,16 @@ impl Tenant { /// - detach + ignore (freeze_and_flush == false) /// /// This will attempt to shutdown even if tenant is broken. - pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> { + /// + /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call. + /// If the tenant is already shutting down, we return a clone of the first shutdown call's + /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with + /// the ongoing shutdown. + async fn shutdown( + &self, + shutdown_progress: completion::Barrier, + freeze_and_flush: bool, + ) -> Result<(), completion::Barrier> { span::debug_assert_current_span_has_tenant_id(); // Set tenant (and its timlines) to Stoppping state. // @@ -1804,12 +1809,16 @@ impl Tenant { // But the tenant background loops are joined-on in our caller. // It's mesed up. // we just ignore the failure to stop - match self.set_stopping().await { + + match self.set_stopping(shutdown_progress).await { Ok(()) => {} Err(SetStoppingError::Broken) => { // assume that this is acceptable } - Err(SetStoppingError::AlreadyStopping) => return Err(ShutdownError::AlreadyStopping), + Err(SetStoppingError::AlreadyStopping(other)) => { + // give caller the option to wait for this this shutdown + return Err(other); + } }; if freeze_and_flush { @@ -1841,7 +1850,7 @@ impl Tenant { /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. /// /// This function is not cancel-safe! - async fn set_stopping(&self) -> Result<(), SetStoppingError> { + async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); // cannot stop before we're done activating, so wait out until we're done activating @@ -1853,7 +1862,7 @@ impl Tenant { ); false } - TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true, }) .await .expect("cannot drop self.state while on a &self method"); @@ -1868,7 +1877,7 @@ impl Tenant { // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines // are created after the transition to Stopping. That's harmless, as the Timelines // won't be accessible to anyone afterwards, because the Tenant is in Stopping state. - *current_state = TenantState::Stopping; + *current_state = TenantState::Stopping { progress }; // Continue stopping outside the closure. We need to grab timelines.lock() // and we plan to turn it into a tokio::sync::Mutex in a future patch. true @@ -1880,9 +1889,9 @@ impl Tenant { err = Some(SetStoppingError::Broken); false } - TenantState::Stopping => { + TenantState::Stopping { progress } => { info!("Tenant is already in Stopping state"); - err = Some(SetStoppingError::AlreadyStopping); + err = Some(SetStoppingError::AlreadyStopping(progress.clone())); false } }); @@ -1926,7 +1935,7 @@ impl Tenant { ); false } - TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true, }) .await .expect("cannot drop self.state while on a &self method"); @@ -1949,7 +1958,7 @@ impl Tenant { warn!("Tenant is already in Broken state"); } // This is the only "expected" path, any other path is a bug. - TenantState::Stopping => { + TenantState::Stopping { .. } => { warn!( "Marking Stopping tenant as Broken state, reason: {}", reason @@ -1982,7 +1991,7 @@ impl Tenant { TenantState::Active { .. } => { return Ok(()); } - TenantState::Broken { .. } | TenantState::Stopping => { + TenantState::Broken { .. } | TenantState::Stopping { .. } => { // There's no chance the tenant can transition back into ::Active return Err(WaitToBecomeActiveError::WillNotBecomeActive { tenant_id: self.tenant_id, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2cc881ed5e..4b97871f35 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -233,11 +233,17 @@ pub fn schedule_local_tenant_processing( /// That could be easily misinterpreted by control plane, the consumer of the /// management API. For example, it could attach the tenant on a different pageserver. /// We would then be in split-brain once this pageserver restarts. -#[instrument] +#[instrument(skip_all)] pub async fn shutdown_all_tenants() { + shutdown_all_tenants0(&TENANTS).await +} + +async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { + use utils::completion; + // Prevent new tenants from being created. let tenants_to_shut_down = { - let mut m = TENANTS.write().await; + let mut m = tenants.write().await; match &mut *m { TenantsMap::Initializing => { *m = TenantsMap::ShuttingDown(HashMap::default()); @@ -262,14 +268,41 @@ pub async fn shutdown_all_tenants() { for (tenant_id, tenant) in tenants_to_shut_down { join_set.spawn( async move { - let freeze_and_flush = true; + // ordering shouldn't matter for this, either we store true right away or never + let ordering = std::sync::atomic::Ordering::Relaxed; + let joined_other = std::sync::atomic::AtomicBool::new(false); - match tenant.shutdown(freeze_and_flush).await { - Ok(()) => debug!("tenant successfully stopped"), - Err(super::ShutdownError::AlreadyStopping) => { - warn!("tenant was already shutting down") + let mut shutdown = std::pin::pin!(async { + let freeze_and_flush = true; + + let res = { + let (_guard, shutdown_progress) = completion::channel(); + tenant.shutdown(shutdown_progress, freeze_and_flush).await + }; + + if let Err(other_progress) = res { + // join the another shutdown in progress + joined_other.store(true, ordering); + other_progress.wait().await; } - } + }); + + // in practice we might not have a lot time to go, since systemd is going to + // SIGKILL us at 10s, but we can try. delete tenant might take a while, so put out + // a warning. + let warning = std::time::Duration::from_secs(5); + let mut warning = std::pin::pin!(tokio::time::sleep(warning)); + + tokio::select! { + _ = &mut shutdown => {}, + _ = &mut warning => { + let joined_other = joined_other.load(ordering); + warn!(%joined_other, "waiting for the shutdown to complete"); + shutdown.await; + } + }; + + debug!("tenant successfully stopped"); } .instrument(info_span!("shutdown", %tenant_id)), ); @@ -413,6 +446,15 @@ pub async fn detach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, detach_ignored: bool, +) -> Result<(), TenantStateError> { + detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await +} + +async fn detach_tenant0( + conf: &'static PageServerConf, + tenants: &tokio::sync::RwLock, + tenant_id: TenantId, + detach_ignored: bool, ) -> Result<(), TenantStateError> { let local_files_cleanup_operation = |tenant_id_to_clean| async move { let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); @@ -425,7 +467,8 @@ pub async fn detach_tenant( }; let removal_result = - remove_tenant_from_memory(tenant_id, local_files_cleanup_operation(tenant_id)).await; + remove_tenant_from_memory(tenants, tenant_id, local_files_cleanup_operation(tenant_id)) + .await; // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. @@ -472,7 +515,15 @@ pub async fn ignore_tenant( conf: &'static PageServerConf, tenant_id: TenantId, ) -> Result<(), TenantStateError> { - remove_tenant_from_memory(tenant_id, async { + ignore_tenant0(conf, &TENANTS, tenant_id).await +} + +async fn ignore_tenant0( + conf: &'static PageServerConf, + tenants: &tokio::sync::RwLock, + tenant_id: TenantId, +) -> Result<(), TenantStateError> { + remove_tenant_from_memory(tenants, tenant_id, async { let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id); fs::File::create(&ignore_mark_file) .await @@ -597,18 +648,21 @@ where /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal /// operation would be needed to remove it. async fn remove_tenant_from_memory( + tenants: &tokio::sync::RwLock, tenant_id: TenantId, tenant_cleanup: F, ) -> Result where F: std::future::Future>, { + use utils::completion; + // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races. // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal. // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to // avoid holding the lock for the entire process. let tenant = { - TENANTS + tenants .write() .await .get(&tenant_id) @@ -616,14 +670,20 @@ where .ok_or(TenantStateError::NotFound(tenant_id))? }; + // allow pageserver shutdown to await for our completion + let (_guard, progress) = completion::channel(); + + // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so // that we can continue safely to cleanup. - match tenant.shutdown(freeze_and_flush).await { + match tenant.shutdown(progress, freeze_and_flush).await { Ok(()) => {} - Err(super::ShutdownError::AlreadyStopping) => { - return Err(TenantStateError::IsStopping(tenant_id)) + Err(_other) => { + // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to + // wait for it but return an error right away because these are distinct requests. + return Err(TenantStateError::IsStopping(tenant_id)); } } @@ -632,14 +692,14 @@ where .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) { Ok(hook_value) => { - let mut tenants_accessor = TENANTS.write().await; + let mut tenants_accessor = tenants.write().await; if tenants_accessor.remove(&tenant_id).is_none() { warn!("Tenant {tenant_id} got removed from memory before operation finished"); } Ok(hook_value) } Err(e) => { - let tenants_accessor = TENANTS.read().await; + let tenants_accessor = tenants.read().await; match tenants_accessor.get(&tenant_id) { Some(tenant) => { tenant.set_broken(e.to_string()).await; @@ -756,3 +816,109 @@ pub async fn immediate_compact( Ok(wait_task_done) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use tracing::{info_span, Instrument}; + + use super::{super::harness::TenantHarness, TenantsMap}; + + #[tokio::test(start_paused = true)] + async fn shutdown_joins_remove_tenant_from_memory() { + // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make + // sure `shutdown_all_tenants0` per-tenant processing joins in any active + // remove_tenant_from_memory calls, which is enforced by making the operation last until + // we've ran `shutdown_all_tenants0` for a long time. + + let (t, _ctx) = TenantHarness::create("shutdown_joins_detach") + .unwrap() + .load() + .await; + + // harness loads it to active, which is forced and nothing is running on the tenant + + let id = t.tenant_id(); + + // tenant harness configures the logging and we cannot escape it + let _e = info_span!("testing", tenant_id = %id).entered(); + + let tenants = HashMap::from([(id, t.clone())]); + let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants))); + + let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel(); + let (until_cleanup_started, cleanup_started) = utils::completion::channel(); + + // start a "detaching operation", which will take a while, until can_complete_cleanup + let cleanup_task = { + let jh = tokio::spawn({ + let tenants = tenants.clone(); + async move { + let cleanup = async move { + drop(until_cleanup_started); + can_complete_cleanup.wait().await; + anyhow::Ok(()) + }; + super::remove_tenant_from_memory(&tenants, id, cleanup).await + } + .instrument(info_span!("foobar", tenant_id = %id)) + }); + + // now the long cleanup should be in place, with the stopping state + cleanup_started.wait().await; + jh + }; + + let mut cleanup_progress = std::pin::pin!(t + .shutdown(utils::completion::Barrier::default(), false) + .await + .unwrap_err() + .wait()); + + let mut shutdown_task = { + let (until_shutdown_started, shutdown_started) = utils::completion::channel(); + + let shutdown_task = tokio::spawn(async move { + drop(until_shutdown_started); + super::shutdown_all_tenants0(&tenants).await; + }); + + shutdown_started.wait().await; + shutdown_task + }; + + // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always + // get to complete within timeout and fail the test. it is expected to continue awaiting + // until completion or SIGKILL during normal shutdown. + // + // the timeout is long to cover anything that shutdown_task could be doing, but it is + // handled instantly because we use tokio's time pausing in this test. 100s is much more than + // what we get from systemd on shutdown (10s). + let long_time = std::time::Duration::from_secs(100); + tokio::select! { + _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"), + _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"), + _ = tokio::time::sleep(long_time) => {}, + } + + // allow the remove_tenant_from_memory and thus eventually the shutdown to continue + drop(until_cleanup_completed); + + let (je, ()) = tokio::join!(shutdown_task, cleanup_progress); + je.expect("Tenant::shutdown shutdown not have panicked"); + cleanup_task + .await + .expect("no panicking") + .expect("remove_tenant_from_memory failed"); + + futures::future::poll_immediate( + t.shutdown(utils::completion::Barrier::default(), false) + .await + .unwrap_err() + .wait(), + ) + .await + .expect("the stopping progress must still be complete"); + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 63a65d3889..3f47ef062f 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -46,7 +46,7 @@ scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive"] } serde_json = { version = "1", features = ["raw_value"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } -tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] } +tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } tokio-rustls = { version = "0.23" } tokio-util = { version = "0.7", features = ["codec", "io"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }