Compare commits

...

9 Commits

Author SHA1 Message Date
Joonas Koivunen
e0aa2e2244 last_tenant_id for measuring progress 2023-12-21 13:45:29 +00:00
Joonas Koivunen
3432805e25 work around the histogram limitations
this was not enough, the same confusing error message continued to
appear.
2023-12-21 13:45:29 +00:00
Joonas Koivunen
2c23ed4873 scan_metadata: rewrite main loop with progress (compile fail) 2023-12-21 13:45:29 +00:00
Joonas Koivunen
a8f7398518 ttid: make fields pub 2023-12-21 13:45:29 +00:00
Joonas Koivunen
9684fb2ba5 silence needless warning 2023-12-21 13:45:29 +00:00
Joonas Koivunen
3ae40556f8 fix: do not use stream for listing timelines 2023-12-21 13:45:29 +00:00
Joonas Koivunen
22a7b68b23 fix some of the logging 2023-12-21 13:40:33 +00:00
Joonas Koivunen
e98c48322d feat: run specific tenants only 2023-12-21 13:40:32 +00:00
Joonas Koivunen
a2d05f8d94 fix: use similar adaptive configuration, with retries 2023-12-15 21:43:05 +00:00
7 changed files with 296 additions and 103 deletions

2
Cargo.lock generated
View File

@@ -4281,12 +4281,14 @@ dependencies = [
"async-stream",
"aws-config",
"aws-sdk-s3",
"aws-smithy-async",
"bincode",
"bytes",
"chrono",
"clap",
"crc32c",
"either",
"futures",
"futures-util",
"hex",
"histogram",

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
aws-sdk-s3.workspace = true
aws-smithy-async.workspace = true
either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
@@ -39,3 +40,5 @@ tracing-subscriber.workspace = true
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
futures.workspace = true

View File

@@ -2,7 +2,7 @@ use std::collections::HashSet;
use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use tracing::{error, info, warn};
use utils::generation::Generation;
use crate::cloud_admin_api::BranchData;
@@ -47,12 +47,13 @@ pub(crate) fn branch_cleanup_and_check_errors(
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
info!("Checking timeline {id}");
tracing::trace!("Checking timeline {id}");
if let Some(s3_active_branch) = s3_active_branch {
info!(
tracing::trace!(
"Checking console status for timeline for branch {:?}/{:?}",
s3_active_branch.project_id, s3_active_branch.id
s3_active_branch.project_id,
s3_active_branch.id
);
match console_branch {
Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
@@ -83,10 +84,10 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
result.warnings.push(format!(
"index_part.json version is not latest: {}",
index_part.get_version()
))
// result.warnings.push(format!(
// "index_part.json version is not latest: {}",
// index_part.get_version()
// ))
}
if index_part.metadata.disk_consistent_lsn()
@@ -101,7 +102,7 @@ pub(crate) fn branch_cleanup_and_check_errors(
if index_part.layer_metadata.is_empty() {
// not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers");
tracing::trace!("index_part.json has no layers");
}
for (layer, metadata) in index_part.layer_metadata {
@@ -185,17 +186,17 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if result.errors.is_empty() {
info!("No check errors found");
tracing::trace!("No check errors found");
} else {
warn!("Timeline metadata errors: {0:?}", result.errors);
tracing::info!("Timeline metadata errors: {:?}", result.errors);
}
if !result.warnings.is_empty() {
warn!("Timeline metadata warnings: {0:?}", result.warnings);
tracing::info!("Timeline metadata warnings: {:?}", result.warnings);
}
if !result.garbage_keys.is_empty() {
error!(
tracing::info!(
"The following keys should be removed from S3: {0:?}",
result.garbage_keys
)
@@ -260,20 +261,20 @@ pub(crate) async fn list_timeline_blobs(
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::info!("Index key {key}");
tracing::trace!("Index key {key}");
index_parts.push(obj)
}
Some("initdb.tar.zst") => {
tracing::info!("initdb archive {key}");
tracing::trace!("initdb archive {key}");
initdb_archive = true;
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
tracing::trace!("Parsed layer key: {} {:?}", new_layer, gen);
s3_layers.insert((new_layer, gen));
}
Err(e) => {
tracing::info!("Error parsing key {maybe_layer_name}");
tracing::trace!("Error parsing key {maybe_layer_name}");
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
@@ -281,7 +282,7 @@ pub(crate) async fn list_timeline_blobs(
}
},
None => {
tracing::info!("Peculiar key {}", key);
tracing::trace!("Peculiar key {}", key);
errors.push(format!("S3 list response got an object with odd key {key}"));
keys_to_remove.push(key.to_string());
}
@@ -289,7 +290,7 @@ pub(crate) async fn list_timeline_blobs(
}
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::info!(
tracing::trace!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {

View File

@@ -15,10 +15,12 @@ use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
@@ -55,8 +57,8 @@ pub struct S3Target {
/// TenantShardTimelineIds in on place.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct TenantShardTimelineId {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
}
impl TenantShardTimelineId {
@@ -277,9 +279,13 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
)
};
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
let mut builder = Config::builder()
.behavior_version(BehaviorVersion::v2023_11_09())
.region(bucket_region)
.retry_config(RetryConfig::adaptive().with_max_attempts(1))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
@@ -334,7 +340,7 @@ async fn list_objects_with_retries(
{
Ok(response) => return Ok(response),
Err(e) => {
error!("list_objects_v2 query failed: {e}");
error!("list_objects_v2 query failed: {e:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
@@ -372,7 +378,7 @@ async fn download_object_with_retries(
.await
{
Ok(bytes_read) => {
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
tracing::trace!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {

View File

@@ -1,3 +1,4 @@
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
@@ -34,6 +35,8 @@ enum Command {
ScanMetadata {
#[arg(short, long, default_value_t = false)]
json: bool,
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
},
}
@@ -57,35 +60,37 @@ async fn main() -> anyhow::Result<()> {
));
match cli.command {
Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
Command::ScanMetadata { json, tenant_ids } => {
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
}
}
}
},
}
Command::FindGarbage {
node_kind,
depth,

View File

@@ -48,17 +48,17 @@ pub fn stream_tenants<'a>(
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
pub async fn get_tenant_timelines(
s3_client: &Client,
target: &RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
) -> Vec<anyhow::Result<TimelineId>> {
let mut timeline_ids = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::info!("Listing in {}", tenant);
tracing::trace!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
@@ -95,9 +95,19 @@ pub async fn stream_tenant_timelines<'a>(
}
}
tracing::info!("Yielding for {}", tenant);
timeline_ids
}
pub async fn stream_tenant_timelines<'a>(
client: &'a Client,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let timelines = get_tenant_timelines(client, target, tenant).await;
// FIXME: futures is not yet imported so have to keep doing it like this:
Ok(stream! {
for i in timeline_ids {
for i in timelines {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}

View File

@@ -1,16 +1,21 @@
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use crate::metadata_stream::stream_tenants;
use crate::{init_remote, BucketConfig, NodeKind, TenantShardTimelineId};
use futures_util::StreamExt;
use histogram::Histogram;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use tracing::Instrument;
#[derive(Serialize)]
pub struct MetadataSummary {
@@ -103,7 +108,7 @@ impl MetadataSummary {
total_size += meta.file_size;
self.layer_size_bytes.sample(meta.file_size)?;
}
self.timeline_size_bytes.sample(total_size)?;
self.timeline_size_bytes.sample(total_size / 1024)?;
Ok(())
}
@@ -156,7 +161,7 @@ With errors: {1}
With warnings: {2}
With garbage: {3}
Index versions: {version_summary}
Timeline size bytes: {4}
Timeline size KiB: {4}
Layer size bytes: {5}
Timeline layer count: {6}
",
@@ -179,43 +184,204 @@ Timeline layer count: {6}
}
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let tenants = stream_tenants(&s3_client, &target);
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
s3_client: &Client,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let mut summary = MetadataSummary::new();
pin_mut!(timelines);
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
}
Ok(summary)
#[derive(Debug)]
enum Either<A, B> {
Left(A),
Right(B),
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let target = Arc::new(target);
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
let tenants = tenants.fuse();
let mut tenants = std::pin::pin!(tenants);
let mut js = tokio::task::JoinSet::new();
let mut consumed_all = false;
let summary = MetadataSummary::new();
// have timeline and timeline blob listings fight over the same semaphore
let timeline_listings = Arc::new(tokio::sync::Semaphore::new(50));
let blob_listings = timeline_listings.clone();
let spawned_tenants = AtomicUsize::new(0);
let spawned_timelines = Arc::new(AtomicUsize::new(0));
let completed_tenants = AtomicUsize::new(0);
let completed_timelines = AtomicUsize::new(0);
let last_tenant_id = std::cell::RefCell::new(None);
let s3_client = s3_client.clone();
let target = target.clone();
let summary = std::sync::Mutex::new(summary);
let scan_tenants = async {
let timeline_listings = timeline_listings;
let blob_listings = blob_listings;
// used to control whether to receive more tenants
let mut more_tenants = true;
loop {
let next_start = tokio::select! {
next_tenant = tenants.next(), if !consumed_all && more_tenants => {
match next_tenant {
Some(Ok(tenant_id)) => Either::Left(tenant_id),
Some(Err(e)) => {
consumed_all = true;
tracing::error!("tenant streaming failed with: {e:?}");
continue;
}
None => {
consumed_all = true;
continue;
}
}
},
next = js.join_next(), if !js.is_empty() => {
more_tenants = js.len() < 10;
match next.unwrap() {
Ok(Either::Left((tenant_id, timelines))) => {
completed_tenants.fetch_add(1, Ordering::Relaxed);
Either::Right((tenant_id, timelines))
}
Ok(Either::Right(Some((ttid, data)))) => {
completed_timelines.fetch_add(1, Ordering::Relaxed);
let ttid: TenantShardTimelineId = ttid;
{
let _e = tracing::info_span!("analysis", tenant_shard_id=%ttid.tenant_shard_id, timeline_id=%ttid.timeline_id).entered();
let summary = &mut summary.lock().unwrap();
summary.update_data(&data);
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
}
continue;
}
Ok(Either::Right(None)) => {
completed_timelines.fetch_add(1, Ordering::Relaxed);
continue;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
continue;
},
Err(je) => {
tracing::error!("unknown join error: {je:?}");
continue;
}
}
},
else => break,
};
let s3_client = s3_client.clone();
let target = target.clone();
let timeline_listings = timeline_listings.clone();
let blob_listings = blob_listings.clone();
match next_start {
Either::Left(tenant_shard_id) => {
let span = tracing::info_span!("get_timelines", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
js.spawn(
async move {
let _permit = timeline_listings.acquire().await;
let timelines = crate::metadata_stream::get_tenant_timelines(
&s3_client,
&target,
tenant_shard_id,
)
.await;
Either::Left((tenant_shard_id, timelines))
}
.instrument(span),
);
more_tenants = js.len() < 1000;
spawned_tenants.fetch_add(1, Ordering::Relaxed);
*last_tenant_id.borrow_mut() = Some(tenant_shard_id);
}
Either::Right((tenant_shard_id, timelines)) => {
for timeline_id in timelines {
let timeline_id = match timeline_id {
Ok(timeline_id) => timeline_id,
Err(e) => {
tracing::error!("failed to fetch a timeline: {e:?}");
continue;
}
};
let s3_client = s3_client.clone();
let target = target.clone();
let blob_listings = blob_listings.clone();
let span = tracing::info_span!("list_timelines_blobs", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
js.spawn(
async move {
let _permit = blob_listings.acquire().await;
let ttid = TenantShardTimelineId::new(tenant_shard_id, timeline_id);
match list_timeline_blobs(&s3_client, ttid, &target).await {
Ok(data) => Either::Right(Some((ttid, data))),
Err(e) => {
tracing::error!("listing failed {e:?}");
Either::Right(None)
}
}
}
.instrument(span),
);
spawned_timelines.fetch_add(1, Ordering::Relaxed);
tokio::task::yield_now().await;
}
}
}
}
};
let started_at = std::time::Instant::now();
{
let mut scan_tenants = std::pin::pin!(scan_tenants);
loop {
let res =
tokio::time::timeout(std::time::Duration::from_secs(1), &mut scan_tenants).await;
let spawned_tenants = spawned_tenants.load(Ordering::Relaxed);
let completed_tenants = completed_tenants.load(Ordering::Relaxed);
let spawned_timelines = spawned_timelines.load(Ordering::Relaxed);
let completed_timelines = completed_timelines.load(Ordering::Relaxed);
match res {
Ok(()) => {
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6} after {:?}", started_at.elapsed());
break;
}
Err(_timeout) => {
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6}, last tenant: {:?}", &*last_tenant_id.borrow());
}
}
}
}
Ok(summary.into_inner().unwrap())
}