From eb3a8be9334533a57b5b337e3c040d6bb1ae57b4 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 10 May 2023 11:27:12 +0300 Subject: [PATCH] keep track of timeline deletion status in IndexPart to prevent timeline resurrection (#3919) Before this patch, the following sequence would lead to the resurrection of a deleted timeline: - create timeline - wait for its index part to reach s3 - delete timeline - wait an arbitrary amount of time, including 0 seconds - detach tenant - attach tenant - the timeline is there and Active again This happens because we only kept track of the deletion in the tenant dir (by deleting the timeline dir) but not in S3. The solution is to turn the deleted timeline's IndexPart into a tombstone. The deletion status of the timeline is expressed in the `deleted_at: Option` field of IndexPart. It's `None` while the timeline is alive and `Some(deletion time stamp)` if it is deleted. We change the timeline deletion handler to upload this tombstoned IndexPart. The handler does not return success if the upload fails. Coincidentally, this fixes the long-stanging TODO about the `std::fs::remove_dir_all` being not atomic. It need not be atomic anymore because we set the `deleted_at=Some()` before starting the `remove_dir_all`. The tombstone is in the IndexPart only, not in the `metadata`. So, we only have the tombstone and the `remove_dir_all` benefits mentioned above if remote storage is configured. This was a conscious trade-off because there's no good format evolution story for the current metadata file format. The introduction of this additional step into `delete_timeline` was painful because delete_timeline needs to be 1. cancel-safe 2. idempotent 3. safe to call concurrently These are mostly self-inflicted limitations that can be avoided by using request-coalescing. PR https://github.com/neondatabase/neon/pull/4159 will do that. fixes https://github.com/neondatabase/neon/issues/3560 refs https://github.com/neondatabase/neon/issues/3889 (part of tenant relocation) Co-authored-by: Joonas Koivunen Co-authored-by: Christian Schwarz --- libs/remote_storage/src/local_fs.rs | 9 + pageserver/src/tenant.rs | 169 ++++++-- pageserver/src/tenant/metadata.rs | 3 +- .../src/tenant/remote_timeline_client.rs | 210 +++++++++- .../tenant/remote_timeline_client/index.rs | 11 +- .../tenant/remote_timeline_client/upload.rs | 4 + pageserver/src/tenant/upload_queue.rs | 19 +- test_runner/fixtures/pageserver/http.py | 4 +- test_runner/fixtures/pageserver/utils.py | 4 +- test_runner/regress/test_import.py | 2 + test_runner/regress/test_timeline_delete.py | 385 +++++++++++++++++- 11 files changed, 746 insertions(+), 74 deletions(-) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index d7b46731cd..c081a6d361 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -128,6 +128,15 @@ impl RemoteStorage for LocalFs { // We need this dance with sort of durable rename (without fsyncs) // to prevent partial uploads. This was really hit when pageserver shutdown // cancelled the upload and partial file was left on the fs + // NOTE: Because temp file suffix always the same this operation is racy. + // Two concurrent operations can lead to the following sequence: + // T1: write(temp) + // T2: write(temp) -> overwrites the content + // T1: rename(temp, dst) -> succeeds + // T2: rename(temp, dst) -> fails, temp no longet exists + // This can be solved by supplying unique temp suffix every time, but this situation + // is not normal in the first place, the error can help (and helped at least once) + // to discover bugs in upper level synchronization. let temp_file_path = path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX); let mut destination = io::BufWriter::new( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d69d5e4b45..1c6006493e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -58,6 +58,8 @@ use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::metadata::load_metadata; use crate::tenant::remote_timeline_client::index::IndexPart; +use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart; +use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::tenant::storage_layer::Layer; @@ -698,16 +700,9 @@ impl Tenant { .await .context("download index file")?; - let remote_metadata = index_part.parse_metadata().context("parse metadata")?; - debug!("finished index part download"); - Result::<_, anyhow::Error>::Ok(( - timeline_id, - client, - index_part, - remote_metadata, - )) + Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part)) } .map(move |res| { res.with_context(|| format!("download index part for timeline {timeline_id}")) @@ -716,17 +711,26 @@ impl Tenant { ); } // Wait for all the download tasks to complete & collect results. - let mut remote_clients = HashMap::new(); - let mut index_parts = HashMap::new(); + let mut remote_index_and_client = HashMap::new(); let mut timeline_ancestors = HashMap::new(); while let Some(result) = part_downloads.join_next().await { // NB: we already added timeline_id as context to the error let result: Result<_, anyhow::Error> = result.context("joinset task join")?; - let (timeline_id, client, index_part, remote_metadata) = result?; + let (timeline_id, client, index_part) = result?; debug!("successfully downloaded index part for timeline {timeline_id}"); - timeline_ancestors.insert(timeline_id, remote_metadata); - index_parts.insert(timeline_id, index_part); - remote_clients.insert(timeline_id, client); + match index_part { + MaybeDeletedIndexPart::IndexPart(index_part) => { + timeline_ancestors.insert( + timeline_id, + index_part.parse_metadata().context("parse_metadata")?, + ); + remote_index_and_client.insert(timeline_id, (index_part, client)); + } + MaybeDeletedIndexPart::Deleted => { + info!("timeline {} is deleted, skipping", timeline_id); + continue; + } + } } // For every timeline, download the metadata file, scan the local directory, @@ -734,12 +738,16 @@ impl Tenant { // layer file. let sorted_timelines = tree_sort_timelines(timeline_ancestors)?; for (timeline_id, remote_metadata) in sorted_timelines { + let (index_part, remote_client) = remote_index_and_client + .remove(&timeline_id) + .expect("just put it in above"); + // TODO again handle early failure self.load_remote_timeline( timeline_id, - index_parts.remove(&timeline_id).unwrap(), + index_part, remote_metadata, - remote_clients.remove(&timeline_id).unwrap(), + remote_client, &ctx, ) .await @@ -1045,21 +1053,13 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip(self, local_metadata, ctx), fields(timeline_id=%timeline_id))] + #[instrument(skip_all, fields(timeline_id))] async fn load_local_timeline( &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, ctx: &RequestContext, ) -> anyhow::Result<()> { - let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { - let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) - .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; - Some(ancestor_timeline) - } else { - None - }; - let remote_client = self.remote_storage.as_ref().map(|remote_storage| { RemoteTimelineClient::new( remote_storage.clone(), @@ -1072,6 +1072,29 @@ impl Tenant { let remote_startup_data = match &remote_client { Some(remote_client) => match remote_client.download_index_file().await { Ok(index_part) => { + let index_part = match index_part { + MaybeDeletedIndexPart::IndexPart(index_part) => index_part, + MaybeDeletedIndexPart::Deleted => { + // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation. + // Example: + // start deletion operation + // finishes upload of index part + // pageserver crashes + // remote storage gets de-configured + // pageserver starts + // + // We don't really anticipate remote storage to be de-configured, so, for now, this is fine. + // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099. + info!("is_deleted is set on remote, resuming removal of local data originally done by timeline deletion handler"); + std::fs::remove_dir_all( + self.conf.timeline_path(&timeline_id, &self.tenant_id), + ) + .context("remove_dir_all")?; + + return Ok(()); + } + }; + let remote_metadata = index_part.parse_metadata().context("parse_metadata")?; Some(RemoteStartupData { index_part, @@ -1087,6 +1110,14 @@ impl Tenant { None => None, }; + let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { + let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) + .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; + Some(ancestor_timeline) + } else { + None + }; + self.timeline_init_and_sync( timeline_id, remote_client, @@ -1334,6 +1365,8 @@ impl Tenant { timeline_id: TimelineId, _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { + timeline::debug_assert_current_span_has_tenant_and_timeline_id(); + // Transition the timeline into TimelineState::Stopping. // This should prevent new operations from starting. let timeline = { @@ -1374,9 +1407,44 @@ impl Tenant { timeline.walreceiver.stop().await; debug!("wal receiver shutdown confirmed"); + // Prevent new uploads from starting. + if let Some(remote_client) = timeline.remote_client.as_ref() { + let res = remote_client.stop(); + match res { + Ok(()) => {} + Err(e) => match e { + remote_timeline_client::StopError::QueueUninitialized => { + // This case shouldn't happen currently because the + // load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart. + // That is, before we declare the Tenant as Active. + // But we only allow calls to delete_timeline on Active tenants. + return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs"))); + } + }, + } + } + + // Stop & wait for the remaining timeline tasks, including upload tasks. + // NB: This and other delete_timeline calls do not run as a task_mgr task, + // so, they are not affected by this shutdown_tasks() call. info!("waiting for timeline tasks to shutdown"); task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await; + // Mark timeline as deleted in S3 so we won't pick it up next time + // during attach or pageserver restart. + // See comment in persist_index_part_with_deleted_flag. + if let Some(remote_client) = timeline.remote_client.as_ref() { + match remote_client.persist_index_part_with_deleted_flag().await { + // If we (now, or already) marked it successfully as deleted, we can proceed + Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), + // Bail out otherwise + Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) + | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); + } + } + } + { // Grab the layer_removal_cs lock, and actually perform the deletion. // @@ -1400,19 +1468,54 @@ impl Tenant { // by the caller. let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); - // XXX make this atomic so that, if we crash-mid-way, the timeline won't be picked up - // with some layers missing. - std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { - format!( - "Failed to remove local timeline directory '{}'", - local_timeline_directory.display() - ) - })?; + + fail::fail_point!("timeline-delete-before-rm", |_| { + Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))? + }); + + // NB: This need not be atomic because the deleted flag in the IndexPart + // will be observed during tenant/timeline load. The deletion will be resumed there. + // + // For configurations without remote storage, we tolerate that we're not crash-safe here. + // The timeline may come up Active but with missing layer files, in such setups. + // See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720 + match std::fs::remove_dir_all(&local_timeline_directory) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // This can happen if we're called a second time, e.g., + // because of a previous failure/cancellation at/after + // failpoint timeline-delete-after-rm. + // + // It can also happen if we race with tenant detach, because, + // it doesn't grab the layer_removal_cs lock. + // + // For now, log and continue. + // warn! level is technically not appropriate for the + // first case because we should expect retries to happen. + // But the error is so rare, it seems better to get attention if it happens. + let tenant_state = self.current_state(); + warn!( + timeline_dir=?local_timeline_directory, + ?tenant_state, + "timeline directory not found, proceeding anyway" + ); + // continue with the rest of the deletion + } + res => res.with_context(|| { + format!( + "Failed to remove local timeline directory '{}'", + local_timeline_directory.display() + ) + })?, + } info!("finished deleting layer files, releasing layer_removal_cs.lock()"); drop(layer_removal_guard); } + fail::fail_point!("timeline-delete-after-rm", |_| { + Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? + }); + // Remove the timeline from the map. let mut timelines = self.timelines.lock().unwrap(); let children_exist = timelines diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 297cccbe30..1ea61fa26b 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -12,6 +12,7 @@ use std::io::Write; use anyhow::{bail, ensure, Context}; use serde::{Deserialize, Serialize}; use tracing::info_span; +use utils::bin_ser::SerializeError; use utils::{ bin_ser::BeSer, id::{TenantId, TimelineId}, @@ -182,7 +183,7 @@ impl TimelineMetadata { } } - pub fn to_bytes(&self) -> anyhow::Result> { + pub fn to_bytes(&self) -> Result, SerializeError> { let body_bytes = self.body.ser()?; let metadata_size = METADATA_HDR_SIZE + body_bytes.len(); let hdr = TimelineMetadataHeader { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c42824a8b5..96aabd7945 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -204,8 +204,11 @@ mod download; pub mod index; mod upload; +use anyhow::Context; +use chrono::{NaiveDateTime, Utc}; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; +use scopeguard::ScopeGuard; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -213,7 +216,7 @@ use std::sync::{Arc, Mutex}; use remote_storage::{DownloadError, GenericRemoteStorage}; use std::ops::DerefMut; use tokio::runtime::Runtime; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use tracing::{info_span, Instrument}; use utils::lsn::Lsn; @@ -240,6 +243,7 @@ use utils::id::{TenantId, TimelineId}; use self::index::IndexPart; use super::storage_layer::LayerFileName; +use super::upload_queue::SetDeletedFlagProgress; // Occasional network issues and such can cause remote operations to fail, and // that's expected. If a download fails, we log it at info-level, and retry. @@ -253,6 +257,30 @@ const FAILED_DOWNLOAD_RETRIES: u32 = 10; // retries. Uploads and deletions are retried forever, though. const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3; +pub enum MaybeDeletedIndexPart { + IndexPart(IndexPart), + Deleted, +} + +/// Errors that can arise when calling [`RemoteTimelineClient::stop`]. +#[derive(Debug, thiserror::Error)] +pub enum StopError { + /// Returned if the upload queue was never initialized. + /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`]. + #[error("queue is not initialized")] + QueueUninitialized, +} + +#[derive(Debug, thiserror::Error)] +pub enum PersistIndexPartWithDeletedFlagError { + #[error("another task is already setting the deleted_flag, started at {0:?}")] + AlreadyInProgress(NaiveDateTime), + #[error("the deleted_flag was already set, value is {0:?}")] + AlreadyDeleted(NaiveDateTime), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// A client for accessing a timeline's data in remote storage. /// /// This takes care of managing the number of connections, and balancing them @@ -367,7 +395,7 @@ impl RemoteTimelineClient { // /// Download index file - pub async fn download_index_file(&self) -> Result { + pub async fn download_index_file(&self) -> Result { let _unfinished_gauge_guard = self.metrics.call_begin( &RemoteOpFileKind::Index, &RemoteOpKind::Download, @@ -376,7 +404,7 @@ impl RemoteTimelineClient { }, ); - download::download_index_part( + let index_part = download::download_index_part( self.conf, &self.storage_impl, self.tenant_id, @@ -389,7 +417,13 @@ impl RemoteTimelineClient { RemoteOpKind::Download, Arc::clone(&self.metrics), ) - .await + .await?; + + if index_part.deleted_at.is_some() { + Ok(MaybeDeletedIndexPart::Deleted) + } else { + Ok(MaybeDeletedIndexPart::IndexPart(index_part)) + } } /// Download a (layer) file from `path`, into local filesystem. @@ -624,6 +658,116 @@ impl RemoteTimelineClient { Ok(()) } + /// Set the deleted_at field in the remote index file. + /// + /// This fails if the upload queue has not been `stop()`ed. + /// + /// The caller is responsible for calling `stop()` AND for waiting + /// for any ongoing upload tasks to finish after `stop()` has succeeded. + /// Check method [`RemoteTimelineClient::stop`] for details. + pub(crate) async fn persist_index_part_with_deleted_flag( + self: &Arc, + ) -> Result<(), PersistIndexPartWithDeletedFlagError> { + let index_part_with_deleted_at = { + let mut locked = self.upload_queue.lock().unwrap(); + + // We must be in stopped state because otherwise + // we can have inprogress index part upload that can overwrite the file + // with missing is_deleted flag that we going to set below + let stopped = match &mut *locked { + UploadQueue::Uninitialized => { + return Err(anyhow::anyhow!("is not Stopped but Uninitialized").into()) + } + UploadQueue::Initialized(_) => { + return Err(anyhow::anyhow!("is not Stopped but Initialized").into()) + } + UploadQueue::Stopped(stopped) => stopped, + }; + + match stopped.deleted_at { + SetDeletedFlagProgress::NotRunning => (), // proceed + SetDeletedFlagProgress::InProgress(at) => { + return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at)); + } + SetDeletedFlagProgress::Successful(at) => { + return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at)); + } + }; + let deleted_at = Utc::now().naive_utc(); + stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at); + + let mut index_part = IndexPart::new( + stopped.latest_files.clone(), + stopped.last_uploaded_consistent_lsn, + stopped + .latest_metadata + .to_bytes() + .context("serialize metadata")?, + ); + index_part.deleted_at = Some(deleted_at); + index_part + }; + + let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| { + let mut locked = self_clone.upload_queue.lock().unwrap(); + let stopped = match &mut *locked { + UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!( + "there's no way out of Stopping, and we checked it's Stopping above: {:?}", + locked.as_str(), + ), + UploadQueue::Stopped(stopped) => stopped, + }; + stopped.deleted_at = SetDeletedFlagProgress::NotRunning; + }); + + // Have a failpoint that can use the `pause` failpoint action. + // We don't want to block the executor thread, hence, spawn_blocking + await. + #[cfg(feature = "testing")] + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!( + "at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause" + ); + fail::fail_point!( + "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + ); + } + }) + .await + .expect("spawn_blocking"); + + upload::upload_index_part( + self.conf, + &self.storage_impl, + self.tenant_id, + self.timeline_id, + &index_part_with_deleted_at, + ) + .await?; + + // all good, disarm the guard and mark as success + ScopeGuard::into_inner(undo_deleted_at); + { + let mut locked = self.upload_queue.lock().unwrap(); + let stopped = match &mut *locked { + UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!( + "there's no way out of Stopping, and we checked it's Stopping above: {:?}", + locked.as_str(), + ), + UploadQueue::Stopped(stopped) => stopped, + }; + stopped.deleted_at = SetDeletedFlagProgress::Successful( + index_part_with_deleted_at + .deleted_at + .expect("we set it above"), + ); + } + + Ok(()) + } + /// /// Pick next tasks from the queue, and start as many of them as possible without violating /// the ordering constraints. @@ -741,8 +885,13 @@ impl RemoteTimelineClient { // upload finishes or times out soon enough. if task_mgr::is_shutdown_requested() { info!("upload task cancelled by shutdown request"); + match self.stop() { + Ok(()) => {} + Err(StopError::QueueUninitialized) => { + unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") + } + } self.calls_unfinished_metric_end(&task.op); - self.stop(); return; } @@ -946,32 +1095,48 @@ impl RemoteTimelineClient { self.metrics.call_end(&file_kind, &op_kind, track_bytes); } - fn stop(&self) { + /// Close the upload queue for new operations and cancel queued operations. + /// In-progress operations will still be running after this function returns. + /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))` + /// to wait for them to complete, after calling this function. + pub fn stop(&self) -> Result<(), StopError> { // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); - match &*guard { - UploadQueue::Uninitialized => panic!( - "callers are responsible for ensuring this is only called on initialized queue" - ), + match &mut *guard { + UploadQueue::Uninitialized => Err(StopError::QueueUninitialized), UploadQueue::Stopped(_) => { // nothing to do info!("another concurrent task already shut down the queue"); + Ok(()) } - UploadQueue::Initialized(qi) => { + UploadQueue::Initialized(UploadQueueInitialized { + latest_files, + latest_metadata, + last_uploaded_consistent_lsn, + .. + }) => { info!("shutting down upload queue"); // Replace the queue with the Stopped state, taking ownership of the old // Initialized queue. We will do some checks on it, and then drop it. let qi = { - let last_uploaded_consistent_lsn = qi.last_uploaded_consistent_lsn; - let upload_queue = std::mem::replace( - &mut *guard, - UploadQueue::Stopped(UploadQueueStopped { - last_uploaded_consistent_lsn, - }), - ); + // take or clone what we need + let latest_files = std::mem::take(latest_files); + let last_uploaded_consistent_lsn = *last_uploaded_consistent_lsn; + // this could be Copy + let latest_metadata = latest_metadata.clone(); + + let stopped = UploadQueueStopped { + latest_files, + last_uploaded_consistent_lsn, + latest_metadata, + deleted_at: SetDeletedFlagProgress::NotRunning, + }; + + let upload_queue = + std::mem::replace(&mut *guard, UploadQueue::Stopped(stopped)); if let UploadQueue::Initialized(qi) = upload_queue { qi } else { @@ -979,6 +1144,8 @@ impl RemoteTimelineClient { } }; + assert!(qi.latest_files.is_empty(), "do not use this anymore"); + // consistency check assert_eq!( qi.num_inprogress_layer_uploads @@ -1002,6 +1169,7 @@ impl RemoteTimelineClient { // We're done. drop(guard); + Ok(()) } } } @@ -1240,7 +1408,11 @@ mod tests { } // Download back the index.json, and check that the list of files is correct - let index_part = runtime.block_on(client.download_index_file())?; + let index_part = match runtime.block_on(client.download_index_file())? { + MaybeDeletedIndexPart::IndexPart(index_part) => index_part, + MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"), + }; + assert_file_list( &index_part.timeline_layers, &[ diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 9c84f8e977..7a06e57a6b 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -4,6 +4,7 @@ use std::collections::{HashMap, HashSet}; +use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -55,6 +56,10 @@ pub struct IndexPart { #[serde(default)] version: usize, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option, + /// Layer names, which are stored on the remote storage. /// /// Additional metadata can might exist in `layer_metadata`. @@ -78,7 +83,7 @@ impl IndexPart { /// used to understand later versions. /// /// Version is currently informative only. - const LATEST_VERSION: usize = 1; + const LATEST_VERSION: usize = 2; pub const FILE_NAME: &'static str = "index_part.json"; pub fn new( @@ -101,6 +106,7 @@ impl IndexPart { layer_metadata, disk_consistent_lsn, metadata_bytes, + deleted_at: None, } } @@ -156,6 +162,7 @@ mod tests { ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), + deleted_at: None, }; let part = serde_json::from_str::(example).unwrap(); @@ -192,6 +199,7 @@ mod tests { ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), + deleted_at: None, }; let part = serde_json::from_str::(example).unwrap(); @@ -236,6 +244,7 @@ mod tests { 0, 0, ] .to_vec(), + deleted_at: None, }; let empty_layers_parsed = serde_json::from_str::(empty_layers_json).unwrap(); diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 699121ccd9..b520bb4b0c 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -19,9 +19,12 @@ pub(super) async fn upload_index_part<'a>( timeline_id: TimelineId, index_part: &'a IndexPart, ) -> anyhow::Result<()> { + tracing::trace!("uploading new index part"); + fail_point!("before-upload-index", |_| { bail!("failpoint before-upload-index") }); + let index_part_bytes = serde_json::to_vec(&index_part) .context("Failed to serialize index part file into bytes")?; let index_part_size = index_part_bytes.len(); @@ -31,6 +34,7 @@ pub(super) async fn upload_index_part<'a>( .metadata_path(timeline_id, tenant_id) .with_file_name(IndexPart::FILE_NAME); let storage_path = conf.remote_path(&index_part_path)?; + storage .upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path) .await diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 08bc1f219d..8f5faff627 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use chrono::NaiveDateTime; use std::sync::Arc; use tracing::info; @@ -18,14 +19,14 @@ use utils::lsn::Lsn; // that many upload queues in a running pageserver, and most of them are initialized // anyway. #[allow(clippy::large_enum_variant)] -pub(crate) enum UploadQueue { +pub(super) enum UploadQueue { Uninitialized, Initialized(UploadQueueInitialized), Stopped(UploadQueueStopped), } impl UploadQueue { - fn as_str(&self) -> &'static str { + pub fn as_str(&self) -> &'static str { match self { UploadQueue::Uninitialized => "Uninitialized", UploadQueue::Initialized(_) => "Initialized", @@ -75,8 +76,18 @@ pub(crate) struct UploadQueueInitialized { pub(crate) queued_operations: VecDeque, } -pub(crate) struct UploadQueueStopped { - pub(crate) last_uploaded_consistent_lsn: Lsn, +#[derive(Clone, Copy)] +pub(super) enum SetDeletedFlagProgress { + NotRunning, + InProgress(NaiveDateTime), + Successful(NaiveDateTime), +} + +pub(super) struct UploadQueueStopped { + pub(super) latest_files: HashMap, + pub(super) last_uploaded_consistent_lsn: Lsn, + pub(super) latest_metadata: TimelineMetadata, + pub(super) deleted_at: SetDeletedFlagProgress, } impl UploadQueue { diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 0c4ed60c8d..388e834b56 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -314,9 +314,9 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId): + def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): res = self.delete( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs ) self.verbose_error(res) res_json = res.json() diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 7f8bb40bda..c558387413 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -87,7 +87,9 @@ def wait_until_tenant_state( time.sleep(period) - raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds") + raise Exception( + f"Tenant {tenant_id} did not become {expected_state} within {iterations * period} seconds" + ) def wait_until_tenant_active( diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 137ce457bc..77030288f0 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -143,6 +143,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build assert env.pageserver.log_contains( ".*WARN.*ignored .* unexpected bytes after the tar archive.*" ) + + # NOTE: delete can easily come before upload operations are completed client.timeline_delete(tenant, timeline) # Importing correct backup works diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index cf607f4f7b..7135b621cb 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -1,8 +1,26 @@ +import os +import queue +import shutil +import threading +from pathlib import Path + import pytest -from fixtures.neon_fixtures import NeonEnv +import requests +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + RemoteStorageKind, + available_remote_storages, +) from fixtures.pageserver.http import PageserverApiException -from fixtures.types import TenantId, TimelineId -from fixtures.utils import wait_until +from fixtures.pageserver.utils import ( + wait_for_last_record_lsn, + wait_for_upload, + wait_until_tenant_active, +) +from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import query_scalar, wait_until def test_timeline_delete(neon_simple_env: NeonEnv): @@ -39,23 +57,17 @@ def test_timeline_delete(neon_simple_env: NeonEnv): "test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent" ) - ps_http = env.pageserver.http_client() + timeline_path = ( + env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id) + ) + with pytest.raises( PageserverApiException, match="Cannot delete timeline which has child timelines" ) as exc: - timeline_path = ( - env.repo_dir - / "tenants" - / str(env.initial_tenant) - / "timelines" - / str(parent_timeline_id) - ) assert timeline_path.exists() ps_http.timeline_delete(env.initial_tenant, parent_timeline_id) - assert not timeline_path.exists() - assert exc.value.status_code == 400 timeline_path = ( @@ -87,3 +99,350 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ) assert exc.value.status_code == 404 + + # Check that we didn't pick up the timeline again after restart. + # See https://github.com/neondatabase/neon/issues/3560 + env.pageserver.stop(immediate=True) + env.pageserver.start() + + with pytest.raises( + PageserverApiException, + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", + ) as exc: + ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + + +# cover the two cases: remote storage configured vs not configured +@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS]) +def test_delete_timeline_post_rm_failure( + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind +): + """ + If there is a failure after removing the timeline directory, the delete operation + should be retryable. + """ + + if remote_storage_kind is not None: + neon_env_builder.enable_remote_storage( + remote_storage_kind, "test_delete_timeline_post_rm_failure" + ) + + env = neon_env_builder.init_start() + assert env.initial_timeline + + ps_http = env.pageserver.http_client() + + failpoint_name = "timeline-delete-after-rm" + ps_http.configure_failpoints((failpoint_name, "return")) + + with pytest.raises(PageserverApiException, match=f"failpoint: {failpoint_name}"): + ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) + + at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*" + env.pageserver.allowed_errors.append(at_failpoint_log_message) + env.pageserver.allowed_errors.append( + f".*DELETE.*{env.initial_timeline}.*InternalServerError.*{failpoint_name}" + ) + + # retry without failpoint, it should succeed + ps_http.configure_failpoints((failpoint_name, "off")) + + # this should succeed + ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) + # the second call will try to transition the timeline into Stopping state, but it's already in that state + env.pageserver.allowed_errors.append( + f".*{env.initial_timeline}.*Ignoring new state, equal to the existing one: Stopping" + ) + env.pageserver.allowed_errors.append( + f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" + ) + + +@pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +@pytest.mark.parametrize("fill_branch", [True, False]) +def test_timeline_resurrection_on_attach( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + fill_branch: bool, +): + """ + After deleting a timeline it should never appear again. + This test ensures that this invariant holds for detach+attach. + Original issue: https://github.com/neondatabase/neon/issues/3560 + """ + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_timeline_resurrection_on_attach", + ) + + ##### First start, insert data and upload it to the remote storage + env = neon_env_builder.init_start() + + ps_http = env.pageserver.http_client() + pg = env.endpoints.create_start("main") + + tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) + + with pg.cursor() as cur: + cur.execute("CREATE TABLE f (i integer);") + cur.execute("INSERT INTO f VALUES (generate_series(1,1000));") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(ps_http, tenant_id, main_timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + ps_http.timeline_checkpoint(tenant_id, main_timeline_id) + + # wait until pageserver successfully uploaded a checkpoint to remote storage + log.info("waiting for checkpoint upload") + wait_for_upload(ps_http, tenant_id, main_timeline_id, current_lsn) + log.info("upload of checkpoint is done") + + branch_timeline_id = env.neon_cli.create_branch("new", "main") + + # Two variants of this test: + # - In fill_branch=True, the deleted branch has layer files. + # - In fill_branch=False, it doesn't, it just has the metadata file. + # A broken implementation is conceivable that tries to "optimize" handling of empty branches, e.g., + # by skipping IndexPart uploads if the layer file set doesn't change. That would be wrong, catch those. + if fill_branch: + with env.endpoints.create_start("new") as new_pg: + with new_pg.cursor() as cur: + cur.execute("INSERT INTO f VALUES (generate_series(1,1000));") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + ps_http.timeline_checkpoint(tenant_id, branch_timeline_id) + + # wait until pageserver successfully uploaded a checkpoint to remote storage + log.info("waiting for checkpoint upload") + wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn) + log.info("upload of checkpoint is done") + else: + pass + + # delete new timeline + ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) + + ##### Stop the pageserver instance, erase all its data + env.endpoints.stop_all() + env.pageserver.stop() + + dir_to_clear = Path(env.repo_dir) / "tenants" + shutil.rmtree(dir_to_clear) + os.mkdir(dir_to_clear) + + ##### Second start, restore the data and ensure that we see only timeline that wasnt deleted + env.pageserver.start() + + ps_http.tenant_attach(tenant_id=tenant_id) + + wait_until_tenant_active(ps_http, tenant_id=tenant_id, iterations=10, period=0.5) + + timelines = ps_http.timeline_list(tenant_id=tenant_id) + assert {TimelineId(tl["timeline_id"]) for tl in timelines} == { + main_timeline_id + }, "the deleted timeline should not have been resurrected" + assert all([tl["state"] == "Active" for tl in timelines]) + + +def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder): + """ + When deleting a timeline, if we succeed in setting the deleted flag remotely + but fail to delete the local state, restarting the pageserver should resume + the deletion of the local state. + (Deletion of the state in S3 is not implemented yet.) + """ + + neon_env_builder.enable_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + test_name="test_timeline_delete_fail_before_local_delete", + ) + + env = neon_env_builder.init_start() + + env.pageserver.allowed_errors.append(".*failpoint: timeline-delete-before-rm") + env.pageserver.allowed_errors.append( + ".*Ignoring new state, equal to the existing one: Stopping" + ) + env.pageserver.allowed_errors.append( + ".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited" + ) + + ps_http = env.pageserver.http_client() + ps_http.configure_failpoints(("timeline-delete-before-rm", "return")) + + # construct pair of branches + intermediate_timeline_id = env.neon_cli.create_branch( + "test_timeline_delete_fail_before_local_delete" + ) + + leaf_timeline_id = env.neon_cli.create_branch( + "test_timeline_delete_fail_before_local_delete1", + "test_timeline_delete_fail_before_local_delete", + ) + + leaf_timeline_path = ( + env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) + ) + + with pytest.raises( + PageserverApiException, + match="failpoint: timeline-delete-before-rm", + ): + ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) + + assert leaf_timeline_path.exists(), "the failpoint didn't work" + + env.pageserver.stop() + env.pageserver.start() + + # Wait for tenant to finish loading. + wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5) + + assert ( + not leaf_timeline_path.exists() + ), "timeline load procedure should have resumed the deletion interrupted by the failpoint" + timelines = ps_http.timeline_list(env.initial_tenant) + assert {TimelineId(tl["timeline_id"]) for tl in timelines} == { + intermediate_timeline_id, + env.initial_timeline, + }, "other timelines should not have been affected" + assert all([tl["state"] == "Active" for tl in timelines]) + + +def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( + neon_env_builder: NeonEnvBuilder, +): + """ + If we're stuck uploading the index file with the is_delete flag, + eventually console will hand up and retry. + If we're still stuck at the retry time, ensure that the retry + fails with status 500, signalling to console that it should retry + later. + Ideally, timeline_delete should return 202 Accepted and require + console to poll for completion, but, that would require changing + the API contract. + """ + + neon_env_builder.enable_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload", + ) + + env = neon_env_builder.init_start() + + child_timeline_id = env.neon_cli.create_branch("child", "main") + + ps_http = env.pageserver.http_client() + + # make the first call sleep practically forever + failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + ps_http.configure_failpoints((failpoint_name, "pause")) + + def first_call(result_queue): + try: + log.info("first call start") + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10) + log.info("first call success") + result_queue.put("success") + except Exception: + log.exception("first call failed") + result_queue.put("failure, see log for stack trace") + + first_call_result: queue.Queue[str] = queue.Queue() + first_call_thread = threading.Thread(target=first_call, args=(first_call_result,)) + first_call_thread.start() + + try: + + def first_call_hit_failpoint(): + assert env.pageserver.log_contains( + f".*{child_timeline_id}.*at failpoint {failpoint_name}" + ) + + wait_until(50, 0.1, first_call_hit_failpoint) + + # make the second call and assert behavior + log.info("second call start") + error_msg_re = "another task is already setting the deleted_flag, started at" + with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err: + ps_http.timeline_delete(env.initial_tenant, child_timeline_id) + assert second_call_err.value.status_code == 500 + env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*") + # the second call will try to transition the timeline into Stopping state as well + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping" + ) + log.info("second call failed as expected") + + # by now we know that the second call failed, let's ensure the first call will finish + ps_http.configure_failpoints((failpoint_name, "off")) + + result = first_call_result.get() + assert result == "success" + + finally: + log.info("joining first call thread") + # in any case, make sure the lifetime of the thread is bounded to this test + first_call_thread.join() + + +def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): + """ + If the client hangs up before we start the index part upload but after we mark it + deleted in local memory, a subsequent delete_timeline call should be able to do + another delete timeline operation. + + This tests cancel safety up to the given failpoint. + """ + neon_env_builder.enable_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + test_name="test_delete_timeline_client_hangup", + ) + + env = neon_env_builder.init_start() + + child_timeline_id = env.neon_cli.create_branch("child", "main") + + ps_http = env.pageserver.http_client() + + failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + ps_http.configure_failpoints((failpoint_name, "pause")) + + with pytest.raises(requests.exceptions.Timeout): + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + + # make sure the timeout was due to the failpoint + at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" + + def hit_failpoint(): + assert env.pageserver.log_contains(at_failpoint_log_message) + + wait_until(50, 0.1, hit_failpoint) + + # we log this error if a client hangs up + # might as well use it as another indicator that the test works + hangup_log_message = f".*DELETE.*{child_timeline_id}.*request was dropped before completing" + env.pageserver.allowed_errors.append(hangup_log_message) + + def got_hangup_log_message(): + assert env.pageserver.log_contains(hangup_log_message) + + wait_until(50, 0.1, got_hangup_log_message) + + # ok, retry without failpoint, it should succeed + ps_http.configure_failpoints((failpoint_name, "off")) + + # this should succeed + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + # the second call will try to transition the timeline into Stopping state, but it's already in that state + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping" + )