mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
scrubber: use adaptive config with retries, check subset of tenants (#6219)
The tool still needs a lot of work. These are the easiest fix and feature: - use similar adaptive config with s3 as remote_storage, use retries - process only particular tenants Tenants need to be from the correct region, they are not deduplicated, but the feature is useful for re-checking small amount of tenants after a large run.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4405,12 +4405,14 @@ dependencies = [
|
||||
"async-stream",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-async",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
"either",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"histogram",
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-smithy-async.workspace = true
|
||||
either.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
anyhow.workspace = true
|
||||
@@ -39,3 +40,5 @@ tracing-subscriber.workspace = true
|
||||
clap.workspace = true
|
||||
tracing-appender = "0.2"
|
||||
histogram = "0.7"
|
||||
|
||||
futures.workspace = true
|
||||
|
||||
@@ -16,10 +16,12 @@ use aws_config::environment::EnvironmentVariableCredentialsProvider;
|
||||
use aws_config::imds::credentials::ImdsCredentialsProvider;
|
||||
use aws_config::meta::credentials::CredentialsProviderChain;
|
||||
use aws_config::profile::ProfileFileCredentialsProvider;
|
||||
use aws_config::retry::RetryConfig;
|
||||
use aws_config::sso::SsoCredentialsProvider;
|
||||
use aws_config::BehaviorVersion;
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::{Client, Config};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
|
||||
use clap::ValueEnum;
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
@@ -283,9 +285,13 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
|
||||
)
|
||||
};
|
||||
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
let mut builder = Config::builder()
|
||||
.behavior_version(BehaviorVersion::v2023_11_09())
|
||||
.region(bucket_region)
|
||||
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
.credentials_provider(credentials_provider);
|
||||
|
||||
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use s3_scrubber::scan_metadata::scan_metadata;
|
||||
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
|
||||
@@ -34,6 +35,8 @@ enum Command {
|
||||
ScanMetadata {
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
json: bool,
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -57,35 +60,37 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
|
||||
match cli.command {
|
||||
Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
}
|
||||
Ok(summary) => {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(&summary).unwrap())
|
||||
} else {
|
||||
println!("{}", summary.summary_string());
|
||||
Command::ScanMetadata { json, tenant_ids } => {
|
||||
match scan_metadata(bucket_config.clone(), tenant_ids).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
}
|
||||
if summary.is_fatal() {
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
Err(anyhow::anyhow!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(summary) => {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(&summary).unwrap())
|
||||
} else {
|
||||
println!("{}", summary.summary_string());
|
||||
}
|
||||
if summary.is_fatal() {
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
Err(anyhow::anyhow!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Command::FindGarbage {
|
||||
node_kind,
|
||||
depth,
|
||||
|
||||
@@ -187,10 +187,17 @@ Timeline layer count: {6}
|
||||
}
|
||||
|
||||
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
|
||||
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
|
||||
pub async fn scan_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
|
||||
|
||||
let tenants = stream_tenants(&s3_client, &target);
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
|
||||
};
|
||||
|
||||
// How many tenants to process in parallel. We need to be mindful of pageservers
|
||||
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
||||
|
||||
Reference in New Issue
Block a user