From 9fabdda2dcaa67536bdec9e65303d22674dbb9b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 30 Jul 2024 11:00:37 +0200 Subject: [PATCH] scrubber: add remote_storage based listing APIs and use them in find-large-objects (#8541) Add two new functions `stream_objects_with_retries` and `stream_tenants_generic` and use them in the `find-large-objects` subcommand, migrating it to `remote_storage`. Also adds the `size` field to the `ListingObject` struct. Part of #7547 --- libs/remote_storage/src/azure_blob.rs | 3 +- libs/remote_storage/src/lib.rs | 1 + libs/remote_storage/src/local_fs.rs | 2 + libs/remote_storage/src/s3_bucket.rs | 5 +- storage_scrubber/src/find_large_objects.rs | 44 +++++------ storage_scrubber/src/garbage.rs | 2 +- storage_scrubber/src/lib.rs | 90 +++++++++++++++++----- storage_scrubber/src/metadata_stream.rs | 33 +++++++- 8 files changed, 133 insertions(+), 47 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 6ca4ae43f2..3c77d5a227 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -355,7 +355,8 @@ impl RemoteStorage for AzureBlobStorage { .blobs() .map(|k| ListingObject{ key: self.name_to_relative_path(&k.name), - last_modified: k.properties.last_modified.into() + last_modified: k.properties.last_modified.into(), + size: k.properties.content_length, } ); diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 031548bbec..794e696769 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -153,6 +153,7 @@ pub enum ListingMode { pub struct ListingObject { pub key: RemotePath, pub last_modified: SystemTime, + pub size: u64, } #[derive(Default)] diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index bc6b10aa51..99b4aa4061 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -368,6 +368,7 @@ impl RemoteStorage for LocalFs { key: k.clone(), // LocalFs is just for testing, so just specify a dummy time last_modified: SystemTime::now(), + size: 0, }) } }) @@ -411,6 +412,7 @@ impl RemoteStorage for LocalFs { key: RemotePath::from_string(&relative_key).unwrap(), // LocalFs is just for testing last_modified: SystemTime::now(), + size: 0, }); } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 412f307445..1f25da813d 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -565,9 +565,12 @@ impl RemoteStorage for S3Bucket { } }; + let size = object.size.unwrap_or(0) as u64; + result.keys.push(ListingObject{ key, - last_modified + last_modified, + size, }); if let Some(mut mk) = max_keys { assert!(mk > 0); diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 2ef802229d..f5bb7e088a 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -1,10 +1,13 @@ +use std::pin::pin; + use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::storage_layer::LayerName; +use remote_storage::ListingMode; use serde::{Deserialize, Serialize}; use crate::{ - checks::parse_layer_object_name, init_remote, list_objects_with_retries, - metadata_stream::stream_tenants, BucketConfig, NodeKind, + checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic, + stream_objects_with_retries, BucketConfig, NodeKind, }; #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -47,45 +50,38 @@ pub async fn find_large_objects( ignore_deltas: bool, concurrency: usize, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; - let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); + let (remote_client, target) = + init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?; + let tenants = pin!(stream_tenants_generic(&remote_client, &target)); let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - let s3_client = s3_client.clone(); + let remote_client = remote_client.clone(); async move { let mut objects = Vec::new(); let mut total_objects_ctr = 0u64; // We want the objects and not just common prefixes tenant_root.delimiter.clear(); - let mut continuation_token = None; - loop { - let fetch_response = - list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) - .await?; - for obj in fetch_response.contents().iter().filter(|o| { - if let Some(obj_size) = o.size { - min_size as i64 <= obj_size - } else { - false - } - }) { - let key = obj.key().expect("couldn't get key").to_owned(); + let mut objects_stream = pin!(stream_objects_with_retries( + &remote_client, + ListingMode::NoDelimiter, + &tenant_root + )); + while let Some(listing) = objects_stream.next().await { + let listing = listing?; + for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) { + let key = obj.key.to_string(); let kind = LargeObjectKind::from_key(&key); if ignore_deltas && kind == LargeObjectKind::DeltaLayer { continue; } objects.push(LargeObject { key, - size: obj.size.unwrap() as u64, + size: obj.size, kind, }) } - total_objects_ctr += fetch_response.contents().len() as u64; - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } + total_objects_ctr += listing.keys.len() as u64; } Ok((tenant_shard_id, objects, total_objects_ctr)) diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 78ecfc7232..73479c3658 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -510,7 +510,7 @@ pub async fn purge_garbage( input_path ); - let remote_client = + let (remote_client, _target) = init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; assert_eq!( diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 5c64e7e459..c7900f9b02 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -22,16 +22,18 @@ use aws_sdk_s3::Client; use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; +use futures::{Stream, StreamExt}; use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path}; use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; use remote_storage::{ - GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, - DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind, + S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, }; use reqwest::Url; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; +use tokio_util::sync::CancellationToken; use tracing::error; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -319,27 +321,35 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { } } +fn make_root_target( + bucket_name: String, + prefix_in_bucket: String, + node_kind: NodeKind, +) -> RootTarget { + let s3_target = S3Target { + bucket_name, + prefix_in_bucket, + delimiter: "/".to_string(), + }; + match node_kind { + NodeKind::Pageserver => RootTarget::Pageserver(s3_target), + NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target), + } +} + async fn init_remote( bucket_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { let bucket_region = Region::new(bucket_config.region); - let delimiter = "/".to_string(); let s3_client = Arc::new(init_s3_client(bucket_region).await); let default_prefix = default_prefix_in_bucket(node_kind).to_string(); - let s3_root = match node_kind { - NodeKind::Pageserver => RootTarget::Pageserver(S3Target { - bucket_name: bucket_config.bucket, - prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix), - delimiter, - }), - NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target { - bucket_name: bucket_config.bucket, - prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix), - delimiter, - }), - }; + let s3_root = make_root_target( + bucket_config.bucket, + bucket_config.prefix_in_bucket.unwrap_or(default_prefix), + node_kind, + ); Ok((s3_client, s3_root)) } @@ -347,12 +357,12 @@ async fn init_remote( async fn init_remote_generic( bucket_config: BucketConfig, node_kind: NodeKind, -) -> anyhow::Result { +) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { let endpoint = env::var("AWS_ENDPOINT_URL").ok(); let default_prefix = default_prefix_in_bucket(node_kind).to_string(); let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix)); let storage = S3Config { - bucket_name: bucket_config.bucket, + bucket_name: bucket_config.bucket.clone(), bucket_region: bucket_config.region, prefix_in_bucket, endpoint, @@ -366,7 +376,13 @@ async fn init_remote_generic( storage: RemoteStorageKind::AwsS3(storage), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; - GenericRemoteStorage::from_config(&storage_config).await + + // We already pass the prefix to the remote client above + let prefix_in_root_target = String::new(); + let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); + + let client = GenericRemoteStorage::from_config(&storage_config).await?; + Ok((client, s3_root)) } async fn list_objects_with_retries( @@ -404,6 +420,44 @@ async fn list_objects_with_retries( Err(anyhow!("unreachable unless MAX_RETRIES==0")) } +fn stream_objects_with_retries<'a>( + storage_client: &'a GenericRemoteStorage, + listing_mode: ListingMode, + s3_target: &'a S3Target, +) -> impl Stream> + 'a { + async_stream::stream! { + let mut trial = 0; + let cancel = CancellationToken::new(); + let prefix_str = &s3_target + .prefix_in_bucket + .strip_prefix("/") + .unwrap_or(&s3_target.prefix_in_bucket); + let prefix = RemotePath::from_string(prefix_str)?; + let mut list_stream = + storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel); + while let Some(res) = list_stream.next().await { + if let Err(err) = res { + let yield_err = if err.is_permanent() { + true + } else { + let backoff_time = 1 << trial.max(5); + tokio::time::sleep(Duration::from_secs(backoff_time)).await; + trial += 1; + trial == MAX_RETRIES - 1 + }; + if yield_err { + yield Err(err) + .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); + break; + } + } else { + trial = 0; + yield res.map_err(anyhow::Error::from); + } + } + } +} + async fn download_object_with_retries( s3_client: &Client, bucket_name: &str, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index c05874f556..91dba3c992 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -1,12 +1,41 @@ -use anyhow::Context; +use std::str::FromStr; + +use anyhow::{anyhow, Context}; use async_stream::{stream, try_stream}; use aws_sdk_s3::{types::ObjectIdentifier, Client}; +use futures::StreamExt; +use remote_storage::{GenericRemoteStorage, ListingMode}; use tokio_stream::Stream; -use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId}; +use crate::{ + list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target, + TenantShardTimelineId, +}; use pageserver_api::shard::TenantShardId; use utils::id::{TenantId, TimelineId}; +/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes +pub fn stream_tenants_generic<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a RootTarget, +) -> impl Stream> + 'a { + try_stream! { + let tenants_target = target.tenants_root(); + let mut tenants_stream = + std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target)); + while let Some(chunk) = tenants_stream.next().await { + let chunk = chunk?; + let entry_ids = chunk.prefixes.iter() + .map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'"))); + for dir_name_res in entry_ids { + let dir_name = dir_name_res?; + let id = TenantShardId::from_str(dir_name)?; + yield id; + } + } + } +} + /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 pub fn stream_tenants<'a>( s3_client: &'a Client,