mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
s3_scrubber: updates for sharding (#6281)
This is a lightweight change to keep the scrubber providing sensible output when using sharding. - The timeline count was wrong when using sharding - When checking for tenant existence, we didn't re-use results between different shards in the same tenant Closes: https://github.com/neondatabase/neon/issues/5929
This commit is contained in:
@@ -2,7 +2,10 @@
|
||||
//! S3 objects which are either not referenced by any metadata, or are referenced by a
|
||||
//! control plane tenant/timeline in a deleted state.
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{
|
||||
@@ -118,6 +121,13 @@ const S3_CONCURRENCY: usize = 32;
|
||||
// How many concurrent API requests to make to the console API.
|
||||
const CONSOLE_CONCURRENCY: usize = 128;
|
||||
|
||||
struct ConsoleCache {
|
||||
/// Set of tenants found in the control plane API
|
||||
projects: HashMap<TenantId, ProjectData>,
|
||||
/// Set of tenants for which the control plane API returned 404
|
||||
not_found: HashSet<TenantId>,
|
||||
}
|
||||
|
||||
async fn find_garbage_inner(
|
||||
bucket_config: BucketConfig,
|
||||
console_config: ConsoleConfig,
|
||||
@@ -143,23 +153,49 @@ async fn find_garbage_inner(
|
||||
console_projects.len()
|
||||
);
|
||||
|
||||
// TODO(sharding): batch calls into Console so that we only call once for each TenantId,
|
||||
// rather than checking the same TenantId for multiple TenantShardId
|
||||
// Because many tenant shards may look up the same TenantId, we maintain a cache.
|
||||
let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
|
||||
projects: console_projects,
|
||||
not_found: HashSet::new(),
|
||||
}));
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
|
||||
let tenants = stream_tenants(&s3_client, &target);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_projects = &console_projects;
|
||||
let console_cache = console_cache.clone();
|
||||
async move {
|
||||
match console_projects.get(&t.tenant_id) {
|
||||
// Check cache before issuing API call
|
||||
let project_data = {
|
||||
let cache = console_cache.lock().unwrap();
|
||||
let result = cache.projects.get(&t.tenant_id).cloned();
|
||||
if result.is_none() && cache.not_found.contains(&t.tenant_id) {
|
||||
return Ok((t, None));
|
||||
}
|
||||
result
|
||||
};
|
||||
|
||||
match project_data {
|
||||
Some(project_data) => Ok((t, Some(project_data.clone()))),
|
||||
None => api_client
|
||||
.find_tenant_project(t.tenant_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map(|r| (t, r)),
|
||||
None => {
|
||||
let project_data = api_client
|
||||
.find_tenant_project(t.tenant_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e));
|
||||
|
||||
// Populate cache with result of API call
|
||||
{
|
||||
let mut cache = console_cache.lock().unwrap();
|
||||
if let Ok(Some(project_data)) = &project_data {
|
||||
cache.projects.insert(t.tenant_id, project_data.clone());
|
||||
} else if let Ok(None) = &project_data {
|
||||
cache.not_found.insert(t.tenant_id);
|
||||
}
|
||||
}
|
||||
|
||||
project_data.map(|r| (t, r))
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -17,7 +17,9 @@ use utils::id::TenantId;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
count: usize,
|
||||
tenant_count: usize,
|
||||
timeline_count: usize,
|
||||
timeline_shard_count: usize,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
@@ -87,7 +89,9 @@ impl MinMaxHisto {
|
||||
impl MetadataSummary {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
count: 0,
|
||||
tenant_count: 0,
|
||||
timeline_count: 0,
|
||||
timeline_shard_count: 0,
|
||||
with_errors: HashSet::new(),
|
||||
with_warnings: HashSet::new(),
|
||||
with_orphans: HashSet::new(),
|
||||
@@ -112,7 +116,7 @@ impl MetadataSummary {
|
||||
}
|
||||
|
||||
fn update_data(&mut self, data: &S3TimelineBlobData) {
|
||||
self.count += 1;
|
||||
self.timeline_shard_count += 1;
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation: _,
|
||||
@@ -158,16 +162,20 @@ impl MetadataSummary {
|
||||
);
|
||||
|
||||
format!(
|
||||
"Timelines: {0}
|
||||
With errors: {1}
|
||||
With warnings: {2}
|
||||
With orphan layers: {3}
|
||||
"Tenants: {}
|
||||
Timelines: {}
|
||||
Timeline-shards: {}
|
||||
With errors: {}
|
||||
With warnings: {}
|
||||
With orphan layers: {}
|
||||
Index versions: {version_summary}
|
||||
Timeline size bytes: {4}
|
||||
Layer size bytes: {5}
|
||||
Timeline layer count: {6}
|
||||
Timeline size bytes: {}
|
||||
Layer size bytes: {}
|
||||
Timeline layer count: {}
|
||||
",
|
||||
self.count,
|
||||
self.tenant_count,
|
||||
self.timeline_count,
|
||||
self.timeline_shard_count,
|
||||
self.with_errors.len(),
|
||||
self.with_warnings.len(),
|
||||
self.with_orphans.len(),
|
||||
@@ -182,7 +190,7 @@ Timeline layer count: {6}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.count == 0
|
||||
self.timeline_shard_count == 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,8 +241,12 @@ pub async fn scan_metadata(
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
|
||||
) {
|
||||
summary.tenant_count += 1;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
@@ -252,6 +264,8 @@ pub async fn scan_metadata(
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
summary.timeline_count += timeline_ids.len();
|
||||
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
|
||||
@@ -254,7 +254,9 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
metadata_summary = S3Scrubber(
|
||||
neon_env_builder.test_output_dir, neon_env_builder
|
||||
).scan_metadata()
|
||||
assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
assert not metadata_summary["with_errors"]
|
||||
assert not metadata_summary["with_warnings"]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user