mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
1 Commits
conrad/try
...
skyzh/page
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7bd3e7a803 |
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
@@ -55,7 +56,7 @@ impl TimelineAnalysis {
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: &TenantShardTimelineId,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<RemoteTimelineBlobData>,
|
||||
@@ -150,7 +151,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
))
|
||||
}
|
||||
|
||||
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
|
||||
if !tenant_objects
|
||||
.lock()
|
||||
.await
|
||||
.check_ref(id.timeline_id, &layer, &metadata)
|
||||
{
|
||||
let path = remote_layer_path(
|
||||
&id.tenant_shard_id.tenant_id,
|
||||
&id.timeline_id,
|
||||
|
||||
@@ -73,8 +73,12 @@ enum Command {
|
||||
node_kind: NodeKind,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
json: bool,
|
||||
/// If provided, only these tenants will be listed from the remote storage.
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
/// If provided, we will list all tenants, but then filter with the prefix.
|
||||
#[arg(long = "tenant-id-prefix")]
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
#[arg(long = "post", default_value_t = false)]
|
||||
post_to_storcon: bool,
|
||||
#[arg(long, default_value = None)]
|
||||
@@ -178,6 +182,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::ScanMetadata {
|
||||
json,
|
||||
tenant_ids,
|
||||
tenant_id_prefix,
|
||||
node_kind,
|
||||
post_to_storcon,
|
||||
dump_db_connstr,
|
||||
@@ -186,6 +191,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
verbose,
|
||||
} => {
|
||||
if let NodeKind::Safekeeper = node_kind {
|
||||
if tenant_id_prefix.is_some() {
|
||||
bail!("`tenant_id_prefix` is not supported for safekeeper node_kind");
|
||||
}
|
||||
let db_or_list = match (timeline_lsns, dump_db_connstr) {
|
||||
(Some(timeline_lsns), _) => {
|
||||
let timeline_lsns = serde_json::from_str(&timeline_lsns)
|
||||
@@ -227,6 +235,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
bucket_config,
|
||||
controller_client.as_ref(),
|
||||
tenant_ids,
|
||||
tenant_id_prefix,
|
||||
json,
|
||||
post_to_storcon,
|
||||
verbose,
|
||||
@@ -338,6 +347,7 @@ pub async fn run_cron_job(
|
||||
bucket_config,
|
||||
controller_client,
|
||||
Vec::new(),
|
||||
None,
|
||||
true,
|
||||
post_to_storcon,
|
||||
false, // default to non-verbose mode
|
||||
@@ -384,10 +394,12 @@ pub async fn pageserver_physical_gc_cmd(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn scan_pageserver_metadata_cmd(
|
||||
bucket_config: BucketConfig,
|
||||
controller_client: Option<&control_api::Client>,
|
||||
tenant_shard_ids: Vec<TenantShardId>,
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
json: bool,
|
||||
post_to_storcon: bool,
|
||||
verbose: bool,
|
||||
@@ -398,7 +410,14 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
"Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
|
||||
));
|
||||
}
|
||||
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
|
||||
match scan_pageserver_metadata(
|
||||
bucket_config.clone(),
|
||||
tenant_shard_ids,
|
||||
tenant_id_prefix,
|
||||
verbose,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::SinkExt;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
|
||||
@@ -7,6 +9,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::ShardCount;
|
||||
|
||||
@@ -14,10 +17,12 @@ use crate::checks::{
|
||||
BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::metadata_stream::{
|
||||
stream_tenant_timelines, stream_tenants, stream_tenants_maybe_prefix,
|
||||
};
|
||||
use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
|
||||
|
||||
#[derive(Serialize, Default)]
|
||||
#[derive(Serialize, Default, Clone)]
|
||||
pub struct MetadataSummary {
|
||||
tenant_count: usize,
|
||||
timeline_count: usize,
|
||||
@@ -102,13 +107,13 @@ impl MetadataSummary {
|
||||
|
||||
format!(
|
||||
"Tenants: {}
|
||||
Timelines: {}
|
||||
Timeline-shards: {}
|
||||
With errors: {}
|
||||
With warnings: {}
|
||||
With orphan layers: {}
|
||||
Index versions: {version_summary}
|
||||
",
|
||||
Timelines: {}
|
||||
Timeline-shards: {}
|
||||
With errors: {}
|
||||
With warnings: {}
|
||||
With orphan layers: {}
|
||||
Index versions: {version_summary}
|
||||
",
|
||||
self.tenant_count,
|
||||
self.timeline_count,
|
||||
self.timeline_shard_count,
|
||||
@@ -138,24 +143,243 @@ Index versions: {version_summary}
|
||||
pub async fn scan_pageserver_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
verbose: bool,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
|
||||
};
|
||||
if !tenant_ids.is_empty() && tenant_id_prefix.is_some() {
|
||||
anyhow::bail!("`tenant_id_prefix` is not supported when `tenant_ids` is provided");
|
||||
}
|
||||
|
||||
// How many tenants to process in parallel. We need to be mindful of pageservers
|
||||
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
||||
const CONCURRENCY: usize = 32;
|
||||
let (mut list_tenants_tx, list_tenants_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let list_tenants = tokio::spawn(async move {
|
||||
let mut cnt = 0;
|
||||
if tenant_ids.is_empty() {
|
||||
if let Some(tenant_id_prefix) = tenant_id_prefix {
|
||||
let stream = stream_tenants_maybe_prefix(
|
||||
&remote_client_inner,
|
||||
&target_inner,
|
||||
Some(tenant_id_prefix.to_string()),
|
||||
);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(tenant) = stream.next().await {
|
||||
let tenant = tenant?;
|
||||
list_tenants_tx.send(tenant).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
} else {
|
||||
let stream = stream_tenants(&remote_client_inner, &target_inner);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(tenant) = stream.next().await {
|
||||
let tenant = tenant?;
|
||||
list_tenants_tx.send(tenant).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for tenant_id in tenant_ids {
|
||||
list_tenants_tx.send(tenant_id).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!("list_tenants: collected {} tenants", cnt);
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
let (mut list_timelines_tx, list_timelines_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let list_timelines = tokio::spawn(async move {
|
||||
let stream = list_tenants_rx
|
||||
.map(|tenant_id| {
|
||||
stream_tenant_timelines(&remote_client_inner, &target_inner, tenant_id)
|
||||
})
|
||||
.buffered(8)
|
||||
.try_flatten();
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
list_timelines_tx.send(item).await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let (mut read_timelines_tx, read_timelines_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let read_timelines = tokio::spawn(async move {
|
||||
let stream = list_timelines_rx
|
||||
.map(|ttid| report_on_timeline(&remote_client_inner, &target_inner, ttid))
|
||||
.buffered(32);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
read_timelines_tx.send(item).await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let summary = Arc::new(tokio::sync::Mutex::new(MetadataSummary::new()));
|
||||
let summary_inner = summary.clone();
|
||||
|
||||
let (mut consolidate_tenants_tx, consolidate_tenants_rx) = futures::channel::mpsc::channel(32);
|
||||
let consolidate_tenants = tokio::spawn(async move {
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut highest_shard_count = ShardCount::MIN;
|
||||
let mut read_timelines_rx = read_timelines_rx;
|
||||
while let Some(i) = read_timelines_rx.next().await {
|
||||
let (ttid, data) = i;
|
||||
{
|
||||
let mut guard = summary_inner.lock().await;
|
||||
guard.update_data(&data);
|
||||
}
|
||||
|
||||
match tenant_id {
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(
|
||||
summary_inner.clone(),
|
||||
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
&mut consolidate_tenants_tx,
|
||||
)
|
||||
.await?;
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = ttid.tenant_shard_id.shard_count;
|
||||
} else {
|
||||
highest_shard_count =
|
||||
highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
BlobDataParseResult::Relic => (),
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors: _,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
summary_inner.clone(),
|
||||
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
&mut consolidate_tenants_tx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let summary_inner = summary.clone();
|
||||
let analyze_tenants = tokio::spawn(async move {
|
||||
let stream = consolidate_tenants_rx
|
||||
.map(|(ttid, tenant_objects, data)| {
|
||||
let remote_client_inner = remote_client_inner.clone();
|
||||
async move {
|
||||
let generation = if let BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
Some(*index_part_generation)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let res = branch_cleanup_and_check_errors(
|
||||
&remote_client_inner,
|
||||
&ttid,
|
||||
tenant_objects.clone(),
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
(ttid, tenant_objects.clone(), generation, res)
|
||||
}
|
||||
})
|
||||
.buffered(32);
|
||||
let mut last_tenant = None;
|
||||
let mut last_tenant_objects = None;
|
||||
let mut timeline_generations = HashMap::new();
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some((ttid, tenant_objects, generation, res)) = stream.next().await {
|
||||
if last_tenant != Some(ttid) {
|
||||
if let Some(tenant_id) = last_tenant {
|
||||
let timeline_generations = std::mem::take(&mut timeline_generations);
|
||||
identify_orphans(
|
||||
tenant_id.tenant_shard_id.tenant_id,
|
||||
last_tenant_objects.take().unwrap(),
|
||||
summary_inner.clone(),
|
||||
&timeline_generations,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
last_tenant = Some(ttid);
|
||||
last_tenant_objects = Some(tenant_objects);
|
||||
}
|
||||
if let Some(generation) = generation {
|
||||
timeline_generations.insert(ttid, generation);
|
||||
}
|
||||
{
|
||||
let mut guard = summary_inner.lock().await;
|
||||
guard.update_analysis(&ttid, &res, verbose);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tenant_id) = last_tenant {
|
||||
identify_orphans(
|
||||
tenant_id.tenant_shard_id.tenant_id,
|
||||
last_tenant_objects.take().unwrap(),
|
||||
summary_inner.clone(),
|
||||
&timeline_generations,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn report_on_timeline(
|
||||
@@ -163,93 +387,94 @@ pub async fn scan_pageserver_metadata(
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
|
||||
tracing::info!("listing blobs for timeline: {}", ttid);
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
// DO NOT call any long-running tasks in this function; always route them through the channel and let
|
||||
// other tokio tasks handle them.
|
||||
async fn analyze_tenant(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
|
||||
highest_shard_count: ShardCount,
|
||||
verbose: bool,
|
||||
) {
|
||||
summary.tenant_count += 1;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
async {
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c timeline is already deleted",
|
||||
ttid
|
||||
);
|
||||
return;
|
||||
}
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis = branch_cleanup_and_check_errors(
|
||||
remote_client,
|
||||
&ttid,
|
||||
&mut tenant_objects,
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
summary.update_analysis(&ttid, &analysis, verbose);
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
}
|
||||
.instrument(
|
||||
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
|
||||
)
|
||||
.await
|
||||
output_tx: &mut futures::channel::mpsc::Sender<(
|
||||
TenantShardTimelineId,
|
||||
Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
RemoteTimelineBlobData,
|
||||
)>,
|
||||
) -> anyhow::Result<()> {
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.tenant_count += 1;
|
||||
}
|
||||
|
||||
summary.timeline_count += timeline_ids.len();
|
||||
let mut timeline_ids = HashSet::new();
|
||||
for (ttid, data) in timelines {
|
||||
async {
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation: _,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
|
||||
output_tx.send((ttid, tenant_objects.clone(), data)).await?;
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}.instrument(
|
||||
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.timeline_count += timeline_ids.len();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn identify_orphans(
|
||||
tenant_id: TenantId,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
|
||||
timeline_generations: &HashMap<TenantShardTimelineId, Generation>,
|
||||
) {
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
// Orphan layers are not a corruption, and not an indication of a problem. They are just
|
||||
// consuming some space in remote storage, and may be cleaned up at leisure.
|
||||
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
|
||||
|
||||
let orphans = { tenant_objects.lock().await.get_orphans() };
|
||||
for (shard_index, timeline_id, layer_file, generation) in orphans {
|
||||
let ttid = TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
@@ -279,83 +504,20 @@ pub async fn scan_pageserver_metadata(
|
||||
|
||||
tracing::info!("Orphan layer detected: {orphan_path}");
|
||||
|
||||
summary.notify_timeline_orphan(&ttid);
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut summary = MetadataSummary::new();
|
||||
let mut highest_shard_count = ShardCount::MIN;
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
// TODO: bail out early if any of the tasks fail
|
||||
list_tenants.await??;
|
||||
list_timelines.await??;
|
||||
read_timelines.await??;
|
||||
consolidate_tenants.await??;
|
||||
analyze_tenants.await??;
|
||||
|
||||
match tenant_id {
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
prev_tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
|
||||
.await;
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = ttid.tenant_shard_id.shard_count;
|
||||
} else {
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
BlobDataParseResult::Relic => (),
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors: _,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
let tenant_id = tenant_id.expect("Must be set if results are present");
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
let summary = summary.lock().await;
|
||||
Ok(summary.clone())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user