From 204bb8faa32975cdbf3546e0a78f3387b511e589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 22 Jul 2024 15:49:30 +0200 Subject: [PATCH] Start using remote_storage in S3 scrubber for PurgeGarbage (#7932) Starts using the `remote_storage` crate in the S3 scrubber for the `PurgeGarbage` subcommand. The `remote_storage` crate is generic over various backends and thus using it gives us the ability to run the scrubber against all supported backends. Start with the `PurgeGarbage` subcommand as it doesn't use `stream_tenants`. Part of #7547. --- libs/remote_storage/src/azure_blob.rs | 6 ++ libs/remote_storage/src/lib.rs | 10 +++ libs/remote_storage/src/s3_bucket.rs | 4 ++ storage_scrubber/src/garbage.rs | 99 +++++++++++++-------------- storage_scrubber/src/lib.rs | 51 ++++++++++++-- 5 files changed, 114 insertions(+), 56 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 8e590b17c4..d0146238da 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -40,6 +40,7 @@ use crate::{ pub struct AzureBlobStorage { client: ContainerClient, + container_name: String, prefix_in_container: Option, max_keys_per_list_response: Option, concurrency_limiter: ConcurrencyLimiter, @@ -85,6 +86,7 @@ impl AzureBlobStorage { Ok(AzureBlobStorage { client, + container_name: azure_config.container_name.to_owned(), prefix_in_container: azure_config.prefix_in_container.to_owned(), max_keys_per_list_response, concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()), @@ -238,6 +240,10 @@ impl AzureBlobStorage { _ = cancel.cancelled() => Err(Cancelled), } } + + pub fn container_name(&self) -> &str { + &self.container_name + } } fn to_azure_metadata(metadata: StorageMetadata) -> Metadata { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 3381c4296f..3ee7d15a76 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -504,6 +504,16 @@ impl GenericRemoteStorage { None => self.download(from, cancel).await, } } + + /// The name of the bucket/container/etc. + pub fn bucket_name(&self) -> Option<&str> { + match self { + Self::LocalFs(_s) => None, + Self::AwsS3(s) => Some(s.bucket_name()), + Self::AzureBlob(s) => Some(s.container_name()), + Self::Unreliable(_s) => None, + } + } } /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index b65d8b7e9e..056646a01e 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -386,6 +386,10 @@ impl S3Bucket { } Ok(()) } + + pub fn bucket_name(&self) -> &str { + &self.bucket_name + } } pin_project_lite::pin_project! { diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 0450851988..c7e21d7e26 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -8,21 +8,19 @@ use std::{ }; use anyhow::Context; -use aws_sdk_s3::{ - types::{Delete, ObjectIdentifier}, - Client, -}; use futures_util::TryStreamExt; use pageserver_api::shard::TenantShardId; +use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath}; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use utils::id::TenantId; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, - init_remote, - metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants}, - BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth, + init_remote, init_remote_generic, + metadata_stream::{stream_tenant_timelines, stream_tenants}, + BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, }; #[derive(Serialize, Deserialize, Debug)] @@ -324,41 +322,45 @@ impl std::fmt::Display for PurgeMode { } pub async fn get_tenant_objects( - s3_client: &Arc, - target: RootTarget, + s3_client: &GenericRemoteStorage, tenant_shard_id: TenantShardId, -) -> anyhow::Result> { +) -> anyhow::Result> { tracing::debug!("Listing objects in tenant {tenant_shard_id}"); + let tenant_root = super::remote_tenant_path(&tenant_shard_id); + // TODO: apply extra validation based on object modification time. Don't purge // tenants where any timeline's index_part.json has been touched recently. - let mut tenant_root = target.tenant_root(&tenant_shard_id); - - // Remove delimiter, so that object listing lists all keys in the prefix and not just - // common prefixes. - tenant_root.delimiter = String::new(); - - let key_stream = stream_listing(s3_client, &tenant_root); - key_stream.try_collect().await + let list = s3_client + .list( + Some(&tenant_root), + ListingMode::NoDelimiter, + None, + &CancellationToken::new(), + ) + .await?; + Ok(list.keys) } pub async fn get_timeline_objects( - s3_client: &Arc, - target: RootTarget, + s3_client: &GenericRemoteStorage, ttid: TenantShardTimelineId, -) -> anyhow::Result> { +) -> anyhow::Result> { tracing::debug!("Listing objects in timeline {ttid}"); - let mut timeline_root = target.timeline_root(&ttid); + let timeline_root = super::remote_timeline_path_id(&ttid); // TODO: apply extra validation based on object modification time. Don't purge // timelines whose index_part.json has been touched recently. - // Remove delimiter, so that object listing lists all keys in the prefix and not just - // common prefixes. - timeline_root.delimiter = String::new(); - let key_stream = stream_listing(s3_client, &timeline_root); - - key_stream.try_collect().await + let list = s3_client + .list( + Some(&timeline_root), + ListingMode::NoDelimiter, + None, + &CancellationToken::new(), + ) + .await?; + Ok(list.keys) } const MAX_KEYS_PER_DELETE: usize = 1000; @@ -369,16 +371,17 @@ const MAX_KEYS_PER_DELETE: usize = 1000; /// MAX_KEYS_PER_DELETE keys are left. /// `num_deleted` returns number of deleted keys. async fn do_delete( - s3_client: &Arc, - bucket_name: &str, - keys: &mut Vec, + remote_client: &GenericRemoteStorage, + keys: &mut Vec, dry_run: bool, drain: bool, progress_tracker: &mut DeletionProgressTracker, ) -> anyhow::Result<()> { + let cancel = CancellationToken::new(); while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) { let request_keys = keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len()))); + let num_deleted = request_keys.len(); if dry_run { tracing::info!("Dry-run deletion of objects: "); @@ -386,14 +389,10 @@ async fn do_delete( tracing::info!(" {k:?}"); } } else { - let delete_request = s3_client - .delete_objects() - .bucket(bucket_name) - .delete(Delete::builder().set_objects(Some(request_keys)).build()?); - delete_request - .send() + remote_client + .delete_objects(&request_keys, &cancel) .await - .context("DeleteObjects request")?; + .context("deletetion request")?; progress_tracker.register(num_deleted); } } @@ -431,8 +430,13 @@ pub async fn purge_garbage( input_path ); - let (s3_client, target) = - init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; + let remote_client = + init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; + + assert_eq!( + &garbage_list.bucket_config.bucket, + remote_client.bucket_name().unwrap() + ); // Sanity checks on the incoming list if garbage_list.active_tenant_count == 0 { @@ -464,16 +468,13 @@ pub async fn purge_garbage( let items = tokio_stream::iter(filtered_items.map(Ok)); let get_objects_results = items.map_ok(|i| { - let s3_client = s3_client.clone(); - let target = target.clone(); + let remote_client = remote_client.clone(); async move { match i.entity { GarbageEntity::Tenant(tenant_id) => { - get_tenant_objects(&s3_client, target, tenant_id).await - } - GarbageEntity::Timeline(ttid) => { - get_timeline_objects(&s3_client, target, ttid).await + get_tenant_objects(&remote_client, tenant_id).await } + GarbageEntity::Timeline(ttid) => get_timeline_objects(&remote_client, ttid).await, } } }); @@ -487,8 +488,7 @@ pub async fn purge_garbage( objects_to_delete.append(&mut object_list); if objects_to_delete.len() >= MAX_KEYS_PER_DELETE { do_delete( - &s3_client, - &garbage_list.bucket_config.bucket, + &remote_client, &mut objects_to_delete, dry_run, false, @@ -499,8 +499,7 @@ pub async fn purge_garbage( } do_delete( - &s3_client, - &garbage_list.bucket_config.bucket, + &remote_client, &mut objects_to_delete, dry_run, true, diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index a0b6d7ea30..5c64e7e459 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -22,9 +22,13 @@ use aws_sdk_s3::Client; use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; +use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path}; use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; -use remote_storage::RemotePath; +use remote_storage::{ + GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, + DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, +}; use reqwest::Url; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; @@ -215,6 +219,10 @@ impl RootTarget { } } +pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath { + remote_timeline_path(&id.tenant_shard_id, &id.timeline_id) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct BucketConfig { @@ -296,7 +304,7 @@ pub fn init_logging(file_name: &str) -> Option { } } -pub async fn init_s3_client(bucket_region: Region) -> Client { +async fn init_s3_client(bucket_region: Region) -> Client { let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28()) .region(bucket_region) .load() @@ -304,6 +312,13 @@ pub async fn init_s3_client(bucket_region: Region) -> Client { Client::new(&config) } +fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { + match node_kind { + NodeKind::Pageserver => "pageserver/v1/", + NodeKind::Safekeeper => "wal/", + } +} + async fn init_remote( bucket_config: BucketConfig, node_kind: NodeKind, @@ -311,18 +326,17 @@ async fn init_remote( let bucket_region = Region::new(bucket_config.region); let delimiter = "/".to_string(); let s3_client = Arc::new(init_s3_client(bucket_region).await); + let default_prefix = default_prefix_in_bucket(node_kind).to_string(); let s3_root = match node_kind { NodeKind::Pageserver => RootTarget::Pageserver(S3Target { bucket_name: bucket_config.bucket, - prefix_in_bucket: bucket_config - .prefix_in_bucket - .unwrap_or("pageserver/v1".to_string()), + prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix), delimiter, }), NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target { bucket_name: bucket_config.bucket, - prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()), + prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix), delimiter, }), }; @@ -330,6 +344,31 @@ async fn init_remote( Ok((s3_client, s3_root)) } +async fn init_remote_generic( + bucket_config: BucketConfig, + node_kind: NodeKind, +) -> anyhow::Result { + let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + let default_prefix = default_prefix_in_bucket(node_kind).to_string(); + let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix)); + let storage = S3Config { + bucket_name: bucket_config.bucket, + bucket_region: bucket_config.region, + prefix_in_bucket, + endpoint, + concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT + .try_into() + .unwrap(), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + upload_storage_class: None, + }; + let storage_config = RemoteStorageConfig { + storage: RemoteStorageKind::AwsS3(storage), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + }; + GenericRemoteStorage::from_config(&storage_config).await +} + async fn list_objects_with_retries( s3_client: &Client, s3_target: &S3Target,