diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 1b4ff01a17..f759f54d19 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -533,8 +533,9 @@ async fn list_timeline_blobs_impl( } pub(crate) struct RemoteTenantManifestInfo { - pub(crate) latest_generation: Option, - pub(crate) manifests: Vec<(Generation, ListingObject)>, + pub(crate) generation: Generation, + pub(crate) manifest: TenantManifest, + pub(crate) listing_object: ListingObject, } pub(crate) enum ListTenantManifestResult { @@ -543,7 +544,10 @@ pub(crate) enum ListTenantManifestResult { #[allow(dead_code)] unknown_keys: Vec, }, - NoErrors(RemoteTenantManifestInfo), + NoErrors { + latest_generation: Option, + manifests: Vec<(Generation, ListingObject)>, + }, } /// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object. @@ -592,14 +596,6 @@ pub(crate) async fn list_tenant_manifests( unknown_keys.push(obj); } - if manifests.is_empty() { - tracing::debug!("No manifest for timeline."); - - return Ok(ListTenantManifestResult::WithErrors { - errors, - unknown_keys, - }); - } if !unknown_keys.is_empty() { errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string())); @@ -609,6 +605,15 @@ pub(crate) async fn list_tenant_manifests( }); } + if manifests.is_empty() { + tracing::debug!("No manifest for timeline."); + + return Ok(ListTenantManifestResult::NoErrors { + latest_generation: None, + manifests, + }); + } + // Find the manifest with the highest generation let (latest_generation, latest_listing_object) = manifests .iter() @@ -616,6 +621,8 @@ pub(crate) async fn list_tenant_manifests( .map(|(g, obj)| (*g, obj.clone())) .unwrap(); + manifests.retain(|(gen, _obj)| gen != &latest_generation); + let manifest_bytes = match download_object_with_retries(remote_client, &latest_listing_object.key).await { Ok(bytes) => bytes, @@ -634,13 +641,15 @@ pub(crate) async fn list_tenant_manifests( }; match TenantManifest::from_json_bytes(&manifest_bytes) { - Ok(_manifest) => { - return Ok(ListTenantManifestResult::NoErrors( - RemoteTenantManifestInfo { - latest_generation: Some(latest_generation), - manifests, - }, - )); + Ok(manifest) => { + return Ok(ListTenantManifestResult::NoErrors { + latest_generation: Some(RemoteTenantManifestInfo { + generation: latest_generation, + manifest, + listing_object: latest_listing_object, + }), + manifests, + }); } Err(parse_error) => errors.push(( latest_listing_object.key.get_path().as_str().to_owned(), diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 20cb9c3633..d19b8a5f91 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -4,11 +4,13 @@ use std::time::Duration; use crate::checks::{ list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult, + RemoteTenantManifestInfo, }; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; +use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest; use pageserver::tenant::remote_timeline_client::{ parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path, }; @@ -527,7 +529,7 @@ async fn gc_tenant_manifests( target: &RootTarget, mode: GcMode, tenant_shard_id: TenantShardId, -) -> anyhow::Result { +) -> anyhow::Result<(GcSummary, Option)> { let mut gc_summary = GcSummary::default(); match list_tenant_manifests(remote_client, tenant_shard_id, target).await? { ListTenantManifestResult::WithErrors { @@ -537,33 +539,35 @@ async fn gc_tenant_manifests( for (_key, error) in errors { tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}"); } + Ok((gc_summary, None)) } - ListTenantManifestResult::NoErrors(mut manifest_info) => { - let Some(latest_gen) = manifest_info.latest_generation else { - return Ok(gc_summary); + ListTenantManifestResult::NoErrors { + latest_generation, + mut manifests, + } => { + let Some(latest_generation) = latest_generation else { + return Ok((gc_summary, None)); }; - manifest_info - .manifests - .sort_by_key(|(generation, _obj)| *generation); + manifests.sort_by_key(|(generation, _obj)| *generation); // skip the two latest generations (they don't neccessarily have to be 1 apart from each other) - let candidates = manifest_info.manifests.iter().rev().skip(2); + let candidates = manifests.iter().rev().skip(2); for (_generation, key) in candidates { maybe_delete_tenant_manifest( remote_client, &min_age, - latest_gen, + latest_generation.generation, key, mode, &mut gc_summary, ) .instrument( - info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_gen, %key.key), + info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_generation.generation, %key.key), ) .await; } + Ok((gc_summary, Some(latest_generation))) } } - Ok(gc_summary) } async fn gc_timeline( @@ -573,6 +577,7 @@ async fn gc_timeline( mode: GcMode, ttid: TenantShardTimelineId, accumulator: &Arc>, + tenant_manifest_info: Arc>, ) -> anyhow::Result { let mut summary = GcSummary::default(); let data = list_timeline_blobs(remote_client, ttid, target).await?; @@ -597,6 +602,60 @@ async fn gc_timeline( } }; + if let Some(tenant_manifest_info) = &*tenant_manifest_info { + // TODO: this is O(n^2) in the number of offloaded timelines. Do a hashmap lookup instead. + let maybe_offloaded = tenant_manifest_info + .manifest + .offloaded_timelines + .iter() + .find(|offloaded_timeline| offloaded_timeline.timeline_id == ttid.timeline_id); + if let Some(offloaded) = maybe_offloaded { + let warnings = validate_index_part_with_offloaded(index_part, offloaded); + let warn = if warnings.is_empty() { + false + } else { + // Verify that the manifest hasn't changed. If it has, a potential racing change could have been cause for our troubles. + match list_tenant_manifests(remote_client, ttid.tenant_shard_id, target).await? { + ListTenantManifestResult::WithErrors { + errors, + unknown_keys: _, + } => { + for (_key, error) in errors { + tracing::warn!(%ttid, "list_tenant_manifests in gc_timeline: {error}"); + } + true + } + ListTenantManifestResult::NoErrors { + latest_generation, + manifests: _, + } => { + if let Some(new_latest_gen) = latest_generation { + let manifest_changed = ( + new_latest_gen.generation, + new_latest_gen.listing_object.last_modified, + ) == ( + tenant_manifest_info.generation, + tenant_manifest_info.listing_object.last_modified, + ); + if manifest_changed { + tracing::debug!(%ttid, "tenant manifest changed since it was loaded, suppressing {} warnings", warnings.len()); + } + manifest_changed + } else { + // The latest generation is gone. This timeline is in the progress of being deleted? + false + } + } + } + }; + if warn { + for warning in warnings { + tracing::warn!(%ttid, "{}", warning); + } + } + } + } + accumulator.lock().unwrap().update(ttid, index_part); for key in candidates { @@ -608,6 +667,35 @@ async fn gc_timeline( Ok(summary) } +fn validate_index_part_with_offloaded( + index_part: &IndexPart, + offloaded: &OffloadedTimelineManifest, +) -> Vec { + let mut warnings = Vec::new(); + if let Some(archived_at_index_part) = index_part.archived_at { + if archived_at_index_part + .signed_duration_since(offloaded.archived_at) + .num_seconds() + != 0 + { + warnings.push(format!( + "index-part archived_at={} differs from manifest archived_at={}", + archived_at_index_part, offloaded.archived_at + )); + } + } else { + warnings.push("Timeline offloaded in manifest but not archived in index-part".to_string()); + } + if index_part.metadata.ancestor_timeline() != offloaded.ancestor_timeline_id { + warnings.push(format!( + "index-part anestor={:?} differs from manifest ancestor={:?}", + index_part.metadata.ancestor_timeline(), + offloaded.ancestor_timeline_id + )); + } + warnings +} + /// Physical garbage collection: removing unused S3 objects. /// /// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level @@ -650,29 +738,38 @@ pub async fn pageserver_physical_gc( let target_ref = ⌖ let remote_client_ref = &remote_client; async move { - let summaries_from_manifests = match gc_tenant_manifests( + let gc_manifest_result = gc_tenant_manifests( remote_client_ref, min_age, target_ref, mode, tenant_shard_id, ) - .await - { - Ok(gc_summary) => vec![Ok(GcSummaryOrContent::::GcSummary( - gc_summary, - ))], + .await; + let (summary_from_manifest, tenant_manifest_opt) = match gc_manifest_result { + Ok((gc_summary, tenant_manifest)) => (gc_summary, tenant_manifest), Err(e) => { tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}"); - Vec::new() + (GcSummary::default(), None) } }; + let tenant_manifest_arc = Arc::new(tenant_manifest_opt); + let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary( + summary_from_manifest, + )); stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id) .await .map(|stream| { stream - .map_ok(GcSummaryOrContent::Content) - .chain(futures::stream::iter(summaries_from_manifests.into_iter())) + .zip(futures::stream::iter(std::iter::repeat( + tenant_manifest_arc, + ))) + .map(|(ttid_res, tenant_manifest_arc)| { + ttid_res.map(move |ttid| { + GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) + }) + }) + .chain(futures::stream::iter([summary_from_manifest].into_iter())) }) } }); @@ -684,14 +781,17 @@ pub async fn pageserver_physical_gc( // Drain futures for per-shard GC, populating accumulator as a side effect { let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid { - GcSummaryOrContent::Content(ttid) => futures::future::Either::Left(gc_timeline( - &remote_client, - &min_age, - &target, - mode, - ttid, - &accumulator, - )), + GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => { + futures::future::Either::Left(gc_timeline( + &remote_client, + &min_age, + &target, + mode, + ttid, + &accumulator, + tenant_manifest_arc, + )) + } GcSummaryOrContent::GcSummary(gc_summary) => { futures::future::Either::Right(futures::future::ok(gc_summary)) }