scrubber: remove _generic postfix and two unused functions (#8761)

Removes the `_generic` postfix from the `GenericRemoteStorage` using
APIs, as `remote_storage` is the "default" now, and add a `_s3` postfix
to the remaining APIs using the S3 SDK (only in tenant snapshot). Also,
remove two unused functions: `list_objects_with_retries` and
`stream_tenants functions`.

Part of https://github.com/neondatabase/neon/issues/7547
This commit is contained in:
Arpad Müller
2024-08-19 23:58:47 +02:00
committed by GitHub
parent 6949b45e17
commit 4b26783c94
9 changed files with 67 additions and 148 deletions

View File

@@ -10,8 +10,8 @@ use utils::generation::Generation;
use utils::id::TimelineId; use utils::id::TimelineId;
use crate::cloud_admin_api::BranchData; use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing_generic; use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries_generic, RootTarget, TenantShardTimelineId}; use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use futures_util::StreamExt; use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::storage_layer::LayerName;
@@ -320,17 +320,17 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
} }
} }
pub(crate) async fn list_timeline_blobs_generic( pub(crate) async fn list_timeline_blobs(
remote_client: &GenericRemoteStorage, remote_client: &GenericRemoteStorage,
id: TenantShardTimelineId, id: TenantShardTimelineId,
s3_root: &RootTarget, root_target: &RootTarget,
) -> anyhow::Result<RemoteTimelineBlobData> { ) -> anyhow::Result<RemoteTimelineBlobData> {
let mut s3_layers = HashSet::new(); let mut s3_layers = HashSet::new();
let mut errors = Vec::new(); let mut errors = Vec::new();
let mut unknown_keys = Vec::new(); let mut unknown_keys = Vec::new();
let mut timeline_dir_target = s3_root.timeline_root(&id); let mut timeline_dir_target = root_target.timeline_root(&id);
timeline_dir_target.delimiter = String::new(); timeline_dir_target.delimiter = String::new();
let mut index_part_keys: Vec<ListingObject> = Vec::new(); let mut index_part_keys: Vec<ListingObject> = Vec::new();
@@ -341,7 +341,7 @@ pub(crate) async fn list_timeline_blobs_generic(
.strip_prefix("/") .strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket); .unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await { while let Some(obj) = stream.next().await {
let (key, Some(obj)) = obj? else { let (key, Some(obj)) = obj? else {
panic!("ListingObject not specified"); panic!("ListingObject not specified");
@@ -421,7 +421,7 @@ pub(crate) async fn list_timeline_blobs_generic(
if let Some(index_part_object_key) = index_part_object.as_ref() { if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes = let index_part_bytes =
download_object_with_retries_generic(remote_client, &index_part_object_key.key) download_object_with_retries(remote_client, &index_part_object_key.key)
.await .await
.context("index_part.json download")?; .context("index_part.json download")?;

View File

@@ -6,7 +6,7 @@ use remote_storage::ListingMode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic, checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
stream_objects_with_retries, BucketConfig, NodeKind, stream_objects_with_retries, BucketConfig, NodeKind,
}; };
@@ -50,9 +50,8 @@ pub async fn find_large_objects(
ignore_deltas: bool, ignore_deltas: bool,
concurrency: usize, concurrency: usize,
) -> anyhow::Result<LargeObjectListing> { ) -> anyhow::Result<LargeObjectListing> {
let (remote_client, target) = let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = pin!(stream_tenants(&remote_client, &target));
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| { let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id); let mut tenant_root = target.tenant_root(&tenant_shard_id);

View File

@@ -19,8 +19,8 @@ use utils::id::TenantId;
use crate::{ use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote_generic, list_objects_with_retries_generic, init_remote, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}, metadata_stream::{stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
}; };
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
node_kind: NodeKind, node_kind: NodeKind,
) -> anyhow::Result<GarbageList> { ) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API // Construct clients for S3 and for Console API
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?; let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config)); let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console // Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants_generic(&remote_client, &target); let tenants = stream_tenants(&remote_client, &target);
let tenants_checked = tenants.map_ok(|t| { let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone(); let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone(); let console_cache = console_cache.clone();
@@ -237,14 +237,13 @@ async fn find_garbage_inner(
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively // Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway // identify it as purge-able anyway
if console_result.is_none() { if console_result.is_none() {
let timelines = let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id) .await?
.await? .collect::<Vec<_>>()
.collect::<Vec<_>>() .await;
.await;
if timelines.is_empty() { if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps // No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries_generic( let tenant_objects = list_objects_with_retries(
&remote_client, &remote_client,
ListingMode::WithDelimiter, ListingMode::WithDelimiter,
&target.tenant_root(&tenant_shard_id), &target.tenant_root(&tenant_shard_id),
@@ -265,7 +264,7 @@ async fn find_garbage_inner(
for timeline_r in timelines { for timeline_r in timelines {
let timeline = timeline_r?; let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries_generic( let timeline_objects = list_objects_with_retries(
&remote_client, &remote_client,
ListingMode::WithDelimiter, ListingMode::WithDelimiter,
&target.timeline_root(&timeline), &target.timeline_root(&timeline),
@@ -331,8 +330,7 @@ async fn find_garbage_inner(
// Construct a stream of all timelines within active tenants // Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok)); let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines = let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY); let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten(); let timelines = timelines.try_flatten();
@@ -507,7 +505,7 @@ pub async fn purge_garbage(
); );
let (remote_client, _target) = let (remote_client, _target) =
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
assert_eq!( assert_eq!(
&garbage_list.bucket_config.bucket, &garbage_list.bucket_config.bucket,

View File

@@ -15,7 +15,7 @@ use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::{anyhow, Context}; use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode}; use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::config::Region; use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext; use aws_sdk_s3::error::DisplayErrorContext;
@@ -352,7 +352,7 @@ fn make_root_target(
} }
} }
async fn init_remote( async fn init_remote_s3(
bucket_config: BucketConfig, bucket_config: BucketConfig,
node_kind: NodeKind, node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> { ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
@@ -369,7 +369,7 @@ async fn init_remote(
Ok((s3_client, s3_root)) Ok((s3_client, s3_root))
} }
async fn init_remote_generic( async fn init_remote(
bucket_config: BucketConfig, bucket_config: BucketConfig,
node_kind: NodeKind, node_kind: NodeKind,
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
@@ -394,45 +394,10 @@ async fn init_remote_generic(
// We already pass the prefix to the remote client above // We already pass the prefix to the remote client above
let prefix_in_root_target = String::new(); let prefix_in_root_target = String::new();
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let client = GenericRemoteStorage::from_config(&storage_config).await?; let client = GenericRemoteStorage::from_config(&storage_config).await?;
Ok((client, s3_root)) Ok((client, root_target))
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for trial in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
.prefix(&s3_target.prefix_in_bucket)
.delimiter(&s3_target.delimiter)
.set_continuation_token(continuation_token.clone())
.send()
.await
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
} }
/// Listing possibly large amounts of keys in a streaming fashion. /// Listing possibly large amounts of keys in a streaming fashion.
@@ -479,7 +444,7 @@ fn stream_objects_with_retries<'a>(
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes, /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
/// use [`stream_objects_with_retries`] instead. /// use [`stream_objects_with_retries`] instead.
async fn list_objects_with_retries_generic( async fn list_objects_with_retries(
remote_client: &GenericRemoteStorage, remote_client: &GenericRemoteStorage,
listing_mode: ListingMode, listing_mode: ListingMode,
s3_target: &S3Target, s3_target: &S3Target,
@@ -516,7 +481,7 @@ async fn list_objects_with_retries_generic(
panic!("MAX_RETRIES is not allowed to be 0"); panic!("MAX_RETRIES is not allowed to be 0");
} }
async fn download_object_with_retries_generic( async fn download_object_with_retries(
remote_client: &GenericRemoteStorage, remote_client: &GenericRemoteStorage,
key: &RemotePath, key: &RemotePath,
) -> anyhow::Result<Vec<u8>> { ) -> anyhow::Result<Vec<u8>> {
@@ -552,7 +517,7 @@ async fn download_object_with_retries_generic(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
} }
async fn download_object_to_file( async fn download_object_to_file_s3(
s3_client: &Client, s3_client: &Client,
bucket_name: &str, bucket_name: &str,
key: &str, key: &str,

View File

@@ -2,20 +2,19 @@ use std::str::FromStr;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream}; use async_stream::{stream, try_stream};
use aws_sdk_s3::Client;
use futures::StreamExt; use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath}; use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream; use tokio_stream::Stream;
use crate::{ use crate::{
list_objects_with_retries, list_objects_with_retries_generic, stream_objects_with_retries, list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
RootTarget, S3Target, TenantShardTimelineId, TenantShardTimelineId,
}; };
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
pub fn stream_tenants_generic<'a>( pub fn stream_tenants<'a>(
remote_client: &'a GenericRemoteStorage, remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget, target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a { ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
@@ -36,44 +35,6 @@ pub fn stream_tenants_generic<'a>(
} }
} }
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
try_stream! {
let mut continuation_token = None;
let tenants_target = target.tenants_root();
loop {
let fetch_response =
list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&tenants_target.prefix_in_bucket)?
.strip_suffix('/')
}).map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
yield i?;
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub async fn stream_tenant_shards<'a>( pub async fn stream_tenant_shards<'a>(
remote_client: &'a GenericRemoteStorage, remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget, target: &'a RootTarget,
@@ -85,12 +46,9 @@ pub async fn stream_tenant_shards<'a>(
let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix); let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket); tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
let listing = list_objects_with_retries_generic( let listing =
remote_client, list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
ListingMode::WithDelimiter, .await?;
&shards_target,
)
.await?;
let tenant_shard_ids = listing let tenant_shard_ids = listing
.prefixes .prefixes
@@ -118,7 +76,7 @@ pub async fn stream_tenant_shards<'a>(
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this /// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered. /// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines_generic<'a>( pub async fn stream_tenant_timelines<'a>(
remote_client: &'a GenericRemoteStorage, remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget, target: &'a RootTarget,
tenant: TenantShardId, tenant: TenantShardId,
@@ -173,7 +131,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
}) })
} }
pub(crate) fn stream_listing_generic<'a>( pub(crate) fn stream_listing<'a>(
remote_client: &'a GenericRemoteStorage, remote_client: &'a GenericRemoteStorage,
target: &'a S3Target, target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a { ) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {

View File

@@ -2,9 +2,9 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult}; use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
@@ -352,7 +352,7 @@ async fn gc_ancestor(
summary: &mut GcSummary, summary: &mut GcSummary,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Scan timelines in the ancestor // Scan timelines in the ancestor
let timelines = stream_tenant_timelines_generic(remote_client, root_target, ancestor).await?; let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
let mut timelines = std::pin::pin!(timelines); let mut timelines = std::pin::pin!(timelines);
// Build a list of keys to retain // Build a list of keys to retain
@@ -360,7 +360,7 @@ async fn gc_ancestor(
while let Some(ttid) = timelines.next().await { while let Some(ttid) = timelines.next().await {
let ttid = ttid?; let ttid = ttid?;
let data = list_timeline_blobs_generic(remote_client, ttid, root_target).await?; let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
let s3_layers = match data.blob_data { let s3_layers = match data.blob_data {
BlobDataParseResult::Parsed { BlobDataParseResult::Parsed {
@@ -456,11 +456,10 @@ pub async fn pageserver_physical_gc(
min_age: Duration, min_age: Duration,
mode: GcMode, mode: GcMode,
) -> anyhow::Result<GcSummary> { ) -> anyhow::Result<GcSummary> {
let (remote_client, target) = let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = if tenant_shard_ids.is_empty() { let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else { } else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
}; };
@@ -473,7 +472,7 @@ pub async fn pageserver_physical_gc(
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId // Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten(); let timelines = timelines.try_flatten();
@@ -487,7 +486,7 @@ pub async fn pageserver_physical_gc(
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>, accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> { ) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default(); let mut summary = GcSummary::default();
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; let data = list_timeline_blobs(remote_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data { let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed { BlobDataParseResult::Parsed {

View File

@@ -1,11 +1,11 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use crate::checks::{ use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs_generic, BlobDataParseResult, branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
}; };
use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path; use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest; use pageserver_api::controller_api::MetadataHealthUpdateRequest;
@@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig, bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>, tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> { ) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Pageserver).await?; let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() { let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants_generic(&remote_client, &target)) futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else { } else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok))) futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
}; };
@@ -133,7 +133,7 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32; const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId // Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t)); let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten(); let timelines = timelines.try_flatten();
@@ -143,7 +143,7 @@ pub async fn scan_pageserver_metadata(
target: &RootTarget, target: &RootTarget,
ttid: TenantShardTimelineId, ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data)) Ok((ttid, data))
} }
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid)); let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));

View File

@@ -14,9 +14,8 @@ use utils::{
}; };
use crate::{ use crate::{
cloud_admin_api::CloudAdminApiClient, init_remote_generic, cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget, BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
TenantShardTimelineId,
}; };
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB. /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -107,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?; let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len()); info!("loaded {} timelines", timelines.len());
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?; let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?; let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config); let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
@@ -193,7 +192,7 @@ async fn check_timeline(
.strip_prefix("/") .strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket); .unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target)); let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await { while let Some(obj) = stream.next().await {
let (key, _obj) = obj?; let (key, _obj) = obj?;

View File

@@ -1,10 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult, RemoteTimelineBlobData}; use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines_generic}; use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{ use crate::{
download_object_to_file, init_remote, init_remote_generic, BucketConfig, NodeKind, RootTarget, download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget,
TenantShardTimelineId, TenantShardTimelineId,
}; };
use anyhow::Context; use anyhow::Context;
@@ -36,7 +36,8 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf, output_path: Utf8PathBuf,
concurrency: usize, concurrency: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let (s3_client, s3_root) =
init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self { Ok(Self {
s3_client, s3_client,
s3_root, s3_root,
@@ -93,7 +94,7 @@ impl SnapshotDownloader {
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else { let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}")); return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
}; };
download_object_to_file( download_object_to_file_s3(
&self.s3_client, &self.s3_client,
&self.bucket_config.bucket, &self.bucket_config.bucket,
&remote_layer_path, &remote_layer_path,
@@ -218,7 +219,7 @@ impl SnapshotDownloader {
pub async fn download(&self) -> anyhow::Result<()> { pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) = let (remote_client, target) =
init_remote_generic(self.bucket_config.clone(), NodeKind::Pageserver).await?; init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId // Generate a stream of TenantShardId
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?; let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
@@ -239,7 +240,7 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) { for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId // Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines_generic(&remote_client, &target, shard).await?; let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
// Generate a stream of S3TimelineBlobData // Generate a stream of S3TimelineBlobData
async fn load_timeline_index( async fn load_timeline_index(
@@ -247,7 +248,7 @@ impl SnapshotDownloader {
target: &RootTarget, target: &RootTarget,
ttid: TenantShardTimelineId, ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> { ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?; let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data)) Ok((ttid, data))
} }
let timelines = let timelines =