From b3a681d121625103fe8a2fddec44bfc7a77bb7bf Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 8 Jan 2024 09:19:10 +0000 Subject: [PATCH] s3_scrubber: updates for sharding (#6281) This is a lightweight change to keep the scrubber providing sensible output when using sharding. - The timeline count was wrong when using sharding - When checking for tenant existence, we didn't re-use results between different shards in the same tenant Closes: https://github.com/neondatabase/neon/issues/5929 --- s3_scrubber/src/garbage.rs | 56 +++++++++++++++---- s3_scrubber/src/scan_metadata.rs | 38 +++++++++---- .../regress/test_pageserver_generations.py | 4 +- 3 files changed, 75 insertions(+), 23 deletions(-) diff --git a/s3_scrubber/src/garbage.rs b/s3_scrubber/src/garbage.rs index 7192afb91b..93bb115883 100644 --- a/s3_scrubber/src/garbage.rs +++ b/s3_scrubber/src/garbage.rs @@ -2,7 +2,10 @@ //! S3 objects which are either not referenced by any metadata, or are referenced by a //! control plane tenant/timeline in a deleted state. -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use anyhow::Context; use aws_sdk_s3::{ @@ -118,6 +121,13 @@ const S3_CONCURRENCY: usize = 32; // How many concurrent API requests to make to the console API. const CONSOLE_CONCURRENCY: usize = 128; +struct ConsoleCache { + /// Set of tenants found in the control plane API + projects: HashMap, + /// Set of tenants for which the control plane API returned 404 + not_found: HashSet, +} + async fn find_garbage_inner( bucket_config: BucketConfig, console_config: ConsoleConfig, @@ -143,23 +153,49 @@ async fn find_garbage_inner( console_projects.len() ); - // TODO(sharding): batch calls into Console so that we only call once for each TenantId, - // rather than checking the same TenantId for multiple TenantShardId + // Because many tenant shards may look up the same TenantId, we maintain a cache. + let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache { + projects: console_projects, + not_found: HashSet::new(), + })); // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); let tenants = stream_tenants(&s3_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); - let console_projects = &console_projects; + let console_cache = console_cache.clone(); async move { - match console_projects.get(&t.tenant_id) { + // Check cache before issuing API call + let project_data = { + let cache = console_cache.lock().unwrap(); + let result = cache.projects.get(&t.tenant_id).cloned(); + if result.is_none() && cache.not_found.contains(&t.tenant_id) { + return Ok((t, None)); + } + result + }; + + match project_data { Some(project_data) => Ok((t, Some(project_data.clone()))), - None => api_client - .find_tenant_project(t.tenant_id) - .await - .map_err(|e| anyhow::anyhow!(e)) - .map(|r| (t, r)), + None => { + let project_data = api_client + .find_tenant_project(t.tenant_id) + .await + .map_err(|e| anyhow::anyhow!(e)); + + // Populate cache with result of API call + { + let mut cache = console_cache.lock().unwrap(); + if let Ok(Some(project_data)) = &project_data { + cache.projects.insert(t.tenant_id, project_data.clone()); + } else if let Ok(None) = &project_data { + cache.not_found.insert(t.tenant_id); + } + } + + project_data.map(|r| (t, r)) + } } } }); diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index bfde8f0213..4b63bb3884 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -17,7 +17,9 @@ use utils::id::TenantId; #[derive(Serialize)] pub struct MetadataSummary { - count: usize, + tenant_count: usize, + timeline_count: usize, + timeline_shard_count: usize, with_errors: HashSet, with_warnings: HashSet, with_orphans: HashSet, @@ -87,7 +89,9 @@ impl MinMaxHisto { impl MetadataSummary { fn new() -> Self { Self { - count: 0, + tenant_count: 0, + timeline_count: 0, + timeline_shard_count: 0, with_errors: HashSet::new(), with_warnings: HashSet::new(), with_orphans: HashSet::new(), @@ -112,7 +116,7 @@ impl MetadataSummary { } fn update_data(&mut self, data: &S3TimelineBlobData) { - self.count += 1; + self.timeline_shard_count += 1; if let BlobDataParseResult::Parsed { index_part, index_part_generation: _, @@ -158,16 +162,20 @@ impl MetadataSummary { ); format!( - "Timelines: {0} -With errors: {1} -With warnings: {2} -With orphan layers: {3} + "Tenants: {} +Timelines: {} +Timeline-shards: {} +With errors: {} +With warnings: {} +With orphan layers: {} Index versions: {version_summary} -Timeline size bytes: {4} -Layer size bytes: {5} -Timeline layer count: {6} +Timeline size bytes: {} +Layer size bytes: {} +Timeline layer count: {} ", - self.count, + self.tenant_count, + self.timeline_count, + self.timeline_shard_count, self.with_errors.len(), self.with_warnings.len(), self.with_orphans.len(), @@ -182,7 +190,7 @@ Timeline layer count: {6} } pub fn is_empty(&self) -> bool { - self.count == 0 + self.timeline_shard_count == 0 } } @@ -233,8 +241,12 @@ pub async fn scan_metadata( mut tenant_objects: TenantObjectListing, timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>, ) { + summary.tenant_count += 1; + + let mut timeline_ids = HashSet::new(); let mut timeline_generations = HashMap::new(); for (ttid, data) in timelines { + timeline_ids.insert(ttid.timeline_id); // Stash the generation of each timeline, for later use identifying orphan layers if let BlobDataParseResult::Parsed { index_part: _index_part, @@ -252,6 +264,8 @@ pub async fn scan_metadata( summary.update_analysis(&ttid, &analysis); } + summary.timeline_count += timeline_ids.len(); + // Identifying orphan layers must be done on a tenant-wide basis, because individual // shards' layers may be referenced by other shards. // diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 9c2f5786d4..87a4fa01fc 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -254,7 +254,9 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): metadata_summary = S3Scrubber( neon_env_builder.test_output_dir, neon_env_builder ).scan_metadata() - assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline + assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline + assert metadata_summary["timeline_count"] == 1 + assert metadata_summary["timeline_shard_count"] == 1 assert not metadata_summary["with_errors"] assert not metadata_summary["with_warnings"]