scrubber: retry when missing index key in the listing (#8873)

Part of #8128, fixes #8872.

## Problem

See #8872.

## Summary of changes

- Retry `list_timeline_blobs` another time if 
  - there are layer file keys listed but not index.
  - failed to download index.
- Instrument code with `analyze-tenant` and `analyze-timeline` span.
- Remove `initdb_archive` check, it could have been deleted.
- Return with exit code 1 on fatal error if `--exit-code` parameter is set.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-09-23 17:58:12 -04:00
committed by GitHub
parent 3ad567290c
commit 37aa6fd953
3 changed files with 163 additions and 71 deletions

View File

@@ -1,13 +1,12 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use anyhow::Context;
use itertools::Itertools; use itertools::Itertools;
use pageserver::tenant::checks::check_valid_layermap; use pageserver::tenant::checks::check_valid_layermap;
use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex; use pageserver_api::shard::ShardIndex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn}; use tracing::{info, warn};
use utils::generation::Generation; use utils::generation::Generation;
use utils::id::TimelineId; use utils::id::TimelineId;
@@ -29,9 +28,8 @@ pub(crate) struct TimelineAnalysis {
/// yet. /// yet.
pub(crate) warnings: Vec<String>, pub(crate) warnings: Vec<String>,
/// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware /// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
/// of races between reading the metadata and reading the objects. pub(crate) unknown_keys: Vec<String>,
pub(crate) garbage_keys: Vec<String>,
} }
impl TimelineAnalysis { impl TimelineAnalysis {
@@ -39,7 +37,7 @@ impl TimelineAnalysis {
Self { Self {
errors: Vec::new(), errors: Vec::new(),
warnings: Vec::new(), warnings: Vec::new(),
garbage_keys: Vec::new(), unknown_keys: Vec::new(),
} }
} }
@@ -59,7 +57,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
) -> TimelineAnalysis { ) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new(); let mut result = TimelineAnalysis::new();
info!("Checking timeline {id}"); info!("Checking timeline");
if let Some(s3_active_branch) = s3_active_branch { if let Some(s3_active_branch) = s3_active_branch {
info!( info!(
@@ -80,7 +78,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data { match s3_data {
Some(s3_data) => { Some(s3_data) => {
result result
.garbage_keys .unknown_keys
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string())); .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
match s3_data.blob_data { match s3_data.blob_data {
@@ -204,10 +202,10 @@ pub(crate) async fn branch_cleanup_and_check_errors(
warn!("Timeline metadata warnings: {0:?}", result.warnings); warn!("Timeline metadata warnings: {0:?}", result.warnings);
} }
if !result.garbage_keys.is_empty() { if !result.unknown_keys.is_empty() {
error!( warn!(
"The following keys should be removed from S3: {0:?}", "The following keys are not recognized: {0:?}",
result.garbage_keys result.unknown_keys
) )
} }
@@ -294,10 +292,10 @@ impl TenantObjectListing {
pub(crate) struct RemoteTimelineBlobData { pub(crate) struct RemoteTimelineBlobData {
pub(crate) blob_data: BlobDataParseResult, pub(crate) blob_data: BlobDataParseResult,
// Index objects that were not used when loading `blob_data`, e.g. those from old generations /// Index objects that were not used when loading `blob_data`, e.g. those from old generations
pub(crate) unused_index_keys: Vec<ListingObject>, pub(crate) unused_index_keys: Vec<ListingObject>,
// Objects whose keys were not recognized at all, i.e. not layer files, not indices /// Objects whose keys were not recognized at all, i.e. not layer files, not indices
pub(crate) unknown_keys: Vec<ListingObject>, pub(crate) unknown_keys: Vec<ListingObject>,
} }
@@ -329,11 +327,54 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
} }
} }
/// Note (<https://github.com/neondatabase/neon/issues/8872>):
/// Since we do not gurantee the order of the listing, we could list layer keys right before
/// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
/// In the rare case, this would give back a transient error where the index key is missing.
///
/// To avoid generating false positive, we try streaming the listing for a second time.
pub(crate) async fn list_timeline_blobs( pub(crate) async fn list_timeline_blobs(
remote_client: &GenericRemoteStorage, remote_client: &GenericRemoteStorage,
id: TenantShardTimelineId, id: TenantShardTimelineId,
root_target: &RootTarget, root_target: &RootTarget,
) -> anyhow::Result<RemoteTimelineBlobData> { ) -> anyhow::Result<RemoteTimelineBlobData> {
let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
match res {
ListTimelineBlobsResult::Ready(data) => Ok(data),
ListTimelineBlobsResult::MissingIndexPart(_) => {
// Retry if index is missing.
let data = list_timeline_blobs_impl(remote_client, id, root_target)
.await?
.into_data();
Ok(data)
}
}
}
enum ListTimelineBlobsResult {
/// Blob data is ready to be intepreted.
Ready(RemoteTimelineBlobData),
/// List timeline blobs has layer files but is missing [`IndexPart`].
MissingIndexPart(RemoteTimelineBlobData),
}
impl ListTimelineBlobsResult {
/// Get the inner blob data regardless the status.
pub fn into_data(self) -> RemoteTimelineBlobData {
match self {
ListTimelineBlobsResult::Ready(data) => data,
ListTimelineBlobsResult::MissingIndexPart(data) => data,
}
}
}
/// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
/// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
async fn list_timeline_blobs_impl(
remote_client: &GenericRemoteStorage,
id: TenantShardTimelineId,
root_target: &RootTarget,
) -> anyhow::Result<ListTimelineBlobsResult> {
let mut s3_layers = HashSet::new(); let mut s3_layers = HashSet::new();
let mut errors = Vec::new(); let mut errors = Vec::new();
@@ -375,30 +416,28 @@ pub(crate) async fn list_timeline_blobs(
s3_layers.insert((new_layer, gen)); s3_layers.insert((new_layer, gen));
} }
Err(e) => { Err(e) => {
tracing::info!("Error parsing key {maybe_layer_name}"); tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
unknown_keys.push(obj); unknown_keys.push(obj);
} }
}, },
None => { None => {
tracing::warn!("Unknown key {key}"); tracing::info!("S3 listed an unknown key: {key}");
errors.push(format!("S3 list response got an object with odd key {key}"));
unknown_keys.push(obj); unknown_keys.push(obj);
} }
} }
} }
if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive { if index_part_keys.is_empty() && s3_layers.is_empty() {
tracing::debug!( tracing::debug!("Timeline is empty: expected post-deletion state.");
"Timeline is empty apart from initdb archive: expected post-deletion state." if initdb_archive {
); tracing::info!("Timeline is post deletion but initdb archive is still present.");
return Ok(RemoteTimelineBlobData { }
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic, blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys, unused_index_keys: index_part_keys,
unknown_keys: Vec::new(), unknown_keys,
}); }));
} }
// Choose the index_part with the highest generation // Choose the index_part with the highest generation
@@ -424,19 +463,43 @@ pub(crate) async fn list_timeline_blobs(
match index_part_object.as_ref() { match index_part_object.as_ref() {
Some(selected) => index_part_keys.retain(|k| k != selected), Some(selected) => index_part_keys.retain(|k| k != selected),
None => { None => {
errors.push("S3 list response got no index_part.json file".to_string()); // It is possible that the branch gets deleted after we got some layer files listed
// and we no longer have the index file in the listing.
errors.push(
"S3 list response got no index_part.json file but still has layer files"
.to_string(),
);
return Ok(ListTimelineBlobsResult::MissingIndexPart(
RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
},
));
} }
} }
if let Some(index_part_object_key) = index_part_object.as_ref() { if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes = let index_part_bytes =
download_object_with_retries(remote_client, &index_part_object_key.key) match download_object_with_retries(remote_client, &index_part_object_key.key).await {
.await Ok(index_part_bytes) => index_part_bytes,
.context("index_part.json download")?; Err(e) => {
// It is possible that the branch gets deleted in-between we list the objects
// and we download the index part file.
errors.push(format!("failed to download index_part.json: {e}"));
return Ok(ListTimelineBlobsResult::MissingIndexPart(
RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
},
));
}
};
match serde_json::from_slice(&index_part_bytes) { match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => { Ok(index_part) => {
return Ok(RemoteTimelineBlobData { return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Parsed { blob_data: BlobDataParseResult::Parsed {
index_part: Box::new(index_part), index_part: Box::new(index_part),
index_part_generation, index_part_generation,
@@ -444,7 +507,7 @@ pub(crate) async fn list_timeline_blobs(
}, },
unused_index_keys: index_part_keys, unused_index_keys: index_part_keys,
unknown_keys, unknown_keys,
}) }))
} }
Err(index_parse_error) => errors.push(format!( Err(index_parse_error) => errors.push(format!(
"index_part.json body parsing error: {index_parse_error}" "index_part.json body parsing error: {index_parse_error}"
@@ -458,9 +521,9 @@ pub(crate) async fn list_timeline_blobs(
); );
} }
Ok(RemoteTimelineBlobData { Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers }, blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys, unused_index_keys: index_part_keys,
unknown_keys, unknown_keys,
}) }))
} }

View File

@@ -41,6 +41,10 @@ struct Cli {
#[arg(long)] #[arg(long)]
/// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'. /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
controller_jwt: Option<String>, controller_jwt: Option<String>,
/// If set to true, the scrubber will exit with error code on fatal error.
#[arg(long, default_value_t = false)]
exit_code: bool,
} }
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
@@ -203,6 +207,7 @@ async fn main() -> anyhow::Result<()> {
tenant_ids, tenant_ids,
json, json,
post_to_storcon, post_to_storcon,
cli.exit_code,
) )
.await .await
} }
@@ -269,6 +274,7 @@ async fn main() -> anyhow::Result<()> {
gc_min_age, gc_min_age,
gc_mode, gc_mode,
post_to_storcon, post_to_storcon,
cli.exit_code,
) )
.await .await
} }
@@ -284,6 +290,7 @@ pub async fn run_cron_job(
gc_min_age: humantime::Duration, gc_min_age: humantime::Duration,
gc_mode: GcMode, gc_mode: GcMode,
post_to_storcon: bool, post_to_storcon: bool,
exit_code: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc"); tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
pageserver_physical_gc_cmd( pageserver_physical_gc_cmd(
@@ -301,6 +308,7 @@ pub async fn run_cron_job(
Vec::new(), Vec::new(),
true, true,
post_to_storcon, post_to_storcon,
exit_code,
) )
.await?; .await?;
@@ -349,6 +357,7 @@ pub async fn scan_pageserver_metadata_cmd(
tenant_shard_ids: Vec<TenantShardId>, tenant_shard_ids: Vec<TenantShardId>,
json: bool, json: bool,
post_to_storcon: bool, post_to_storcon: bool,
exit_code: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon { if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run")); return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
@@ -380,6 +389,9 @@ pub async fn scan_pageserver_metadata_cmd(
if summary.is_fatal() { if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected"); tracing::error!("Fatal scrub errors detected");
if exit_code {
std::process::exit(1);
}
} else if summary.is_empty() { } else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines // scrubber they were likely expecting to scan something, and if we see no timelines
@@ -391,6 +403,9 @@ pub async fn scan_pageserver_metadata_cmd(
.prefix_in_bucket .prefix_in_bucket
.unwrap_or("<none>".to_string()) .unwrap_or("<none>".to_string())
); );
if exit_code {
std::process::exit(1);
}
} }
Ok(()) Ok(())

View File

@@ -12,6 +12,7 @@ use pageserver_api::controller_api::MetadataHealthUpdateRequest;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage; use remote_storage::GenericRemoteStorage;
use serde::Serialize; use serde::Serialize;
use tracing::{info_span, Instrument};
use utils::id::TenantId; use utils::id::TenantId;
use utils::shard::ShardCount; use utils::shard::ShardCount;
@@ -169,45 +170,54 @@ pub async fn scan_pageserver_metadata(
let mut timeline_ids = HashSet::new(); let mut timeline_ids = HashSet::new();
let mut timeline_generations = HashMap::new(); let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines { for (ttid, data) in timelines {
if ttid.tenant_shard_id.shard_count == highest_shard_count { async {
// Only analyze `TenantShardId`s with highest shard count. if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
// Stash the generation of each timeline, for later use identifying orphan layers // Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed { if let BlobDataParseResult::Parsed {
index_part, index_part,
index_part_generation, index_part_generation,
s3_layers: _s3_layers, s3_layers: _s3_layers,
} = &data.blob_data } = &data.blob_data
{ {
if index_part.deleted_at.is_some() { if index_part.deleted_at.is_some() {
// skip deleted timeline. // skip deleted timeline.
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid); tracing::info!(
continue; "Skip analysis of {} b/c timeline is already deleted",
ttid
);
return;
}
timeline_generations.insert(ttid, *index_part_generation);
} }
timeline_generations.insert(ttid, *index_part_generation);
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
remote_client,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
} }
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
remote_client,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
} }
.instrument(
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
)
.await
} }
summary.timeline_count += timeline_ids.len(); summary.timeline_count += timeline_ids.len();
@@ -278,6 +288,7 @@ pub async fn scan_pageserver_metadata(
timelines, timelines,
highest_shard_count, highest_shard_count,
) )
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await; .await;
tenant_id = Some(ttid.tenant_shard_id.tenant_id); tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count; highest_shard_count = ttid.tenant_shard_id.shard_count;
@@ -306,15 +317,18 @@ pub async fn scan_pageserver_metadata(
tenant_timeline_results.push((ttid, data)); tenant_timeline_results.push((ttid, data));
} }
let tenant_id = tenant_id.expect("Must be set if results are present");
if !tenant_timeline_results.is_empty() { if !tenant_timeline_results.is_empty() {
analyze_tenant( analyze_tenant(
&remote_client, &remote_client,
tenant_id.expect("Must be set if results are present"), tenant_id,
&mut summary, &mut summary,
tenant_objects, tenant_objects,
tenant_timeline_results, tenant_timeline_results,
highest_shard_count, highest_shard_count,
) )
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await; .await;
} }