feat(storcon): store scrubber metadata scan result (#8480)

Part of #8128, followed by #8502.

## Problem

Currently we lack mechanism to alert unhealthy `scan_metadata` status if
we start running this scrubber command as part of a cronjob. With the
storage controller client introduced to storage scrubber in #8196, it is
viable to set up alert by storing health status in the storage
controller database.

We intentionally do not store the full output to the database as the
json blobs potentially makes the table really huge. Instead, only a
health status and a timestamp recording the last time metadata health
status is posted on a tenant shard.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-07-30 09:32:00 -04:00
committed by Arpad Müller
parent 5702e1cb46
commit b87a1384f0
12 changed files with 560 additions and 27 deletions

2
Cargo.lock generated
View File

@@ -1672,6 +1672,7 @@ checksum = "62d6dcd069e7b5fe49a302411f759d4cf1cf2c27fe798ef46fb8baefc053dd2b"
dependencies = [
"bitflags 2.4.1",
"byteorder",
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
@@ -5718,6 +5719,7 @@ dependencies = [
"aws-config",
"bytes",
"camino",
"chrono",
"clap",
"control_plane",
"diesel",

View File

@@ -1,5 +1,5 @@
use std::str::FromStr;
use std::time::Instant;
use std::time::{Duration, Instant};
/// Request/response types for the storage controller
/// API (`/control/v1` prefix). Implemented by the server
@@ -294,6 +294,42 @@ pub enum PlacementPolicy {
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateResponse {}
/// Metadata health record posted from scrubber.
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthRecord {
pub tenant_shard_id: TenantShardId,
pub healthy: bool,
pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthUpdateRequest {
pub healthy_tenant_shards: Vec<TenantShardId>,
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthUpdateResponse {}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListUnhealthyResponse {
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedRequest {
#[serde(with = "humantime_serde")]
pub not_scrubbed_for: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -18,20 +18,20 @@ const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
// Provides access to all data for a specific tenant (specified in `struct Claims` below)
/// Provides access to all data for a specific tenant (specified in `struct Claims` below)
// TODO: join these two?
Tenant,
// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
// Should only be used e.g. for status check/tenant creation/list.
/// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
/// Should only be used e.g. for status check/tenant creation/list.
PageServerApi,
// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
// Should only be used e.g. for status check.
// Currently also used for connection from any pageserver to any safekeeper.
/// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
/// Should only be used e.g. for status check.
/// Currently also used for connection from any pageserver to any safekeeper.
SafekeeperData,
// The scope used by pageservers in upcalls to storage controller and cloud control plane
/// The scope used by pageservers in upcalls to storage controller and cloud control plane
#[serde(rename = "generations_api")]
GenerationsApi,
// Allows access to control plane managment API and some storage controller endpoints.
/// Allows access to control plane managment API and some storage controller endpoints.
Admin,
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state

View File

@@ -18,6 +18,7 @@ anyhow.workspace = true
aws-config.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
@@ -44,7 +45,12 @@ scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel = { version = "2.1.4", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
@@ -52,4 +58,3 @@ utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -0,0 +1 @@
DROP TABLE metadata_health;

View File

@@ -0,0 +1,14 @@
CREATE TABLE metadata_health (
tenant_id VARCHAR NOT NULL,
shard_number INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
-- Rely on cascade behavior for delete
FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE,
healthy BOOLEAN NOT NULL DEFAULT TRUE,
last_scrubbed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO metadata_health(tenant_id, shard_number, shard_count)
SELECT tenant_id, shard_number, shard_count FROM tenant_shards;

View File

@@ -10,7 +10,11 @@ use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::TenantCreateRequest;
use pageserver_api::controller_api::{
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
TenantCreateRequest,
};
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineCreateRequest,
@@ -560,6 +564,51 @@ async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, A
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_metadata_health_update(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Scrubber)?;
let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
let state = get_state(&req);
state.service.metadata_health_update(update_req).await?;
json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
}
async fn handle_metadata_health_list_unhealthy(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
json_response(
StatusCode::OK,
MetadataHealthListUnhealthyResponse {
unhealthy_tenant_shards,
},
)
}
async fn handle_metadata_health_list_outdated(
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
let state = get_state(&req);
let health_records = state
.service
.metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
.await?;
json_response(
StatusCode::OK,
MetadataHealthListOutdatedResponse { health_records },
)
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
@@ -987,6 +1036,28 @@ pub fn make_router(
RequestName("control_v1_cancel_node_fill"),
)
})
// Metadata health operations
.post("/control/v1/metadata_health/update", |r| {
named_request_span(
r,
handle_metadata_health_update,
RequestName("control_v1_metadata_health_update"),
)
})
.get("/control/v1/metadata_health/unhealthy", |r| {
named_request_span(
r,
handle_metadata_health_list_unhealthy,
RequestName("control_v1_metadata_health_list_unhealthy"),
)
})
.post("/control/v1/metadata_health/outdated", |r| {
named_request_span(
r,
handle_metadata_health_list_outdated,
RequestName("control_v1_metadata_health_list_outdated"),
)
})
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {

View File

@@ -8,6 +8,7 @@ use self::split_state::SplitState;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
@@ -90,6 +91,10 @@ pub(crate) enum DatabaseOperation {
UpdateTenantShard,
DeleteTenant,
UpdateTenantConfig,
UpdateMetadataHealth,
ListMetadataHealth,
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
}
#[must_use]
@@ -307,15 +312,32 @@ impl Persistence {
&self,
shards: Vec<TenantShardPersistence>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
use crate::schema::metadata_health;
use crate::schema::tenant_shards;
let now = chrono::Utc::now();
let metadata_health_records = shards
.iter()
.map(|t| MetadataHealthPersistence {
tenant_id: t.tenant_id.clone(),
shard_number: t.shard_number,
shard_count: t.shard_count,
healthy: true,
last_scrubbed_at: now,
})
.collect::<Vec<_>>();
self.with_measured_conn(
DatabaseOperation::InsertTenantShards,
move |conn| -> DatabaseResult<()> {
for tenant in &shards {
diesel::insert_into(tenant_shards)
.values(tenant)
.execute(conn)?;
}
diesel::insert_into(tenant_shards::table)
.values(&shards)
.execute(conn)?;
diesel::insert_into(metadata_health::table)
.values(&metadata_health_records)
.execute(conn)?;
Ok(())
},
)
@@ -329,10 +351,10 @@ impl Persistence {
self.with_measured_conn(
DatabaseOperation::DeleteTenant,
move |conn| -> DatabaseResult<()> {
// `metadata_health` status (if exists) is also deleted based on the cascade behavior.
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)?;
Ok(())
},
)
@@ -675,6 +697,94 @@ impl Persistence {
)
.await
}
/// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
///
/// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
#[allow(dead_code)]
pub(crate) async fn update_metadata_health_records(
&self,
healthy_records: Vec<MetadataHealthPersistence>,
unhealthy_records: Vec<MetadataHealthPersistence>,
now: chrono::DateTime<chrono::Utc>,
) -> DatabaseResult<()> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::UpdateMetadataHealth,
move |conn| -> DatabaseResult<_> {
diesel::insert_into(metadata_health)
.values(&healthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(true), last_scrubbed_at.eq(now)))
.execute(conn)?;
diesel::insert_into(metadata_health)
.values(&unhealthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(false), last_scrubbed_at.eq(now)))
.execute(conn)?;
Ok(())
},
)
.await
}
/// Lists all the metadata health records.
#[allow(dead_code)]
pub(crate) async fn list_metadata_health_records(
&self,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
self.with_measured_conn(
DatabaseOperation::ListMetadataHealth,
move |conn| -> DatabaseResult<_> {
Ok(
crate::schema::metadata_health::table
.load::<MetadataHealthPersistence>(conn)?,
)
},
)
.await
}
/// Lists all the metadata health records that is unhealthy.
#[allow(dead_code)]
pub(crate) async fn list_unhealthy_metadata_health_records(
&self,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthUnhealthy,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::metadata_health::table
.filter(healthy.eq(false))
.load::<MetadataHealthPersistence>(conn)?)
},
)
.await
}
/// Lists all the metadata health records that have not been updated since an `earlier` time.
#[allow(dead_code)]
pub(crate) async fn list_outdated_metadata_health_records(
&self,
earlier: chrono::DateTime<chrono::Utc>,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthOutdated,
move |conn| -> DatabaseResult<_> {
let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
let res = query.load::<MetadataHealthPersistence>(conn)?;
Ok(res)
},
)
.await
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -744,3 +854,59 @@ pub(crate) struct NodePersistence {
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: i32,
}
/// Tenant metadata health status that are stored durably.
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[diesel(table_name = crate::schema::metadata_health)]
pub(crate) struct MetadataHealthPersistence {
#[serde(default)]
pub(crate) tenant_id: String,
#[serde(default)]
pub(crate) shard_number: i32,
#[serde(default)]
pub(crate) shard_count: i32,
pub(crate) healthy: bool,
pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
}
impl MetadataHealthPersistence {
pub fn new(
tenant_shard_id: TenantShardId,
healthy: bool,
last_scrubbed_at: chrono::DateTime<chrono::Utc>,
) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_number = tenant_shard_id.shard_number.0 as i32;
let shard_count = tenant_shard_id.shard_count.literal() as i32;
MetadataHealthPersistence {
tenant_id,
shard_number,
shard_count,
healthy,
last_scrubbed_at,
}
}
#[allow(dead_code)]
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
impl From<MetadataHealthPersistence> for MetadataHealthRecord {
fn from(value: MetadataHealthPersistence) -> Self {
MetadataHealthRecord {
tenant_shard_id: value
.get_tenant_shard_id()
.expect("stored tenant id should be valid"),
healthy: value.healthy,
last_scrubbed_at: value.last_scrubbed_at,
}
}
}

View File

@@ -1,5 +1,15 @@
// @generated automatically by Diesel CLI.
diesel::table! {
metadata_health (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
healthy -> Bool,
last_scrubbed_at -> Timestamptz,
}
}
diesel::table! {
nodes (node_id) {
node_id -> Int8,
@@ -26,4 +36,4 @@ diesel::table! {
}
}
diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,);
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);

View File

@@ -16,7 +16,7 @@ use crate::{
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
metrics::LeadershipStatusGroup,
persistence::{AbortShardSplitStatus, TenantFilter},
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
@@ -33,11 +33,11 @@ use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse, UtilizationScore,
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
};
@@ -6095,6 +6095,68 @@ impl Service {
Ok(())
}
/// Updates scrubber metadata health check results.
pub(crate) async fn metadata_health_update(
&self,
update_req: MetadataHealthUpdateRequest,
) -> Result<(), ApiError> {
let now = chrono::offset::Utc::now();
let (healthy_records, unhealthy_records) = {
let locked = self.inner.read().unwrap();
let healthy_records = update_req
.healthy_tenant_shards
.into_iter()
// Retain only health records associated with tenant shards managed by storage controller.
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, true, now))
.collect();
let unhealthy_records = update_req
.unhealthy_tenant_shards
.into_iter()
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, false, now))
.collect();
(healthy_records, unhealthy_records)
};
self.persistence
.update_metadata_health_records(healthy_records, unhealthy_records, now)
.await?;
Ok(())
}
/// Lists the tenant shards that has unhealthy metadata status.
pub(crate) async fn metadata_health_list_unhealthy(
&self,
) -> Result<Vec<TenantShardId>, ApiError> {
let result = self
.persistence
.list_unhealthy_metadata_health_records()
.await?
.iter()
.map(|p| p.get_tenant_shard_id().unwrap())
.collect();
Ok(result)
}
/// Lists the tenant shards that have not been scrubbed for some duration.
pub(crate) async fn metadata_health_list_outdated(
&self,
not_scrubbed_for: Duration,
) -> Result<Vec<MetadataHealthRecord>, ApiError> {
let earlier = chrono::offset::Utc::now() - not_scrubbed_for;
let result = self
.persistence
.list_outdated_metadata_health_records(earlier)
.await?
.into_iter()
.map(|record| record.into())
.collect();
Ok(result)
}
pub(crate) fn get_leadership_status(&self) -> LeadershipStatus {
self.inner.read().unwrap().get_leadership_status()
}

View File

@@ -449,6 +449,7 @@ class TokenScope(str, Enum):
GENERATIONS_API = "generations_api"
SAFEKEEPER_DATA = "safekeeperdata"
TENANT = "tenant"
SCRUBBER = "scrubber"
class NeonEnvBuilder:
@@ -2586,6 +2587,51 @@ class NeonStorageController(MetricsGetter, LogUtils):
time.sleep(backoff)
def metadata_health_update(self, healthy: List[TenantShardId], unhealthy: List[TenantShardId]):
body: Dict[str, Any] = {
"healthy_tenant_shards": [str(t) for t in healthy],
"unhealthy_tenant_shards": [str(t) for t in unhealthy],
}
self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/update",
json=body,
headers=self.headers(TokenScope.SCRUBBER),
)
def metadata_health_list_unhealthy(self):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def metadata_health_list_outdated(self, duration: str):
body: Dict[str, Any] = {"not_scrubbed_for": duration}
response = self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def metadata_health_is_healthy(self, outdated_duration: str = "1h") -> bool:
"""Metadata is healthy if there is no unhealthy or outdated health records."""
unhealthy = self.metadata_health_list_unhealthy()
outdated = self.metadata_health_list_outdated(outdated_duration)
healthy = (
len(unhealthy["unhealthy_tenant_shards"]) == 0 and len(outdated["health_records"]) == 0
)
if not healthy:
log.info(f"{unhealthy=}, {outdated=}")
return healthy
def step_down(self):
log.info("Asking storage controller to step down")
response = self.request(

View File

@@ -3,7 +3,7 @@ import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -1785,6 +1785,126 @@ def test_storage_controller_node_deletion(
env.storage_controller.consistency_check()
@pytest.mark.parametrize("shard_count", [None, 2])
def test_storage_controller_metadata_health(
neon_env_builder: NeonEnvBuilder,
shard_count: Optional[int],
):
"""
Create three tenants A, B, C.
Phase 1:
- A: Post healthy status.
- B: Post unhealthy status.
- C: No updates.
Phase 2:
- B: Post healthy status.
- C: Post healthy status.
Phase 3:
- A: Post unhealthy status.
Phase 4:
- Delete tenant A, metadata health status should be deleted as well.
"""
def update_and_query_metadata_health(
env: NeonEnv,
healthy: List[TenantShardId],
unhealthy: List[TenantShardId],
outdated_duration: str = "1h",
) -> Tuple[Set[str], Set[str]]:
"""
Update metadata health. Then list tenant shards with unhealthy and
outdated metadata health status.
"""
if healthy or unhealthy:
env.storage_controller.metadata_health_update(healthy, unhealthy)
result = env.storage_controller.metadata_health_list_unhealthy()
unhealthy_res = set(result["unhealthy_tenant_shards"])
result = env.storage_controller.metadata_health_list_outdated(outdated_duration)
outdated_res = set(record["tenant_shard_id"] for record in result["health_records"])
return unhealthy_res, outdated_res
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
# Mock tenant (`initial_tenant``) with healthy scrubber scan result
tenant_a_shard_ids = (
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(env.initial_tenant, 0, 0)]
)
# Mock tenant with unhealthy scrubber scan result
tenant_b, _ = env.neon_cli.create_tenant(shard_count=shard_count)
tenant_b_shard_ids = (
env.storage_controller.tenant_shard_split(tenant_b, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(tenant_b, 0, 0)]
)
# Mock tenant that never gets a health update from scrubber
tenant_c, _ = env.neon_cli.create_tenant(shard_count=shard_count)
tenant_c_shard_ids = (
env.storage_controller.tenant_shard_split(tenant_c, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(tenant_c, 0, 0)]
)
# Metadata health table also updated as tenant shards are created.
assert env.storage_controller.metadata_health_is_healthy()
# post "fake" updates to storage controller db
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=tenant_a_shard_ids, unhealthy=tenant_b_shard_ids
)
log.info(f"After Phase 1: {unhealthy=}, {outdated=}")
assert len(unhealthy) == len(tenant_b_shard_ids)
for t in tenant_b_shard_ids:
assert str(t) in unhealthy
assert len(outdated) == 0
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=tenant_b_shard_ids + tenant_c_shard_ids, unhealthy=[]
)
log.info(f"After Phase 2: {unhealthy=}, {outdated=}")
assert len(unhealthy) == 0
assert len(outdated) == 0
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=[], unhealthy=tenant_a_shard_ids
)
log.info(f"After Phase 3: {unhealthy=}, {outdated=}")
assert len(unhealthy) == len(tenant_a_shard_ids)
for t in tenant_a_shard_ids:
assert str(t) in unhealthy
assert len(outdated) == 0
# Phase 4: Delete A
env.storage_controller.pageserver_api().tenant_delete(env.initial_tenant)
# A's unhealthy metadata health status should be deleted as well.
assert env.storage_controller.metadata_health_is_healthy()
# All shards from B and C are not fresh if set outdated duration to 0 seconds.
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=[], unhealthy=tenant_a_shard_ids, outdated_duration="0s"
)
assert len(unhealthy) == 0
for t in tenant_b_shard_ids + tenant_c_shard_ids:
assert str(t) in outdated
def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
"""
Test the `/control/v1/step_down` storage controller API. Upon receiving such