diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index b732095f8f..897e1a7aad 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -264,7 +264,7 @@ jobs: export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev export REMOTE_STORAGE_S3_REGION=eu-central-1 # Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now - ${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test pagination_tests -- s3_pagination_should_work --exact + ${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3 - name: Install rust binaries run: | diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index ddce82324c..df5f5896a1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -152,7 +152,7 @@ pub enum ActivatingFrom { } /// A state of a timeline in pageserver's memory. -#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TimelineState { /// The timeline is recognized by the pageserver but is not yet operational. /// In particular, the walreceiver connection loop is not running for this timeline. @@ -165,7 +165,7 @@ pub enum TimelineState { /// It cannot transition back into any other state. Stopping, /// The timeline is broken and not operational (previous states: Loading or Active). - Broken, + Broken { reason: String, backtrace: String }, } #[serde_as] diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index c081a6d361..c73e647845 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -17,7 +17,7 @@ use tokio::{ io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; use tracing::*; -use utils::crashsafe::path_with_suffix_extension; +use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; use crate::{Download, DownloadError, RemotePath}; @@ -101,19 +101,35 @@ impl RemoteStorage for LocalFs { Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), None => Cow::Borrowed(&self.storage_root), }; - Ok(get_all_files(path.as_ref(), false) + + let prefixes_to_filter = get_all_files(path.as_ref(), false) .await - .map_err(DownloadError::Other)? - .into_iter() - .map(|path| { - path.strip_prefix(&self.storage_root) - .context("Failed to strip preifix") + .map_err(DownloadError::Other)?; + + let mut prefixes = Vec::with_capacity(prefixes_to_filter.len()); + + // filter out empty directories to mirror s3 behavior. + for prefix in prefixes_to_filter { + if prefix.is_dir() + && is_directory_empty(&prefix) + .await + .map_err(DownloadError::Other)? + { + continue; + } + + prefixes.push( + prefix + .strip_prefix(&self.storage_root) + .context("Failed to strip prefix") .and_then(RemotePath::new) .expect( "We list files for storage root, hence should be able to remote the prefix", - ) - }) - .collect()) + ), + ) + } + + Ok(prefixes) } async fn upload( @@ -291,11 +307,18 @@ impl RemoteStorage for LocalFs { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let file_path = path.with_base(&self.storage_root); - if file_path.exists() && file_path.is_file() { - Ok(fs::remove_file(file_path).await?) - } else { - bail!("File {file_path:?} either does not exist or is not a file") + if !file_path.exists() { + // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html + // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful. + return Ok(()); } + + if !file_path.is_file() { + anyhow::bail!("{file_path:?} is not a file"); + } + Ok(fs::remove_file(file_path) + .await + .map_err(|e| anyhow::anyhow!(e))?) } } @@ -320,7 +343,7 @@ where let file_type = dir_entry.file_type().await?; let entry_path = dir_entry.path(); if file_type.is_symlink() { - debug!("{entry_path:?} us a symlink, skipping") + debug!("{entry_path:?} is a symlink, skipping") } else if file_type.is_dir() { if recursive { paths.extend(get_all_files(&entry_path, true).await?.into_iter()) @@ -595,15 +618,11 @@ mod fs_tests { storage.delete(&upload_target).await?; assert!(storage.list().await?.is_empty()); - match storage.delete(&upload_target).await { - Ok(()) => panic!("Should not allow deleting non-existing storage files"), - Err(e) => { - let error_string = e.to_string(); - assert!(error_string.contains("does not exist")); - let expected_path = upload_target.with_base(&storage.storage_root); - assert!(error_string.contains(expected_path.to_str().unwrap())); - } - } + storage + .delete(&upload_target) + .await + .expect("Should allow deleting non-existing storage files"); + Ok(()) } diff --git a/libs/remote_storage/tests/pagination_tests.rs b/libs/remote_storage/tests/test_real_s3.rs similarity index 79% rename from libs/remote_storage/tests/pagination_tests.rs rename to libs/remote_storage/tests/test_real_s3.rs index 86a6888f98..48ed8f686c 100644 --- a/libs/remote_storage/tests/pagination_tests.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::UNIX_EPOCH; use anyhow::Context; +use once_cell::sync::OnceCell; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; @@ -14,8 +15,12 @@ use test_context::{test_context, AsyncTestContext}; use tokio::task::JoinSet; use tracing::{debug, error, info}; +static LOGGING_DONE: OnceCell<()> = OnceCell::new(); + const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; +const BASE_PREFIX: &str = "test/"; + /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries. /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. /// See the client creation in [`create_s3_client`] for details on the required env vars. @@ -38,20 +43,20 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_ /// /// Lastly, the test attempts to clean up and remove all uploaded S3 files. /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. -#[test_context(MaybeEnabledS3)] +#[test_context(MaybeEnabledS3WithTestBlobs)] #[tokio::test] -async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { +async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> { let ctx = match ctx { - MaybeEnabledS3::Enabled(ctx) => ctx, - MaybeEnabledS3::Disabled => return Ok(()), - MaybeEnabledS3::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"), + MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()), + MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"), }; - let test_client = Arc::clone(&ctx.client_with_excessive_pagination); + let test_client = Arc::clone(&ctx.enabled.client); let expected_remote_prefixes = ctx.remote_prefixes.clone(); - let base_prefix = - RemotePath::new(Path::new(ctx.base_prefix_str)).context("common_prefix construction")?; + let base_prefix = RemotePath::new(Path::new(ctx.enabled.base_prefix)) + .context("common_prefix construction")?; let root_remote_prefixes = test_client .list_prefixes(None) .await @@ -83,27 +88,91 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<( Ok(()) } +#[test_context(MaybeEnabledS3)] +#[tokio::test] +async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3::Enabled(ctx) => ctx, + MaybeEnabledS3::Disabled => return Ok(()), + }; + + let path = RemotePath::new(&PathBuf::from(format!( + "{}/for_sure_there_is_nothing_there_really", + ctx.base_prefix, + ))) + .with_context(|| "RemotePath conversion")?; + + ctx.client.delete(&path).await.expect("should succeed"); + + Ok(()) +} + +fn ensure_logging_ready() { + LOGGING_DONE.get_or_init(|| { + utils::logging::init( + utils::logging::LogFormat::Test, + utils::logging::TracingErrorLayerEnablement::Disabled, + ) + .expect("logging init failed"); + }); +} + +struct EnabledS3 { + client: Arc, + base_prefix: &'static str, +} + +impl EnabledS3 { + async fn setup(max_keys_in_list_response: Option) -> Self { + let client = create_s3_client(max_keys_in_list_response) + .context("S3 client creation") + .expect("S3 client creation failed"); + + EnabledS3 { + client, + base_prefix: BASE_PREFIX, + } + } +} + enum MaybeEnabledS3 { + Enabled(EnabledS3), + Disabled, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledS3 { + async fn setup() -> Self { + ensure_logging_ready(); + + if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + Self::Enabled(EnabledS3::setup(None).await) + } +} + +enum MaybeEnabledS3WithTestBlobs { Enabled(S3WithTestBlobs), Disabled, UploadsFailed(anyhow::Error, S3WithTestBlobs), } struct S3WithTestBlobs { - client_with_excessive_pagination: Arc, - base_prefix_str: &'static str, + enabled: EnabledS3, remote_prefixes: HashSet, remote_blobs: HashSet, } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledS3 { +impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { async fn setup() -> Self { - utils::logging::init( - utils::logging::LogFormat::Test, - utils::logging::TracingErrorLayerEnablement::Disabled, - ) - .expect("logging init failed"); + ensure_logging_ready(); if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { info!( "`{}` env variable is not set, skipping the test", @@ -115,23 +184,14 @@ impl AsyncTestContext for MaybeEnabledS3 { let max_keys_in_list_response = 10; let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); - let client_with_excessive_pagination = create_s3_client(max_keys_in_list_response) - .context("S3 client creation") - .expect("S3 client creation failed"); + let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; - let base_prefix_str = "test/"; - match upload_s3_data( - &client_with_excessive_pagination, - base_prefix_str, - upload_tasks_count, - ) - .await - { + match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { ControlFlow::Continue(uploads) => { info!("Remote objects created successfully"); + Self::Enabled(S3WithTestBlobs { - client_with_excessive_pagination, - base_prefix_str, + enabled, remote_prefixes: uploads.prefixes, remote_blobs: uploads.blobs, }) @@ -139,8 +199,7 @@ impl AsyncTestContext for MaybeEnabledS3 { ControlFlow::Break(uploads) => Self::UploadsFailed( anyhow::anyhow!("One or multiple blobs failed to upload to S3"), S3WithTestBlobs { - client_with_excessive_pagination, - base_prefix_str, + enabled, remote_prefixes: uploads.prefixes, remote_blobs: uploads.blobs, }, @@ -152,13 +211,15 @@ impl AsyncTestContext for MaybeEnabledS3 { match self { Self::Disabled => {} Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { - cleanup(&ctx.client_with_excessive_pagination, ctx.remote_blobs).await; + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; } } } } -fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result> { +fn create_s3_client( + max_keys_per_list_response: Option, +) -> anyhow::Result> { let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET") .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?; let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION") @@ -176,7 +237,7 @@ fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result io::Result; @@ -15,10 +17,19 @@ where } } +pub async fn is_directory_empty(path: impl AsRef) -> anyhow::Result { + let mut dir = tokio::fs::read_dir(&path) + .await + .context(format!("read_dir({})", path.as_ref().display()))?; + Ok(dir.next_entry().await?.is_none()) +} + #[cfg(test)] mod test { use std::path::PathBuf; + use crate::fs_ext::is_directory_empty; + #[test] fn is_empty_dir() { use super::PathExt; @@ -42,4 +53,26 @@ mod test { std::fs::remove_file(&file_path).unwrap(); assert!(file_path.is_empty_dir().is_err()); } + + #[tokio::test] + async fn is_empty_dir_async() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path(); + + // test positive case + assert!( + is_directory_empty(dir_path).await.expect("test failure"), + "new tempdir should be empty" + ); + + // invoke on a file to ensure it returns an error + let file_path: PathBuf = dir_path.join("testfile"); + let f = std::fs::File::create(&file_path).unwrap(); + drop(f); + assert!(is_directory_empty(&file_path).await.is_err()); + + // do it again on a path, we know to be nonexistent + std::fs::remove_file(&file_path).unwrap(); + assert!(is_directory_empty(file_path).await.is_err()); + } } diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index 4eff16b6a3..f9c06453df 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -21,7 +21,7 @@ pub enum ApiError { Conflict(String), #[error("Precondition failed: {0}")] - PreconditionFailed(&'static str), + PreconditionFailed(Box), #[error(transparent)] InternalServerError(anyhow::Error), diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 40e4f035ae..50614653be 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -215,7 +215,7 @@ paths: schema: $ref: "#/components/schemas/NotFoundError" "412": - description: Tenant is missing + description: Tenant is missing, or timeline has children content: application/json: schema: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 22dedbe5b2..ac230e5f4a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -183,9 +183,10 @@ impl From for ApiError { use crate::tenant::DeleteTimelineError::*; match value { NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")), - HasChildren => ApiError::BadRequest(anyhow::anyhow!( - "Cannot delete timeline which has child timelines" - )), + HasChildren(children) => ApiError::PreconditionFailed( + format!("Cannot delete timeline which has child timelines: {children:?}") + .into_boxed_str(), + ), Other(e) => ApiError::InternalServerError(e), } } @@ -197,9 +198,9 @@ impl From for ApiError { match value { // Report Precondition failed so client can distinguish between // "tenant is missing" case from "timeline is missing" - Tenant(GetTenantError::NotFound(..)) => { - ApiError::PreconditionFailed("Requested tenant is missing") - } + Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed( + "Requested tenant is missing".to_owned().into_boxed_str(), + ), Tenant(t) => ApiError::from(t), Timeline(t) => ApiError::from(t), } @@ -494,7 +495,8 @@ async fn timeline_delete_handler( .instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id)) .await?; - json_response(StatusCode::OK, ()) + // FIXME: needs to be an error for console to retry it. Ideally Accepted should be used and retried until 404. + json_response(StatusCode::ACCEPTED, ()) } async fn tenant_detach_handler( diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 4df0e4e6f2..d8db12a113 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -257,6 +257,9 @@ pub enum TaskKind { // task that handles attaching a tenant Attach, + // Used mostly for background deletion from s3 + TimelineDeletionWorker, + // task that handhes metrics collection MetricsCollection, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 29086cae86..d23f1cb96f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -18,6 +18,7 @@ use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use storage_broker::BrokerClientChannel; use tokio::sync::watch; +use tokio::sync::OwnedMutexGuard; use tokio::task::JoinSet; use tracing::*; use utils::completion; @@ -444,7 +445,7 @@ pub enum DeleteTimelineError { #[error("NotFound")] NotFound, #[error("HasChildren")] - HasChildren, + HasChildren(Vec), #[error(transparent)] Other(#[from] anyhow::Error), } @@ -568,7 +569,7 @@ impl Tenant { .with_context(|| { format!("creating broken timeline data for {tenant_id}/{timeline_id}") })?; - broken_timeline.set_state(TimelineState::Broken); + broken_timeline.set_broken(e.to_string()); timelines_accessor.insert(timeline_id, broken_timeline); return Err(e); } @@ -763,7 +764,7 @@ impl Tenant { ); remote_index_and_client.insert(timeline_id, (index_part, client)); } - MaybeDeletedIndexPart::Deleted => { + MaybeDeletedIndexPart::Deleted(_) => { info!("timeline {} is deleted, skipping", timeline_id); continue; } @@ -1113,9 +1114,9 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip_all, fields(timeline_id))] + #[instrument(skip(self, local_metadata, init_order, ctx))] async fn load_local_timeline( - &self, + self: &Arc, timeline_id: TimelineId, local_metadata: TimelineMetadata, init_order: Option<&InitializationOrder>, @@ -1132,12 +1133,20 @@ impl Tenant { ) }); - let remote_startup_data = match &remote_client { + let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { + let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) + .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; + Some(ancestor_timeline) + } else { + None + }; + + let (remote_startup_data, remote_client) = match remote_client { Some(remote_client) => match remote_client.download_index_file().await { Ok(index_part) => { let index_part = match index_part { MaybeDeletedIndexPart::IndexPart(index_part) => index_part, - MaybeDeletedIndexPart::Deleted => { + MaybeDeletedIndexPart::Deleted(index_part) => { // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation. // Example: // start deletion operation @@ -1148,37 +1157,59 @@ impl Tenant { // // We don't really anticipate remote storage to be de-configured, so, for now, this is fine. // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099. - info!("is_deleted is set on remote, resuming removal of local data originally done by timeline deletion handler"); - std::fs::remove_dir_all( - self.conf.timeline_path(&timeline_id, &self.tenant_id), - ) - .context("remove_dir_all")?; + info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler"); + + remote_client + .init_upload_queue_stopped_to_continue_deletion(&index_part)?; + + let timeline = self + .create_timeline_data( + timeline_id, + &local_metadata, + ancestor, + Some(remote_client), + init_order, + ) + .context("create_timeline_data")?; + + let guard = Arc::clone(&timeline.delete_lock).lock_owned().await; + + // Note: here we even skip populating layer map. Timeline is essentially uninitialized. + // RemoteTimelineClient is the only functioning part. + timeline.set_state(TimelineState::Stopping); + // We meed to do this because when console retries delete request we shouldnt answer with 404 + // because 404 means successful deletion. + // FIXME consider TimelineState::Deleting. + let mut locked = self.timelines.lock().unwrap(); + locked.insert(timeline_id, Arc::clone(&timeline)); + + Tenant::schedule_delete_timeline( + Arc::clone(self), + timeline_id, + timeline, + guard, + ); return Ok(()); } }; let remote_metadata = index_part.parse_metadata().context("parse_metadata")?; - Some(RemoteStartupData { - index_part, - remote_metadata, - }) + ( + Some(RemoteStartupData { + index_part, + remote_metadata, + }), + Some(remote_client), + ) } Err(DownloadError::NotFound) => { info!("no index file was found on the remote"); - None + (None, Some(remote_client)) } Err(e) => return Err(anyhow::anyhow!(e)), }, - None => None, - }; - - let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { - let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) - .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; - Some(ancestor_timeline) - } else { - None + None => (None, remote_client), }; self.timeline_init_and_sync( @@ -1511,13 +1542,118 @@ impl Tenant { } /// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its - /// data from disk. - /// - /// This doesn't currently delete all data from S3, but sets a flag in its - /// index_part.json file to mark it as deleted. - pub async fn delete_timeline( + /// data from both disk and s3. + async fn delete_timeline( &self, timeline_id: TimelineId, + timeline: Arc, + ) -> anyhow::Result<()> { + { + // Grab the layer_removal_cs lock, and actually perform the deletion. + // + // This lock prevents prevents GC or compaction from running at the same time. + // The GC task doesn't register itself with the timeline it's operating on, + // so it might still be running even though we called `shutdown_tasks`. + // + // Note that there are still other race conditions between + // GC, compaction and timeline deletion. See + // https://github.com/neondatabase/neon/issues/2671 + // + // No timeout here, GC & Compaction should be responsive to the + // `TimelineState::Stopping` change. + info!("waiting for layer_removal_cs.lock()"); + let layer_removal_guard = timeline.layer_removal_cs.lock().await; + info!("got layer_removal_cs.lock(), deleting layer files"); + + // NB: storage_sync upload tasks that reference these layers have been cancelled + // by the caller. + + let local_timeline_directory = self + .conf + .timeline_path(&timeline.timeline_id, &self.tenant_id); + + fail::fail_point!("timeline-delete-before-rm", |_| { + Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))? + }); + + // NB: This need not be atomic because the deleted flag in the IndexPart + // will be observed during tenant/timeline load. The deletion will be resumed there. + // + // For configurations without remote storage, we tolerate that we're not crash-safe here. + // The timeline may come up Active but with missing layer files, in such setups. + // See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720 + match std::fs::remove_dir_all(&local_timeline_directory) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // This can happen if we're called a second time, e.g., + // because of a previous failure/cancellation at/after + // failpoint timeline-delete-after-rm. + // + // It can also happen if we race with tenant detach, because, + // it doesn't grab the layer_removal_cs lock. + // + // For now, log and continue. + // warn! level is technically not appropriate for the + // first case because we should expect retries to happen. + // But the error is so rare, it seems better to get attention if it happens. + let tenant_state = self.current_state(); + warn!( + timeline_dir=?local_timeline_directory, + ?tenant_state, + "timeline directory not found, proceeding anyway" + ); + // continue with the rest of the deletion + } + res => res.with_context(|| { + format!( + "Failed to remove local timeline directory '{}'", + local_timeline_directory.display() + ) + })?, + } + + info!("finished deleting layer files, releasing layer_removal_cs.lock()"); + drop(layer_removal_guard); + } + + fail::fail_point!("timeline-delete-after-rm", |_| { + Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? + }); + + { + // Remove the timeline from the map. + let mut timelines = self.timelines.lock().unwrap(); + let children_exist = timelines + .iter() + .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); + // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`. + // We already deleted the layer files, so it's probably best to panic. + // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart) + if children_exist { + panic!("Timeline grew children while we removed layer files"); + } + + timelines.remove(&timeline_id).expect( + "timeline that we were deleting was concurrently removed from 'timelines' map", + ); + + drop(timelines); + } + + let remote_client = match &timeline.remote_client { + Some(remote_client) => remote_client, + None => return Ok(()), + }; + + remote_client.delete_all().await?; + + Ok(()) + } + + /// Removes timeline-related in-memory data and schedules removal from remote storage. + #[instrument(skip(self, _ctx))] + pub async fn prepare_and_schedule_delete_timeline( + self: Arc, + timeline_id: TimelineId, _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { timeline::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -1527,18 +1663,25 @@ impl Tenant { // // Also grab the Timeline's delete_lock to prevent another deletion from starting. let timeline; - let mut delete_lock_guard; + let delete_lock_guard; { let mut timelines = self.timelines.lock().unwrap(); // Ensure that there are no child timelines **attached to that pageserver**, // because detach removes files, which will break child branches - let children_exist = timelines + let children: Vec = timelines .iter() - .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); + .filter_map(|(id, entry)| { + if entry.get_ancestor_timeline_id() == Some(timeline_id) { + Some(*id) + } else { + None + } + }) + .collect(); - if children_exist { - return Err(DeleteTimelineError::HasChildren); + if !children.is_empty() { + return Err(DeleteTimelineError::HasChildren(children)); } let timeline_entry = match timelines.entry(timeline_id) { @@ -1553,11 +1696,15 @@ impl Tenant { // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller // needs to poll until the operation has finished. But for now, we return an // error, because the control plane knows to retry errors. - delete_lock_guard = timeline.delete_lock.try_lock().map_err(|_| { - DeleteTimelineError::Other(anyhow::anyhow!( - "timeline deletion is already in progress" - )) - })?; + + delete_lock_guard = + Arc::clone(&timeline.delete_lock) + .try_lock_owned() + .map_err(|_| { + DeleteTimelineError::Other(anyhow::anyhow!( + "timeline deletion is already in progress" + )) + })?; // If another task finished the deletion just before we acquired the lock, // return success. @@ -1626,102 +1773,43 @@ impl Tenant { } } } - - { - // Grab the layer_removal_cs lock, and actually perform the deletion. - // - // This lock prevents prevents GC or compaction from running at the same time. - // The GC task doesn't register itself with the timeline it's operating on, - // so it might still be running even though we called `shutdown_tasks`. - // - // Note that there are still other race conditions between - // GC, compaction and timeline deletion. See - // https://github.com/neondatabase/neon/issues/2671 - // - // No timeout here, GC & Compaction should be responsive to the - // `TimelineState::Stopping` change. - info!("waiting for layer_removal_cs.lock()"); - let layer_removal_guard = timeline.layer_removal_cs.lock().await; - info!("got layer_removal_cs.lock(), deleting layer files"); - - // NB: storage_sync upload tasks that reference these layers have been cancelled - // by the caller. - - let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); - - fail::fail_point!("timeline-delete-before-rm", |_| { - Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))? - }); - - // NB: This need not be atomic because the deleted flag in the IndexPart - // will be observed during tenant/timeline load. The deletion will be resumed there. - // - // For configurations without remote storage, we tolerate that we're not crash-safe here. - // The timeline may come up Active but with missing layer files, in such setups. - // See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720 - match std::fs::remove_dir_all(&local_timeline_directory) { - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // This can happen if we're called a second time, e.g., - // because of a previous failure/cancellation at/after - // failpoint timeline-delete-after-rm. - // - // It can also happen if we race with tenant detach, because, - // it doesn't grab the layer_removal_cs lock. - // - // For now, log and continue. - // warn! level is technically not appropriate for the - // first case because we should expect retries to happen. - // But the error is so rare, it seems better to get attention if it happens. - let tenant_state = self.current_state(); - warn!( - timeline_dir=?local_timeline_directory, - ?tenant_state, - "timeline directory not found, proceeding anyway" - ); - // continue with the rest of the deletion - } - res => res.with_context(|| { - format!( - "Failed to remove local timeline directory '{}'", - local_timeline_directory.display() - ) - })?, - } - - info!("finished deleting layer files, releasing layer_removal_cs.lock()"); - drop(layer_removal_guard); - } - - fail::fail_point!("timeline-delete-after-rm", |_| { - Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? - }); - - // Remove the timeline from the map. - { - let mut timelines = self.timelines.lock().unwrap(); - - let children_exist = timelines - .iter() - .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`. - // We already deleted the layer files, so it's probably best to panic. - // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart) - if children_exist { - panic!("Timeline grew children while we removed layer files"); - } - - timelines.remove(&timeline_id).expect( - "timeline that we were deleting was concurrently removed from 'timelines' map", - ); - } - - // All done! Mark the deletion as completed and release the delete_lock - *delete_lock_guard = true; - drop(delete_lock_guard); + self.schedule_delete_timeline(timeline_id, timeline, delete_lock_guard); Ok(()) } + fn schedule_delete_timeline( + self: Arc, + timeline_id: TimelineId, + timeline: Arc, + _guard: OwnedMutexGuard, + ) { + let tenant_id = self.tenant_id; + let timeline_clone = Arc::clone(&timeline); + + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + TaskKind::TimelineDeletionWorker, + Some(self.tenant_id), + Some(timeline_id), + "timeline_delete", + false, + async move { + if let Err(err) = self.delete_timeline(timeline_id, timeline).await { + error!("Error: {err:#}"); + timeline_clone.set_broken(err.to_string()) + }; + Ok(()) + } + .instrument({ + let span = + tracing::info_span!(parent: None, "delete_timeline", tenant_id=%tenant_id, timeline_id=%timeline_id); + span.follows_from(Span::current()); + span + }), + ); + } + pub fn current_state(&self) -> TenantState { self.state.borrow().clone() } @@ -1764,9 +1852,9 @@ impl Tenant { if activating { let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor + let timelines_to_activate = timelines_accessor .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); + .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. @@ -1774,7 +1862,7 @@ impl Tenant { let mut activated_timelines = 0; - for timeline in not_broken_timelines { + for timeline in timelines_to_activate { timeline.activate(broker_client.clone(), background_jobs_can_start, ctx); activated_timelines += 1; } @@ -1925,7 +2013,7 @@ impl Tenant { let timelines_accessor = self.timelines.lock().unwrap(); let not_broken_timelines = timelines_accessor .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); + .filter(|timeline| !timeline.is_broken()); for timeline in not_broken_timelines { timeline.set_state(TimelineState::Stopping); } @@ -3758,7 +3846,7 @@ mod tests { make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; - tline.set_state(TimelineState::Broken); + tline.set_broken("test".to_owned()); tenant .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a1638e4a95..7e123c3fbd 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -396,7 +396,9 @@ pub async fn delete_timeline( ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { let tenant = get_tenant(tenant_id, true).await?; - tenant.delete_timeline(timeline_id, ctx).await?; + tenant + .prepare_and_schedule_delete_timeline(timeline_id, ctx) + .await?; Ok(()) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c4640307d0..2936e7a4af 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -210,13 +210,15 @@ use chrono::{NaiveDateTime, Utc}; pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; +use std::collections::{HashMap, VecDeque}; +use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; -use remote_storage::{DownloadError, GenericRemoteStorage}; +use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use std::ops::DerefMut; use tokio::runtime::Runtime; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; use tracing::{info_span, Instrument}; use utils::lsn::Lsn; @@ -225,7 +227,9 @@ use crate::metrics::{ RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS, }; +use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use crate::tenant::upload_queue::Delete; use crate::{ config::PageServerConf, task_mgr, @@ -259,7 +263,7 @@ const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3; pub enum MaybeDeletedIndexPart { IndexPart(IndexPart), - Deleted, + Deleted(IndexPart), } /// Errors that can arise when calling [`RemoteTimelineClient::stop`]. @@ -361,11 +365,42 @@ impl RemoteTimelineClient { Ok(()) } + /// Initialize the queue in stopped state. Used in startup path + /// to continue deletion operation interrupted by pageserver crash or restart. + pub fn init_upload_queue_stopped_to_continue_deletion( + &self, + index_part: &IndexPart, + ) -> anyhow::Result<()> { + // FIXME: consider newtype for DeletedIndexPart. + let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!( + "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted" + ))?; + + { + let mut upload_queue = self.upload_queue.lock().unwrap(); + upload_queue.initialize_with_current_remote_index_part(index_part)?; + self.update_remote_physical_size_gauge(Some(index_part)); + } + // also locks upload queue, without dropping the guard above it will be a deadlock + self.stop().expect("initialized line above"); + + let mut upload_queue = self.upload_queue.lock().unwrap(); + + upload_queue + .stopped_mut() + .expect("stopped above") + .deleted_at = SetDeletedFlagProgress::Successful(deleted_at); + + Ok(()) + } + pub fn last_uploaded_consistent_lsn(&self) -> Option { match &*self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), - UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn), + UploadQueue::Stopped(q) => { + Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn) + } } } @@ -420,7 +455,7 @@ impl RemoteTimelineClient { .await?; if index_part.deleted_at.is_some() { - Ok(MaybeDeletedIndexPart::Deleted) + Ok(MaybeDeletedIndexPart::Deleted(index_part)) } else { Ok(MaybeDeletedIndexPart::IndexPart(index_part)) } @@ -622,7 +657,11 @@ impl RemoteTimelineClient { // schedule the actual deletions for name in names { - let op = UploadOp::Delete(RemoteOpFileKind::Layer, name.clone()); + let op = UploadOp::Delete(Delete { + file_kind: RemoteOpFileKind::Layer, + layer_file_name: name.clone(), + scheduled_from_timeline_delete: false, + }); self.calls_unfinished_metric_begin(&op); upload_queue.queued_operations.push_back(op); info!("scheduled layer file deletion {}", name.file_name()); @@ -639,18 +678,11 @@ impl RemoteTimelineClient { /// Wait for all previously scheduled uploads/deletions to complete /// pub async fn wait_completion(self: &Arc) -> anyhow::Result<()> { - let (sender, mut receiver) = tokio::sync::watch::channel(()); - let barrier_op = UploadOp::Barrier(sender); - - { + let mut receiver = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - upload_queue.queued_operations.push_back(barrier_op); - // Don't count this kind of operation! - - // Launch the task immediately, if possible - self.launch_queued_tasks(upload_queue); - } + self.schedule_barrier(upload_queue) + }; if receiver.changed().await.is_err() { anyhow::bail!("wait_completion aborted because upload queue was stopped"); @@ -658,6 +690,22 @@ impl RemoteTimelineClient { Ok(()) } + fn schedule_barrier( + self: &Arc, + upload_queue: &mut UploadQueueInitialized, + ) -> tokio::sync::watch::Receiver<()> { + let (sender, receiver) = tokio::sync::watch::channel(()); + let barrier_op = UploadOp::Barrier(sender); + + upload_queue.queued_operations.push_back(barrier_op); + // Don't count this kind of operation! + + // Launch the task immediately, if possible + self.launch_queued_tasks(upload_queue); + + receiver + } + /// Set the deleted_at field in the remote index file. /// /// This fails if the upload queue has not been `stop()`ed. @@ -665,6 +713,7 @@ impl RemoteTimelineClient { /// The caller is responsible for calling `stop()` AND for waiting /// for any ongoing upload tasks to finish after `stop()` has succeeded. /// Check method [`RemoteTimelineClient::stop`] for details. + #[instrument(skip_all)] pub(crate) async fn persist_index_part_with_deleted_flag( self: &Arc, ) -> Result<(), PersistIndexPartWithDeletedFlagError> { @@ -674,15 +723,7 @@ impl RemoteTimelineClient { // We must be in stopped state because otherwise // we can have inprogress index part upload that can overwrite the file // with missing is_deleted flag that we going to set below - let stopped = match &mut *locked { - UploadQueue::Uninitialized => { - return Err(anyhow::anyhow!("is not Stopped but Uninitialized").into()) - } - UploadQueue::Initialized(_) => { - return Err(anyhow::anyhow!("is not Stopped but Initialized").into()) - } - UploadQueue::Stopped(stopped) => stopped, - }; + let stopped = locked.stopped_mut()?; match stopped.deleted_at { SetDeletedFlagProgress::NotRunning => (), // proceed @@ -696,27 +737,17 @@ impl RemoteTimelineClient { let deleted_at = Utc::now().naive_utc(); stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at); - let mut index_part = IndexPart::new( - stopped.latest_files.clone(), - stopped.last_uploaded_consistent_lsn, - stopped - .latest_metadata - .to_bytes() - .context("serialize metadata")?, - ); + let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion) + .context("IndexPart serialize")?; index_part.deleted_at = Some(deleted_at); index_part }; let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| { let mut locked = self_clone.upload_queue.lock().unwrap(); - let stopped = match &mut *locked { - UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!( - "there's no way out of Stopping, and we checked it's Stopping above: {:?}", - locked.as_str(), - ), - UploadQueue::Stopped(stopped) => stopped, - }; + let stopped = locked + .stopped_mut() + .expect("there's no way out of Stopping, and we checked it's Stopping above"); stopped.deleted_at = SetDeletedFlagProgress::NotRunning; }); @@ -751,13 +782,10 @@ impl RemoteTimelineClient { ScopeGuard::into_inner(undo_deleted_at); { let mut locked = self.upload_queue.lock().unwrap(); - let stopped = match &mut *locked { - UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!( - "there's no way out of Stopping, and we checked it's Stopping above: {:?}", - locked.as_str(), - ), - UploadQueue::Stopped(stopped) => stopped, - }; + + let stopped = locked + .stopped_mut() + .expect("there's no way out of Stopping, and we checked it's Stopping above"); stopped.deleted_at = SetDeletedFlagProgress::Successful( index_part_with_deleted_at .deleted_at @@ -768,6 +796,92 @@ impl RemoteTimelineClient { Ok(()) } + /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set. + /// The function deletes layer files one by one, then lists the prefix to see if we leaked something + /// deletes leaked files if any and proceeds with deletion of index file at the end. + pub(crate) async fn delete_all(self: &Arc) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_and_timeline_id(); + + let (mut receiver, deletions_queued) = { + let mut deletions_queued = 0; + + let mut locked = self.upload_queue.lock().unwrap(); + let stopped = locked.stopped_mut()?; + + if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) { + anyhow::bail!("deleted_at is not set") + } + + debug_assert!(stopped.upload_queue_for_deletion.no_pending_work()); + + stopped + .upload_queue_for_deletion + .queued_operations + .reserve(stopped.upload_queue_for_deletion.latest_files.len()); + + // schedule the actual deletions + for name in stopped.upload_queue_for_deletion.latest_files.keys() { + let op = UploadOp::Delete(Delete { + file_kind: RemoteOpFileKind::Layer, + layer_file_name: name.clone(), + scheduled_from_timeline_delete: true, + }); + self.calls_unfinished_metric_begin(&op); + stopped + .upload_queue_for_deletion + .queued_operations + .push_back(op); + + info!("scheduled layer file deletion {}", name.file_name()); + deletions_queued += 1; + } + + self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion); + + ( + self.schedule_barrier(&mut stopped.upload_queue_for_deletion), + deletions_queued, + ) + }; + + receiver.changed().await?; + + // Do not delete index part yet, it is needed for possible retry. If we remove it first + // and retry will arrive to different pageserver there wont be any traces of it on remote storage + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + let timeline_storage_path = self.conf.remote_path(&timeline_path)?; + + let remaining = self + .storage_impl + .list_prefixes(Some(&timeline_storage_path)) + .await?; + + let remaining: Vec = remaining + .into_iter() + .filter(|p| p.object_name() != Some(IndexPart::FILE_NAME)) + .collect(); + + if !remaining.is_empty() { + warn!( + "Found {} files not bound to index_file.json, proceeding with their deletion", + remaining.len() + ); + for file in remaining { + warn!("Removing {}", file.object_name().unwrap_or_default()); + self.storage_impl.delete(&file).await?; + } + } + + let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME)); + + debug!("deleting index part"); + self.storage_impl.delete(&index_file_path).await?; + + info!(deletions_queued, "done deleting, including index_part.json"); + + Ok(()) + } + /// /// Pick next tasks from the queue, and start as many of them as possible without violating /// the ordering constraints. @@ -786,7 +900,7 @@ impl RemoteTimelineClient { // have finished. upload_queue.inprogress_tasks.is_empty() } - UploadOp::Delete(_, _) => { + UploadOp::Delete(_) => { // Wait for preceding uploads to finish. Concurrent deletions are OK, though. upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } @@ -817,7 +931,7 @@ impl RemoteTimelineClient { UploadOp::UploadMetadata(_, _) => { upload_queue.num_inprogress_metadata_uploads += 1; } - UploadOp::Delete(_, _) => { + UploadOp::Delete(_) => { upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { @@ -891,7 +1005,6 @@ impl RemoteTimelineClient { unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") } } - self.calls_unfinished_metric_end(&task.op); return; } @@ -937,16 +1050,16 @@ impl RemoteTimelineClient { } res } - UploadOp::Delete(metric_file_kind, ref layer_file_name) => { + UploadOp::Delete(delete) => { let path = &self .conf .timeline_path(&self.timeline_id, &self.tenant_id) - .join(layer_file_name.file_name()); + .join(delete.layer_file_name.file_name()); delete::delete_layer(self.conf, &self.storage_impl, path) .measure_remote_op( self.tenant_id, self.timeline_id, - *metric_file_kind, + delete.file_kind, RemoteOpKind::Delete, Arc::clone(&self.metrics), ) @@ -1012,11 +1125,24 @@ impl RemoteTimelineClient { let mut upload_queue_guard = self.upload_queue.lock().unwrap(); let upload_queue = match upload_queue_guard.deref_mut() { UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"), - UploadQueue::Stopped(_) => { + UploadQueue::Stopped(stopped) => { + // Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion) + // then stop() took care of it so we just return. + // For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc. + match &task.op { + UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion), + _ => None + } + }, + UploadQueue::Initialized(qi) => { Some(qi) } + }; + + let upload_queue = match upload_queue { + Some(upload_queue) => upload_queue, + None => { info!("another concurrent task already stopped the queue"); return; - }, // nothing to do - UploadQueue::Initialized(qi) => { qi } + } }; upload_queue.inprogress_tasks.remove(&task.task_id); @@ -1029,7 +1155,7 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_metadata_uploads -= 1; upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check? } - UploadOp::Delete(_, _) => { + UploadOp::Delete(_) => { upload_queue.num_inprogress_deletions -= 1; } UploadOp::Barrier(_) => unreachable!(), @@ -1063,8 +1189,8 @@ impl RemoteTimelineClient { reason: "metadata uploads are tiny", }, ), - UploadOp::Delete(file_kind, _) => ( - *file_kind, + UploadOp::Delete(delete) => ( + delete.file_kind, RemoteOpKind::Delete, DontTrackSize { reason: "should we track deletes? positive or negative sign?", @@ -1111,32 +1237,36 @@ impl RemoteTimelineClient { info!("another concurrent task already shut down the queue"); Ok(()) } - UploadQueue::Initialized(UploadQueueInitialized { - latest_files, - latest_metadata, - last_uploaded_consistent_lsn, - .. - }) => { + UploadQueue::Initialized(initialized) => { info!("shutting down upload queue"); // Replace the queue with the Stopped state, taking ownership of the old // Initialized queue. We will do some checks on it, and then drop it. let qi = { - // take or clone what we need - let latest_files = std::mem::take(latest_files); - let last_uploaded_consistent_lsn = *last_uploaded_consistent_lsn; - // this could be Copy - let latest_metadata = latest_metadata.clone(); - - let stopped = UploadQueueStopped { - latest_files, - last_uploaded_consistent_lsn, - latest_metadata, - deleted_at: SetDeletedFlagProgress::NotRunning, + // Here we preserve working version of the upload queue for possible use during deletions. + // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut + // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point. + // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. + let upload_queue_for_deletion = UploadQueueInitialized { + task_counter: 0, + latest_files: initialized.latest_files.clone(), + latest_files_changes_since_metadata_upload_scheduled: 0, + latest_metadata: initialized.latest_metadata.clone(), + last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn, + num_inprogress_layer_uploads: 0, + num_inprogress_metadata_uploads: 0, + num_inprogress_deletions: 0, + inprogress_tasks: HashMap::default(), + queued_operations: VecDeque::default(), }; - let upload_queue = - std::mem::replace(&mut *guard, UploadQueue::Stopped(stopped)); + let upload_queue = std::mem::replace( + &mut *guard, + UploadQueue::Stopped(UploadQueueStopped { + upload_queue_for_deletion, + deleted_at: SetDeletedFlagProgress::NotRunning, + }), + ); if let UploadQueue::Initialized(qi) = upload_queue { qi } else { @@ -1144,8 +1274,6 @@ impl RemoteTimelineClient { } }; - assert!(qi.latest_files.is_empty(), "do not use this anymore"); - // consistency check assert_eq!( qi.num_inprogress_layer_uploads @@ -1408,7 +1536,7 @@ mod tests { // Download back the index.json, and check that the list of files is correct let index_part = match runtime.block_on(client.download_index_file())? { MaybeDeletedIndexPart::IndexPart(index_part) => index_part, - MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"), + MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"), }; assert_file_list( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 7a06e57a6b..c3f6dcadec 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -7,9 +7,11 @@ use std::collections::{HashMap, HashSet}; use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; +use utils::bin_ser::SerializeError; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerFileName; +use crate::tenant::upload_queue::UploadQueueInitialized; use utils::lsn::Lsn; @@ -115,6 +117,21 @@ impl IndexPart { } } +impl TryFrom<&UploadQueueInitialized> for IndexPart { + type Error = SerializeError; + + fn try_from(upload_queue: &UploadQueueInitialized) -> Result { + let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn(); + let metadata_bytes = upload_queue.latest_metadata.to_bytes()?; + + Ok(Self::new( + upload_queue.latest_files.clone(), + disk_consistent_lsn, + metadata_bytes, + )) + } +} + /// Serialized form of [`LayerFileMetadata`]. #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] pub struct IndexLayerMetadata { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2a50a26a23..71f83bf127 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -239,7 +239,7 @@ pub struct Timeline { /// Prevent two tasks from deleting the timeline at the same time. If held, the /// timeline is being deleted. If 'true', the timeline has already been deleted. - pub delete_lock: tokio::sync::Mutex, + pub delete_lock: Arc>, eviction_task_timeline_state: tokio::sync::Mutex, @@ -815,8 +815,7 @@ impl Timeline { // above. Rewrite it. let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? - let state = *self.state.borrow(); - if state == TimelineState::Stopping { + if self.is_stopping() { return Err(anyhow::anyhow!("timeline is Stopping").into()); } @@ -955,14 +954,17 @@ impl Timeline { (st, TimelineState::Loading) => { error!("ignoring transition from {st:?} into Loading state"); } - (TimelineState::Broken, _) => { - error!("Ignoring state update {new_state:?} for broken tenant"); + (TimelineState::Broken { .. }, new_state) => { + error!("Ignoring state update {new_state:?} for broken timeline"); } (TimelineState::Stopping, TimelineState::Active) => { error!("Not activating a Stopping timeline"); } (_, new_state) => { - if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) { + if matches!( + new_state, + TimelineState::Stopping | TimelineState::Broken { .. } + ) { // drop the copmletion guard, if any; it might be holding off the completion // forever needlessly self.initial_logical_size_attempt @@ -975,14 +977,31 @@ impl Timeline { } } + pub fn set_broken(&self, reason: String) { + let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture()); + let broken_state = TimelineState::Broken { + reason, + backtrace: backtrace_str, + }; + self.set_state(broken_state) + } + pub fn current_state(&self) -> TimelineState { - *self.state.borrow() + self.state.borrow().clone() + } + + pub fn is_broken(&self) -> bool { + matches!(&*self.state.borrow(), TimelineState::Broken { .. }) } pub fn is_active(&self) -> bool { self.current_state() == TimelineState::Active } + pub fn is_stopping(&self) -> bool { + self.current_state() == TimelineState::Stopping + } + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { self.state.subscribe() } @@ -993,7 +1012,7 @@ impl Timeline { ) -> Result<(), TimelineState> { let mut receiver = self.state.subscribe(); loop { - let current_state = *receiver.borrow_and_update(); + let current_state = receiver.borrow().clone(); match current_state { TimelineState::Loading => { receiver @@ -1460,7 +1479,7 @@ impl Timeline { eviction_task_timeline_state: tokio::sync::Mutex::new( EvictionTaskTimelineState::default(), ), - delete_lock: tokio::sync::Mutex::new(false), + delete_lock: Arc::new(tokio::sync::Mutex::new(false)), initial_logical_size_can_start, initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), @@ -2101,11 +2120,11 @@ impl Timeline { loop { match timeline_state_updates.changed().await { Ok(()) => { - let new_state = *timeline_state_updates.borrow(); + let new_state = timeline_state_updates.borrow().clone(); match new_state { // we're running this job for active timelines only TimelineState::Active => continue, - TimelineState::Broken + TimelineState::Broken { .. } | TimelineState::Stopping | TimelineState::Loading => { break format!("aborted because timeline became inactive (new state: {new_state:?})") @@ -3792,9 +3811,7 @@ impl Timeline { let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? - let state = *self.state.borrow(); - if state == TimelineState::Stopping { - // there's a global allowed_error for this + if self.is_stopping() { anyhow::bail!("timeline is Stopping"); } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index e235fab425..a5d0af32fe 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -153,7 +153,7 @@ pub(super) async fn connection_manager_loop_step( match new_state { // we're already active as walreceiver, no need to reactivate TimelineState::Active => continue, - TimelineState::Broken | TimelineState::Stopping => { + TimelineState::Broken { .. } | TimelineState::Stopping => { debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop"); return ControlFlow::Break(()); } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 8f5faff627..a62cc99adf 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -76,6 +76,12 @@ pub(crate) struct UploadQueueInitialized { pub(crate) queued_operations: VecDeque, } +impl UploadQueueInitialized { + pub(super) fn no_pending_work(&self) -> bool { + self.inprogress_tasks.is_empty() && self.queued_operations.is_empty() + } +} + #[derive(Clone, Copy)] pub(super) enum SetDeletedFlagProgress { NotRunning, @@ -84,9 +90,7 @@ pub(super) enum SetDeletedFlagProgress { } pub(super) struct UploadQueueStopped { - pub(super) latest_files: HashMap, - pub(super) last_uploaded_consistent_lsn: Lsn, - pub(super) latest_metadata: TimelineMetadata, + pub(super) upload_queue_for_deletion: UploadQueueInitialized, pub(super) deleted_at: SetDeletedFlagProgress, } @@ -187,6 +191,15 @@ impl UploadQueue { UploadQueue::Initialized(x) => Ok(x), } } + + pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> { + match self { + UploadQueue::Initialized(_) | UploadQueue::Uninitialized => { + anyhow::bail!("queue is in state {}", self.as_str()) + } + UploadQueue::Stopped(stopped) => Ok(stopped), + } + } } /// An in-progress upload or delete task. @@ -199,6 +212,13 @@ pub(crate) struct UploadTask { pub(crate) op: UploadOp, } +#[derive(Debug)] +pub(crate) struct Delete { + pub(crate) file_kind: RemoteOpFileKind, + pub(crate) layer_file_name: LayerFileName, + pub(crate) scheduled_from_timeline_delete: bool, +} + #[derive(Debug)] pub(crate) enum UploadOp { /// Upload a layer file @@ -207,8 +227,8 @@ pub(crate) enum UploadOp { /// Upload the metadata file UploadMetadata(IndexPart, Lsn), - /// Delete a file. - Delete(RemoteOpFileKind, LayerFileName), + /// Delete a layer file + Delete(Delete), /// Barrier. When the barrier operation is reached, Barrier(tokio::sync::watch::Sender<()>), @@ -226,7 +246,12 @@ impl std::fmt::Display for UploadOp { ) } UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), - UploadOp::Delete(_, path) => write!(f, "Delete({})", path.file_name()), + UploadOp::Delete(delete) => write!( + f, + "Delete(path: {}, scheduled_from_timeline_delete: {})", + delete.layer_file_name.file_name(), + delete.scheduled_from_timeline_delete + ), UploadOp::Barrier(_) => write!(f, "Barrier"), } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 551faa116e..a8610e24df 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -663,6 +663,8 @@ class NeonEnvBuilder: else: raise RuntimeError(f"Unknown storage type: {remote_storage_kind}") + self.remote_storage_kind = remote_storage_kind + def enable_local_fs_remote_storage(self, force_enable: bool = True): """ Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index d7ffa633fd..83880abc77 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -2,7 +2,7 @@ import time from typing import Any, Dict, Optional from fixtures.log_helper import log -from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.types import Lsn, TenantId, TimelineId @@ -92,6 +92,41 @@ def wait_until_tenant_state( ) +def wait_until_timeline_state( + pageserver_http: PageserverHttpClient, + tenant_id: TenantId, + timeline_id: TimelineId, + expected_state: str, + iterations: int, + period: float = 1.0, +) -> Dict[str, Any]: + """ + Does not use `wait_until` for debugging purposes + """ + for i in range(iterations): + try: + timeline = pageserver_http.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id) + log.debug(f"Timeline {tenant_id}/{timeline_id} data: {timeline}") + if isinstance(timeline["state"], str): + if timeline["state"] == expected_state: + return timeline + elif isinstance(timeline, Dict): + if timeline["state"].get(expected_state): + return timeline + + except Exception as e: + log.debug(f"Timeline {tenant_id}/{timeline_id} state retrieval failure: {e}") + + if i == iterations - 1: + # do not sleep last time, we already know that we failed + break + time.sleep(period) + + raise Exception( + f"Timeline {tenant_id}/{timeline_id} did not become {expected_state} within {iterations * period} seconds" + ) + + def wait_until_tenant_active( pageserver_http: PageserverHttpClient, tenant_id: TenantId, @@ -156,3 +191,21 @@ def wait_for_upload_queue_empty( if all(m.value == 0 for m in tl): return time.sleep(0.2) + + +def assert_timeline_detail_404( + pageserver_http: PageserverHttpClient, + tenant_id: TenantId, + timeline_id: TimelineId, +): + """Asserts that timeline_detail returns 404, or dumps the detail.""" + try: + data = pageserver_http.timeline_detail(tenant_id, timeline_id) + log.error(f"detail {data}") + except PageserverApiException as e: + log.error(e) + if e.status_code == 404: + return + else: + raise + raise Exception("detail succeeded (it should return 404)") diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 77030288f0..5c3948b027 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -79,6 +79,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Set up pageserver for import neon_env_builder.enable_local_fs_remote_storage() env = neon_env_builder.init_start() + client = env.pageserver.http_client() client.tenant_create(tenant) @@ -145,6 +146,11 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ) # NOTE: delete can easily come before upload operations are completed + # https://github.com/neondatabase/neon/issues/4326 + env.pageserver.allowed_errors.append( + ".*files not bound to index_file.json, proceeding with their deletion.*" + ) + client.timeline_delete(tenant, timeline) # Importing correct backup works diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 742dbfff95..11ac9e2555 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -20,6 +20,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( + assert_timeline_detail_404, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, @@ -182,7 +183,7 @@ def test_remote_storage_backup_and_restore( wait_until_tenant_active( pageserver_http=client, tenant_id=tenant_id, - iterations=5, + iterations=10, # make it longer for real_s3 tests when unreliable wrapper is involved ) detail = client.timeline_detail(tenant_id, timeline_id) @@ -598,8 +599,23 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( ) client.timeline_delete(tenant_id, timeline_id) + env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*") + env.pageserver.allowed_errors.append( + ".*files not bound to index_file.json, proceeding with their deletion.*" + ) + + wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id)) + assert not timeline_path.exists() + # to please mypy + assert isinstance(env.remote_storage, LocalFsStorage) + remote_timeline_path = ( + env.remote_storage.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + ) + + assert not list(remote_timeline_path.iterdir()) + # timeline deletion should kill ongoing uploads, so, the metric will be gone assert get_queued_count(file_kind="index", op_kind="upload") is None diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index be79538843..28b15d03ca 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -3,6 +3,7 @@ import queue import shutil import threading from pathlib import Path +from typing import Optional import pytest import requests @@ -11,13 +12,16 @@ from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, RemoteStorageKind, + S3Storage, available_remote_storages, ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( + assert_timeline_detail_404, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, + wait_until_timeline_state, ) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import query_scalar, wait_until @@ -68,7 +72,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ps_http.timeline_delete(env.initial_tenant, parent_timeline_id) - assert exc.value.status_code == 400 + assert exc.value.status_code == 412 timeline_path = ( env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) @@ -130,13 +134,25 @@ def test_delete_timeline_post_rm_failure( env = neon_env_builder.init_start() assert env.initial_timeline + env.pageserver.allowed_errors.append(".*Error: failpoint: timeline-delete-after-rm") + env.pageserver.allowed_errors.append(".*Ignoring state update Stopping for broken timeline") + ps_http = env.pageserver.http_client() failpoint_name = "timeline-delete-after-rm" ps_http.configure_failpoints((failpoint_name, "return")) - with pytest.raises(PageserverApiException, match=f"failpoint: {failpoint_name}"): - ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) + ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) + + timeline_info = wait_until_timeline_state( + pageserver_http=ps_http, + tenant_id=env.initial_tenant, + timeline_id=env.initial_timeline, + expected_state="Broken", + iterations=2, # effectively try immediately and retry once in one second + ) + + timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm" at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*" env.pageserver.allowed_errors.append(at_failpoint_log_message) @@ -148,11 +164,14 @@ def test_delete_timeline_post_rm_failure( ps_http.configure_failpoints((failpoint_name, "off")) # this should succeed + # this also checks that delete can be retried even when timeline is in Broken state ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) - # the second call will try to transition the timeline into Stopping state, but it's already in that state - env.pageserver.allowed_errors.append( - f".*{env.initial_timeline}.*Ignoring new state, equal to the existing one: Stopping" - ) + with pytest.raises(PageserverApiException) as e: + ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) + + assert e.value.status_code == 404 + + env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*") env.pageserver.allowed_errors.append( f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" ) @@ -230,6 +249,12 @@ def test_timeline_resurrection_on_attach( # delete new timeline ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) + env.pageserver.allowed_errors.append( + f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*" + ) + + wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id)) + ##### Stop the pageserver instance, erase all its data env.endpoints.stop_all() env.pageserver.stop() @@ -252,12 +277,31 @@ def test_timeline_resurrection_on_attach( assert all([tl["state"] == "Active" for tl in timelines]) +def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None): + # For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api. + assert neon_env_builder.remote_storage_kind in ( + RemoteStorageKind.MOCK_S3, + RemoteStorageKind.REAL_S3, + ) + # For mypy + assert isinstance(neon_env_builder.remote_storage, S3Storage) + + # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + response = neon_env_builder.remote_storage_client.list_objects_v2( + Bucket=neon_env_builder.remote_storage.bucket_name, + Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "", + ) + objects = response.get("Contents") + assert ( + response["KeyCount"] == 0 + ), f"remote dir with prefix {prefix} is not empty after deletion: {objects}" + + def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder): """ When deleting a timeline, if we succeed in setting the deleted flag remotely but fail to delete the local state, restarting the pageserver should resume the deletion of the local state. - (Deletion of the state in S3 is not implemented yet.) """ neon_env_builder.enable_remote_storage( @@ -293,11 +337,17 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) ) - with pytest.raises( - PageserverApiException, - match="failpoint: timeline-delete-before-rm", - ): - ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) + ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) + + timeline_info = wait_until_timeline_state( + pageserver_http=ps_http, + tenant_id=env.initial_tenant, + timeline_id=leaf_timeline_id, + expected_state="Broken", + iterations=2, # effectively try immediately and retry once in one second + ) + + timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm" assert leaf_timeline_path.exists(), "the failpoint didn't work" @@ -305,7 +355,14 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild env.pageserver.start() # Wait for tenant to finish loading. - wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5) + wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) + + env.pageserver.allowed_errors.append( + f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*" + ) + wait_until( + 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) + ) assert ( not leaf_timeline_path.exists() @@ -317,6 +374,50 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild }, "other timelines should not have been affected" assert all([tl["state"] == "Active" for tl in timelines]) + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(env.initial_tenant), + "timelines", + str(leaf_timeline_id), + ) + ), + ) + + assert env.initial_timeline is not None + + for timeline_id in (intermediate_timeline_id, env.initial_timeline): + ps_http.timeline_delete(env.initial_tenant, timeline_id) + + env.pageserver.allowed_errors.append( + f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" + ) + wait_until( + 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) + ) + + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(env.initial_tenant), + "timelines", + str(timeline_id), + ) + ), + ) + + # for some reason the check above doesnt immediately take effect for the below. + # Assume it is mock server incosistency and check twice. + wait_until( + 2, + 0.5, + lambda: assert_prefix_empty(neon_env_builder), + ) + def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( neon_env_builder: NeonEnvBuilder, @@ -457,3 +558,87 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): ps_http.timeline_detail(env.initial_tenant, child_timeline_id) assert exc.value.status_code == 404 + + +@pytest.mark.parametrize( + "remote_storage_kind", + list( + filter( + lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3), + available_remote_storages(), + ) + ), +) +def test_timeline_delete_works_for_remote_smoke( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_timeline_delete_works_for_remote_smoke", + ) + + env = neon_env_builder.init_start() + + ps_http = env.pageserver.http_client() + pg = env.endpoints.create_start("main") + + tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) + + assert tenant_id == env.initial_tenant + assert main_timeline_id == env.initial_timeline + + timeline_ids = [env.initial_timeline] + for i in range(2): + branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main") + pg = env.endpoints.create_start(f"new{i}") + + with pg.cursor() as cur: + cur.execute("CREATE TABLE f (i integer);") + cur.execute("INSERT INTO f VALUES (generate_series(1,1000));") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + ps_http.timeline_checkpoint(tenant_id, branch_timeline_id) + + # wait until pageserver successfully uploaded a checkpoint to remote storage + log.info("waiting for checkpoint upload") + wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn) + log.info("upload of checkpoint is done") + timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) + + timeline_ids.append(timeline_id) + + for timeline_id in reversed(timeline_ids): + # note that we need to finish previous deletion before scheduling next one + # otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3) + ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id) + + env.pageserver.allowed_errors.append( + f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" + ) + wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id)) + + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(env.initial_tenant), + "timelines", + str(timeline_id), + ) + ), + ) + + # for some reason the check above doesnt immediately take effect for the below. + # Assume it is mock server incosistency and check twice. + wait_until( + 2, + 0.5, + lambda: assert_prefix_empty(neon_env_builder), + )