From 4b26783c94b582dad20efb49ca2ca842c6f944b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 19 Aug 2024 23:58:47 +0200 Subject: [PATCH] scrubber: remove _generic postfix and two unused functions (#8761) Removes the `_generic` postfix from the `GenericRemoteStorage` using APIs, as `remote_storage` is the "default" now, and add a `_s3` postfix to the remaining APIs using the S3 SDK (only in tenant snapshot). Also, remove two unused functions: `list_objects_with_retries` and `stream_tenants functions`. Part of https://github.com/neondatabase/neon/issues/7547 --- storage_scrubber/src/checks.rs | 14 ++--- storage_scrubber/src/find_large_objects.rs | 7 +-- storage_scrubber/src/garbage.rs | 26 ++++----- storage_scrubber/src/lib.rs | 51 +++------------- storage_scrubber/src/metadata_stream.rs | 58 +++---------------- .../src/pageserver_physical_gc.rs | 19 +++--- .../src/scan_pageserver_metadata.rs | 14 ++--- .../src/scan_safekeeper_metadata.rs | 9 ++- storage_scrubber/src/tenant_snapshot.rs | 17 +++--- 9 files changed, 67 insertions(+), 148 deletions(-) diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 9063b3c197..b35838bcf7 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -10,8 +10,8 @@ use utils::generation::Generation; use utils::id::TimelineId; use crate::cloud_admin_api::BranchData; -use crate::metadata_stream::stream_listing_generic; -use crate::{download_object_with_retries_generic, RootTarget, TenantShardTimelineId}; +use crate::metadata_stream::stream_listing; +use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; use futures_util::StreamExt; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; use pageserver::tenant::storage_layer::LayerName; @@ -320,17 +320,17 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati } } -pub(crate) async fn list_timeline_blobs_generic( +pub(crate) async fn list_timeline_blobs( remote_client: &GenericRemoteStorage, id: TenantShardTimelineId, - s3_root: &RootTarget, + root_target: &RootTarget, ) -> anyhow::Result { let mut s3_layers = HashSet::new(); let mut errors = Vec::new(); let mut unknown_keys = Vec::new(); - let mut timeline_dir_target = s3_root.timeline_root(&id); + let mut timeline_dir_target = root_target.timeline_root(&id); timeline_dir_target.delimiter = String::new(); let mut index_part_keys: Vec = Vec::new(); @@ -341,7 +341,7 @@ pub(crate) async fn list_timeline_blobs_generic( .strip_prefix("/") .unwrap_or(&timeline_dir_target.prefix_in_bucket); - let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); + let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target)); while let Some(obj) = stream.next().await { let (key, Some(obj)) = obj? else { panic!("ListingObject not specified"); @@ -421,7 +421,7 @@ pub(crate) async fn list_timeline_blobs_generic( if let Some(index_part_object_key) = index_part_object.as_ref() { let index_part_bytes = - download_object_with_retries_generic(remote_client, &index_part_object_key.key) + download_object_with_retries(remote_client, &index_part_object_key.key) .await .context("index_part.json download")?; diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index f5bb7e088a..88e36af560 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -6,7 +6,7 @@ use remote_storage::ListingMode; use serde::{Deserialize, Serialize}; use crate::{ - checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic, + checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants, stream_objects_with_retries, BucketConfig, NodeKind, }; @@ -50,9 +50,8 @@ pub async fn find_large_objects( ignore_deltas: bool, concurrency: usize, ) -> anyhow::Result { - let (remote_client, target) = - init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?; - let tenants = pin!(stream_tenants_generic(&remote_client, &target)); + let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; + let tenants = pin!(stream_tenants(&remote_client, &target)); let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index d6a73bf366..3e22960f8d 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -19,8 +19,8 @@ use utils::id::TenantId; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, - init_remote_generic, list_objects_with_retries_generic, - metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}, + init_remote, list_objects_with_retries, + metadata_stream::{stream_tenant_timelines, stream_tenants}, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, }; @@ -153,7 +153,7 @@ async fn find_garbage_inner( node_kind: NodeKind, ) -> anyhow::Result { // Construct clients for S3 and for Console API - let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?; + let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?; let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config)); // Build a set of console-known tenants, for quickly eliminating known-active tenants without having @@ -179,7 +179,7 @@ async fn find_garbage_inner( // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); - let tenants = stream_tenants_generic(&remote_client, &target); + let tenants = stream_tenants(&remote_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); let console_cache = console_cache.clone(); @@ -237,14 +237,13 @@ async fn find_garbage_inner( // Special case: If it's missing in console, check for known bugs that would enable us to conclusively // identify it as purge-able anyway if console_result.is_none() { - let timelines = - stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id) - .await? - .collect::>() - .await; + let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id) + .await? + .collect::>() + .await; if timelines.is_empty() { // No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps - let tenant_objects = list_objects_with_retries_generic( + let tenant_objects = list_objects_with_retries( &remote_client, ListingMode::WithDelimiter, &target.tenant_root(&tenant_shard_id), @@ -265,7 +264,7 @@ async fn find_garbage_inner( for timeline_r in timelines { let timeline = timeline_r?; - let timeline_objects = list_objects_with_retries_generic( + let timeline_objects = list_objects_with_retries( &remote_client, ListingMode::WithDelimiter, &target.timeline_root(&timeline), @@ -331,8 +330,7 @@ async fn find_garbage_inner( // Construct a stream of all timelines within active tenants let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok)); - let timelines = - active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t)); + let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t)); let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY); let timelines = timelines.try_flatten(); @@ -507,7 +505,7 @@ pub async fn purge_garbage( ); let (remote_client, _target) = - init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; + init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; assert_eq!( &garbage_list.bucket_config.bucket, diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 3183bc3c64..112f052e07 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -15,7 +15,7 @@ use std::fmt::Display; use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, Context}; +use anyhow::Context; use aws_config::retry::{RetryConfigBuilder, RetryMode}; use aws_sdk_s3::config::Region; use aws_sdk_s3::error::DisplayErrorContext; @@ -352,7 +352,7 @@ fn make_root_target( } } -async fn init_remote( +async fn init_remote_s3( bucket_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { @@ -369,7 +369,7 @@ async fn init_remote( Ok((s3_client, s3_root)) } -async fn init_remote_generic( +async fn init_remote( bucket_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { @@ -394,45 +394,10 @@ async fn init_remote_generic( // We already pass the prefix to the remote client above let prefix_in_root_target = String::new(); - let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); + let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); let client = GenericRemoteStorage::from_config(&storage_config).await?; - Ok((client, s3_root)) -} - -async fn list_objects_with_retries( - s3_client: &Client, - s3_target: &S3Target, - continuation_token: Option, -) -> anyhow::Result { - for trial in 0..MAX_RETRIES { - match s3_client - .list_objects_v2() - .bucket(&s3_target.bucket_name) - .prefix(&s3_target.prefix_in_bucket) - .delimiter(&s3_target.delimiter) - .set_continuation_token(continuation_token.clone()) - .send() - .await - { - Ok(response) => return Ok(response), - Err(e) => { - if trial == MAX_RETRIES - 1 { - return Err(e) - .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); - } - error!( - "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", - s3_target.bucket_name, - s3_target.prefix_in_bucket, - s3_target.delimiter, - DisplayErrorContext(e), - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - Err(anyhow!("unreachable unless MAX_RETRIES==0")) + Ok((client, root_target)) } /// Listing possibly large amounts of keys in a streaming fashion. @@ -479,7 +444,7 @@ fn stream_objects_with_retries<'a>( /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes, /// use [`stream_objects_with_retries`] instead. -async fn list_objects_with_retries_generic( +async fn list_objects_with_retries( remote_client: &GenericRemoteStorage, listing_mode: ListingMode, s3_target: &S3Target, @@ -516,7 +481,7 @@ async fn list_objects_with_retries_generic( panic!("MAX_RETRIES is not allowed to be 0"); } -async fn download_object_with_retries_generic( +async fn download_object_with_retries( remote_client: &GenericRemoteStorage, key: &RemotePath, ) -> anyhow::Result> { @@ -552,7 +517,7 @@ async fn download_object_with_retries_generic( anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") } -async fn download_object_to_file( +async fn download_object_to_file_s3( s3_client: &Client, bucket_name: &str, key: &str, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index eca774413a..10d77937f1 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -2,20 +2,19 @@ use std::str::FromStr; use anyhow::{anyhow, Context}; use async_stream::{stream, try_stream}; -use aws_sdk_s3::Client; use futures::StreamExt; use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath}; use tokio_stream::Stream; use crate::{ - list_objects_with_retries, list_objects_with_retries_generic, stream_objects_with_retries, - RootTarget, S3Target, TenantShardTimelineId, + list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target, + TenantShardTimelineId, }; use pageserver_api::shard::TenantShardId; use utils::id::{TenantId, TimelineId}; /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes -pub fn stream_tenants_generic<'a>( +pub fn stream_tenants<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, ) -> impl Stream> + 'a { @@ -36,44 +35,6 @@ pub fn stream_tenants_generic<'a>( } } -/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 -pub fn stream_tenants<'a>( - s3_client: &'a Client, - target: &'a RootTarget, -) -> impl Stream> + 'a { - try_stream! { - let mut continuation_token = None; - let tenants_target = target.tenants_root(); - loop { - let fetch_response = - list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?; - - let new_entry_ids = fetch_response - .common_prefixes() - .iter() - .filter_map(|prefix| prefix.prefix()) - .filter_map(|prefix| -> Option<&str> { - prefix - .strip_prefix(&tenants_target.prefix_in_bucket)? - .strip_suffix('/') - }).map(|entry_id_str| { - entry_id_str - .parse() - .with_context(|| format!("Incorrect entry id str: {entry_id_str}")) - }); - - for i in new_entry_ids { - yield i?; - } - - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } - } - } -} - pub async fn stream_tenant_shards<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, @@ -85,12 +46,9 @@ pub async fn stream_tenant_shards<'a>( let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix); tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket); - let listing = list_objects_with_retries_generic( - remote_client, - ListingMode::WithDelimiter, - &shards_target, - ) - .await?; + let listing = + list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target) + .await?; let tenant_shard_ids = listing .prefixes @@ -118,7 +76,7 @@ pub async fn stream_tenant_shards<'a>( /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered /// using a listing. The listing is done before the stream is built, so that this /// function can be used to generate concurrency on a stream using buffer_unordered. -pub async fn stream_tenant_timelines_generic<'a>( +pub async fn stream_tenant_timelines<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, tenant: TenantShardId, @@ -173,7 +131,7 @@ pub async fn stream_tenant_timelines_generic<'a>( }) } -pub(crate) fn stream_listing_generic<'a>( +pub(crate) fn stream_listing<'a>( remote_client: &'a GenericRemoteStorage, target: &'a S3Target, ) -> impl Stream)>> + 'a { diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 6828081128..88681e38c2 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -2,9 +2,9 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; -use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult}; -use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; -use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; +use crate::checks::{list_timeline_blobs, BlobDataParseResult}; +use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; +use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; @@ -352,7 +352,7 @@ async fn gc_ancestor( summary: &mut GcSummary, ) -> anyhow::Result<()> { // Scan timelines in the ancestor - let timelines = stream_tenant_timelines_generic(remote_client, root_target, ancestor).await?; + let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?; let mut timelines = std::pin::pin!(timelines); // Build a list of keys to retain @@ -360,7 +360,7 @@ async fn gc_ancestor( while let Some(ttid) = timelines.next().await { let ttid = ttid?; - let data = list_timeline_blobs_generic(remote_client, ttid, root_target).await?; + let data = list_timeline_blobs(remote_client, ttid, root_target).await?; let s3_layers = match data.blob_data { BlobDataParseResult::Parsed { @@ -456,11 +456,10 @@ pub async fn pageserver_physical_gc( min_age: Duration, mode: GcMode, ) -> anyhow::Result { - let (remote_client, target) = - init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?; + let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = if tenant_shard_ids.is_empty() { - futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) + futures::future::Either::Left(stream_tenants(&remote_client, &target)) } else { futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) }; @@ -473,7 +472,7 @@ pub async fn pageserver_physical_gc( let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); // Generate a stream of TenantTimelineId - let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); + let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t)); let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); @@ -487,7 +486,7 @@ pub async fn pageserver_physical_gc( accumulator: &Arc>, ) -> anyhow::Result { let mut summary = GcSummary::default(); - let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; + let data = list_timeline_blobs(remote_client, ttid, target).await?; let (index_part, latest_gen, candidates) = match &data.blob_data { BlobDataParseResult::Parsed { diff --git a/storage_scrubber/src/scan_pageserver_metadata.rs b/storage_scrubber/src/scan_pageserver_metadata.rs index e89e97ccb6..151ef27672 100644 --- a/storage_scrubber/src/scan_pageserver_metadata.rs +++ b/storage_scrubber/src/scan_pageserver_metadata.rs @@ -1,11 +1,11 @@ use std::collections::{HashMap, HashSet}; use crate::checks::{ - branch_cleanup_and_check_errors, list_timeline_blobs_generic, BlobDataParseResult, + branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis, }; -use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; -use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; +use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; +use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::remote_layer_path; use pageserver_api::controller_api::MetadataHealthUpdateRequest; @@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata( bucket_config: BucketConfig, tenant_ids: Vec, ) -> anyhow::Result { - let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Pageserver).await?; + let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { - futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) + futures::future::Either::Left(stream_tenants(&remote_client, &target)) } else { futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok))) }; @@ -133,7 +133,7 @@ pub async fn scan_pageserver_metadata( const CONCURRENCY: usize = 32; // Generate a stream of TenantTimelineId - let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); + let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t)); let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); @@ -143,7 +143,7 @@ pub async fn scan_pageserver_metadata( target: &RootTarget, ttid: TenantShardTimelineId, ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { - let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; + let data = list_timeline_blobs(remote_client, ttid, target).await?; Ok((ttid, data)) } let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid)); diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index f20fa27d13..1a9f3d0ef5 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -14,9 +14,8 @@ use utils::{ }; use crate::{ - cloud_admin_api::CloudAdminApiClient, init_remote_generic, - metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget, - TenantShardTimelineId, + cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing, + BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, }; /// Generally we should ask safekeepers, but so far we use everywhere default 16MB. @@ -107,7 +106,7 @@ pub async fn scan_safekeeper_metadata( let timelines = client.query(&query, &[]).await?; info!("loaded {} timelines", timelines.len()); - let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?; + let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; let cloud_admin_api_client = CloudAdminApiClient::new(console_config); @@ -193,7 +192,7 @@ async fn check_timeline( .strip_prefix("/") .unwrap_or(&timeline_dir_target.prefix_in_bucket); - let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); + let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target)); while let Some(obj) = stream.next().await { let (key, _obj) = obj?; diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index fc3a973922..bb4079b5f4 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult, RemoteTimelineBlobData}; -use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines_generic}; +use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData}; +use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines}; use crate::{ - download_object_to_file, init_remote, init_remote_generic, BucketConfig, NodeKind, RootTarget, + download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, }; use anyhow::Context; @@ -36,7 +36,8 @@ impl SnapshotDownloader { output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { - let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; + let (s3_client, s3_root) = + init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, @@ -93,7 +94,7 @@ impl SnapshotDownloader { let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else { return Err(anyhow::anyhow!("No versions found for {remote_layer_path}")); }; - download_object_to_file( + download_object_to_file_s3( &self.s3_client, &self.bucket_config.bucket, &remote_layer_path, @@ -218,7 +219,7 @@ impl SnapshotDownloader { pub async fn download(&self) -> anyhow::Result<()> { let (remote_client, target) = - init_remote_generic(self.bucket_config.clone(), NodeKind::Pageserver).await?; + init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?; // Generate a stream of TenantShardId let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?; @@ -239,7 +240,7 @@ impl SnapshotDownloader { for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) { // Generate a stream of TenantTimelineId - let timelines = stream_tenant_timelines_generic(&remote_client, &target, shard).await?; + let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?; // Generate a stream of S3TimelineBlobData async fn load_timeline_index( @@ -247,7 +248,7 @@ impl SnapshotDownloader { target: &RootTarget, ttid: TenantShardTimelineId, ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { - let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; + let data = list_timeline_blobs(remote_client, ttid, target).await?; Ok((ttid, data)) } let timelines =