diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index c6466237bf..719608dd5f 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -19,7 +19,12 @@ mod simulate_failures; mod support; use std::{ - collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc, + collections::HashMap, + fmt::Debug, + num::NonZeroU32, + ops::Bound, + pin::{pin, Pin}, + sync::Arc, time::SystemTime, }; @@ -28,6 +33,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use bytes::Bytes; use futures::{stream::Stream, StreamExt}; +use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; @@ -261,7 +267,7 @@ pub trait RemoteStorage: Send + Sync + 'static { max_keys: Option, cancel: &CancellationToken, ) -> Result { - let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel)); + let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel)); let mut combined = stream.next().await.expect("At least one item required")?; while let Some(list) = stream.next().await { let list = list?; @@ -324,6 +330,35 @@ pub trait RemoteStorage: Send + Sync + 'static { cancel: &CancellationToken, ) -> anyhow::Result<()>; + /// Deletes all objects matching the given prefix. + /// + /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will + /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc. + /// + /// If the operation fails because of timeout or cancellation, the root cause of the error will + /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went + /// through. + async fn delete_prefix( + &self, + prefix: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let mut stream = + pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel)); + while let Some(result) = stream.next().await { + let keys = match result { + Ok(listing) if listing.keys.is_empty() => continue, + Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(), + Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()), + Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()), + Err(err) => return Err(err.into()), + }; + tracing::info!("Deleting {} keys from remote storage", keys.len()); + self.delete_objects(&keys, cancel).await?; + } + Ok(()) + } + /// Copy a remote object inside a bucket from one path to another. async fn copy( &self, @@ -488,6 +523,20 @@ impl GenericRemoteStorage> { } } + /// See [`RemoteStorage::delete_prefix`] + pub async fn delete_prefix( + &self, + prefix: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await, + Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await, + Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await, + Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await, + } + } + /// See [`RemoteStorage::copy`] pub async fn copy_object( &self, diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index e6f33fc3f8..d5da1d48e9 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -199,6 +199,138 @@ async fn list_no_delimiter_works( Ok(()) } +/// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"), +/// but only with NoDelimiter. +#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)] +#[tokio::test] +async fn list_partial_prefix( + ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs, +) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + + let cancel = CancellationToken::new(); + let test_client = Arc::clone(&ctx.enabled.client); + + // Prefix "fold" should match all "folder{i}" directories with NoDelimiter. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("fold")?), + ListingMode::NoDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert_eq!(&objects, &ctx.remote_blobs); + + // Prefix "fold" matches nothing with WithDelimiter. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("fold")?), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert!(objects.is_empty()); + + // Prefix "" matches everything. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("")?), + ListingMode::NoDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert_eq!(&objects, &ctx.remote_blobs); + + // Prefix "" matches nothing with WithDelimiter. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("")?), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert!(objects.is_empty()); + + // Prefix "foo" matches nothing. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("foo")?), + ListingMode::NoDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert!(objects.is_empty()); + + // Prefix "folder2/blob" matches. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("folder2/blob")?), + ListingMode::NoDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + let expect: HashSet<_> = ctx + .remote_blobs + .iter() + .filter(|o| o.get_path().starts_with("folder2")) + .cloned() + .collect(); + assert_eq!(&objects, &expect); + + // Prefix "folder2/foo" matches nothing. + let objects: HashSet<_> = test_client + .list( + Some(&RemotePath::from_string("folder2/foo")?), + ListingMode::NoDelimiter, + None, + &cancel, + ) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert!(objects.is_empty()); + + Ok(()) +} + #[test_context(MaybeEnabledStorage)] #[tokio::test] async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { @@ -265,6 +397,80 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<( Ok(()) } +/// Tests that delete_prefix() will delete all objects matching a prefix, including +/// partial prefixes (i.e. "/foo" matches "/foobar"). +#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)] +#[tokio::test] +async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + + let cancel = CancellationToken::new(); + let test_client = Arc::clone(&ctx.enabled.client); + + /// Asserts that the S3 listing matches the given paths. + macro_rules! assert_list { + ($expect:expr) => {{ + let listing = test_client + .list(None, ListingMode::NoDelimiter, None, &cancel) + .await? + .keys + .into_iter() + .map(|o| o.key) + .collect(); + assert_eq!($expect, listing); + }}; + } + + // We start with the full set of uploaded files. + let mut expect = ctx.remote_blobs.clone(); + + // Deleting a non-existing prefix should do nothing. + test_client + .delete_prefix(&RemotePath::from_string("xyz")?, &cancel) + .await?; + assert_list!(expect); + + // Prefixes are case-sensitive. + test_client + .delete_prefix(&RemotePath::from_string("Folder")?, &cancel) + .await?; + assert_list!(expect); + + // Deleting a path which overlaps with an existing object should do nothing. We pick the first + // path in the set as our common prefix. + let path = expect.iter().next().expect("empty set").clone().join("xyz"); + test_client.delete_prefix(&path, &cancel).await?; + assert_list!(expect); + + // Deleting an exact path should work. We pick the first path in the set. + let path = expect.iter().next().expect("empty set").clone(); + test_client.delete_prefix(&path, &cancel).await?; + expect.remove(&path); + assert_list!(expect); + + // Deleting a prefix should delete all matching objects. + test_client + .delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel) + .await?; + expect.retain(|p| !p.get_path().as_str().starts_with("folder0/")); + assert_list!(expect); + + // Deleting a common prefix should delete all objects. + test_client + .delete_prefix(&RemotePath::from_string("fold")?, &cancel) + .await?; + expect.clear(); + assert_list!(expect); + + Ok(()) +} + #[test_context(MaybeEnabledStorage)] #[tokio::test] async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 9d9852c525..0567f8f3a7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -11,6 +11,7 @@ use pageserver_api::shard::{ }; use pageserver_api::upcall_api::ReAttachResponseTenant; use rand::{distributions::Alphanumeric, Rng}; +use remote_storage::TimeoutOrCancel; use std::borrow::Cow; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -1350,47 +1351,17 @@ impl TenantManager { } } - async fn delete_tenant_remote( - &self, - tenant_shard_id: TenantShardId, - ) -> Result<(), DeleteTenantError> { - let remote_path = remote_tenant_path(&tenant_shard_id); - let mut keys_stream = self.resources.remote_storage.list_streaming( - Some(&remote_path), - remote_storage::ListingMode::NoDelimiter, - None, - &self.cancel, - ); - while let Some(chunk) = keys_stream.next().await { - let keys = match chunk { - Ok(listing) => listing.keys, - Err(remote_storage::DownloadError::Cancelled) => { - return Err(DeleteTenantError::Cancelled) - } - Err(remote_storage::DownloadError::NotFound) => return Ok(()), - Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))), - }; - - if keys.is_empty() { - tracing::info!("Remote storage already deleted"); - } else { - tracing::info!("Deleting {} keys from remote storage", keys.len()); - let keys = keys.into_iter().map(|o| o.key).collect::>(); - self.resources - .remote_storage - .delete_objects(&keys, &self.cancel) - .await?; - } - } - - Ok(()) - } - /// If a tenant is attached, detach it. Then remove its data from remote storage. /// /// A tenant is considered deleted once it is gone from remote storage. It is the caller's /// responsibility to avoid trying to attach the tenant again or use it any way once deletion /// has started: this operation is not atomic, and must be retried until it succeeds. + /// + /// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove + /// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage + /// controller uses this to purge all remote tenant data, including any stale parent shards that + /// may remain after splits. Ideally, this special case would be handled elsewhere. See: + /// . pub(crate) async fn delete_tenant( &self, tenant_shard_id: TenantShardId, @@ -1442,25 +1413,29 @@ impl TenantManager { // in 500 responses to delete requests. // - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will // 503/retry, rather than kicking off a wasteful concurrent deletion. - match backoff::retry( - || async move { self.delete_tenant_remote(tenant_shard_id).await }, - |e| match e { - DeleteTenantError::Cancelled => true, - DeleteTenantError::SlotError(_) => { - unreachable!("Remote deletion doesn't touch slots") - } - _ => false, + // NB: this also deletes partial prefixes, i.e. a path will delete all + // _/* objects. See method comment for why. + backoff::retry( + || async move { + self.resources + .remote_storage + .delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel) + .await }, + |_| false, // backoff::retry handles cancellation 1, 3, &format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"), &self.cancel, ) .await - { - Some(r) => r, - None => Err(DeleteTenantError::Cancelled), - } + .unwrap_or(Err(TimeoutOrCancel::Cancel.into())) + .map_err(|err| { + if TimeoutOrCancel::caused_by_cancel(&err) { + return DeleteTenantError::Cancelled; + } + DeleteTenantError::Other(err) + }) } #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))] diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 25e1fb5e1f..ab2c3b5e48 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2862,17 +2862,12 @@ impl Service { let _tenant_lock = trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await; - // Detach all shards - let (detach_waiters, shard_ids, node) = { - let mut shard_ids = Vec::new(); + // Detach all shards. This also deletes local pageserver shard data. + let (detach_waiters, node) = { let mut detach_waiters = Vec::new(); let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); - for (tenant_shard_id, shard) in - tenants.range_mut(TenantShardId::tenant_range(tenant_id)) - { - shard_ids.push(*tenant_shard_id); - + for (_, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { // Update the tenant's intent to remove all attachments shard.policy = PlacementPolicy::Detached; shard @@ -2892,7 +2887,7 @@ impl Service { let node = nodes .get(&node_id) .expect("Pageservers may not be deleted while lock is active"); - (detach_waiters, shard_ids, node.clone()) + (detach_waiters, node.clone()) }; // This reconcile wait can fail in a few ways: @@ -2907,38 +2902,34 @@ impl Service { self.await_waiters(detach_waiters, RECONCILE_TIMEOUT) .await?; - let locations = shard_ids - .into_iter() - .map(|s| (s, node.clone())) - .collect::>(); - let results = self.tenant_for_shards_api( - locations, - |tenant_shard_id, client| async move { client.tenant_delete(tenant_shard_id).await }, - 1, - 3, - RECONCILE_TIMEOUT, - &self.cancel, - ) - .await; - for result in results { - match result { - Ok(StatusCode::ACCEPTED) => { - // This should never happen: we waited for detaches to finish above - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "Unexpectedly still attached on {}", - node - ))); - } - Ok(_) => {} - Err(mgmt_api::Error::Cancelled) => { - return Err(ApiError::ShuttingDown); - } - Err(e) => { - // This is unexpected: remote deletion should be infallible, unless the object store - // at large is unavailable. - tracing::error!("Error deleting via node {}: {e}", node); - return Err(ApiError::InternalServerError(anyhow::anyhow!(e))); - } + // Delete the entire tenant (all shards) from remote storage via a random pageserver. + // Passing an unsharded tenant ID will cause the pageserver to remove all remote paths with + // the tenant ID prefix, including all shards (even possibly stale ones). + match node + .with_client_retries( + |client| async move { + client + .tenant_delete(TenantShardId::unsharded(tenant_id)) + .await + }, + &self.config.jwt_token, + 1, + 3, + RECONCILE_TIMEOUT, + &self.cancel, + ) + .await + .unwrap_or(Err(mgmt_api::Error::Cancelled)) + { + Ok(_) => {} + Err(mgmt_api::Error::Cancelled) => { + return Err(ApiError::ShuttingDown); + } + Err(e) => { + // This is unexpected: remote deletion should be infallible, unless the object store + // at large is unavailable. + tracing::error!("Error deleting via node {node}: {e}"); + return Err(ApiError::InternalServerError(anyhow::anyhow!(e))); } } diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 377a95fbeb..4c4306be9e 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -303,9 +303,10 @@ def assert_prefix_empty( remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None, allowed_postfix: Optional[str] = None, + delimiter: str = "/", ) -> None: assert remote_storage is not None - response = list_prefix(remote_storage, prefix) + response = list_prefix(remote_storage, prefix, delimiter) keys = response["KeyCount"] objects: list[ObjectTypeDef] = response.get("Contents", []) common_prefixes = response.get("CommonPrefixes", []) @@ -338,16 +339,18 @@ def assert_prefix_empty( if not (allowed_postfix.endswith(key)): filtered_count += 1 - assert ( - filtered_count == 0 - ), f"remote dir with prefix {prefix} is not empty after deletion: {objects}" + assert filtered_count == 0, f"remote prefix {prefix} is not empty: {objects}" # remote_storage must not be None, but that's easier for callers to make mypy happy -def assert_prefix_not_empty(remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None): +def assert_prefix_not_empty( + remote_storage: Optional[RemoteStorage], + prefix: Optional[str] = None, + delimiter: str = "/", +): assert remote_storage is not None response = list_prefix(remote_storage, prefix) - assert response["KeyCount"] != 0, f"remote dir with prefix {prefix} is empty: {response}" + assert response["KeyCount"] != 0, f"remote prefix {prefix} is empty: {response}" def list_prefix( diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 294c1248c5..f486327445 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -20,6 +20,7 @@ from fixtures.pageserver.utils import ( ) from fixtures.remote_storage import RemoteStorageKind, s3_storage from fixtures.utils import run_pg_bench_small, wait_until +from fixtures.workload import Workload from requests.exceptions import ReadTimeout from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -404,3 +405,57 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, make_httpserver, neon_env_builder cloud_admin_api_token=cloud_admin_token, ) assert healthy + + +def test_tenant_delete_stale_shards(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + """ + Deleting a tenant should also delete any stale (pre-split) shards from remote storage. + """ + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + env = neon_env_builder.init_start() + + # Create an unsharded tenant. + tenant_id, timeline_id = env.create_tenant() + + # Write some data. + workload = Workload(env, tenant_id, timeline_id, branch_name="main") + workload.init() + workload.write_rows(256) + workload.validate() + + assert_prefix_not_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join(("tenants", str(tenant_id))), + ) + + # Upload a heatmap as well. + env.pageserver.http_client().tenant_heatmap_upload(tenant_id) + + # Split off a few shards, in two rounds. + env.storage_controller.tenant_shard_split(tenant_id, shard_count=4) + env.storage_controller.tenant_shard_split(tenant_id, shard_count=16) + + # Delete the tenant. This should also delete data for the unsharded and count=4 parents. + env.storage_controller.pageserver_api().tenant_delete(tenant_id=tenant_id) + + assert_prefix_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join(("tenants", str(tenant_id))), + delimiter="", # match partial prefixes, i.e. all shards + ) + + dirs = list(env.pageserver.tenant_dir(None).glob(f"{tenant_id}*")) + assert dirs == [], f"found tenant directories: {dirs}" + + # The initial tenant created by the test harness should still be there. + # Only the tenant we deleted should be removed. + assert_prefix_not_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join(("tenants", str(env.initial_tenant))), + ) + dirs = list(env.pageserver.tenant_dir(None).glob(f"{env.initial_tenant}*")) + assert dirs != [], "missing initial tenant directory" + + env.stop()