From d1d4631c8f0eda39b9558ab542bf04ca7c4c5604 Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Tue, 30 Jul 2024 11:07:34 -0400 Subject: [PATCH] feat(scrubber): post `scan_metadata` results to storage controller (#8502) Part of #8128, followup to #8480. closes #8421. Enable scrubber to optionally post metadata scan health results to storage controller. Signed-off-by: Yuchen Liang --- libs/pageserver_api/src/controller_api.rs | 5 ++- storage_scrubber/src/checks.rs | 5 +++ storage_scrubber/src/lib.rs | 7 ++++ storage_scrubber/src/main.rs | 40 ++++++++++++++----- .../src/pageserver_physical_gc.rs | 8 +--- .../src/scan_pageserver_metadata.rs | 32 ++++++++++----- test_runner/fixtures/neon_fixtures.py | 9 +++-- test_runner/regress/test_storage_scrubber.py | 16 +++++++- 8 files changed, 88 insertions(+), 34 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 36b1bd95ff..a5b452da83 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -304,8 +305,8 @@ pub struct MetadataHealthRecord { #[derive(Serialize, Deserialize, Debug)] pub struct MetadataHealthUpdateRequest { - pub healthy_tenant_shards: Vec, - pub unhealthy_tenant_shards: Vec, + pub healthy_tenant_shards: HashSet, + pub unhealthy_tenant_shards: HashSet, } #[derive(Serialize, Deserialize, Debug)] diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index a35a58aedd..5aa9e88c40 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -40,6 +40,11 @@ impl TimelineAnalysis { garbage_keys: Vec::new(), } } + + /// Whether a timeline is healthy. + pub(crate) fn is_healthy(&self) -> bool { + self.errors.is_empty() && self.warnings.is_empty() + } } pub(crate) async fn branch_cleanup_and_check_errors( diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index c7900f9b02..e0f154def3 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -32,6 +32,7 @@ use remote_storage::{ }; use reqwest::Url; use serde::{Deserialize, Serialize}; +use storage_controller_client::control_api; use tokio::io::AsyncReadExt; use tokio_util::sync::CancellationToken; use tracing::error; @@ -255,6 +256,12 @@ pub struct ControllerClientConfig { pub controller_jwt: String, } +impl ControllerClientConfig { + pub fn build_client(self) -> control_api::Client { + control_api::Client::new(self.controller_api, Some(self.controller_jwt)) + } +} + pub struct ConsoleConfig { pub token: String, pub base_url: Url, diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 346829b7c9..4c804c00c1 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -1,7 +1,8 @@ use anyhow::{anyhow, bail}; use camino::Utf8PathBuf; +use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse}; use pageserver_api::shard::TenantShardId; -use reqwest::Url; +use reqwest::{Method, Url}; use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use storage_scrubber::pageserver_physical_gc::GcMode; use storage_scrubber::scan_pageserver_metadata::scan_metadata; @@ -61,6 +62,8 @@ enum Command { json: bool, #[arg(long = "tenant-id", num_args = 0..)] tenant_ids: Vec, + #[arg(long = "post", default_value_t = false)] + post_to_storage_controller: bool, #[arg(long, default_value = None)] /// For safekeeper node_kind only, points to db with debug dump dump_db_connstr: Option, @@ -116,11 +119,20 @@ async fn main() -> anyhow::Result<()> { chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") )); + let controller_client_conf = cli.controller_api.map(|controller_api| { + ControllerClientConfig { + controller_api, + // Default to no key: this is a convenience when working in a development environment + controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()), + } + }); + match cli.command { Command::ScanMetadata { json, tenant_ids, node_kind, + post_to_storage_controller, dump_db_connstr, dump_db_table, } => { @@ -159,6 +171,9 @@ async fn main() -> anyhow::Result<()> { } Ok(()) } else { + if controller_client_conf.is_none() && post_to_storage_controller { + return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run")); + } match scan_metadata(bucket_config.clone(), tenant_ids).await { Err(e) => { tracing::error!("Failed: {e}"); @@ -170,6 +185,21 @@ async fn main() -> anyhow::Result<()> { } else { println!("{}", summary.summary_string()); } + + if post_to_storage_controller { + if let Some(conf) = controller_client_conf { + let controller_client = conf.build_client(); + let body = summary.build_health_update_request(); + controller_client + .dispatch::( + Method::POST, + "control/v1/metadata_health/update".to_string(), + Some(body), + ) + .await?; + } + } + if summary.is_fatal() { Err(anyhow::anyhow!("Fatal scrub errors detected")) } else if summary.is_empty() { @@ -217,14 +247,6 @@ async fn main() -> anyhow::Result<()> { min_age, mode, } => { - let controller_client_conf = cli.controller_api.map(|controller_api| { - ControllerClientConfig { - controller_api, - // Default to no key: this is a convenience when working in a development environment - controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()), - } - }); - match (&controller_client_conf, mode) { (Some(_), _) => { // Any mode may run when controller API is set diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index e977fd49f7..69896caa82 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -567,13 +567,7 @@ pub async fn pageserver_physical_gc( } // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC - let Some(controller_client) = controller_client_conf.as_ref().map(|c| { - let ControllerClientConfig { - controller_api, - controller_jwt, - } = c; - control_api::Client::new(controller_api.clone(), Some(controller_jwt.clone())) - }) else { + let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else { tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified"); return Ok(summary); }; diff --git a/storage_scrubber/src/scan_pageserver_metadata.rs b/storage_scrubber/src/scan_pageserver_metadata.rs index fbd60f93bb..dc410bde41 100644 --- a/storage_scrubber/src/scan_pageserver_metadata.rs +++ b/storage_scrubber/src/scan_pageserver_metadata.rs @@ -9,12 +9,13 @@ use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimeline use aws_sdk_s3::Client; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::remote_layer_path; +use pageserver_api::controller_api::MetadataHealthUpdateRequest; use pageserver_api::shard::TenantShardId; use serde::Serialize; use utils::id::TenantId; use utils::shard::ShardCount; -#[derive(Serialize)] +#[derive(Serialize, Default)] pub struct MetadataSummary { tenant_count: usize, timeline_count: usize, @@ -23,19 +24,16 @@ pub struct MetadataSummary { with_warnings: HashSet, with_orphans: HashSet, indices_by_version: HashMap, + + #[serde(skip)] + pub(crate) healthy_tenant_shards: HashSet, + #[serde(skip)] + pub(crate) unhealthy_tenant_shards: HashSet, } impl MetadataSummary { fn new() -> Self { - Self { - tenant_count: 0, - timeline_count: 0, - timeline_shard_count: 0, - with_errors: HashSet::new(), - with_warnings: HashSet::new(), - with_orphans: HashSet::new(), - indices_by_version: HashMap::new(), - } + Self::default() } fn update_data(&mut self, data: &S3TimelineBlobData) { @@ -54,6 +52,13 @@ impl MetadataSummary { } fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) { + if analysis.is_healthy() { + self.healthy_tenant_shards.insert(id.tenant_shard_id); + } else { + self.healthy_tenant_shards.remove(&id.tenant_shard_id); + self.unhealthy_tenant_shards.insert(id.tenant_shard_id); + } + if !analysis.errors.is_empty() { self.with_errors.insert(*id); } @@ -101,6 +106,13 @@ Index versions: {version_summary} pub fn is_empty(&self) -> bool { self.timeline_shard_count == 0 } + + pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest { + MetadataHealthUpdateRequest { + healthy_tenant_shards: self.healthy_tenant_shards.clone(), + unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(), + } + } } /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5b2ebea794..0c33dec784 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4401,10 +4401,11 @@ class StorageScrubber: assert stdout is not None return stdout - def scan_metadata(self) -> Any: - stdout = self.scrubber_cli( - ["scan-metadata", "--node-kind", "pageserver", "--json"], timeout=30 - ) + def scan_metadata(self, post_to_storage_controller: bool = False) -> Any: + args = ["scan-metadata", "--node-kind", "pageserver", "--json"] + if post_to_storage_controller: + args.append("--post") + stdout = self.scrubber_cli(args, timeout=30) try: return json.loads(stdout) diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index a45430ca86..fadf438788 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -440,10 +440,12 @@ def test_scrubber_scan_pageserver_metadata( assert len(index.layer_metadata) > 0 it = iter(index.layer_metadata.items()) - scan_summary = env.storage_scrubber.scan_metadata() + scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True) assert not scan_summary["with_warnings"] assert not scan_summary["with_errors"] + assert env.storage_controller.metadata_health_is_healthy() + # Delete a layer file that is listed in the index. layer, metadata = next(it) log.info(f"Deleting {timeline_path}/{layer.to_str()}") @@ -453,7 +455,17 @@ def test_scrubber_scan_pageserver_metadata( ) log.info(f"delete response: {delete_response}") - # Check scan summary. Expect it to be a L0 layer so only emit warnings. + # Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings. scan_summary = env.storage_scrubber.scan_metadata() log.info(f"{pprint.pformat(scan_summary)}") assert len(scan_summary["with_warnings"]) > 0 + + assert env.storage_controller.metadata_health_is_healthy() + + # Now post to storage controller, expect seeing one unhealthy health record + scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True) + log.info(f"{pprint.pformat(scan_summary)}") + assert len(scan_summary["with_warnings"]) > 0 + + unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"] + assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)