Start using remote_storage in S3 scrubber for PurgeGarbage (#7932)

Starts using the `remote_storage` crate in the S3 scrubber for the
`PurgeGarbage` subcommand.

The `remote_storage` crate is generic over various backends and thus
using it gives us the ability to run the scrubber against all supported
backends.

Start with the `PurgeGarbage` subcommand as it doesn't use
`stream_tenants`.

Part of #7547.
This commit is contained in:
Arpad Müller
2024-07-22 15:49:30 +02:00
committed by GitHub
parent 8d948f2e07
commit 204bb8faa3
5 changed files with 114 additions and 56 deletions

View File

@@ -40,6 +40,7 @@ use crate::{
pub struct AzureBlobStorage {
client: ContainerClient,
container_name: String,
prefix_in_container: Option<String>,
max_keys_per_list_response: Option<NonZeroU32>,
concurrency_limiter: ConcurrencyLimiter,
@@ -85,6 +86,7 @@ impl AzureBlobStorage {
Ok(AzureBlobStorage {
client,
container_name: azure_config.container_name.to_owned(),
prefix_in_container: azure_config.prefix_in_container.to_owned(),
max_keys_per_list_response,
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
@@ -238,6 +240,10 @@ impl AzureBlobStorage {
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {

View File

@@ -504,6 +504,16 @@ impl GenericRemoteStorage {
None => self.download(from, cancel).await,
}
}
/// The name of the bucket/container/etc.
pub fn bucket_name(&self) -> Option<&str> {
match self {
Self::LocalFs(_s) => None,
Self::AwsS3(s) => Some(s.bucket_name()),
Self::AzureBlob(s) => Some(s.container_name()),
Self::Unreliable(_s) => None,
}
}
}
/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.

View File

@@ -386,6 +386,10 @@ impl S3Bucket {
}
Ok(())
}
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
}
pin_project_lite::pin_project! {

View File

@@ -8,21 +8,19 @@ use std::{
};
use anyhow::Context;
use aws_sdk_s3::{
types::{Delete, ObjectIdentifier},
Client,
};
use futures_util::TryStreamExt;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote,
metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
init_remote, init_remote_generic,
metadata_stream::{stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
#[derive(Serialize, Deserialize, Debug)]
@@ -324,41 +322,45 @@ impl std::fmt::Display for PurgeMode {
}
pub async fn get_tenant_objects(
s3_client: &Arc<Client>,
target: RootTarget,
s3_client: &GenericRemoteStorage,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
) -> anyhow::Result<Vec<RemotePath>> {
tracing::debug!("Listing objects in tenant {tenant_shard_id}");
let tenant_root = super::remote_tenant_path(&tenant_shard_id);
// TODO: apply extra validation based on object modification time. Don't purge
// tenants where any timeline's index_part.json has been touched recently.
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// Remove delimiter, so that object listing lists all keys in the prefix and not just
// common prefixes.
tenant_root.delimiter = String::new();
let key_stream = stream_listing(s3_client, &tenant_root);
key_stream.try_collect().await
let list = s3_client
.list(
Some(&tenant_root),
ListingMode::NoDelimiter,
None,
&CancellationToken::new(),
)
.await?;
Ok(list.keys)
}
pub async fn get_timeline_objects(
s3_client: &Arc<Client>,
target: RootTarget,
s3_client: &GenericRemoteStorage,
ttid: TenantShardTimelineId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
) -> anyhow::Result<Vec<RemotePath>> {
tracing::debug!("Listing objects in timeline {ttid}");
let mut timeline_root = target.timeline_root(&ttid);
let timeline_root = super::remote_timeline_path_id(&ttid);
// TODO: apply extra validation based on object modification time. Don't purge
// timelines whose index_part.json has been touched recently.
// Remove delimiter, so that object listing lists all keys in the prefix and not just
// common prefixes.
timeline_root.delimiter = String::new();
let key_stream = stream_listing(s3_client, &timeline_root);
key_stream.try_collect().await
let list = s3_client
.list(
Some(&timeline_root),
ListingMode::NoDelimiter,
None,
&CancellationToken::new(),
)
.await?;
Ok(list.keys)
}
const MAX_KEYS_PER_DELETE: usize = 1000;
@@ -369,16 +371,17 @@ const MAX_KEYS_PER_DELETE: usize = 1000;
/// MAX_KEYS_PER_DELETE keys are left.
/// `num_deleted` returns number of deleted keys.
async fn do_delete(
s3_client: &Arc<Client>,
bucket_name: &str,
keys: &mut Vec<ObjectIdentifier>,
remote_client: &GenericRemoteStorage,
keys: &mut Vec<RemotePath>,
dry_run: bool,
drain: bool,
progress_tracker: &mut DeletionProgressTracker,
) -> anyhow::Result<()> {
let cancel = CancellationToken::new();
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
let request_keys =
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
let num_deleted = request_keys.len();
if dry_run {
tracing::info!("Dry-run deletion of objects: ");
@@ -386,14 +389,10 @@ async fn do_delete(
tracing::info!(" {k:?}");
}
} else {
let delete_request = s3_client
.delete_objects()
.bucket(bucket_name)
.delete(Delete::builder().set_objects(Some(request_keys)).build()?);
delete_request
.send()
remote_client
.delete_objects(&request_keys, &cancel)
.await
.context("DeleteObjects request")?;
.context("deletetion request")?;
progress_tracker.register(num_deleted);
}
}
@@ -431,8 +430,13 @@ pub async fn purge_garbage(
input_path
);
let (s3_client, target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
let remote_client =
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
assert_eq!(
&garbage_list.bucket_config.bucket,
remote_client.bucket_name().unwrap()
);
// Sanity checks on the incoming list
if garbage_list.active_tenant_count == 0 {
@@ -464,16 +468,13 @@ pub async fn purge_garbage(
let items = tokio_stream::iter(filtered_items.map(Ok));
let get_objects_results = items.map_ok(|i| {
let s3_client = s3_client.clone();
let target = target.clone();
let remote_client = remote_client.clone();
async move {
match i.entity {
GarbageEntity::Tenant(tenant_id) => {
get_tenant_objects(&s3_client, target, tenant_id).await
}
GarbageEntity::Timeline(ttid) => {
get_timeline_objects(&s3_client, target, ttid).await
get_tenant_objects(&remote_client, tenant_id).await
}
GarbageEntity::Timeline(ttid) => get_timeline_objects(&remote_client, ttid).await,
}
}
});
@@ -487,8 +488,7 @@ pub async fn purge_garbage(
objects_to_delete.append(&mut object_list);
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
do_delete(
&s3_client,
&garbage_list.bucket_config.bucket,
&remote_client,
&mut objects_to_delete,
dry_run,
false,
@@ -499,8 +499,7 @@ pub async fn purge_garbage(
}
do_delete(
&s3_client,
&garbage_list.bucket_config.bucket,
&remote_client,
&mut objects_to_delete,
dry_run,
true,

View File

@@ -22,9 +22,13 @@ use aws_sdk_s3::Client;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use remote_storage::RemotePath;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
@@ -215,6 +219,10 @@ impl RootTarget {
}
}
pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
remote_timeline_path(&id.tenant_shard_id, &id.timeline_id)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BucketConfig {
@@ -296,7 +304,7 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
}
pub async fn init_s3_client(bucket_region: Region) -> Client {
async fn init_s3_client(bucket_region: Region) -> Client {
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.load()
@@ -304,6 +312,13 @@ pub async fn init_s3_client(bucket_region: Region) -> Client {
Client::new(&config)
}
fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
match node_kind {
NodeKind::Pageserver => "pageserver/v1/",
NodeKind::Safekeeper => "wal/",
}
}
async fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
@@ -311,18 +326,17 @@ async fn init_remote(
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: bucket_config
.prefix_in_bucket
.unwrap_or("pageserver/v1".to_string()),
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()),
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
delimiter,
}),
};
@@ -330,6 +344,31 @@ async fn init_remote(
Ok((s3_client, s3_root))
}
async fn init_remote_generic(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<GenericRemoteStorage> {
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
let storage = S3Config {
bucket_name: bucket_config.bucket,
bucket_region: bucket_config.region,
prefix_in_bucket,
endpoint,
concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
.try_into()
.unwrap(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: None,
};
let storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(storage),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
GenericRemoteStorage::from_config(&storage_config).await
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,