mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
scrubber: retry when missing index key in the listing (#8873)
Part of #8128, fixes #8872. ## Problem See #8872. ## Summary of changes - Retry `list_timeline_blobs` another time if - there are layer file keys listed but not index. - failed to download index. - Instrument code with `analyze-tenant` and `analyze-timeline` span. - Remove `initdb_archive` check, it could have been deleted. - Return with exit code 1 on fatal error if `--exit-code` parameter is set. Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use itertools::Itertools;
|
||||
use pageserver::tenant::checks::check_valid_layermap;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{info, warn};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
@@ -29,9 +28,8 @@ pub(crate) struct TimelineAnalysis {
|
||||
/// yet.
|
||||
pub(crate) warnings: Vec<String>,
|
||||
|
||||
/// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
|
||||
/// of races between reading the metadata and reading the objects.
|
||||
pub(crate) garbage_keys: Vec<String>,
|
||||
/// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
|
||||
pub(crate) unknown_keys: Vec<String>,
|
||||
}
|
||||
|
||||
impl TimelineAnalysis {
|
||||
@@ -39,7 +37,7 @@ impl TimelineAnalysis {
|
||||
Self {
|
||||
errors: Vec::new(),
|
||||
warnings: Vec::new(),
|
||||
garbage_keys: Vec::new(),
|
||||
unknown_keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +57,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
) -> TimelineAnalysis {
|
||||
let mut result = TimelineAnalysis::new();
|
||||
|
||||
info!("Checking timeline {id}");
|
||||
info!("Checking timeline");
|
||||
|
||||
if let Some(s3_active_branch) = s3_active_branch {
|
||||
info!(
|
||||
@@ -80,7 +78,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
match s3_data {
|
||||
Some(s3_data) => {
|
||||
result
|
||||
.garbage_keys
|
||||
.unknown_keys
|
||||
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
|
||||
|
||||
match s3_data.blob_data {
|
||||
@@ -204,10 +202,10 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
warn!("Timeline metadata warnings: {0:?}", result.warnings);
|
||||
}
|
||||
|
||||
if !result.garbage_keys.is_empty() {
|
||||
error!(
|
||||
"The following keys should be removed from S3: {0:?}",
|
||||
result.garbage_keys
|
||||
if !result.unknown_keys.is_empty() {
|
||||
warn!(
|
||||
"The following keys are not recognized: {0:?}",
|
||||
result.unknown_keys
|
||||
)
|
||||
}
|
||||
|
||||
@@ -294,10 +292,10 @@ impl TenantObjectListing {
|
||||
pub(crate) struct RemoteTimelineBlobData {
|
||||
pub(crate) blob_data: BlobDataParseResult,
|
||||
|
||||
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
|
||||
/// Index objects that were not used when loading `blob_data`, e.g. those from old generations
|
||||
pub(crate) unused_index_keys: Vec<ListingObject>,
|
||||
|
||||
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
|
||||
/// Objects whose keys were not recognized at all, i.e. not layer files, not indices
|
||||
pub(crate) unknown_keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
@@ -329,11 +327,54 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
|
||||
}
|
||||
}
|
||||
|
||||
/// Note (<https://github.com/neondatabase/neon/issues/8872>):
|
||||
/// Since we do not gurantee the order of the listing, we could list layer keys right before
|
||||
/// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
|
||||
/// In the rare case, this would give back a transient error where the index key is missing.
|
||||
///
|
||||
/// To avoid generating false positive, we try streaming the listing for a second time.
|
||||
pub(crate) async fn list_timeline_blobs(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: TenantShardTimelineId,
|
||||
root_target: &RootTarget,
|
||||
) -> anyhow::Result<RemoteTimelineBlobData> {
|
||||
let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
|
||||
match res {
|
||||
ListTimelineBlobsResult::Ready(data) => Ok(data),
|
||||
ListTimelineBlobsResult::MissingIndexPart(_) => {
|
||||
// Retry if index is missing.
|
||||
let data = list_timeline_blobs_impl(remote_client, id, root_target)
|
||||
.await?
|
||||
.into_data();
|
||||
Ok(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ListTimelineBlobsResult {
|
||||
/// Blob data is ready to be intepreted.
|
||||
Ready(RemoteTimelineBlobData),
|
||||
/// List timeline blobs has layer files but is missing [`IndexPart`].
|
||||
MissingIndexPart(RemoteTimelineBlobData),
|
||||
}
|
||||
|
||||
impl ListTimelineBlobsResult {
|
||||
/// Get the inner blob data regardless the status.
|
||||
pub fn into_data(self) -> RemoteTimelineBlobData {
|
||||
match self {
|
||||
ListTimelineBlobsResult::Ready(data) => data,
|
||||
ListTimelineBlobsResult::MissingIndexPart(data) => data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
|
||||
/// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
|
||||
async fn list_timeline_blobs_impl(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: TenantShardTimelineId,
|
||||
root_target: &RootTarget,
|
||||
) -> anyhow::Result<ListTimelineBlobsResult> {
|
||||
let mut s3_layers = HashSet::new();
|
||||
|
||||
let mut errors = Vec::new();
|
||||
@@ -375,30 +416,28 @@ pub(crate) async fn list_timeline_blobs(
|
||||
s3_layers.insert((new_layer, gen));
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("Error parsing key {maybe_layer_name}");
|
||||
errors.push(
|
||||
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
|
||||
);
|
||||
tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
|
||||
unknown_keys.push(obj);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::warn!("Unknown key {key}");
|
||||
errors.push(format!("S3 list response got an object with odd key {key}"));
|
||||
tracing::info!("S3 listed an unknown key: {key}");
|
||||
unknown_keys.push(obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
|
||||
tracing::debug!(
|
||||
"Timeline is empty apart from initdb archive: expected post-deletion state."
|
||||
);
|
||||
return Ok(RemoteTimelineBlobData {
|
||||
if index_part_keys.is_empty() && s3_layers.is_empty() {
|
||||
tracing::debug!("Timeline is empty: expected post-deletion state.");
|
||||
if initdb_archive {
|
||||
tracing::info!("Timeline is post deletion but initdb archive is still present.");
|
||||
}
|
||||
|
||||
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Relic,
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys: Vec::new(),
|
||||
});
|
||||
unknown_keys,
|
||||
}));
|
||||
}
|
||||
|
||||
// Choose the index_part with the highest generation
|
||||
@@ -424,19 +463,43 @@ pub(crate) async fn list_timeline_blobs(
|
||||
match index_part_object.as_ref() {
|
||||
Some(selected) => index_part_keys.retain(|k| k != selected),
|
||||
None => {
|
||||
errors.push("S3 list response got no index_part.json file".to_string());
|
||||
// It is possible that the branch gets deleted after we got some layer files listed
|
||||
// and we no longer have the index file in the listing.
|
||||
errors.push(
|
||||
"S3 list response got no index_part.json file but still has layer files"
|
||||
.to_string(),
|
||||
);
|
||||
return Ok(ListTimelineBlobsResult::MissingIndexPart(
|
||||
RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(index_part_object_key) = index_part_object.as_ref() {
|
||||
let index_part_bytes =
|
||||
download_object_with_retries(remote_client, &index_part_object_key.key)
|
||||
.await
|
||||
.context("index_part.json download")?;
|
||||
match download_object_with_retries(remote_client, &index_part_object_key.key).await {
|
||||
Ok(index_part_bytes) => index_part_bytes,
|
||||
Err(e) => {
|
||||
// It is possible that the branch gets deleted in-between we list the objects
|
||||
// and we download the index part file.
|
||||
errors.push(format!("failed to download index_part.json: {e}"));
|
||||
return Ok(ListTimelineBlobsResult::MissingIndexPart(
|
||||
RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
},
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_slice(&index_part_bytes) {
|
||||
Ok(index_part) => {
|
||||
return Ok(RemoteTimelineBlobData {
|
||||
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Parsed {
|
||||
index_part: Box::new(index_part),
|
||||
index_part_generation,
|
||||
@@ -444,7 +507,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
},
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
})
|
||||
}))
|
||||
}
|
||||
Err(index_parse_error) => errors.push(format!(
|
||||
"index_part.json body parsing error: {index_parse_error}"
|
||||
@@ -458,9 +521,9 @@ pub(crate) async fn list_timeline_blobs(
|
||||
);
|
||||
}
|
||||
|
||||
Ok(RemoteTimelineBlobData {
|
||||
Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -41,6 +41,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
/// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
|
||||
controller_jwt: Option<String>,
|
||||
|
||||
/// If set to true, the scrubber will exit with error code on fatal error.
|
||||
#[arg(long, default_value_t = false)]
|
||||
exit_code: bool,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
@@ -203,6 +207,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_ids,
|
||||
json,
|
||||
post_to_storcon,
|
||||
cli.exit_code,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -269,6 +274,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
gc_min_age,
|
||||
gc_mode,
|
||||
post_to_storcon,
|
||||
cli.exit_code,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -284,6 +290,7 @@ pub async fn run_cron_job(
|
||||
gc_min_age: humantime::Duration,
|
||||
gc_mode: GcMode,
|
||||
post_to_storcon: bool,
|
||||
exit_code: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
|
||||
pageserver_physical_gc_cmd(
|
||||
@@ -301,6 +308,7 @@ pub async fn run_cron_job(
|
||||
Vec::new(),
|
||||
true,
|
||||
post_to_storcon,
|
||||
exit_code,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -349,6 +357,7 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
tenant_shard_ids: Vec<TenantShardId>,
|
||||
json: bool,
|
||||
post_to_storcon: bool,
|
||||
exit_code: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if controller_client.is_none() && post_to_storcon {
|
||||
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
|
||||
@@ -380,6 +389,9 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
|
||||
if summary.is_fatal() {
|
||||
tracing::error!("Fatal scrub errors detected");
|
||||
if exit_code {
|
||||
std::process::exit(1);
|
||||
}
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
@@ -391,6 +403,9 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
);
|
||||
if exit_code {
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -12,6 +12,7 @@ use pageserver_api::controller_api::MetadataHealthUpdateRequest;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::ShardCount;
|
||||
|
||||
@@ -169,45 +170,54 @@ pub async fn scan_pageserver_metadata(
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
async {
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
|
||||
continue;
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c timeline is already deleted",
|
||||
ttid
|
||||
);
|
||||
return;
|
||||
}
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis = branch_cleanup_and_check_errors(
|
||||
remote_client,
|
||||
&ttid,
|
||||
&mut tenant_objects,
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis = branch_cleanup_and_check_errors(
|
||||
remote_client,
|
||||
&ttid,
|
||||
&mut tenant_objects,
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
.instrument(
|
||||
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
summary.timeline_count += timeline_ids.len();
|
||||
@@ -278,6 +288,7 @@ pub async fn scan_pageserver_metadata(
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
|
||||
.await;
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = ttid.tenant_shard_id.shard_count;
|
||||
@@ -306,15 +317,18 @@ pub async fn scan_pageserver_metadata(
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
let tenant_id = tenant_id.expect("Must be set if results are present");
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
tenant_id.expect("Must be set if results are present"),
|
||||
tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user