mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 07:39:58 +00:00
Support tenant manifests in the scrubber (#9942)
Support tenant manifests in the storage scrubber: * list the manifests, order them by generation * delete all manifests except for the two most recent generations * for the latest manifest: try parsing it. I've tested this patch by running the against a staging bucket and it successfully deleted stuff (and avoided deleting the latest two generations). In follow-up work, we might want to also check some invariants of the manifest, as mentioned in #8088. Part of #9386 Part of #8088 --------- Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -2564,9 +2564,9 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Given the key of a tenant manifest, parse out the generation number
|
/// Given the key of a tenant manifest, parse out the generation number
|
||||||
pub(crate) fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
|
pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
|
||||||
static RE: OnceLock<Regex> = OnceLock::new();
|
static RE: OnceLock<Regex> = OnceLock::new();
|
||||||
let re = RE.get_or_init(|| Regex::new(r".+tenant-manifest-([0-9a-f]{8}).json").unwrap());
|
let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
|
||||||
re.captures(path.get_path().as_str())
|
re.captures(path.get_path().as_str())
|
||||||
.and_then(|c| c.get(1))
|
.and_then(|c| c.get(1))
|
||||||
.and_then(|m| Generation::parse_suffix(m.as_str()))
|
.and_then(|m| Generation::parse_suffix(m.as_str()))
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ impl TenantManifest {
|
|||||||
offloaded_timelines: vec![],
|
offloaded_timelines: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
|
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
|
||||||
serde_json::from_slice::<Self>(bytes)
|
serde_json::from_slice::<Self>(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,17 +4,21 @@ use itertools::Itertools;
|
|||||||
use pageserver::tenant::checks::check_valid_layermap;
|
use pageserver::tenant::checks::check_valid_layermap;
|
||||||
use pageserver::tenant::layer_map::LayerMap;
|
use pageserver::tenant::layer_map::LayerMap;
|
||||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||||
|
use pageserver::tenant::remote_timeline_client::manifest::TenantManifest;
|
||||||
use pageserver_api::shard::ShardIndex;
|
use pageserver_api::shard::ShardIndex;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use utils::generation::Generation;
|
use utils::generation::Generation;
|
||||||
use utils::id::TimelineId;
|
use utils::id::TimelineId;
|
||||||
|
use utils::shard::TenantShardId;
|
||||||
|
|
||||||
use crate::cloud_admin_api::BranchData;
|
use crate::cloud_admin_api::BranchData;
|
||||||
use crate::metadata_stream::stream_listing;
|
use crate::metadata_stream::stream_listing;
|
||||||
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
|
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
|
use pageserver::tenant::remote_timeline_client::{
|
||||||
|
parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
|
||||||
|
};
|
||||||
use pageserver::tenant::storage_layer::LayerName;
|
use pageserver::tenant::storage_layer::LayerName;
|
||||||
use pageserver::tenant::IndexPart;
|
use pageserver::tenant::IndexPart;
|
||||||
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
|
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
|
||||||
@@ -527,3 +531,132 @@ async fn list_timeline_blobs_impl(
|
|||||||
unknown_keys,
|
unknown_keys,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) struct RemoteTenantManifestInfo {
|
||||||
|
pub(crate) latest_generation: Option<Generation>,
|
||||||
|
pub(crate) manifests: Vec<(Generation, ListingObject)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) enum ListTenantManifestResult {
|
||||||
|
WithErrors {
|
||||||
|
errors: Vec<(String, String)>,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
unknown_keys: Vec<ListingObject>,
|
||||||
|
},
|
||||||
|
NoErrors(RemoteTenantManifestInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
|
||||||
|
pub(crate) async fn list_tenant_manifests(
|
||||||
|
remote_client: &GenericRemoteStorage,
|
||||||
|
tenant_id: TenantShardId,
|
||||||
|
root_target: &RootTarget,
|
||||||
|
) -> anyhow::Result<ListTenantManifestResult> {
|
||||||
|
let mut errors = Vec::new();
|
||||||
|
let mut unknown_keys = Vec::new();
|
||||||
|
|
||||||
|
let mut tenant_root_target = root_target.tenant_root(&tenant_id);
|
||||||
|
let original_prefix = tenant_root_target.prefix_in_bucket.clone();
|
||||||
|
const TENANT_MANIFEST_STEM: &str = "tenant-manifest";
|
||||||
|
tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM;
|
||||||
|
tenant_root_target.delimiter = String::new();
|
||||||
|
|
||||||
|
let mut manifests: Vec<(Generation, ListingObject)> = Vec::new();
|
||||||
|
|
||||||
|
let prefix_str = &original_prefix
|
||||||
|
.strip_prefix("/")
|
||||||
|
.unwrap_or(&original_prefix);
|
||||||
|
|
||||||
|
let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target));
|
||||||
|
'outer: while let Some(obj) = stream.next().await {
|
||||||
|
let (key, Some(obj)) = obj? else {
|
||||||
|
panic!("ListingObject not specified");
|
||||||
|
};
|
||||||
|
|
||||||
|
'err: {
|
||||||
|
// TODO a let chain would be nicer here.
|
||||||
|
let Some(name) = key.object_name() else {
|
||||||
|
break 'err;
|
||||||
|
};
|
||||||
|
if !name.starts_with(TENANT_MANIFEST_STEM) {
|
||||||
|
break 'err;
|
||||||
|
}
|
||||||
|
let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else {
|
||||||
|
break 'err;
|
||||||
|
};
|
||||||
|
tracing::debug!("tenant manifest {key}");
|
||||||
|
manifests.push((generation, obj));
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
tracing::info!("Listed an unknown key: {key}");
|
||||||
|
unknown_keys.push(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
if manifests.is_empty() {
|
||||||
|
tracing::debug!("No manifest for timeline.");
|
||||||
|
|
||||||
|
return Ok(ListTenantManifestResult::WithErrors {
|
||||||
|
errors,
|
||||||
|
unknown_keys,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if !unknown_keys.is_empty() {
|
||||||
|
errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
|
||||||
|
|
||||||
|
return Ok(ListTenantManifestResult::WithErrors {
|
||||||
|
errors,
|
||||||
|
unknown_keys,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the manifest with the highest generation
|
||||||
|
let (latest_generation, latest_listing_object) = manifests
|
||||||
|
.iter()
|
||||||
|
.max_by_key(|i| i.0)
|
||||||
|
.map(|(g, obj)| (*g, obj.clone()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let manifest_bytes =
|
||||||
|
match download_object_with_retries(remote_client, &latest_listing_object.key).await {
|
||||||
|
Ok(bytes) => bytes,
|
||||||
|
Err(e) => {
|
||||||
|
// It is possible that the tenant gets deleted in-between we list the objects
|
||||||
|
// and we download the manifest file.
|
||||||
|
errors.push((
|
||||||
|
latest_listing_object.key.get_path().as_str().to_owned(),
|
||||||
|
format!("failed to download tenant-manifest.json: {e}"),
|
||||||
|
));
|
||||||
|
return Ok(ListTenantManifestResult::WithErrors {
|
||||||
|
errors,
|
||||||
|
unknown_keys,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match TenantManifest::from_json_bytes(&manifest_bytes) {
|
||||||
|
Ok(_manifest) => {
|
||||||
|
return Ok(ListTenantManifestResult::NoErrors(
|
||||||
|
RemoteTenantManifestInfo {
|
||||||
|
latest_generation: Some(latest_generation),
|
||||||
|
manifests,
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(parse_error) => errors.push((
|
||||||
|
latest_listing_object.key.get_path().as_str().to_owned(),
|
||||||
|
format!("tenant-manifest.json body parsing error: {parse_error}"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.is_empty() {
|
||||||
|
errors.push((
|
||||||
|
(*prefix_str).to_owned(),
|
||||||
|
"Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ListTenantManifestResult::WithErrors {
|
||||||
|
errors,
|
||||||
|
unknown_keys,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,12 +2,16 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
|
use crate::checks::{
|
||||||
|
list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult,
|
||||||
|
};
|
||||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
|
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
|
||||||
use futures_util::{StreamExt, TryStreamExt};
|
use futures_util::{StreamExt, TryStreamExt};
|
||||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||||
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
|
use pageserver::tenant::remote_timeline_client::{
|
||||||
|
parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
|
||||||
|
};
|
||||||
use pageserver::tenant::storage_layer::LayerName;
|
use pageserver::tenant::storage_layer::LayerName;
|
||||||
use pageserver::tenant::IndexPart;
|
use pageserver::tenant::IndexPart;
|
||||||
use pageserver_api::controller_api::TenantDescribeResponse;
|
use pageserver_api::controller_api::TenantDescribeResponse;
|
||||||
@@ -25,6 +29,7 @@ use utils::id::{TenantId, TenantTimelineId};
|
|||||||
#[derive(Serialize, Default)]
|
#[derive(Serialize, Default)]
|
||||||
pub struct GcSummary {
|
pub struct GcSummary {
|
||||||
indices_deleted: usize,
|
indices_deleted: usize,
|
||||||
|
tenant_manifests_deleted: usize,
|
||||||
remote_storage_errors: usize,
|
remote_storage_errors: usize,
|
||||||
controller_api_errors: usize,
|
controller_api_errors: usize,
|
||||||
ancestor_layers_deleted: usize,
|
ancestor_layers_deleted: usize,
|
||||||
@@ -34,12 +39,14 @@ impl GcSummary {
|
|||||||
fn merge(&mut self, other: Self) {
|
fn merge(&mut self, other: Self) {
|
||||||
let Self {
|
let Self {
|
||||||
indices_deleted,
|
indices_deleted,
|
||||||
|
tenant_manifests_deleted,
|
||||||
remote_storage_errors,
|
remote_storage_errors,
|
||||||
ancestor_layers_deleted,
|
ancestor_layers_deleted,
|
||||||
controller_api_errors,
|
controller_api_errors,
|
||||||
} = other;
|
} = other;
|
||||||
|
|
||||||
self.indices_deleted += indices_deleted;
|
self.indices_deleted += indices_deleted;
|
||||||
|
self.tenant_manifests_deleted += tenant_manifests_deleted;
|
||||||
self.remote_storage_errors += remote_storage_errors;
|
self.remote_storage_errors += remote_storage_errors;
|
||||||
self.ancestor_layers_deleted += ancestor_layers_deleted;
|
self.ancestor_layers_deleted += ancestor_layers_deleted;
|
||||||
self.controller_api_errors += controller_api_errors;
|
self.controller_api_errors += controller_api_errors;
|
||||||
@@ -352,6 +359,69 @@ async fn maybe_delete_index(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn maybe_delete_tenant_manifest(
|
||||||
|
remote_client: &GenericRemoteStorage,
|
||||||
|
min_age: &Duration,
|
||||||
|
latest_gen: Generation,
|
||||||
|
obj: &ListingObject,
|
||||||
|
mode: GcMode,
|
||||||
|
summary: &mut GcSummary,
|
||||||
|
) {
|
||||||
|
// Validation: we will only delete things that parse cleanly
|
||||||
|
let basename = obj.key.get_path().file_name().unwrap();
|
||||||
|
let Some(candidate_generation) =
|
||||||
|
parse_remote_tenant_manifest_path(RemotePath::from_string(basename).unwrap())
|
||||||
|
else {
|
||||||
|
// A strange key: we will not delete this because we don't understand it.
|
||||||
|
tracing::warn!("Bad index key");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Validation: we will only delete manifests more than one generation old, and in fact we
|
||||||
|
// should never be called with such recent generations.
|
||||||
|
if candidate_generation >= latest_gen {
|
||||||
|
tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
|
||||||
|
return;
|
||||||
|
} else if candidate_generation.next() == latest_gen {
|
||||||
|
tracing::warn!("Deletion candidate is >= latest generation - 1, this is a bug!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !is_old_enough(min_age, obj, summary) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if matches!(mode, GcMode::DryRun) {
|
||||||
|
tracing::info!("Dry run: would delete this key");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// All validations passed: erase the object
|
||||||
|
let cancel = CancellationToken::new();
|
||||||
|
match backoff::retry(
|
||||||
|
|| remote_client.delete(&obj.key, &cancel),
|
||||||
|
|_| false,
|
||||||
|
3,
|
||||||
|
MAX_RETRIES as u32,
|
||||||
|
"maybe_delete_tenant_manifest",
|
||||||
|
&cancel,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
None => {
|
||||||
|
unreachable!("Using a dummy cancellation token");
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
tracing::info!("Successfully deleted tenant manifest");
|
||||||
|
summary.tenant_manifests_deleted += 1;
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
tracing::warn!("Failed to delete tenant manifest: {e}");
|
||||||
|
summary.remote_storage_errors += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn gc_ancestor(
|
async fn gc_ancestor(
|
||||||
remote_client: &GenericRemoteStorage,
|
remote_client: &GenericRemoteStorage,
|
||||||
@@ -451,44 +521,51 @@ async fn gc_ancestor(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Physical garbage collection: removing unused S3 objects.
|
async fn gc_tenant_manifests(
|
||||||
///
|
remote_client: &GenericRemoteStorage,
|
||||||
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
|
|
||||||
/// (keys, layers). This type of garbage collection is about removing:
|
|
||||||
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
|
|
||||||
/// uploading a layer and uploading an index)
|
|
||||||
/// - Index objects from historic generations
|
|
||||||
///
|
|
||||||
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
|
|
||||||
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
|
|
||||||
pub async fn pageserver_physical_gc(
|
|
||||||
bucket_config: &BucketConfig,
|
|
||||||
controller_client: Option<&control_api::Client>,
|
|
||||||
tenant_shard_ids: Vec<TenantShardId>,
|
|
||||||
min_age: Duration,
|
min_age: Duration,
|
||||||
|
target: &RootTarget,
|
||||||
mode: GcMode,
|
mode: GcMode,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
) -> anyhow::Result<GcSummary> {
|
) -> anyhow::Result<GcSummary> {
|
||||||
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
let mut gc_summary = GcSummary::default();
|
||||||
|
match list_tenant_manifests(remote_client, tenant_shard_id, target).await? {
|
||||||
let tenants = if tenant_shard_ids.is_empty() {
|
ListTenantManifestResult::WithErrors {
|
||||||
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
errors,
|
||||||
} else {
|
unknown_keys: _,
|
||||||
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
|
} => {
|
||||||
|
for (_key, error) in errors {
|
||||||
|
tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ListTenantManifestResult::NoErrors(mut manifest_info) => {
|
||||||
|
let Some(latest_gen) = manifest_info.latest_generation else {
|
||||||
|
return Ok(gc_summary);
|
||||||
};
|
};
|
||||||
|
manifest_info
|
||||||
|
.manifests
|
||||||
|
.sort_by_key(|(generation, _obj)| *generation);
|
||||||
|
// skip the two latest generations (they don't neccessarily have to be 1 apart from each other)
|
||||||
|
let candidates = manifest_info.manifests.iter().rev().skip(2);
|
||||||
|
for (_generation, key) in candidates {
|
||||||
|
maybe_delete_tenant_manifest(
|
||||||
|
remote_client,
|
||||||
|
&min_age,
|
||||||
|
latest_gen,
|
||||||
|
key,
|
||||||
|
mode,
|
||||||
|
&mut gc_summary,
|
||||||
|
)
|
||||||
|
.instrument(
|
||||||
|
info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_gen, %key.key),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(gc_summary)
|
||||||
|
}
|
||||||
|
|
||||||
// How many tenants to process in parallel. We need to be mindful of pageservers
|
|
||||||
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
|
||||||
const CONCURRENCY: usize = 32;
|
|
||||||
|
|
||||||
// Accumulate information about each tenant for cross-shard GC step we'll do at the end
|
|
||||||
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
|
|
||||||
|
|
||||||
// Generate a stream of TenantTimelineId
|
|
||||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
|
||||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
|
||||||
let timelines = timelines.try_flatten();
|
|
||||||
|
|
||||||
// Generate a stream of S3TimelineBlobData
|
|
||||||
async fn gc_timeline(
|
async fn gc_timeline(
|
||||||
remote_client: &GenericRemoteStorage,
|
remote_client: &GenericRemoteStorage,
|
||||||
min_age: &Duration,
|
min_age: &Duration,
|
||||||
@@ -531,12 +608,93 @@ pub async fn pageserver_physical_gc(
|
|||||||
Ok(summary)
|
Ok(summary)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Physical garbage collection: removing unused S3 objects.
|
||||||
|
///
|
||||||
|
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
|
||||||
|
/// (keys, layers). This type of garbage collection is about removing:
|
||||||
|
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
|
||||||
|
/// uploading a layer and uploading an index)
|
||||||
|
/// - Index objects and tenant manifests from historic generations
|
||||||
|
///
|
||||||
|
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
|
||||||
|
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
|
||||||
|
pub async fn pageserver_physical_gc(
|
||||||
|
bucket_config: &BucketConfig,
|
||||||
|
controller_client: Option<&control_api::Client>,
|
||||||
|
tenant_shard_ids: Vec<TenantShardId>,
|
||||||
|
min_age: Duration,
|
||||||
|
mode: GcMode,
|
||||||
|
) -> anyhow::Result<GcSummary> {
|
||||||
|
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||||
|
|
||||||
|
let remote_client = Arc::new(remote_client);
|
||||||
|
let tenants = if tenant_shard_ids.is_empty() {
|
||||||
|
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
||||||
|
} else {
|
||||||
|
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
|
||||||
|
};
|
||||||
|
|
||||||
|
// How many tenants to process in parallel. We need to be mindful of pageservers
|
||||||
|
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
||||||
|
const CONCURRENCY: usize = 32;
|
||||||
|
|
||||||
|
// Accumulate information about each tenant for cross-shard GC step we'll do at the end
|
||||||
|
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
|
||||||
|
|
||||||
|
// Generate a stream of TenantTimelineId
|
||||||
|
enum GcSummaryOrContent<T> {
|
||||||
|
Content(T),
|
||||||
|
GcSummary(GcSummary),
|
||||||
|
}
|
||||||
|
let timelines = tenants.map_ok(|tenant_shard_id| {
|
||||||
|
let target_ref = ⌖
|
||||||
|
let remote_client_ref = &remote_client;
|
||||||
|
async move {
|
||||||
|
let summaries_from_manifests = match gc_tenant_manifests(
|
||||||
|
remote_client_ref,
|
||||||
|
min_age,
|
||||||
|
target_ref,
|
||||||
|
mode,
|
||||||
|
tenant_shard_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(gc_summary) => vec![Ok(GcSummaryOrContent::<TenantShardTimelineId>::GcSummary(
|
||||||
|
gc_summary,
|
||||||
|
))],
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}");
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id)
|
||||||
|
.await
|
||||||
|
.map(|stream| {
|
||||||
|
stream
|
||||||
|
.map_ok(GcSummaryOrContent::Content)
|
||||||
|
.chain(futures::stream::iter(summaries_from_manifests.into_iter()))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||||
|
let timelines = timelines.try_flatten();
|
||||||
|
|
||||||
let mut summary = GcSummary::default();
|
let mut summary = GcSummary::default();
|
||||||
|
|
||||||
// Drain futures for per-shard GC, populating accumulator as a side effect
|
// Drain futures for per-shard GC, populating accumulator as a side effect
|
||||||
{
|
{
|
||||||
let timelines = timelines.map_ok(|ttid| {
|
let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid {
|
||||||
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
|
GcSummaryOrContent::Content(ttid) => futures::future::Either::Left(gc_timeline(
|
||||||
|
&remote_client,
|
||||||
|
&min_age,
|
||||||
|
&target,
|
||||||
|
mode,
|
||||||
|
ttid,
|
||||||
|
&accumulator,
|
||||||
|
)),
|
||||||
|
GcSummaryOrContent::GcSummary(gc_summary) => {
|
||||||
|
futures::future::Either::Right(futures::future::ok(gc_summary))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||||
|
|
||||||
|
|||||||
@@ -835,3 +835,117 @@ def test_timeline_retain_lsn(
|
|||||||
with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint:
|
with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint:
|
||||||
sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||||
assert sum == pre_branch_sum
|
assert sum == pre_branch_sum
|
||||||
|
|
||||||
|
|
||||||
|
def test_timeline_offload_generations(neon_env_builder: NeonEnvBuilder):
|
||||||
|
"""
|
||||||
|
Test for scrubber deleting old generations of manifests
|
||||||
|
"""
|
||||||
|
remote_storage_kind = s3_storage()
|
||||||
|
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||||
|
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
ps_http = env.pageserver.http_client()
|
||||||
|
|
||||||
|
# Turn off gc and compaction loops: we want to issue them manually for better reliability
|
||||||
|
tenant_id, root_timeline_id = env.create_tenant(
|
||||||
|
conf={
|
||||||
|
"gc_period": "0s",
|
||||||
|
"compaction_period": "0s",
|
||||||
|
"checkpoint_distance": f"{1024 ** 2}",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a branch and archive it
|
||||||
|
child_timeline_id = env.create_branch("test_archived_branch_persisted", tenant_id)
|
||||||
|
|
||||||
|
with env.endpoints.create_start(
|
||||||
|
"test_archived_branch_persisted", tenant_id=tenant_id
|
||||||
|
) as endpoint:
|
||||||
|
endpoint.safe_psql_many(
|
||||||
|
[
|
||||||
|
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
|
||||||
|
"INSERT INTO foo SELECT FROM generate_series(1,512)",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
sum = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2")
|
||||||
|
last_flush_lsn_upload(env, endpoint, tenant_id, child_timeline_id)
|
||||||
|
|
||||||
|
assert_prefix_not_empty(
|
||||||
|
neon_env_builder.pageserver_remote_storage,
|
||||||
|
prefix=f"tenants/{str(tenant_id)}/",
|
||||||
|
)
|
||||||
|
assert_prefix_empty(
|
||||||
|
neon_env_builder.pageserver_remote_storage,
|
||||||
|
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
|
||||||
|
)
|
||||||
|
|
||||||
|
ps_http.timeline_archival_config(
|
||||||
|
tenant_id,
|
||||||
|
child_timeline_id,
|
||||||
|
state=TimelineArchivalState.ARCHIVED,
|
||||||
|
)
|
||||||
|
|
||||||
|
def timeline_offloaded_api(timeline_id: TimelineId) -> bool:
|
||||||
|
# TODO add a proper API to check if a timeline has been offloaded or not
|
||||||
|
return not any(
|
||||||
|
timeline["timeline_id"] == str(timeline_id)
|
||||||
|
for timeline in ps_http.timeline_list(tenant_id=tenant_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
def child_offloaded():
|
||||||
|
ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id)
|
||||||
|
assert timeline_offloaded_api(child_timeline_id)
|
||||||
|
|
||||||
|
wait_until(child_offloaded)
|
||||||
|
|
||||||
|
assert timeline_offloaded_api(child_timeline_id)
|
||||||
|
assert not timeline_offloaded_api(root_timeline_id)
|
||||||
|
|
||||||
|
# Reboot the pageserver a bunch of times, do unoffloads, offloads
|
||||||
|
for i in range(5):
|
||||||
|
env.pageserver.stop()
|
||||||
|
env.pageserver.start()
|
||||||
|
|
||||||
|
assert timeline_offloaded_api(child_timeline_id)
|
||||||
|
assert not timeline_offloaded_api(root_timeline_id)
|
||||||
|
|
||||||
|
ps_http.timeline_archival_config(
|
||||||
|
tenant_id,
|
||||||
|
child_timeline_id,
|
||||||
|
state=TimelineArchivalState.UNARCHIVED,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not timeline_offloaded_api(child_timeline_id)
|
||||||
|
|
||||||
|
if i % 2 == 0:
|
||||||
|
with env.endpoints.create_start(
|
||||||
|
"test_archived_branch_persisted", tenant_id=tenant_id
|
||||||
|
) as endpoint:
|
||||||
|
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2")
|
||||||
|
assert sum == sum_again
|
||||||
|
|
||||||
|
ps_http.timeline_archival_config(
|
||||||
|
tenant_id,
|
||||||
|
child_timeline_id,
|
||||||
|
state=TimelineArchivalState.ARCHIVED,
|
||||||
|
)
|
||||||
|
wait_until(child_offloaded)
|
||||||
|
|
||||||
|
#
|
||||||
|
# Now ensure that scrubber runs will clean up old generations' manifests.
|
||||||
|
#
|
||||||
|
|
||||||
|
# Sleep some amount larger than min_age_secs
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
# Ensure that min_age_secs has a deletion impeding effect
|
||||||
|
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600, mode="full")
|
||||||
|
assert gc_summary["remote_storage_errors"] == 0
|
||||||
|
assert gc_summary["indices_deleted"] == 0
|
||||||
|
assert gc_summary["tenant_manifests_deleted"] == 0
|
||||||
|
|
||||||
|
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full")
|
||||||
|
assert gc_summary["remote_storage_errors"] == 0
|
||||||
|
assert gc_summary["indices_deleted"] > 0
|
||||||
|
assert gc_summary["tenant_manifests_deleted"] > 0
|
||||||
|
|||||||
Reference in New Issue
Block a user