mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Do tenant manifest validation with index-part (#10007)
This adds some validation of invariants that we want to uphold wrt the tenant manifest and `index_part.json`: * the data the manifest has about a timeline must match with the data in `index_part.json`. It might actually change, e.g. when we do reparenting during detach ancestor, but that requires the timeline to be unoffloaded, i.e. removed from the manifest. * any timeline mentioned in index part, must, if present, be archived. If we unarchive, we first update the tenant manifest to unoffload, and only then update index part. And one needs to archive before offloading. * it is legal for timelines to be mentioned in the manifest but have no `index_part`: this is a temporary state visible during deletion of the timeline. if the pageserver crashed, an attach of the tenant will clean the state up. * it is also legal for offloaded timelines to have an `ancestor_retain_lsn` of None while having an `ancestor_timeline_id`. This is for the to-be-added flattening functionality: the plan is to set former to None if we have flattened a timeline. follow-up of #9942 part of #8088
This commit is contained in:
@@ -533,8 +533,9 @@ async fn list_timeline_blobs_impl(
|
||||
}
|
||||
|
||||
pub(crate) struct RemoteTenantManifestInfo {
|
||||
pub(crate) latest_generation: Option<Generation>,
|
||||
pub(crate) manifests: Vec<(Generation, ListingObject)>,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) manifest: TenantManifest,
|
||||
pub(crate) listing_object: ListingObject,
|
||||
}
|
||||
|
||||
pub(crate) enum ListTenantManifestResult {
|
||||
@@ -543,7 +544,10 @@ pub(crate) enum ListTenantManifestResult {
|
||||
#[allow(dead_code)]
|
||||
unknown_keys: Vec<ListingObject>,
|
||||
},
|
||||
NoErrors(RemoteTenantManifestInfo),
|
||||
NoErrors {
|
||||
latest_generation: Option<RemoteTenantManifestInfo>,
|
||||
manifests: Vec<(Generation, ListingObject)>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
|
||||
@@ -592,14 +596,6 @@ pub(crate) async fn list_tenant_manifests(
|
||||
unknown_keys.push(obj);
|
||||
}
|
||||
|
||||
if manifests.is_empty() {
|
||||
tracing::debug!("No manifest for timeline.");
|
||||
|
||||
return Ok(ListTenantManifestResult::WithErrors {
|
||||
errors,
|
||||
unknown_keys,
|
||||
});
|
||||
}
|
||||
if !unknown_keys.is_empty() {
|
||||
errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
|
||||
|
||||
@@ -609,6 +605,15 @@ pub(crate) async fn list_tenant_manifests(
|
||||
});
|
||||
}
|
||||
|
||||
if manifests.is_empty() {
|
||||
tracing::debug!("No manifest for timeline.");
|
||||
|
||||
return Ok(ListTenantManifestResult::NoErrors {
|
||||
latest_generation: None,
|
||||
manifests,
|
||||
});
|
||||
}
|
||||
|
||||
// Find the manifest with the highest generation
|
||||
let (latest_generation, latest_listing_object) = manifests
|
||||
.iter()
|
||||
@@ -616,6 +621,8 @@ pub(crate) async fn list_tenant_manifests(
|
||||
.map(|(g, obj)| (*g, obj.clone()))
|
||||
.unwrap();
|
||||
|
||||
manifests.retain(|(gen, _obj)| gen != &latest_generation);
|
||||
|
||||
let manifest_bytes =
|
||||
match download_object_with_retries(remote_client, &latest_listing_object.key).await {
|
||||
Ok(bytes) => bytes,
|
||||
@@ -634,13 +641,15 @@ pub(crate) async fn list_tenant_manifests(
|
||||
};
|
||||
|
||||
match TenantManifest::from_json_bytes(&manifest_bytes) {
|
||||
Ok(_manifest) => {
|
||||
return Ok(ListTenantManifestResult::NoErrors(
|
||||
RemoteTenantManifestInfo {
|
||||
latest_generation: Some(latest_generation),
|
||||
manifests,
|
||||
},
|
||||
));
|
||||
Ok(manifest) => {
|
||||
return Ok(ListTenantManifestResult::NoErrors {
|
||||
latest_generation: Some(RemoteTenantManifestInfo {
|
||||
generation: latest_generation,
|
||||
manifest,
|
||||
listing_object: latest_listing_object,
|
||||
}),
|
||||
manifests,
|
||||
});
|
||||
}
|
||||
Err(parse_error) => errors.push((
|
||||
latest_listing_object.key.get_path().as_str().to_owned(),
|
||||
|
||||
@@ -4,11 +4,13 @@ use std::time::Duration;
|
||||
|
||||
use crate::checks::{
|
||||
list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult,
|
||||
RemoteTenantManifestInfo,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest;
|
||||
use pageserver::tenant::remote_timeline_client::{
|
||||
parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
|
||||
};
|
||||
@@ -527,7 +529,7 @@ async fn gc_tenant_manifests(
|
||||
target: &RootTarget,
|
||||
mode: GcMode,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> anyhow::Result<GcSummary> {
|
||||
) -> anyhow::Result<(GcSummary, Option<RemoteTenantManifestInfo>)> {
|
||||
let mut gc_summary = GcSummary::default();
|
||||
match list_tenant_manifests(remote_client, tenant_shard_id, target).await? {
|
||||
ListTenantManifestResult::WithErrors {
|
||||
@@ -537,33 +539,35 @@ async fn gc_tenant_manifests(
|
||||
for (_key, error) in errors {
|
||||
tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}");
|
||||
}
|
||||
Ok((gc_summary, None))
|
||||
}
|
||||
ListTenantManifestResult::NoErrors(mut manifest_info) => {
|
||||
let Some(latest_gen) = manifest_info.latest_generation else {
|
||||
return Ok(gc_summary);
|
||||
ListTenantManifestResult::NoErrors {
|
||||
latest_generation,
|
||||
mut manifests,
|
||||
} => {
|
||||
let Some(latest_generation) = latest_generation else {
|
||||
return Ok((gc_summary, None));
|
||||
};
|
||||
manifest_info
|
||||
.manifests
|
||||
.sort_by_key(|(generation, _obj)| *generation);
|
||||
manifests.sort_by_key(|(generation, _obj)| *generation);
|
||||
// skip the two latest generations (they don't neccessarily have to be 1 apart from each other)
|
||||
let candidates = manifest_info.manifests.iter().rev().skip(2);
|
||||
let candidates = manifests.iter().rev().skip(2);
|
||||
for (_generation, key) in candidates {
|
||||
maybe_delete_tenant_manifest(
|
||||
remote_client,
|
||||
&min_age,
|
||||
latest_gen,
|
||||
latest_generation.generation,
|
||||
key,
|
||||
mode,
|
||||
&mut gc_summary,
|
||||
)
|
||||
.instrument(
|
||||
info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_gen, %key.key),
|
||||
info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_generation.generation, %key.key),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok((gc_summary, Some(latest_generation)))
|
||||
}
|
||||
}
|
||||
Ok(gc_summary)
|
||||
}
|
||||
|
||||
async fn gc_timeline(
|
||||
@@ -573,6 +577,7 @@ async fn gc_timeline(
|
||||
mode: GcMode,
|
||||
ttid: TenantShardTimelineId,
|
||||
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
|
||||
tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>,
|
||||
) -> anyhow::Result<GcSummary> {
|
||||
let mut summary = GcSummary::default();
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
@@ -597,6 +602,60 @@ async fn gc_timeline(
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(tenant_manifest_info) = &*tenant_manifest_info {
|
||||
// TODO: this is O(n^2) in the number of offloaded timelines. Do a hashmap lookup instead.
|
||||
let maybe_offloaded = tenant_manifest_info
|
||||
.manifest
|
||||
.offloaded_timelines
|
||||
.iter()
|
||||
.find(|offloaded_timeline| offloaded_timeline.timeline_id == ttid.timeline_id);
|
||||
if let Some(offloaded) = maybe_offloaded {
|
||||
let warnings = validate_index_part_with_offloaded(index_part, offloaded);
|
||||
let warn = if warnings.is_empty() {
|
||||
false
|
||||
} else {
|
||||
// Verify that the manifest hasn't changed. If it has, a potential racing change could have been cause for our troubles.
|
||||
match list_tenant_manifests(remote_client, ttid.tenant_shard_id, target).await? {
|
||||
ListTenantManifestResult::WithErrors {
|
||||
errors,
|
||||
unknown_keys: _,
|
||||
} => {
|
||||
for (_key, error) in errors {
|
||||
tracing::warn!(%ttid, "list_tenant_manifests in gc_timeline: {error}");
|
||||
}
|
||||
true
|
||||
}
|
||||
ListTenantManifestResult::NoErrors {
|
||||
latest_generation,
|
||||
manifests: _,
|
||||
} => {
|
||||
if let Some(new_latest_gen) = latest_generation {
|
||||
let manifest_changed = (
|
||||
new_latest_gen.generation,
|
||||
new_latest_gen.listing_object.last_modified,
|
||||
) == (
|
||||
tenant_manifest_info.generation,
|
||||
tenant_manifest_info.listing_object.last_modified,
|
||||
);
|
||||
if manifest_changed {
|
||||
tracing::debug!(%ttid, "tenant manifest changed since it was loaded, suppressing {} warnings", warnings.len());
|
||||
}
|
||||
manifest_changed
|
||||
} else {
|
||||
// The latest generation is gone. This timeline is in the progress of being deleted?
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
if warn {
|
||||
for warning in warnings {
|
||||
tracing::warn!(%ttid, "{}", warning);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
accumulator.lock().unwrap().update(ttid, index_part);
|
||||
|
||||
for key in candidates {
|
||||
@@ -608,6 +667,35 @@ async fn gc_timeline(
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
fn validate_index_part_with_offloaded(
|
||||
index_part: &IndexPart,
|
||||
offloaded: &OffloadedTimelineManifest,
|
||||
) -> Vec<String> {
|
||||
let mut warnings = Vec::new();
|
||||
if let Some(archived_at_index_part) = index_part.archived_at {
|
||||
if archived_at_index_part
|
||||
.signed_duration_since(offloaded.archived_at)
|
||||
.num_seconds()
|
||||
!= 0
|
||||
{
|
||||
warnings.push(format!(
|
||||
"index-part archived_at={} differs from manifest archived_at={}",
|
||||
archived_at_index_part, offloaded.archived_at
|
||||
));
|
||||
}
|
||||
} else {
|
||||
warnings.push("Timeline offloaded in manifest but not archived in index-part".to_string());
|
||||
}
|
||||
if index_part.metadata.ancestor_timeline() != offloaded.ancestor_timeline_id {
|
||||
warnings.push(format!(
|
||||
"index-part anestor={:?} differs from manifest ancestor={:?}",
|
||||
index_part.metadata.ancestor_timeline(),
|
||||
offloaded.ancestor_timeline_id
|
||||
));
|
||||
}
|
||||
warnings
|
||||
}
|
||||
|
||||
/// Physical garbage collection: removing unused S3 objects.
|
||||
///
|
||||
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
|
||||
@@ -650,29 +738,38 @@ pub async fn pageserver_physical_gc(
|
||||
let target_ref = ⌖
|
||||
let remote_client_ref = &remote_client;
|
||||
async move {
|
||||
let summaries_from_manifests = match gc_tenant_manifests(
|
||||
let gc_manifest_result = gc_tenant_manifests(
|
||||
remote_client_ref,
|
||||
min_age,
|
||||
target_ref,
|
||||
mode,
|
||||
tenant_shard_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(gc_summary) => vec![Ok(GcSummaryOrContent::<TenantShardTimelineId>::GcSummary(
|
||||
gc_summary,
|
||||
))],
|
||||
.await;
|
||||
let (summary_from_manifest, tenant_manifest_opt) = match gc_manifest_result {
|
||||
Ok((gc_summary, tenant_manifest)) => (gc_summary, tenant_manifest),
|
||||
Err(e) => {
|
||||
tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}");
|
||||
Vec::new()
|
||||
(GcSummary::default(), None)
|
||||
}
|
||||
};
|
||||
let tenant_manifest_arc = Arc::new(tenant_manifest_opt);
|
||||
let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary(
|
||||
summary_from_manifest,
|
||||
));
|
||||
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id)
|
||||
.await
|
||||
.map(|stream| {
|
||||
stream
|
||||
.map_ok(GcSummaryOrContent::Content)
|
||||
.chain(futures::stream::iter(summaries_from_manifests.into_iter()))
|
||||
.zip(futures::stream::iter(std::iter::repeat(
|
||||
tenant_manifest_arc,
|
||||
)))
|
||||
.map(|(ttid_res, tenant_manifest_arc)| {
|
||||
ttid_res.map(move |ttid| {
|
||||
GcSummaryOrContent::Content((ttid, tenant_manifest_arc))
|
||||
})
|
||||
})
|
||||
.chain(futures::stream::iter([summary_from_manifest].into_iter()))
|
||||
})
|
||||
}
|
||||
});
|
||||
@@ -684,14 +781,17 @@ pub async fn pageserver_physical_gc(
|
||||
// Drain futures for per-shard GC, populating accumulator as a side effect
|
||||
{
|
||||
let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid {
|
||||
GcSummaryOrContent::Content(ttid) => futures::future::Either::Left(gc_timeline(
|
||||
&remote_client,
|
||||
&min_age,
|
||||
&target,
|
||||
mode,
|
||||
ttid,
|
||||
&accumulator,
|
||||
)),
|
||||
GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => {
|
||||
futures::future::Either::Left(gc_timeline(
|
||||
&remote_client,
|
||||
&min_age,
|
||||
&target,
|
||||
mode,
|
||||
ttid,
|
||||
&accumulator,
|
||||
tenant_manifest_arc,
|
||||
))
|
||||
}
|
||||
GcSummaryOrContent::GcSummary(gc_summary) => {
|
||||
futures::future::Either::Right(futures::future::ok(gc_summary))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user