mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
s3_scrubber: revive garbage collection for safekeepers.
- pageserver_id in project details is now is optional, fix it - add active_timeline_count guard/stat similar to active_tenant_count - fix safekeeper prefix - count and log deleted keys
This commit is contained in:
@@ -137,7 +137,7 @@ pub struct ProjectData {
|
||||
pub region_id: String,
|
||||
pub platform_id: String,
|
||||
pub user_id: String,
|
||||
pub pageserver_id: u64,
|
||||
pub pageserver_id: Option<u64>,
|
||||
#[serde(deserialize_with = "from_nullable_id")]
|
||||
pub tenant: TenantId,
|
||||
pub safekeepers: Vec<SafekeeperData>,
|
||||
|
||||
@@ -60,6 +60,7 @@ pub struct GarbageList {
|
||||
/// see garbage, we saw some active tenants too. This protects against classes of bugs
|
||||
/// in the scrubber that might otherwise generate a "deleted all" result.
|
||||
active_tenant_count: usize,
|
||||
active_timeline_count: usize,
|
||||
}
|
||||
|
||||
impl GarbageList {
|
||||
@@ -67,6 +68,7 @@ impl GarbageList {
|
||||
Self {
|
||||
items: Vec::new(),
|
||||
active_tenant_count: 0,
|
||||
active_timeline_count: 0,
|
||||
node_kind,
|
||||
bucket_config,
|
||||
}
|
||||
@@ -221,6 +223,7 @@ async fn find_garbage_inner(
|
||||
} else {
|
||||
tracing::debug!("Tenant {tenant_shard_id} is active");
|
||||
active_tenants.push(tenant_shard_id);
|
||||
garbage.active_tenant_count = active_tenants.len();
|
||||
}
|
||||
|
||||
counter += 1;
|
||||
@@ -271,15 +274,29 @@ async fn find_garbage_inner(
|
||||
std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
|
||||
|
||||
// Update the GarbageList with any timelines which appear not to exist.
|
||||
let mut active_timelines: Vec<TenantShardTimelineId> = vec![];
|
||||
while let Some(result) = timelines_checked.next().await {
|
||||
let (ttid, console_result) = result?;
|
||||
if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
|
||||
tracing::debug!("Timeline {ttid} is garbage");
|
||||
} else {
|
||||
tracing::debug!("Timeline {ttid} is active");
|
||||
active_timelines.push(ttid);
|
||||
garbage.active_timeline_count = active_timelines.len();
|
||||
}
|
||||
}
|
||||
|
||||
let num_garbage_timelines = garbage
|
||||
.items
|
||||
.iter()
|
||||
.filter(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
|
||||
.count();
|
||||
tracing::info!(
|
||||
"Found {}/{} garbage timelines in active tenants",
|
||||
num_garbage_timelines,
|
||||
active_timelines.len(),
|
||||
);
|
||||
|
||||
Ok(garbage)
|
||||
}
|
||||
|
||||
@@ -344,16 +361,22 @@ pub async fn get_timeline_objects(
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Drain a buffer of keys into DeleteObjects requests
|
||||
///
|
||||
/// If `drain` is true, drains keys completely; otherwise stops when <
|
||||
/// MAX_KEYS_PER_DELETE keys are left.
|
||||
/// `num_deleted` returns number of deleted keys.
|
||||
async fn do_delete(
|
||||
s3_client: &Arc<Client>,
|
||||
bucket_name: &str,
|
||||
keys: &mut Vec<ObjectIdentifier>,
|
||||
dry_run: bool,
|
||||
drain: bool,
|
||||
progress_tracker: &mut DeletionProgressTracker,
|
||||
) -> anyhow::Result<()> {
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
|
||||
let request_keys =
|
||||
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
|
||||
let num_deleted = request_keys.len();
|
||||
if dry_run {
|
||||
tracing::info!("Dry-run deletion of objects: ");
|
||||
for k in request_keys {
|
||||
@@ -368,12 +391,30 @@ async fn do_delete(
|
||||
.send()
|
||||
.await
|
||||
.context("DeleteObjects request")?;
|
||||
progress_tracker.register(num_deleted);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Simple tracker reporting each 10k deleted keys.
|
||||
#[derive(Default)]
|
||||
struct DeletionProgressTracker {
|
||||
num_deleted: usize,
|
||||
last_reported_num_deleted: usize,
|
||||
}
|
||||
|
||||
impl DeletionProgressTracker {
|
||||
fn register(&mut self, n: usize) {
|
||||
self.num_deleted += n;
|
||||
if self.num_deleted - self.last_reported_num_deleted > 10000 {
|
||||
tracing::info!("progress: deleted {} keys", self.num_deleted);
|
||||
self.last_reported_num_deleted = self.num_deleted;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn purge_garbage(
|
||||
input_path: String,
|
||||
mode: PurgeMode,
|
||||
@@ -394,6 +435,14 @@ pub async fn purge_garbage(
|
||||
if garbage_list.active_tenant_count == 0 {
|
||||
anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
|
||||
}
|
||||
if garbage_list
|
||||
.items
|
||||
.iter()
|
||||
.any(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
|
||||
&& garbage_list.active_timeline_count == 0
|
||||
{
|
||||
anyhow::bail!("Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines");
|
||||
}
|
||||
|
||||
let filtered_items = garbage_list
|
||||
.items
|
||||
@@ -429,6 +478,7 @@ pub async fn purge_garbage(
|
||||
std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
|
||||
|
||||
let mut objects_to_delete = Vec::new();
|
||||
let mut progress_tracker = DeletionProgressTracker::default();
|
||||
while let Some(result) = get_objects_results.next().await {
|
||||
let mut object_list = result?;
|
||||
objects_to_delete.append(&mut object_list);
|
||||
@@ -439,6 +489,7 @@ pub async fn purge_garbage(
|
||||
&mut objects_to_delete,
|
||||
dry_run,
|
||||
false,
|
||||
&mut progress_tracker,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -450,10 +501,11 @@ pub async fn purge_garbage(
|
||||
&mut objects_to_delete,
|
||||
dry_run,
|
||||
true,
|
||||
&mut progress_tracker,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Fell through");
|
||||
tracing::info!("{} keys deleted in total", progress_tracker.num_deleted);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -343,7 +343,7 @@ fn init_remote(
|
||||
}),
|
||||
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
|
||||
bucket_name: bucket_config.bucket,
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal".to_string()),
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()),
|
||||
delimiter,
|
||||
}),
|
||||
};
|
||||
|
||||
@@ -114,7 +114,7 @@ pub async fn stream_tenant_timelines<'a>(
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
|
||||
loop {
|
||||
tracing::info!("Listing in {}", tenant);
|
||||
tracing::debug!("Listing in {}", tenant);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
|
||||
.await;
|
||||
@@ -151,7 +151,7 @@ pub async fn stream_tenant_timelines<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Yielding for {}", tenant);
|
||||
tracing::debug!("Yielding for {}", tenant);
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
let id = i?;
|
||||
|
||||
Reference in New Issue
Block a user