mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
feat(scrubber): post scan_metadata results to storage controller (#8502)
Part of #8128, followup to #8480. closes #8421. Enable scrubber to optionally post metadata scan health results to storage controller. Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
committed by
Arpad Müller
parent
b87a1384f0
commit
d1d4631c8f
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -304,8 +305,8 @@ pub struct MetadataHealthRecord {
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MetadataHealthUpdateRequest {
|
||||
pub healthy_tenant_shards: Vec<TenantShardId>,
|
||||
pub unhealthy_tenant_shards: Vec<TenantShardId>,
|
||||
pub healthy_tenant_shards: HashSet<TenantShardId>,
|
||||
pub unhealthy_tenant_shards: HashSet<TenantShardId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
@@ -40,6 +40,11 @@ impl TimelineAnalysis {
|
||||
garbage_keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether a timeline is healthy.
|
||||
pub(crate) fn is_healthy(&self) -> bool {
|
||||
self.errors.is_empty() && self.warnings.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
|
||||
@@ -32,6 +32,7 @@ use remote_storage::{
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use storage_controller_client::control_api;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
@@ -255,6 +256,12 @@ pub struct ControllerClientConfig {
|
||||
pub controller_jwt: String,
|
||||
}
|
||||
|
||||
impl ControllerClientConfig {
|
||||
pub fn build_client(self) -> control_api::Client {
|
||||
control_api::Client::new(self.controller_api, Some(self.controller_jwt))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConsoleConfig {
|
||||
pub token: String,
|
||||
pub base_url: Url,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use anyhow::{anyhow, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::Url;
|
||||
use reqwest::{Method, Url};
|
||||
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use storage_scrubber::pageserver_physical_gc::GcMode;
|
||||
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
|
||||
@@ -61,6 +62,8 @@ enum Command {
|
||||
json: bool,
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
#[arg(long = "post", default_value_t = false)]
|
||||
post_to_storage_controller: bool,
|
||||
#[arg(long, default_value = None)]
|
||||
/// For safekeeper node_kind only, points to db with debug dump
|
||||
dump_db_connstr: Option<String>,
|
||||
@@ -116,11 +119,20 @@ async fn main() -> anyhow::Result<()> {
|
||||
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
|
||||
));
|
||||
|
||||
let controller_client_conf = cli.controller_api.map(|controller_api| {
|
||||
ControllerClientConfig {
|
||||
controller_api,
|
||||
// Default to no key: this is a convenience when working in a development environment
|
||||
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
|
||||
}
|
||||
});
|
||||
|
||||
match cli.command {
|
||||
Command::ScanMetadata {
|
||||
json,
|
||||
tenant_ids,
|
||||
node_kind,
|
||||
post_to_storage_controller,
|
||||
dump_db_connstr,
|
||||
dump_db_table,
|
||||
} => {
|
||||
@@ -159,6 +171,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
if controller_client_conf.is_none() && post_to_storage_controller {
|
||||
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
|
||||
}
|
||||
match scan_metadata(bucket_config.clone(), tenant_ids).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
@@ -170,6 +185,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
println!("{}", summary.summary_string());
|
||||
}
|
||||
|
||||
if post_to_storage_controller {
|
||||
if let Some(conf) = controller_client_conf {
|
||||
let controller_client = conf.build_client();
|
||||
let body = summary.build_health_update_request();
|
||||
controller_client
|
||||
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
|
||||
Method::POST,
|
||||
"control/v1/metadata_health/update".to_string(),
|
||||
Some(body),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
if summary.is_fatal() {
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
@@ -217,14 +247,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
min_age,
|
||||
mode,
|
||||
} => {
|
||||
let controller_client_conf = cli.controller_api.map(|controller_api| {
|
||||
ControllerClientConfig {
|
||||
controller_api,
|
||||
// Default to no key: this is a convenience when working in a development environment
|
||||
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
|
||||
}
|
||||
});
|
||||
|
||||
match (&controller_client_conf, mode) {
|
||||
(Some(_), _) => {
|
||||
// Any mode may run when controller API is set
|
||||
|
||||
@@ -567,13 +567,7 @@ pub async fn pageserver_physical_gc(
|
||||
}
|
||||
|
||||
// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
|
||||
let Some(controller_client) = controller_client_conf.as_ref().map(|c| {
|
||||
let ControllerClientConfig {
|
||||
controller_api,
|
||||
controller_jwt,
|
||||
} = c;
|
||||
control_api::Client::new(controller_api.clone(), Some(controller_jwt.clone()))
|
||||
}) else {
|
||||
let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else {
|
||||
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
|
||||
return Ok(summary);
|
||||
};
|
||||
|
||||
@@ -9,12 +9,13 @@ use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimeline
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::ShardCount;
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, Default)]
|
||||
pub struct MetadataSummary {
|
||||
tenant_count: usize,
|
||||
timeline_count: usize,
|
||||
@@ -23,19 +24,16 @@ pub struct MetadataSummary {
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
#[serde(skip)]
|
||||
pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
|
||||
#[serde(skip)]
|
||||
pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
|
||||
}
|
||||
|
||||
impl MetadataSummary {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
tenant_count: 0,
|
||||
timeline_count: 0,
|
||||
timeline_shard_count: 0,
|
||||
with_errors: HashSet::new(),
|
||||
with_warnings: HashSet::new(),
|
||||
with_orphans: HashSet::new(),
|
||||
indices_by_version: HashMap::new(),
|
||||
}
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn update_data(&mut self, data: &S3TimelineBlobData) {
|
||||
@@ -54,6 +52,13 @@ impl MetadataSummary {
|
||||
}
|
||||
|
||||
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
|
||||
if analysis.is_healthy() {
|
||||
self.healthy_tenant_shards.insert(id.tenant_shard_id);
|
||||
} else {
|
||||
self.healthy_tenant_shards.remove(&id.tenant_shard_id);
|
||||
self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
|
||||
}
|
||||
|
||||
if !analysis.errors.is_empty() {
|
||||
self.with_errors.insert(*id);
|
||||
}
|
||||
@@ -101,6 +106,13 @@ Index versions: {version_summary}
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.timeline_shard_count == 0
|
||||
}
|
||||
|
||||
pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
|
||||
MetadataHealthUpdateRequest {
|
||||
healthy_tenant_shards: self.healthy_tenant_shards.clone(),
|
||||
unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
|
||||
|
||||
@@ -4401,10 +4401,11 @@ class StorageScrubber:
|
||||
assert stdout is not None
|
||||
return stdout
|
||||
|
||||
def scan_metadata(self) -> Any:
|
||||
stdout = self.scrubber_cli(
|
||||
["scan-metadata", "--node-kind", "pageserver", "--json"], timeout=30
|
||||
)
|
||||
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
|
||||
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
|
||||
if post_to_storage_controller:
|
||||
args.append("--post")
|
||||
stdout = self.scrubber_cli(args, timeout=30)
|
||||
|
||||
try:
|
||||
return json.loads(stdout)
|
||||
|
||||
@@ -440,10 +440,12 @@ def test_scrubber_scan_pageserver_metadata(
|
||||
assert len(index.layer_metadata) > 0
|
||||
it = iter(index.layer_metadata.items())
|
||||
|
||||
scan_summary = env.storage_scrubber.scan_metadata()
|
||||
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
assert not scan_summary["with_warnings"]
|
||||
assert not scan_summary["with_errors"]
|
||||
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
# Delete a layer file that is listed in the index.
|
||||
layer, metadata = next(it)
|
||||
log.info(f"Deleting {timeline_path}/{layer.to_str()}")
|
||||
@@ -453,7 +455,17 @@ def test_scrubber_scan_pageserver_metadata(
|
||||
)
|
||||
log.info(f"delete response: {delete_response}")
|
||||
|
||||
# Check scan summary. Expect it to be a L0 layer so only emit warnings.
|
||||
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
|
||||
scan_summary = env.storage_scrubber.scan_metadata()
|
||||
log.info(f"{pprint.pformat(scan_summary)}")
|
||||
assert len(scan_summary["with_warnings"]) > 0
|
||||
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
# Now post to storage controller, expect seeing one unhealthy health record
|
||||
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
log.info(f"{pprint.pformat(scan_summary)}")
|
||||
assert len(scan_summary["with_warnings"]) > 0
|
||||
|
||||
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
|
||||
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
|
||||
|
||||
Reference in New Issue
Block a user