diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 3c77d5a227..cb7479f6cd 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -383,6 +383,48 @@ impl RemoteStorage for AzureBlobStorage { } } + async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result { + let kind = RequestKind::Head; + let _permit = self.permit(kind, cancel).await?; + + let started_at = start_measuring_requests(kind); + + let blob_client = self.client.blob_client(self.relative_path_to_name(key)); + let properties_future = blob_client.get_properties().into_future(); + + let properties_future = tokio::time::timeout(self.timeout, properties_future); + + let res = tokio::select! { + res = properties_future => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + if let Ok(inner) = &res { + // do not incl. timeouts as errors in metrics but cancellations + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, inner, started_at); + } + + let data = match res { + Ok(Ok(data)) => Ok(data), + Ok(Err(sdk)) => Err(to_download_error(sdk)), + Err(_timeout) => Err(DownloadError::Timeout), + }?; + + let properties = data.blob.properties; + Ok(ListingObject { + key: key.to_owned(), + last_modified: SystemTime::from(properties.last_modified), + size: properties.content_length, + }) + } + async fn upload( &self, from: impl Stream> + Send + Sync + 'static, diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 2c9e298f79..cc1d3e0ae4 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -150,7 +150,7 @@ pub enum ListingMode { NoDelimiter, } -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct ListingObject { pub key: RemotePath, pub last_modified: SystemTime, @@ -215,6 +215,13 @@ pub trait RemoteStorage: Send + Sync + 'static { Ok(combined) } + /// Obtain metadata information about an object. + async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result; + /// Streams the local file contents into remote into the remote storage entry. /// /// If the operation fails because of timeout or cancellation, the root cause of the error will be @@ -363,6 +370,20 @@ impl GenericRemoteStorage> { } } + // See [`RemoteStorage::head_object`]. + pub async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result { + match self { + Self::LocalFs(s) => s.head_object(key, cancel).await, + Self::AwsS3(s) => s.head_object(key, cancel).await, + Self::AzureBlob(s) => s.head_object(key, cancel).await, + Self::Unreliable(s) => s.head_object(key, cancel).await, + } + } + /// See [`RemoteStorage::upload`] pub async fn upload( &self, @@ -598,6 +619,7 @@ impl ConcurrencyLimiter { RequestKind::Delete => &self.write, RequestKind::Copy => &self.write, RequestKind::TimeTravel => &self.write, + RequestKind::Head => &self.read, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 99b4aa4061..c3ef18cab1 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -445,6 +445,20 @@ impl RemoteStorage for LocalFs { } } + async fn head_object( + &self, + key: &RemotePath, + _cancel: &CancellationToken, + ) -> Result { + let target_file_path = key.with_base(&self.storage_root); + let metadata = file_metadata(&target_file_path).await?; + Ok(ListingObject { + key: key.clone(), + last_modified: metadata.modified()?, + size: metadata.len(), + }) + } + async fn upload( &self, data: impl Stream> + Send + Sync, diff --git a/libs/remote_storage/src/metrics.rs b/libs/remote_storage/src/metrics.rs index bbb51590f3..f1aa4c433b 100644 --- a/libs/remote_storage/src/metrics.rs +++ b/libs/remote_storage/src/metrics.rs @@ -13,6 +13,7 @@ pub(crate) enum RequestKind { List = 3, Copy = 4, TimeTravel = 5, + Head = 6, } use scopeguard::ScopeGuard; @@ -27,6 +28,7 @@ impl RequestKind { List => "list_objects", Copy => "copy_object", TimeTravel => "time_travel_recover", + Head => "head_object", } } const fn as_index(&self) -> usize { @@ -34,7 +36,8 @@ impl RequestKind { } } -pub(crate) struct RequestTyped([C; 6]); +const REQUEST_KIND_COUNT: usize = 7; +pub(crate) struct RequestTyped([C; REQUEST_KIND_COUNT]); impl RequestTyped { pub(crate) fn get(&self, kind: RequestKind) -> &C { @@ -43,8 +46,8 @@ impl RequestTyped { fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { use RequestKind::*; - let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter(); - let arr = std::array::from_fn::(|index| { + let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter(); + let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); f(next) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 1f25da813d..11f6598cbf 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -23,7 +23,7 @@ use aws_config::{ use aws_sdk_s3::{ config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep}, error::SdkError, - operation::get_object::GetObjectError, + operation::{get_object::GetObjectError, head_object::HeadObjectError}, types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass}, Client, }; @@ -604,6 +604,78 @@ impl RemoteStorage for S3Bucket { } } + async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result { + let kind = RequestKind::Head; + let _permit = self.permit(kind, cancel).await?; + + let started_at = start_measuring_requests(kind); + + let head_future = self + .client + .head_object() + .bucket(self.bucket_name()) + .key(self.relative_path_to_s3_object(key)) + .send(); + + let head_future = tokio::time::timeout(self.timeout, head_future); + + let res = tokio::select! { + res = head_future => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let res = res.map_err(|_e| DownloadError::Timeout)?; + + // do not incl. timeouts as errors in metrics but cancellations + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + let data = match res { + Ok(object_output) => object_output, + Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => { + // Count this in the AttemptOutcome::Ok bucket, because 404 is not + // an error: we expect to sometimes fetch an object and find it missing, + // e.g. when probing for timeline indices. + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Ok, + started_at, + ); + return Err(DownloadError::NotFound); + } + Err(e) => { + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Err, + started_at, + ); + + return Err(DownloadError::Other( + anyhow::Error::new(e).context("s3 head object"), + )); + } + }; + + let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else { + return Err(DownloadError::Other(anyhow!( + "head_object doesn't contain last_modified or content_length" + )))?; + }; + Ok(ListingObject { + key: key.to_owned(), + last_modified: SystemTime::try_from(last_modified).map_err(|e| { + DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}")) + })?, + size: size as u64, + }) + } + async fn upload( &self, from: impl Stream> + Send + Sync + 'static, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 13f873dcdb..c7eb634af3 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -30,6 +30,7 @@ pub struct UnreliableWrapper { #[derive(Debug, Hash, Eq, PartialEq)] enum RemoteOp { ListPrefixes(Option), + HeadObject(RemotePath), Upload(RemotePath), Download(RemotePath), Delete(RemotePath), @@ -137,6 +138,16 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list(prefix, mode, max_keys, cancel).await } + async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result { + self.attempt(RemoteOp::HeadObject(key.clone())) + .map_err(DownloadError::Other)?; + self.inner.head_object(key, cancel).await + } + async fn upload( &self, data: impl Stream> + Send + Sync + 'static, diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 35ec69fd50..9063b3c197 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -1,22 +1,22 @@ use std::collections::{HashMap, HashSet}; use anyhow::Context; -use aws_sdk_s3::Client; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver_api::shard::ShardIndex; +use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use utils::generation::Generation; use utils::id::TimelineId; use crate::cloud_admin_api::BranchData; -use crate::metadata_stream::stream_listing; -use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; +use crate::metadata_stream::stream_listing_generic; +use crate::{download_object_with_retries_generic, RootTarget, TenantShardTimelineId}; use futures_util::StreamExt; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; -use remote_storage::RemotePath; +use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath}; pub(crate) struct TimelineAnalysis { /// Anomalies detected @@ -48,13 +48,12 @@ impl TimelineAnalysis { } pub(crate) async fn branch_cleanup_and_check_errors( - s3_client: &Client, - target: &RootTarget, + remote_client: &GenericRemoteStorage, id: &TenantShardTimelineId, tenant_objects: &mut TenantObjectListing, s3_active_branch: Option<&BranchData>, console_branch: Option, - s3_data: Option, + s3_data: Option, ) -> TimelineAnalysis { let mut result = TimelineAnalysis::new(); @@ -78,7 +77,9 @@ pub(crate) async fn branch_cleanup_and_check_errors( match s3_data { Some(s3_data) => { - result.garbage_keys.extend(s3_data.unknown_keys); + result + .garbage_keys + .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string())); match s3_data.blob_data { BlobDataParseResult::Parsed { @@ -143,11 +144,8 @@ pub(crate) async fn branch_cleanup_and_check_errors( // HEAD request used here to address a race condition when an index was uploaded concurrently // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot. - let response = s3_client - .head_object() - .bucket(target.bucket_name()) - .key(path.get_path().as_str()) - .send() + let response = remote_client + .head_object(&path, &CancellationToken::new()) .await; if response.is_err() { @@ -284,14 +282,14 @@ impl TenantObjectListing { } #[derive(Debug)] -pub(crate) struct S3TimelineBlobData { +pub(crate) struct RemoteTimelineBlobData { pub(crate) blob_data: BlobDataParseResult, // Index objects that were not used when loading `blob_data`, e.g. those from old generations - pub(crate) unused_index_keys: Vec, + pub(crate) unused_index_keys: Vec, // Objects whose keys were not recognized at all, i.e. not layer files, not indices - pub(crate) unknown_keys: Vec, + pub(crate) unknown_keys: Vec, } #[derive(Debug)] @@ -322,11 +320,11 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati } } -pub(crate) async fn list_timeline_blobs( - s3_client: &Client, +pub(crate) async fn list_timeline_blobs_generic( + remote_client: &GenericRemoteStorage, id: TenantShardTimelineId, s3_root: &RootTarget, -) -> anyhow::Result { +) -> anyhow::Result { let mut s3_layers = HashSet::new(); let mut errors = Vec::new(); @@ -335,19 +333,25 @@ pub(crate) async fn list_timeline_blobs( let mut timeline_dir_target = s3_root.timeline_root(&id); timeline_dir_target.delimiter = String::new(); - let mut index_part_keys: Vec = Vec::new(); + let mut index_part_keys: Vec = Vec::new(); let mut initdb_archive: bool = false; - let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target)); - while let Some(obj) = stream.next().await { - let obj = obj?; - let key = obj.key(); + let prefix_str = &timeline_dir_target + .prefix_in_bucket + .strip_prefix("/") + .unwrap_or(&timeline_dir_target.prefix_in_bucket); - let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket); + let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); + while let Some(obj) = stream.next().await { + let (key, Some(obj)) = obj? else { + panic!("ListingObject not specified"); + }; + + let blob_name = key.get_path().as_str().strip_prefix(prefix_str); match blob_name { Some(name) if name.starts_with("index_part.json") => { tracing::debug!("Index key {key}"); - index_part_keys.push(key.to_owned()) + index_part_keys.push(obj) } Some("initdb.tar.zst") => { tracing::debug!("initdb archive {key}"); @@ -358,7 +362,7 @@ pub(crate) async fn list_timeline_blobs( } Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) { Ok((new_layer, gen)) => { - tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen); + tracing::debug!("Parsed layer key: {new_layer} {gen:?}"); s3_layers.insert((new_layer, gen)); } Err(e) => { @@ -366,13 +370,13 @@ pub(crate) async fn list_timeline_blobs( errors.push( format!("S3 list response got an object with key {key} that is not a layer name: {e}"), ); - unknown_keys.push(key.to_string()); + unknown_keys.push(obj); } }, None => { - tracing::warn!("Unknown key {}", key); + tracing::warn!("Unknown key {key}"); errors.push(format!("S3 list response got an object with odd key {key}")); - unknown_keys.push(key.to_string()); + unknown_keys.push(obj); } } } @@ -381,7 +385,7 @@ pub(crate) async fn list_timeline_blobs( tracing::debug!( "Timeline is empty apart from initdb archive: expected post-deletion state." ); - return Ok(S3TimelineBlobData { + return Ok(RemoteTimelineBlobData { blob_data: BlobDataParseResult::Relic, unused_index_keys: index_part_keys, unknown_keys: Vec::new(), @@ -395,13 +399,13 @@ pub(crate) async fn list_timeline_blobs( // Stripping the index key to the last part, because RemotePath doesn't // like absolute paths, and depending on prefix_in_bucket it's possible // for the keys we read back to start with a slash. - let basename = key.rsplit_once('/').unwrap().1; + let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1; parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g)) }) .max_by_key(|i| i.1) .map(|(k, g)| (k.clone(), g)) { - Some((key, gen)) => (Some(key), gen), + Some((key, gen)) => (Some::(key.to_owned()), gen), None => { // Legacy/missing case: one or zero index parts, which did not have a generation (index_part_keys.pop(), Generation::none()) @@ -416,17 +420,14 @@ pub(crate) async fn list_timeline_blobs( } if let Some(index_part_object_key) = index_part_object.as_ref() { - let index_part_bytes = download_object_with_retries( - s3_client, - &timeline_dir_target.bucket_name, - index_part_object_key, - ) - .await - .context("index_part.json download")?; + let index_part_bytes = + download_object_with_retries_generic(remote_client, &index_part_object_key.key) + .await + .context("index_part.json download")?; match serde_json::from_slice(&index_part_bytes) { Ok(index_part) => { - return Ok(S3TimelineBlobData { + return Ok(RemoteTimelineBlobData { blob_data: BlobDataParseResult::Parsed { index_part: Box::new(index_part), index_part_generation, @@ -448,7 +449,7 @@ pub(crate) async fn list_timeline_blobs( ); } - Ok(S3TimelineBlobData { + Ok(RemoteTimelineBlobData { blob_data: BlobDataParseResult::Incorrect { errors, s3_layers }, unused_index_keys: index_part_keys, unknown_keys, diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 1fc94cc174..3183bc3c64 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -452,23 +452,26 @@ fn stream_objects_with_retries<'a>( let mut list_stream = storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel); while let Some(res) = list_stream.next().await { - if let Err(err) = res { - let yield_err = if err.is_permanent() { - true - } else { - let backoff_time = 1 << trial.max(5); - tokio::time::sleep(Duration::from_secs(backoff_time)).await; - trial += 1; - trial == MAX_RETRIES - 1 - }; - if yield_err { - yield Err(err) - .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); - break; + match res { + Err(err) => { + let yield_err = if err.is_permanent() { + true + } else { + let backoff_time = 1 << trial.max(5); + tokio::time::sleep(Duration::from_secs(backoff_time)).await; + trial += 1; + trial == MAX_RETRIES - 1 + }; + if yield_err { + yield Err(err) + .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); + break; + } + } + Ok(res) => { + trial = 0; + yield Ok(res); } - } else { - trial = 0; - yield res.map_err(anyhow::Error::from); } } } @@ -513,41 +516,35 @@ async fn list_objects_with_retries_generic( panic!("MAX_RETRIES is not allowed to be 0"); } -async fn download_object_with_retries( - s3_client: &Client, - bucket_name: &str, - key: &str, +async fn download_object_with_retries_generic( + remote_client: &GenericRemoteStorage, + key: &RemotePath, ) -> anyhow::Result> { - for _ in 0..MAX_RETRIES { - let mut body_buf = Vec::new(); - let response_stream = match s3_client - .get_object() - .bucket(bucket_name) - .key(key) - .send() - .await - { + let cancel = CancellationToken::new(); + for trial in 0..MAX_RETRIES { + let mut buf = Vec::new(); + let download = match remote_client.download(key, &cancel).await { Ok(response) => response, Err(e) => { error!("Failed to download object for key {key}: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; + let backoff_time = 1 << trial.max(5); + tokio::time::sleep(Duration::from_secs(backoff_time)).await; continue; } }; - match response_stream - .body - .into_async_read() - .read_to_end(&mut body_buf) + match tokio_util::io::StreamReader::new(download.download_stream) + .read_to_end(&mut buf) .await { Ok(bytes_read) => { tracing::debug!("Downloaded {bytes_read} bytes for object {key}"); - return Ok(body_buf); + return Ok(buf); } Err(e) => { error!("Failed to stream object body for key {key}: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; + let backoff_time = 1 << trial.max(5); + tokio::time::sleep(Duration::from_secs(backoff_time)).await; } } } diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index 54812ffc94..eca774413a 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -2,14 +2,14 @@ use std::str::FromStr; use anyhow::{anyhow, Context}; use async_stream::{stream, try_stream}; -use aws_sdk_s3::{types::ObjectIdentifier, Client}; +use aws_sdk_s3::Client; use futures::StreamExt; use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath}; use tokio_stream::Stream; use crate::{ - list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target, - TenantShardTimelineId, + list_objects_with_retries, list_objects_with_retries_generic, stream_objects_with_retries, + RootTarget, S3Target, TenantShardTimelineId, }; use pageserver_api::shard::TenantShardId; use utils::id::{TenantId, TimelineId}; @@ -75,53 +75,38 @@ pub fn stream_tenants<'a>( } pub async fn stream_tenant_shards<'a>( - s3_client: &'a Client, + remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, tenant_id: TenantId, ) -> anyhow::Result> + 'a> { - let mut tenant_shard_ids: Vec> = Vec::new(); - let mut continuation_token = None; let shards_target = target.tenant_shards_prefix(&tenant_id); - loop { - tracing::info!("Listing in {}", shards_target.prefix_in_bucket); - let fetch_response = - list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await; - let fetch_response = match fetch_response { - Err(e) => { - tenant_shard_ids.push(Err(e)); - break; - } - Ok(r) => r, - }; + let strip_prefix = target.tenants_root().prefix_in_bucket; + let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix); - let new_entry_ids = fetch_response - .common_prefixes() - .iter() - .filter_map(|prefix| prefix.prefix()) - .filter_map(|prefix| -> Option<&str> { - prefix - .strip_prefix(&target.tenants_root().prefix_in_bucket)? - .strip_suffix('/') - }) - .map(|entry_id_str| { - let first_part = entry_id_str.split('/').next().unwrap(); + tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket); + let listing = list_objects_with_retries_generic( + remote_client, + ListingMode::WithDelimiter, + &shards_target, + ) + .await?; - first_part - .parse::() - .with_context(|| format!("Incorrect entry id str: {first_part}")) - }); + let tenant_shard_ids = listing + .prefixes + .iter() + .map(|prefix| prefix.get_path().as_str()) + .filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) }) + .map(|entry_id_str| { + let first_part = entry_id_str.split('/').next().unwrap(); - for i in new_entry_ids { - tenant_shard_ids.push(i); - } - - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } - } + first_part + .parse::() + .with_context(|| format!("Incorrect entry id str: {first_part}")) + }) + .collect::>(); + tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len()); Ok(stream! { for i in tenant_shard_ids { let id = i?; @@ -130,65 +115,6 @@ pub async fn stream_tenant_shards<'a>( }) } -/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered -/// using ListObjectsv2. The listing is done before the stream is built, so that this -/// function can be used to generate concurrency on a stream using buffer_unordered. -pub async fn stream_tenant_timelines<'a>( - s3_client: &'a Client, - target: &'a RootTarget, - tenant: TenantShardId, -) -> anyhow::Result> + 'a> { - let mut timeline_ids: Vec> = Vec::new(); - let mut continuation_token = None; - let timelines_target = target.timelines_root(&tenant); - - loop { - tracing::debug!("Listing in {}", tenant); - let fetch_response = - list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone()) - .await; - let fetch_response = match fetch_response { - Err(e) => { - timeline_ids.push(Err(e)); - break; - } - Ok(r) => r, - }; - - let new_entry_ids = fetch_response - .common_prefixes() - .iter() - .filter_map(|prefix| prefix.prefix()) - .filter_map(|prefix| -> Option<&str> { - prefix - .strip_prefix(&timelines_target.prefix_in_bucket)? - .strip_suffix('/') - }) - .map(|entry_id_str| { - entry_id_str - .parse::() - .with_context(|| format!("Incorrect entry id str: {entry_id_str}")) - }); - - for i in new_entry_ids { - timeline_ids.push(i); - } - - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } - } - - tracing::debug!("Yielding for {}", tenant); - Ok(stream! { - for i in timeline_ids { - let id = i?; - yield Ok(TenantShardTimelineId::new(tenant, id)); - } - }) -} - /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered /// using a listing. The listing is done before the stream is built, so that this /// function can be used to generate concurrency on a stream using buffer_unordered. @@ -200,6 +126,11 @@ pub async fn stream_tenant_timelines_generic<'a>( let mut timeline_ids: Vec> = Vec::new(); let timelines_target = target.timelines_root(&tenant); + let prefix_str = &timelines_target + .prefix_in_bucket + .strip_prefix("/") + .unwrap_or(&timelines_target.prefix_in_bucket); + let mut objects_stream = std::pin::pin!(stream_objects_with_retries( remote_client, ListingMode::WithDelimiter, @@ -220,11 +151,7 @@ pub async fn stream_tenant_timelines_generic<'a>( .prefixes .iter() .filter_map(|prefix| -> Option<&str> { - prefix - .get_path() - .as_str() - .strip_prefix(&timelines_target.prefix_in_bucket)? - .strip_suffix('/') + prefix.get_path().as_str().strip_prefix(prefix_str) }) .map(|entry_id_str| { entry_id_str @@ -237,7 +164,7 @@ pub async fn stream_tenant_timelines_generic<'a>( } } - tracing::debug!("Yielding for {}", tenant); + tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant); Ok(stream! { for i in timeline_ids { let id = i?; @@ -246,37 +173,6 @@ pub async fn stream_tenant_timelines_generic<'a>( }) } -pub(crate) fn stream_listing<'a>( - s3_client: &'a Client, - target: &'a S3Target, -) -> impl Stream> + 'a { - try_stream! { - let mut continuation_token = None; - loop { - let fetch_response = - list_objects_with_retries(s3_client, target, continuation_token.clone()).await?; - - if target.delimiter.is_empty() { - for object_key in fetch_response.contents().iter().filter_map(|object| object.key()) - { - let object_id = ObjectIdentifier::builder().key(object_key).build()?; - yield object_id; - } - } else { - for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) { - let object_id = ObjectIdentifier::builder().key(prefix).build()?; - yield object_id; - } - } - - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } - } - } -} - pub(crate) fn stream_listing_generic<'a>( remote_client: &'a GenericRemoteStorage, target: &'a S3Target, diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 20d9bd6dd4..6828081128 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -1,11 +1,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::Duration; -use crate::checks::{list_timeline_blobs, BlobDataParseResult}; -use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; -use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; -use aws_sdk_s3::Client; +use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult}; +use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; +use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; @@ -13,10 +12,11 @@ use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::controller_api::TenantDescribeResponse; use pageserver_api::shard::{ShardIndex, TenantShardId}; -use remote_storage::RemotePath; +use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath}; use reqwest::Method; use serde::Serialize; use storage_controller_client::control_api; +use tokio_util::sync::CancellationToken; use tracing::{info_span, Instrument}; use utils::generation::Generation; use utils::id::{TenantId, TenantTimelineId}; @@ -240,38 +240,13 @@ impl TenantRefAccumulator { } } -async fn is_old_enough( - s3_client: &Client, - bucket_config: &BucketConfig, - min_age: &Duration, - key: &str, - summary: &mut GcSummary, -) -> bool { +fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool { // Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident // it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects. - let age: Duration = match s3_client - .head_object() - .bucket(&bucket_config.bucket) - .key(key) - .send() - .await - { - Ok(response) => match response.last_modified { - None => { - tracing::warn!("Missing last_modified"); - summary.remote_storage_errors += 1; - return false; - } - Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) { - Ok(Ok(e)) => e, - Err(_) | Ok(Err(_)) => { - tracing::warn!("Bad last_modified time: {last_modified:?}"); - return false; - } - }, - }, - Err(e) => { - tracing::warn!("Failed to HEAD {key}: {e}"); + let age = match key.last_modified.elapsed() { + Ok(e) => e, + Err(_) => { + tracing::warn!("Bad last_modified time: {:?}", key.last_modified); summary.remote_storage_errors += 1; return false; } @@ -289,17 +264,30 @@ async fn is_old_enough( old_enough } +/// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it. +async fn check_is_old_enough( + remote_client: &GenericRemoteStorage, + key: &RemotePath, + min_age: &Duration, + summary: &mut GcSummary, +) -> Option { + let listing_object = remote_client + .head_object(key, &CancellationToken::new()) + .await + .ok()?; + Some(is_old_enough(min_age, &listing_object, summary)) +} + async fn maybe_delete_index( - s3_client: &Client, - bucket_config: &BucketConfig, + remote_client: &GenericRemoteStorage, min_age: &Duration, latest_gen: Generation, - key: &str, + obj: &ListingObject, mode: GcMode, summary: &mut GcSummary, ) { // Validation: we will only delete things that parse cleanly - let basename = key.rsplit_once('/').unwrap().1; + let basename = obj.key.get_path().file_name().unwrap(); let candidate_generation = match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) { Some(g) => g, @@ -328,7 +316,7 @@ async fn maybe_delete_index( return; } - if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await { + if !is_old_enough(min_age, obj, summary) { return; } @@ -338,11 +326,8 @@ async fn maybe_delete_index( } // All validations passed: erase the object - match s3_client - .delete_object() - .bucket(&bucket_config.bucket) - .key(key) - .send() + match remote_client + .delete(&obj.key, &CancellationToken::new()) .await { Ok(_) => { @@ -358,8 +343,7 @@ async fn maybe_delete_index( #[allow(clippy::too_many_arguments)] async fn gc_ancestor( - s3_client: &Client, - bucket_config: &BucketConfig, + remote_client: &GenericRemoteStorage, root_target: &RootTarget, min_age: &Duration, ancestor: TenantShardId, @@ -368,7 +352,7 @@ async fn gc_ancestor( summary: &mut GcSummary, ) -> anyhow::Result<()> { // Scan timelines in the ancestor - let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?; + let timelines = stream_tenant_timelines_generic(remote_client, root_target, ancestor).await?; let mut timelines = std::pin::pin!(timelines); // Build a list of keys to retain @@ -376,7 +360,7 @@ async fn gc_ancestor( while let Some(ttid) = timelines.next().await { let ttid = ttid?; - let data = list_timeline_blobs(s3_client, ttid, root_target).await?; + let data = list_timeline_blobs_generic(remote_client, ttid, root_target).await?; let s3_layers = match data.blob_data { BlobDataParseResult::Parsed { @@ -427,7 +411,8 @@ async fn gc_ancestor( // We apply a time threshold to GCing objects that are un-referenced: this preserves our ability // to roll back a shard split if we have to, by avoiding deleting ancestor layers right away - if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await { + let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap(); + if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) { continue; } @@ -437,13 +422,7 @@ async fn gc_ancestor( } // All validations passed: erase the object - match s3_client - .delete_object() - .bucket(&bucket_config.bucket) - .key(&key) - .send() - .await - { + match remote_client.delete(&path, &CancellationToken::new()).await { Ok(_) => { tracing::info!("Successfully deleted unreferenced ancestor layer {key}"); summary.ancestor_layers_deleted += 1; @@ -477,10 +456,11 @@ pub async fn pageserver_physical_gc( min_age: Duration, mode: GcMode, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; + let (remote_client, target) = + init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = if tenant_shard_ids.is_empty() { - futures::future::Either::Left(stream_tenants(&s3_client, &target)) + futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) } else { futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) }; @@ -493,14 +473,13 @@ pub async fn pageserver_physical_gc( let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); // Generate a stream of TenantTimelineId - let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t)); + let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); // Generate a stream of S3TimelineBlobData async fn gc_timeline( - s3_client: &Client, - bucket_config: &BucketConfig, + remote_client: &GenericRemoteStorage, min_age: &Duration, target: &RootTarget, mode: GcMode, @@ -508,7 +487,7 @@ pub async fn pageserver_physical_gc( accumulator: &Arc>, ) -> anyhow::Result { let mut summary = GcSummary::default(); - let data = list_timeline_blobs(s3_client, ttid, target).await?; + let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; let (index_part, latest_gen, candidates) = match &data.blob_data { BlobDataParseResult::Parsed { @@ -533,17 +512,9 @@ pub async fn pageserver_physical_gc( accumulator.lock().unwrap().update(ttid, index_part); for key in candidates { - maybe_delete_index( - s3_client, - bucket_config, - min_age, - latest_gen, - &key, - mode, - &mut summary, - ) - .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key)) - .await; + maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary) + .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key)) + .await; } Ok(summary) @@ -554,15 +525,7 @@ pub async fn pageserver_physical_gc( // Drain futures for per-shard GC, populating accumulator as a side effect { let timelines = timelines.map_ok(|ttid| { - gc_timeline( - &s3_client, - bucket_config, - &min_age, - &target, - mode, - ttid, - &accumulator, - ) + gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator) }); let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); @@ -586,8 +549,7 @@ pub async fn pageserver_physical_gc( for ancestor_shard in ancestor_shards { gc_ancestor( - &s3_client, - bucket_config, + &remote_client, &target, &min_age, ancestor_shard, diff --git a/storage_scrubber/src/scan_pageserver_metadata.rs b/storage_scrubber/src/scan_pageserver_metadata.rs index 2409b7b132..e89e97ccb6 100644 --- a/storage_scrubber/src/scan_pageserver_metadata.rs +++ b/storage_scrubber/src/scan_pageserver_metadata.rs @@ -1,16 +1,16 @@ use std::collections::{HashMap, HashSet}; use crate::checks::{ - branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData, - TenantObjectListing, TimelineAnalysis, + branch_cleanup_and_check_errors, list_timeline_blobs_generic, BlobDataParseResult, + RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis, }; -use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; -use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; -use aws_sdk_s3::Client; +use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; +use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::remote_layer_path; use pageserver_api::controller_api::MetadataHealthUpdateRequest; use pageserver_api::shard::TenantShardId; +use remote_storage::GenericRemoteStorage; use serde::Serialize; use utils::id::TenantId; use utils::shard::ShardCount; @@ -36,7 +36,7 @@ impl MetadataSummary { Self::default() } - fn update_data(&mut self, data: &S3TimelineBlobData) { + fn update_data(&mut self, data: &RemoteTimelineBlobData) { self.timeline_shard_count += 1; if let BlobDataParseResult::Parsed { index_part, @@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata( bucket_config: BucketConfig, tenant_ids: Vec, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?; + let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { - futures::future::Either::Left(stream_tenants(&s3_client, &target)) + futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) } else { futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok))) }; @@ -133,20 +133,20 @@ pub async fn scan_pageserver_metadata( const CONCURRENCY: usize = 32; // Generate a stream of TenantTimelineId - let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t)); + let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); // Generate a stream of S3TimelineBlobData async fn report_on_timeline( - s3_client: &Client, + remote_client: &GenericRemoteStorage, target: &RootTarget, ttid: TenantShardTimelineId, - ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> { - let data = list_timeline_blobs(s3_client, ttid, target).await?; + ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { + let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; Ok((ttid, data)) } - let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid)); + let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid)); let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different @@ -157,12 +157,11 @@ pub async fn scan_pageserver_metadata( let mut tenant_timeline_results = Vec::new(); async fn analyze_tenant( - s3_client: &Client, - target: &RootTarget, + remote_client: &GenericRemoteStorage, tenant_id: TenantId, summary: &mut MetadataSummary, mut tenant_objects: TenantObjectListing, - timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>, + timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>, highest_shard_count: ShardCount, ) { summary.tenant_count += 1; @@ -191,8 +190,7 @@ pub async fn scan_pageserver_metadata( // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects` // reference counts for layers across the tenant. let analysis = branch_cleanup_and_check_errors( - s3_client, - target, + remote_client, &ttid, &mut tenant_objects, None, @@ -273,8 +271,7 @@ pub async fn scan_pageserver_metadata( let tenant_objects = std::mem::take(&mut tenant_objects); let timelines = std::mem::take(&mut tenant_timeline_results); analyze_tenant( - &s3_client, - &target, + &remote_client, prev_tenant_id, &mut summary, tenant_objects, @@ -311,8 +308,7 @@ pub async fn scan_pageserver_metadata( if !tenant_timeline_results.is_empty() { analyze_tenant( - &s3_client, - &target, + &remote_client, tenant_id.expect("Must be set if results are present"), &mut summary, tenant_objects, diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 08a4541c5c..f20fa27d13 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -188,6 +188,11 @@ async fn check_timeline( // we need files, so unset it. timeline_dir_target.delimiter = String::new(); + let prefix_str = &timeline_dir_target + .prefix_in_bucket + .strip_prefix("/") + .unwrap_or(&timeline_dir_target.prefix_in_bucket); + let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); while let Some(obj) = stream.next().await { let (key, _obj) = obj?; @@ -195,7 +200,7 @@ async fn check_timeline( let seg_name = key .get_path() .as_str() - .strip_prefix(&timeline_dir_target.prefix_in_bucket) + .strip_prefix(prefix_str) .expect("failed to extract segment name"); expected_segfiles.remove(seg_name); } diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index 1866e6ec80..fc3a973922 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData}; -use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines}; +use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult, RemoteTimelineBlobData}; +use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines_generic}; use crate::{ - download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, + download_object_to_file, init_remote, init_remote_generic, BucketConfig, NodeKind, RootTarget, + TenantShardTimelineId, }; use anyhow::Context; use async_stream::stream; @@ -15,6 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::shard::TenantShardId; +use remote_storage::GenericRemoteStorage; use utils::generation::Generation; use utils::id::TenantId; @@ -215,11 +217,11 @@ impl SnapshotDownloader { } pub async fn download(&self) -> anyhow::Result<()> { - let (s3_client, target) = - init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?; + let (remote_client, target) = + init_remote_generic(self.bucket_config.clone(), NodeKind::Pageserver).await?; // Generate a stream of TenantShardId - let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?; + let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?; let shards: Vec = shards.try_collect().await?; // Only read from shards that have the highest count: avoids redundantly downloading @@ -237,18 +239,19 @@ impl SnapshotDownloader { for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) { // Generate a stream of TenantTimelineId - let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?; + let timelines = stream_tenant_timelines_generic(&remote_client, &target, shard).await?; // Generate a stream of S3TimelineBlobData async fn load_timeline_index( - s3_client: &Client, + remote_client: &GenericRemoteStorage, target: &RootTarget, ttid: TenantShardTimelineId, - ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> { - let data = list_timeline_blobs(s3_client, ttid, target).await?; + ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { + let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; Ok((ttid, data)) } - let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid)); + let timelines = + timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid)); let mut timelines = std::pin::pin!(timelines.try_buffered(8)); while let Some(i) = timelines.next().await { @@ -278,7 +281,7 @@ impl SnapshotDownloader { for (ttid, layers) in ancestor_layers.into_iter() { tracing::info!( - "Downloading {} layers from ancvestor timeline {ttid}...", + "Downloading {} layers from ancestor timeline {ttid}...", layers.len() ); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ba6fbc003a..9aa275d343 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4643,6 +4643,7 @@ class StorageScrubber: ] args = base_args + args + log.info(f"Invoking scrubber command {args} with env: {env}") (output_path, stdout, status_code) = subprocess_capture( self.log_dir, args,