Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Chi Z
7bd3e7a803 feat(scrubber): more parallelism for metadata check
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-06 17:28:13 +08:00
3 changed files with 357 additions and 171 deletions

View File

@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;
use futures_util::StreamExt;
@@ -55,7 +56,7 @@ impl TimelineAnalysis {
pub(crate) async fn branch_cleanup_and_check_errors(
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<RemoteTimelineBlobData>,
@@ -150,7 +151,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
))
}
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
if !tenant_objects
.lock()
.await
.check_ref(id.timeline_id, &layer, &metadata)
{
let path = remote_layer_path(
&id.tenant_shard_id.tenant_id,
&id.timeline_id,

View File

@@ -73,8 +73,12 @@ enum Command {
node_kind: NodeKind,
#[arg(short, long, default_value_t = false)]
json: bool,
/// If provided, only these tenants will be listed from the remote storage.
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
/// If provided, we will list all tenants, but then filter with the prefix.
#[arg(long = "tenant-id-prefix")]
tenant_id_prefix: Option<TenantId>,
#[arg(long = "post", default_value_t = false)]
post_to_storcon: bool,
#[arg(long, default_value = None)]
@@ -178,6 +182,7 @@ async fn main() -> anyhow::Result<()> {
Command::ScanMetadata {
json,
tenant_ids,
tenant_id_prefix,
node_kind,
post_to_storcon,
dump_db_connstr,
@@ -186,6 +191,9 @@ async fn main() -> anyhow::Result<()> {
verbose,
} => {
if let NodeKind::Safekeeper = node_kind {
if tenant_id_prefix.is_some() {
bail!("`tenant_id_prefix` is not supported for safekeeper node_kind");
}
let db_or_list = match (timeline_lsns, dump_db_connstr) {
(Some(timeline_lsns), _) => {
let timeline_lsns = serde_json::from_str(&timeline_lsns)
@@ -227,6 +235,7 @@ async fn main() -> anyhow::Result<()> {
bucket_config,
controller_client.as_ref(),
tenant_ids,
tenant_id_prefix,
json,
post_to_storcon,
verbose,
@@ -338,6 +347,7 @@ pub async fn run_cron_job(
bucket_config,
controller_client,
Vec::new(),
None,
true,
post_to_storcon,
false, // default to non-verbose mode
@@ -384,10 +394,12 @@ pub async fn pageserver_physical_gc_cmd(
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn scan_pageserver_metadata_cmd(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
tenant_id_prefix: Option<TenantId>,
json: bool,
post_to_storcon: bool,
verbose: bool,
@@ -398,7 +410,14 @@ pub async fn scan_pageserver_metadata_cmd(
"Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
match scan_pageserver_metadata(
bucket_config.clone(),
tenant_shard_ids,
tenant_id_prefix,
verbose,
)
.await
{
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)

View File

@@ -1,5 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use futures::SinkExt;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
@@ -7,6 +9,7 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use tracing::{Instrument, info_span};
use utils::generation::Generation;
use utils::id::TenantId;
use utils::shard::ShardCount;
@@ -14,10 +17,12 @@ use crate::checks::{
BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
branch_cleanup_and_check_errors, list_timeline_blobs,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::metadata_stream::{
stream_tenant_timelines, stream_tenants, stream_tenants_maybe_prefix,
};
use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
#[derive(Serialize, Default)]
#[derive(Serialize, Default, Clone)]
pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
@@ -102,13 +107,13 @@ impl MetadataSummary {
format!(
"Tenants: {}
Timelines: {}
Timeline-shards: {}
With errors: {}
With warnings: {}
With orphan layers: {}
Index versions: {version_summary}
",
Timelines: {}
Timeline-shards: {}
With errors: {}
With warnings: {}
With orphan layers: {}
Index versions: {version_summary}
",
self.tenant_count,
self.timeline_count,
self.timeline_shard_count,
@@ -138,24 +143,243 @@ Index versions: {version_summary}
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
tenant_id_prefix: Option<TenantId>,
verbose: bool,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
if !tenant_ids.is_empty() && tenant_id_prefix.is_some() {
anyhow::bail!("`tenant_id_prefix` is not supported when `tenant_ids` is provided");
}
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
let (mut list_tenants_tx, list_tenants_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let list_tenants = tokio::spawn(async move {
let mut cnt = 0;
if tenant_ids.is_empty() {
if let Some(tenant_id_prefix) = tenant_id_prefix {
let stream = stream_tenants_maybe_prefix(
&remote_client_inner,
&target_inner,
Some(tenant_id_prefix.to_string()),
);
let mut stream = Box::pin(stream);
while let Some(tenant) = stream.next().await {
let tenant = tenant?;
list_tenants_tx.send(tenant).await?;
cnt += 1;
}
} else {
let stream = stream_tenants(&remote_client_inner, &target_inner);
let mut stream = Box::pin(stream);
while let Some(tenant) = stream.next().await {
let tenant = tenant?;
list_tenants_tx.send(tenant).await?;
cnt += 1;
}
}
} else {
for tenant_id in tenant_ids {
list_tenants_tx.send(tenant_id).await?;
cnt += 1;
}
}
tracing::info!("list_tenants: collected {} tenants", cnt);
Ok::<_, anyhow::Error>(())
});
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
let (mut list_timelines_tx, list_timelines_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let list_timelines = tokio::spawn(async move {
let stream = list_tenants_rx
.map(|tenant_id| {
stream_tenant_timelines(&remote_client_inner, &target_inner, tenant_id)
})
.buffered(8)
.try_flatten();
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
let item = item?;
list_timelines_tx.send(item).await?;
}
Ok::<_, anyhow::Error>(())
});
let (mut read_timelines_tx, read_timelines_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let read_timelines = tokio::spawn(async move {
let stream = list_timelines_rx
.map(|ttid| report_on_timeline(&remote_client_inner, &target_inner, ttid))
.buffered(32);
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
let item = item?;
read_timelines_tx.send(item).await?;
}
Ok::<_, anyhow::Error>(())
});
let summary = Arc::new(tokio::sync::Mutex::new(MetadataSummary::new()));
let summary_inner = summary.clone();
let (mut consolidate_tenants_tx, consolidate_tenants_rx) = futures::channel::mpsc::channel(32);
let consolidate_tenants = tokio::spawn(async move {
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut highest_shard_count = ShardCount::MIN;
let mut read_timelines_rx = read_timelines_rx;
while let Some(i) = read_timelines_rx.next().await {
let (ttid, data) = i;
{
let mut guard = summary_inner.lock().await;
guard.update_data(&data);
}
match tenant_id {
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
summary_inner.clone(),
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
timelines,
highest_shard_count,
&mut consolidate_tenants_tx,
)
.await?;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count;
} else {
highest_shard_count =
highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part_generation: _index_part_generation,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
analyze_tenant(
summary_inner.clone(),
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
tenant_timeline_results,
highest_shard_count,
&mut consolidate_tenants_tx,
)
.await?;
}
Ok::<_, anyhow::Error>(())
});
let remote_client_inner = remote_client.clone();
let summary_inner = summary.clone();
let analyze_tenants = tokio::spawn(async move {
let stream = consolidate_tenants_rx
.map(|(ttid, tenant_objects, data)| {
let remote_client_inner = remote_client_inner.clone();
async move {
let generation = if let BlobDataParseResult::Parsed {
index_part: _,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
Some(*index_part_generation)
} else {
None
};
let res = branch_cleanup_and_check_errors(
&remote_client_inner,
&ttid,
tenant_objects.clone(),
None,
None,
Some(data),
)
.await;
(ttid, tenant_objects.clone(), generation, res)
}
})
.buffered(32);
let mut last_tenant = None;
let mut last_tenant_objects = None;
let mut timeline_generations = HashMap::new();
let mut stream = Box::pin(stream);
while let Some((ttid, tenant_objects, generation, res)) = stream.next().await {
if last_tenant != Some(ttid) {
if let Some(tenant_id) = last_tenant {
let timeline_generations = std::mem::take(&mut timeline_generations);
identify_orphans(
tenant_id.tenant_shard_id.tenant_id,
last_tenant_objects.take().unwrap(),
summary_inner.clone(),
&timeline_generations,
)
.await;
}
last_tenant = Some(ttid);
last_tenant_objects = Some(tenant_objects);
}
if let Some(generation) = generation {
timeline_generations.insert(ttid, generation);
}
{
let mut guard = summary_inner.lock().await;
guard.update_analysis(&ttid, &res, verbose);
}
}
if let Some(tenant_id) = last_tenant {
identify_orphans(
tenant_id.tenant_shard_id.tenant_id,
last_tenant_objects.take().unwrap(),
summary_inner.clone(),
&timeline_generations,
)
.await;
}
Ok::<_, anyhow::Error>(())
});
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
@@ -163,93 +387,94 @@ pub async fn scan_pageserver_metadata(
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
tracing::info!("listing blobs for timeline: {}", ttid);
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
// DO NOT call any long-running tasks in this function; always route them through the channel and let
// other tokio tasks handle them.
async fn analyze_tenant(
remote_client: &GenericRemoteStorage,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
verbose: bool,
) {
summary.tenant_count += 1;
let mut timeline_ids = HashSet::new();
let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines {
async {
if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
// skip deleted timeline.
tracing::info!(
"Skip analysis of {} b/c timeline is already deleted",
ttid
);
return;
}
timeline_generations.insert(ttid, *index_part_generation);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
remote_client,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis, verbose);
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
}
}
.instrument(
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
)
.await
output_tx: &mut futures::channel::mpsc::Sender<(
TenantShardTimelineId,
Arc<tokio::sync::Mutex<TenantObjectListing>>,
RemoteTimelineBlobData,
)>,
) -> anyhow::Result<()> {
{
let mut guard = summary.lock().await;
guard.tenant_count += 1;
}
summary.timeline_count += timeline_ids.len();
let mut timeline_ids = HashSet::new();
for (ttid, data) in timelines {
async {
if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
// skip deleted timeline.
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
return Ok(());
}
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
output_tx.send((ttid, tenant_objects.clone(), data)).await?;
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
}
Ok::<_, anyhow::Error>(())
}.instrument(
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
)
.await?;
}
{
let mut guard = summary.lock().await;
guard.timeline_count += timeline_ids.len();
}
Ok(())
}
async fn identify_orphans(
tenant_id: TenantId,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
timeline_generations: &HashMap<TenantShardTimelineId, Generation>,
) {
// Identifying orphan layers must be done on a tenant-wide basis, because individual
// shards' layers may be referenced by other shards.
//
// Orphan layers are not a corruption, and not an indication of a problem. They are just
// consuming some space in remote storage, and may be cleaned up at leisure.
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
let orphans = { tenant_objects.lock().await.get_orphans() };
for (shard_index, timeline_id, layer_file, generation) in orphans {
let ttid = TenantShardTimelineId {
tenant_shard_id: TenantShardId {
tenant_id,
@@ -279,83 +504,20 @@ pub async fn scan_pageserver_metadata(
tracing::info!("Orphan layer detected: {orphan_path}");
summary.notify_timeline_orphan(&ttid);
{
let mut guard = summary.lock().await;
guard.notify_timeline_orphan(&ttid);
}
}
}
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new();
let mut highest_shard_count = ShardCount::MIN;
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
// TODO: bail out early if any of the tasks fail
list_tenants.await??;
list_timelines.await??;
read_timelines.await??;
consolidate_tenants.await??;
analyze_tenants.await??;
match tenant_id {
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
&remote_client,
prev_tenant_id,
&mut summary,
tenant_objects,
timelines,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count;
} else {
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part_generation: _index_part_generation,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
let tenant_id = tenant_id.expect("Must be set if results are present");
analyze_tenant(
&remote_client,
tenant_id,
&mut summary,
tenant_objects,
tenant_timeline_results,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await;
}
Ok(summary)
let summary = summary.lock().await;
Ok(summary.clone())
}