mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
scrubber: add remote_storage based listing APIs and use them in find-large-objects (#8541)
Add two new functions `stream_objects_with_retries` and `stream_tenants_generic` and use them in the `find-large-objects` subcommand, migrating it to `remote_storage`. Also adds the `size` field to the `ListingObject` struct. Part of #7547
This commit is contained in:
@@ -355,7 +355,8 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
.blobs()
|
||||
.map(|k| ListingObject{
|
||||
key: self.name_to_relative_path(&k.name),
|
||||
last_modified: k.properties.last_modified.into()
|
||||
last_modified: k.properties.last_modified.into(),
|
||||
size: k.properties.content_length,
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
@@ -153,6 +153,7 @@ pub enum ListingMode {
|
||||
pub struct ListingObject {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -368,6 +368,7 @@ impl RemoteStorage for LocalFs {
|
||||
key: k.clone(),
|
||||
// LocalFs is just for testing, so just specify a dummy time
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
})
|
||||
@@ -411,6 +412,7 @@ impl RemoteStorage for LocalFs {
|
||||
key: RemotePath::from_string(&relative_key).unwrap(),
|
||||
// LocalFs is just for testing
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,9 +565,12 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
};
|
||||
|
||||
let size = object.size.unwrap_or(0) as u64;
|
||||
|
||||
result.keys.push(ListingObject{
|
||||
key,
|
||||
last_modified
|
||||
last_modified,
|
||||
size,
|
||||
});
|
||||
if let Some(mut mk) = max_keys {
|
||||
assert!(mk > 0);
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use remote_storage::ListingMode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
|
||||
metadata_stream::stream_tenants, BucketConfig, NodeKind,
|
||||
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
|
||||
stream_objects_with_retries, BucketConfig, NodeKind,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -47,45 +50,38 @@ pub async fn find_large_objects(
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
let (remote_client, target) =
|
||||
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
let s3_client = s3_client.clone();
|
||||
let remote_client = remote_client.clone();
|
||||
async move {
|
||||
let mut objects = Vec::new();
|
||||
let mut total_objects_ctr = 0u64;
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let mut objects_stream = pin!(stream_objects_with_retries(
|
||||
&remote_client,
|
||||
ListingMode::NoDelimiter,
|
||||
&tenant_root
|
||||
));
|
||||
while let Some(listing) = objects_stream.next().await {
|
||||
let listing = listing?;
|
||||
for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
|
||||
let key = obj.key.to_string();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
size: obj.size,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
total_objects_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
total_objects_ctr += listing.keys.len() as u64;
|
||||
}
|
||||
|
||||
Ok((tenant_shard_id, objects, total_objects_ctr))
|
||||
|
||||
@@ -510,7 +510,7 @@ pub async fn purge_garbage(
|
||||
input_path
|
||||
);
|
||||
|
||||
let remote_client =
|
||||
let (remote_client, _target) =
|
||||
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -22,16 +22,18 @@ use aws_sdk_s3::Client;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
use futures::{Stream, StreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
@@ -319,27 +321,35 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_root_target(
|
||||
bucket_name: String,
|
||||
prefix_in_bucket: String,
|
||||
node_kind: NodeKind,
|
||||
) -> RootTarget {
|
||||
let s3_target = S3Target {
|
||||
bucket_name,
|
||||
prefix_in_bucket,
|
||||
delimiter: "/".to_string(),
|
||||
};
|
||||
match node_kind {
|
||||
NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
|
||||
NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_remote(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
let bucket_region = Region::new(bucket_config.region);
|
||||
let delimiter = "/".to_string();
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region).await);
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
|
||||
let s3_root = match node_kind {
|
||||
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
|
||||
bucket_name: bucket_config.bucket,
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
delimiter,
|
||||
}),
|
||||
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
|
||||
bucket_name: bucket_config.bucket,
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
delimiter,
|
||||
}),
|
||||
};
|
||||
let s3_root = make_root_target(
|
||||
bucket_config.bucket,
|
||||
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
node_kind,
|
||||
);
|
||||
|
||||
Ok((s3_client, s3_root))
|
||||
}
|
||||
@@ -347,12 +357,12 @@ async fn init_remote(
|
||||
async fn init_remote_generic(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
|
||||
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
|
||||
let storage = S3Config {
|
||||
bucket_name: bucket_config.bucket,
|
||||
bucket_name: bucket_config.bucket.clone(),
|
||||
bucket_region: bucket_config.region,
|
||||
prefix_in_bucket,
|
||||
endpoint,
|
||||
@@ -366,7 +376,13 @@ async fn init_remote_generic(
|
||||
storage: RemoteStorageKind::AwsS3(storage),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
GenericRemoteStorage::from_config(&storage_config).await
|
||||
|
||||
// We already pass the prefix to the remote client above
|
||||
let prefix_in_root_target = String::new();
|
||||
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
|
||||
|
||||
let client = GenericRemoteStorage::from_config(&storage_config).await?;
|
||||
Ok((client, s3_root))
|
||||
}
|
||||
|
||||
async fn list_objects_with_retries(
|
||||
@@ -404,6 +420,44 @@ async fn list_objects_with_retries(
|
||||
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
|
||||
}
|
||||
|
||||
fn stream_objects_with_retries<'a>(
|
||||
storage_client: &'a GenericRemoteStorage,
|
||||
listing_mode: ListingMode,
|
||||
s3_target: &'a S3Target,
|
||||
) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
|
||||
async_stream::stream! {
|
||||
let mut trial = 0;
|
||||
let cancel = CancellationToken::new();
|
||||
let prefix_str = &s3_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&s3_target.prefix_in_bucket);
|
||||
let prefix = RemotePath::from_string(prefix_str)?;
|
||||
let mut list_stream =
|
||||
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
|
||||
while let Some(res) = list_stream.next().await {
|
||||
if let Err(err) = res {
|
||||
let yield_err = if err.is_permanent() {
|
||||
true
|
||||
} else {
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
trial += 1;
|
||||
trial == MAX_RETRIES - 1
|
||||
};
|
||||
if yield_err {
|
||||
yield Err(err)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
trial = 0;
|
||||
yield res.map_err(anyhow::Error::from);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_object_with_retries(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
|
||||
@@ -1,12 +1,41 @@
|
||||
use anyhow::Context;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use async_stream::{stream, try_stream};
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode};
|
||||
use tokio_stream::Stream;
|
||||
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
|
||||
use crate::{
|
||||
list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
|
||||
TenantShardTimelineId,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
|
||||
pub fn stream_tenants_generic<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let tenants_target = target.tenants_root();
|
||||
let mut tenants_stream =
|
||||
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
|
||||
while let Some(chunk) = tenants_stream.next().await {
|
||||
let chunk = chunk?;
|
||||
let entry_ids = chunk.prefixes.iter()
|
||||
.map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
|
||||
for dir_name_res in entry_ids {
|
||||
let dir_name = dir_name_res?;
|
||||
let id = TenantShardId::from_str(dir_name)?;
|
||||
yield id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
s3_client: &'a Client,
|
||||
|
||||
Reference in New Issue
Block a user