diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 75aa28233b..031548bbec 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -194,7 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static { mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> impl Stream>; + ) -> impl Stream> + Send; async fn list( &self, @@ -351,10 +351,10 @@ impl GenericRemoteStorage> { mode: ListingMode, max_keys: Option, cancel: &'a CancellationToken, - ) -> impl Stream> + 'a { + ) -> impl Stream> + 'a + Send { match self { Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)) - as Pin>>>, + as Pin> + Send>>, Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 67e5be2955..13f873dcdb 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -114,7 +114,7 @@ impl RemoteStorage for UnreliableWrapper { mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> impl Stream> { + ) -> impl Stream> + Send { async_stream::stream! { self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) .map_err(DownloadError::Other)?; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 75c8682c97..5e1f69f4c1 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1384,34 +1384,32 @@ impl TenantManager { tenant_shard_id: TenantShardId, ) -> Result<(), DeleteTenantError> { let remote_path = remote_tenant_path(&tenant_shard_id); - let keys = match self - .resources - .remote_storage - .list( - Some(&remote_path), - remote_storage::ListingMode::NoDelimiter, - None, - &self.cancel, - ) - .await - { - Ok(listing) => listing.keys, - Err(remote_storage::DownloadError::Cancelled) => { - return Err(DeleteTenantError::Cancelled) - } - Err(remote_storage::DownloadError::NotFound) => return Ok(()), - Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))), - }; + let mut keys_stream = self.resources.remote_storage.list_streaming( + Some(&remote_path), + remote_storage::ListingMode::NoDelimiter, + None, + &self.cancel, + ); + while let Some(chunk) = keys_stream.next().await { + let keys = match chunk { + Ok(listing) => listing.keys, + Err(remote_storage::DownloadError::Cancelled) => { + return Err(DeleteTenantError::Cancelled) + } + Err(remote_storage::DownloadError::NotFound) => return Ok(()), + Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))), + }; - if keys.is_empty() { - tracing::info!("Remote storage already deleted"); - } else { - tracing::info!("Deleting {} keys from remote storage", keys.len()); - let keys = keys.into_iter().map(|o| o.key).collect::>(); - self.resources - .remote_storage - .delete_objects(&keys, &self.cancel) - .await?; + if keys.is_empty() { + tracing::info!("Remote storage already deleted"); + } else { + tracing::info!("Deleting {} keys from remote storage", keys.len()); + let keys = keys.into_iter().map(|o| o.key).collect::>(); + self.resources + .remote_storage + .delete_objects(&keys, &self.cancel) + .await?; + } } Ok(())