feat(test_runner): allowed_errors in storage scrubber (#10062)

## Problem

resolve
https://github.com/neondatabase/neon/issues/9988#issuecomment-2528239437

## Summary of changes

* New verbose mode for storage scrubber scan metadata (pageserver) that
contains the error messages.
* Filter allowed_error list from the JSON output to determine the
healthy flag status.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-12-10 12:00:47 -05:00
committed by GitHub
parent b853f78136
commit aa0554fd1e
4 changed files with 98 additions and 15 deletions

View File

@@ -86,6 +86,8 @@ enum Command {
/// For safekeeper node_kind only, json list of timelines and their lsn info
#[arg(long, default_value = None)]
timeline_lsns: Option<String>,
#[arg(long, default_value_t = false)]
verbose: bool,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
@@ -166,6 +168,7 @@ async fn main() -> anyhow::Result<()> {
dump_db_connstr,
dump_db_table,
timeline_lsns,
verbose,
} => {
if let NodeKind::Safekeeper = node_kind {
let db_or_list = match (timeline_lsns, dump_db_connstr) {
@@ -203,6 +206,7 @@ async fn main() -> anyhow::Result<()> {
tenant_ids,
json,
post_to_storcon,
verbose,
cli.exit_code,
)
.await
@@ -313,6 +317,7 @@ pub async fn run_cron_job(
Vec::new(),
true,
post_to_storcon,
false, // default to non-verbose mode
exit_code,
)
.await?;
@@ -362,12 +367,13 @@ pub async fn scan_pageserver_metadata_cmd(
tenant_shard_ids: Vec<TenantShardId>,
json: bool,
post_to_storcon: bool,
verbose: bool,
exit_code: bool,
) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)

View File

@@ -21,8 +21,12 @@ pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
timeline_shard_count: usize,
with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>,
/// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_errors: HashMap<String, Vec<String>>,
/// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_warnings: HashMap<String, Vec<String>>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,
@@ -52,7 +56,12 @@ impl MetadataSummary {
}
}
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
fn update_analysis(
&mut self,
id: &TenantShardTimelineId,
analysis: &TimelineAnalysis,
verbose: bool,
) {
if analysis.is_healthy() {
self.healthy_tenant_shards.insert(id.tenant_shard_id);
} else {
@@ -61,11 +70,17 @@ impl MetadataSummary {
}
if !analysis.errors.is_empty() {
self.with_errors.insert(*id);
let entry = self.with_errors.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.errors.iter().cloned());
}
}
if !analysis.warnings.is_empty() {
self.with_warnings.insert(*id);
let entry = self.with_warnings.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.warnings.iter().cloned());
}
}
}
@@ -120,6 +135,7 @@ Index versions: {version_summary}
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
verbose: bool,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
@@ -164,6 +180,7 @@ pub async fn scan_pageserver_metadata(
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
verbose: bool,
) {
summary.tenant_count += 1;
@@ -203,7 +220,7 @@ pub async fn scan_pageserver_metadata(
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
summary.update_analysis(&ttid, &analysis, verbose);
timeline_ids.insert(ttid.timeline_id);
} else {
@@ -271,10 +288,6 @@ pub async fn scan_pageserver_metadata(
summary.update_data(&data);
match tenant_id {
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
@@ -287,6 +300,7 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
timelines,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await;
@@ -296,6 +310,10 @@ pub async fn scan_pageserver_metadata(
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
@@ -326,6 +344,7 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
tenant_timeline_results,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await;

View File

@@ -4556,6 +4556,7 @@ class StorageScrubber:
def __init__(self, env: NeonEnv, log_dir: Path):
self.env = env
self.log_dir = log_dir
self.allowed_errors: list[str] = []
def scrubber_cli(
self, args: list[str], timeout, extra_env: dict[str, str] | None = None
@@ -4633,19 +4634,70 @@ class StorageScrubber:
if timeline_lsns is not None:
args.append("--timeline-lsns")
args.append(json.dumps(timeline_lsns))
if node_kind == NodeKind.PAGESERVER:
args.append("--verbose")
stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env)
try:
summary = json.loads(stdout)
# summary does not contain "with_warnings" if node_kind is the safekeeper
no_warnings = "with_warnings" not in summary or not summary["with_warnings"]
healthy = not summary["with_errors"] and no_warnings
healthy = self._check_run_healthy(summary)
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)
raise
def _check_line_allowed(self, line: str) -> bool:
for a in self.allowed_errors:
try:
if re.match(a, line):
return True
except re.error:
log.error(f"Invalid regex: '{a}'")
raise
return False
def _check_line_list_allowed(self, lines: list[str]) -> bool:
for line in lines:
if not self._check_line_allowed(line):
return False
return True
def _check_run_healthy(self, summary: dict[str, Any]) -> bool:
# summary does not contain "with_warnings" if node_kind is the safekeeper
healthy = True
with_warnings = summary.get("with_warnings", None)
if with_warnings is not None:
if isinstance(with_warnings, list):
if len(with_warnings) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, warnings in with_warnings.items():
assert (
len(warnings) > 0
), "with_warnings value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(warnings):
healthy = False
break
if not healthy:
return healthy
with_errors = summary.get("with_errors", None)
if with_errors is not None:
if isinstance(with_errors, list):
if len(with_errors) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, errors in with_errors.items():
assert (
len(errors) > 0
), "with_errors value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(errors):
healthy = False
break
return healthy
def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
stdout = self.scrubber_cli(
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],

View File

@@ -572,4 +572,10 @@ def test_scrubber_scan_pageserver_metadata(
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
neon_env_builder.disable_scrub_on_exit()
healthy, _ = env.storage_scrubber.scan_metadata()
assert not healthy
env.storage_scrubber.allowed_errors.append(".*not present in remote storage.*")
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
neon_env_builder.disable_scrub_on_exit() # We already ran scrubber, no need to do an extra run