diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index c702c0c312..54812ffc94 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Context}; use async_stream::{stream, try_stream}; use aws_sdk_s3::{types::ObjectIdentifier, Client}; use futures::StreamExt; -use remote_storage::{GenericRemoteStorage, ListingMode}; +use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath}; use tokio_stream::Stream; use crate::{ @@ -276,3 +276,33 @@ pub(crate) fn stream_listing<'a>( } } } + +pub(crate) fn stream_listing_generic<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a S3Target, +) -> impl Stream)>> + 'a { + let listing_mode = if target.delimiter.is_empty() { + ListingMode::NoDelimiter + } else { + ListingMode::WithDelimiter + }; + try_stream! { + let mut objects_stream = std::pin::pin!(stream_objects_with_retries( + remote_client, + listing_mode, + target, + )); + while let Some(list) = objects_stream.next().await { + let list = list?; + if target.delimiter.is_empty() { + for key in list.keys { + yield (key.key.clone(), Some(key)); + } + } else { + for key in list.prefixes { + yield (key, None); + } + } + } + } +} diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 553adf8f46..08a4541c5c 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -1,10 +1,10 @@ use std::{collections::HashSet, str::FromStr, sync::Arc}; -use aws_sdk_s3::Client; use futures::stream::{StreamExt, TryStreamExt}; use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use postgres_ffi::{XLogFileName, PG_TLI}; +use remote_storage::GenericRemoteStorage; use serde::Serialize; use tokio_postgres::types::PgLsn; use tracing::{error, info, trace}; @@ -14,8 +14,9 @@ use utils::{ }; use crate::{ - cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing, - BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, + cloud_admin_api::CloudAdminApiClient, init_remote_generic, + metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget, + TenantShardTimelineId, }; /// Generally we should ask safekeepers, but so far we use everywhere default 16MB. @@ -106,7 +107,7 @@ pub async fn scan_safekeeper_metadata( let timelines = client.query(&query, &[]).await?; info!("loaded {} timelines", timelines.len()); - let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; + let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; let cloud_admin_api_client = CloudAdminApiClient::new(console_config); @@ -119,7 +120,7 @@ pub async fn scan_safekeeper_metadata( let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg)); let ttid = TenantTimelineId::new(tenant_id, timeline_id); check_timeline( - &s3_client, + &remote_client, &target, &cloud_admin_api_client, ttid, @@ -156,7 +157,7 @@ struct TimelineCheckResult { /// errors are logged to stderr; returns Ok(true) if timeline is consistent, /// Ok(false) if not, Err if failed to check. async fn check_timeline( - s3_client: &Client, + remote_client: &GenericRemoteStorage, root: &RootTarget, api_client: &CloudAdminApiClient, ttid: TenantTimelineId, @@ -187,12 +188,13 @@ async fn check_timeline( // we need files, so unset it. timeline_dir_target.delimiter = String::new(); - let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target)); + let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); while let Some(obj) = stream.next().await { - let obj = obj?; - let key = obj.key(); + let (key, _obj) = obj?; let seg_name = key + .get_path() + .as_str() .strip_prefix(&timeline_dir_target.prefix_in_bucket) .expect("failed to extract segment name"); expected_segfiles.remove(seg_name);