From e374d6778ed4da3e7437975b77ef69fdfea80470 Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Tue, 30 Jul 2024 09:32:00 -0400 Subject: [PATCH] feat(storcon): store scrubber metadata scan result (#8480) Part of #8128, followed by #8502. ## Problem Currently we lack mechanism to alert unhealthy `scan_metadata` status if we start running this scrubber command as part of a cronjob. With the storage controller client introduced to storage scrubber in #8196, it is viable to set up alert by storing health status in the storage controller database. We intentionally do not store the full output to the database as the json blobs potentially makes the table really huge. Instead, only a health status and a timestamp recording the last time metadata health status is posted on a tenant shard. Signed-off-by: Yuchen Liang --- Cargo.lock | 2 + libs/pageserver_api/src/controller_api.rs | 38 +++- libs/utils/src/auth.rs | 16 +- storage_controller/Cargo.toml | 9 +- .../down.sql | 1 + .../up.sql | 14 ++ storage_controller/src/http.rs | 73 ++++++- storage_controller/src/persistence.rs | 180 +++++++++++++++++- storage_controller/src/schema.rs | 12 +- storage_controller/src/service.rs | 74 ++++++- test_runner/fixtures/neon_fixtures.py | 46 +++++ .../regress/test_storage_controller.py | 122 +++++++++++- 12 files changed, 560 insertions(+), 27 deletions(-) create mode 100644 storage_controller/migrations/2024-07-23-191537_create_metadata_health/down.sql create mode 100644 storage_controller/migrations/2024-07-23-191537_create_metadata_health/up.sql diff --git a/Cargo.lock b/Cargo.lock index 2b56095bc8..2186d55e9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,6 +1672,7 @@ checksum = "62d6dcd069e7b5fe49a302411f759d4cf1cf2c27fe798ef46fb8baefc053dd2b" dependencies = [ "bitflags 2.4.1", "byteorder", + "chrono", "diesel_derives", "itoa", "pq-sys", @@ -5718,6 +5719,7 @@ dependencies = [ "aws-config", "bytes", "camino", + "chrono", "clap", "control_plane", "diesel", diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 474f796040..36b1bd95ff 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,5 +1,5 @@ use std::str::FromStr; -use std::time::Instant; +use std::time::{Duration, Instant}; /// Request/response types for the storage controller /// API (`/control/v1` prefix). Implemented by the server @@ -294,6 +294,42 @@ pub enum PlacementPolicy { #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateResponse {} +/// Metadata health record posted from scrubber. +#[derive(Serialize, Deserialize, Debug)] +pub struct MetadataHealthRecord { + pub tenant_shard_id: TenantShardId, + pub healthy: bool, + pub last_scrubbed_at: chrono::DateTime, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetadataHealthUpdateRequest { + pub healthy_tenant_shards: Vec, + pub unhealthy_tenant_shards: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetadataHealthUpdateResponse {} + +#[derive(Serialize, Deserialize, Debug)] + +pub struct MetadataHealthListUnhealthyResponse { + pub unhealthy_tenant_shards: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] + +pub struct MetadataHealthListOutdatedRequest { + #[serde(with = "humantime_serde")] + pub not_scrubbed_for: Duration, +} + +#[derive(Serialize, Deserialize, Debug)] + +pub struct MetadataHealthListOutdatedResponse { + pub health_records: Vec, +} + #[cfg(test)] mod test { use super::*; diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index a1170a460d..7b735875b7 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -18,20 +18,20 @@ const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA; #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[serde(rename_all = "lowercase")] pub enum Scope { - // Provides access to all data for a specific tenant (specified in `struct Claims` below) + /// Provides access to all data for a specific tenant (specified in `struct Claims` below) // TODO: join these two? Tenant, - // Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs. - // Should only be used e.g. for status check/tenant creation/list. + /// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs. + /// Should only be used e.g. for status check/tenant creation/list. PageServerApi, - // Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs. - // Should only be used e.g. for status check. - // Currently also used for connection from any pageserver to any safekeeper. + /// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs. + /// Should only be used e.g. for status check. + /// Currently also used for connection from any pageserver to any safekeeper. SafekeeperData, - // The scope used by pageservers in upcalls to storage controller and cloud control plane + /// The scope used by pageservers in upcalls to storage controller and cloud control plane #[serde(rename = "generations_api")] GenerationsApi, - // Allows access to control plane managment API and some storage controller endpoints. + /// Allows access to control plane managment API and some storage controller endpoints. Admin, /// Allows access to storage controller APIs used by the scrubber, to interrogate the state diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index b54dea5d47..d14b235046 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -18,6 +18,7 @@ anyhow.workspace = true aws-config.workspace = true bytes.workspace = true camino.workspace = true +chrono.workspace = true clap.workspace = true fail.workspace = true futures.workspace = true @@ -44,7 +45,12 @@ scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true -diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] } +diesel = { version = "2.1.4", features = [ + "serde_json", + "postgres", + "r2d2", + "chrono", +] } diesel_migrations = { version = "2.1.0" } r2d2 = { version = "0.8.10" } @@ -52,4 +58,3 @@ utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } workspace_hack = { version = "0.1", path = "../workspace_hack" } - diff --git a/storage_controller/migrations/2024-07-23-191537_create_metadata_health/down.sql b/storage_controller/migrations/2024-07-23-191537_create_metadata_health/down.sql new file mode 100644 index 0000000000..1ecfc8786f --- /dev/null +++ b/storage_controller/migrations/2024-07-23-191537_create_metadata_health/down.sql @@ -0,0 +1 @@ +DROP TABLE metadata_health; \ No newline at end of file diff --git a/storage_controller/migrations/2024-07-23-191537_create_metadata_health/up.sql b/storage_controller/migrations/2024-07-23-191537_create_metadata_health/up.sql new file mode 100644 index 0000000000..fa87eda119 --- /dev/null +++ b/storage_controller/migrations/2024-07-23-191537_create_metadata_health/up.sql @@ -0,0 +1,14 @@ +CREATE TABLE metadata_health ( + tenant_id VARCHAR NOT NULL, + shard_number INTEGER NOT NULL, + shard_count INTEGER NOT NULL, + PRIMARY KEY(tenant_id, shard_number, shard_count), + -- Rely on cascade behavior for delete + FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE, + healthy BOOLEAN NOT NULL DEFAULT TRUE, + last_scrubbed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + + +INSERT INTO metadata_health(tenant_id, shard_number, shard_count) +SELECT tenant_id, shard_number, shard_count FROM tenant_shards; diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index c77918827f..e8513b31eb 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -10,7 +10,11 @@ use hyper::header::CONTENT_TYPE; use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; use metrics::{BuildInfo, NeonMetrics}; -use pageserver_api::controller_api::TenantCreateRequest; +use pageserver_api::controller_api::{ + MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse, + MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse, + TenantCreateRequest, +}; use pageserver_api::models::{ TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest, TimelineCreateRequest, @@ -560,6 +564,51 @@ async fn handle_cancel_node_fill(req: Request) -> Result, A json_response(StatusCode::ACCEPTED, ()) } +async fn handle_metadata_health_update(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Scrubber)?; + + let update_req = json_request::(&mut req).await?; + let state = get_state(&req); + + state.service.metadata_health_update(update_req).await?; + + json_response(StatusCode::OK, MetadataHealthUpdateResponse {}) +} + +async fn handle_metadata_health_list_unhealthy( + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?; + + json_response( + StatusCode::OK, + MetadataHealthListUnhealthyResponse { + unhealthy_tenant_shards, + }, + ) +} + +async fn handle_metadata_health_list_outdated( + mut req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let list_outdated_req = json_request::(&mut req).await?; + let state = get_state(&req); + let health_records = state + .service + .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for) + .await?; + + json_response( + StatusCode::OK, + MetadataHealthListOutdatedResponse { health_records }, + ) +} + async fn handle_tenant_shard_split( service: Arc, mut req: Request, @@ -987,6 +1036,28 @@ pub fn make_router( RequestName("control_v1_cancel_node_fill"), ) }) + // Metadata health operations + .post("/control/v1/metadata_health/update", |r| { + named_request_span( + r, + handle_metadata_health_update, + RequestName("control_v1_metadata_health_update"), + ) + }) + .get("/control/v1/metadata_health/unhealthy", |r| { + named_request_span( + r, + handle_metadata_health_list_unhealthy, + RequestName("control_v1_metadata_health_list_unhealthy"), + ) + }) + .post("/control/v1/metadata_health/outdated", |r| { + named_request_span( + r, + handle_metadata_health_list_outdated, + RequestName("control_v1_metadata_health_list_outdated"), + ) + }) // TODO(vlad): endpoint for cancelling drain and fill // Tenant Shard operations .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index d8f31e86e5..64a3e597ce 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -8,6 +8,7 @@ use self::split_state::SplitState; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; +use pageserver_api::controller_api::MetadataHealthRecord; use pageserver_api::controller_api::ShardSchedulingPolicy; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; use pageserver_api::models::TenantConfig; @@ -90,6 +91,10 @@ pub(crate) enum DatabaseOperation { UpdateTenantShard, DeleteTenant, UpdateTenantConfig, + UpdateMetadataHealth, + ListMetadataHealth, + ListMetadataHealthUnhealthy, + ListMetadataHealthOutdated, } #[must_use] @@ -307,15 +312,32 @@ impl Persistence { &self, shards: Vec, ) -> DatabaseResult<()> { - use crate::schema::tenant_shards::dsl::*; + use crate::schema::metadata_health; + use crate::schema::tenant_shards; + + let now = chrono::Utc::now(); + + let metadata_health_records = shards + .iter() + .map(|t| MetadataHealthPersistence { + tenant_id: t.tenant_id.clone(), + shard_number: t.shard_number, + shard_count: t.shard_count, + healthy: true, + last_scrubbed_at: now, + }) + .collect::>(); + self.with_measured_conn( DatabaseOperation::InsertTenantShards, move |conn| -> DatabaseResult<()> { - for tenant in &shards { - diesel::insert_into(tenant_shards) - .values(tenant) - .execute(conn)?; - } + diesel::insert_into(tenant_shards::table) + .values(&shards) + .execute(conn)?; + + diesel::insert_into(metadata_health::table) + .values(&metadata_health_records) + .execute(conn)?; Ok(()) }, ) @@ -329,10 +351,10 @@ impl Persistence { self.with_measured_conn( DatabaseOperation::DeleteTenant, move |conn| -> DatabaseResult<()> { + // `metadata_health` status (if exists) is also deleted based on the cascade behavior. diesel::delete(tenant_shards) .filter(tenant_id.eq(del_tenant_id.to_string())) .execute(conn)?; - Ok(()) }, ) @@ -675,6 +697,94 @@ impl Persistence { ) .await } + + /// Stores all the latest metadata health updates durably. Updates existing entry on conflict. + /// + /// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller. + #[allow(dead_code)] + pub(crate) async fn update_metadata_health_records( + &self, + healthy_records: Vec, + unhealthy_records: Vec, + now: chrono::DateTime, + ) -> DatabaseResult<()> { + use crate::schema::metadata_health::dsl::*; + + self.with_measured_conn( + DatabaseOperation::UpdateMetadataHealth, + move |conn| -> DatabaseResult<_> { + diesel::insert_into(metadata_health) + .values(&healthy_records) + .on_conflict((tenant_id, shard_number, shard_count)) + .do_update() + .set((healthy.eq(true), last_scrubbed_at.eq(now))) + .execute(conn)?; + + diesel::insert_into(metadata_health) + .values(&unhealthy_records) + .on_conflict((tenant_id, shard_number, shard_count)) + .do_update() + .set((healthy.eq(false), last_scrubbed_at.eq(now))) + .execute(conn)?; + Ok(()) + }, + ) + .await + } + + /// Lists all the metadata health records. + #[allow(dead_code)] + pub(crate) async fn list_metadata_health_records( + &self, + ) -> DatabaseResult> { + self.with_measured_conn( + DatabaseOperation::ListMetadataHealth, + move |conn| -> DatabaseResult<_> { + Ok( + crate::schema::metadata_health::table + .load::(conn)?, + ) + }, + ) + .await + } + + /// Lists all the metadata health records that is unhealthy. + #[allow(dead_code)] + pub(crate) async fn list_unhealthy_metadata_health_records( + &self, + ) -> DatabaseResult> { + use crate::schema::metadata_health::dsl::*; + self.with_measured_conn( + DatabaseOperation::ListMetadataHealthUnhealthy, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::metadata_health::table + .filter(healthy.eq(false)) + .load::(conn)?) + }, + ) + .await + } + + /// Lists all the metadata health records that have not been updated since an `earlier` time. + #[allow(dead_code)] + pub(crate) async fn list_outdated_metadata_health_records( + &self, + earlier: chrono::DateTime, + ) -> DatabaseResult> { + use crate::schema::metadata_health::dsl::*; + + self.with_measured_conn( + DatabaseOperation::ListMetadataHealthOutdated, + move |conn| -> DatabaseResult<_> { + let query = metadata_health.filter(last_scrubbed_at.lt(earlier)); + let res = query.load::(conn)?; + + Ok(res) + }, + ) + .await + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably @@ -744,3 +854,59 @@ pub(crate) struct NodePersistence { pub(crate) listen_pg_addr: String, pub(crate) listen_pg_port: i32, } + +/// Tenant metadata health status that are stored durably. +#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[diesel(table_name = crate::schema::metadata_health)] +pub(crate) struct MetadataHealthPersistence { + #[serde(default)] + pub(crate) tenant_id: String, + #[serde(default)] + pub(crate) shard_number: i32, + #[serde(default)] + pub(crate) shard_count: i32, + + pub(crate) healthy: bool, + pub(crate) last_scrubbed_at: chrono::DateTime, +} + +impl MetadataHealthPersistence { + pub fn new( + tenant_shard_id: TenantShardId, + healthy: bool, + last_scrubbed_at: chrono::DateTime, + ) -> Self { + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_number = tenant_shard_id.shard_number.0 as i32; + let shard_count = tenant_shard_id.shard_count.literal() as i32; + + MetadataHealthPersistence { + tenant_id, + shard_number, + shard_count, + healthy, + last_scrubbed_at, + } + } + + #[allow(dead_code)] + pub(crate) fn get_tenant_shard_id(&self) -> Result { + Ok(TenantShardId { + tenant_id: TenantId::from_str(self.tenant_id.as_str())?, + shard_number: ShardNumber(self.shard_number as u8), + shard_count: ShardCount::new(self.shard_count as u8), + }) + } +} + +impl From for MetadataHealthRecord { + fn from(value: MetadataHealthPersistence) -> Self { + MetadataHealthRecord { + tenant_shard_id: value + .get_tenant_shard_id() + .expect("stored tenant id should be valid"), + healthy: value.healthy, + last_scrubbed_at: value.last_scrubbed_at, + } + } +} diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index ff37d0fe77..cb5ba3f38b 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -1,5 +1,15 @@ // @generated automatically by Diesel CLI. +diesel::table! { + metadata_health (tenant_id, shard_number, shard_count) { + tenant_id -> Varchar, + shard_number -> Int4, + shard_count -> Int4, + healthy -> Bool, + last_scrubbed_at -> Timestamptz, + } +} + diesel::table! { nodes (node_id) { node_id -> Int8, @@ -26,4 +36,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,); +diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 821f45d0c0..ea515f67da 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -16,7 +16,7 @@ use crate::{ compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard}, metrics::LeadershipStatusGroup, - persistence::{AbortShardSplitStatus, TenantFilter}, + persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ @@ -33,11 +33,11 @@ use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, - ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, - TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard, - TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest, - TenantShardMigrateResponse, UtilizationScore, + MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest, + NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest, + TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, + TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, + TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore, }, models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest}, }; @@ -6095,6 +6095,68 @@ impl Service { Ok(()) } + /// Updates scrubber metadata health check results. + pub(crate) async fn metadata_health_update( + &self, + update_req: MetadataHealthUpdateRequest, + ) -> Result<(), ApiError> { + let now = chrono::offset::Utc::now(); + let (healthy_records, unhealthy_records) = { + let locked = self.inner.read().unwrap(); + let healthy_records = update_req + .healthy_tenant_shards + .into_iter() + // Retain only health records associated with tenant shards managed by storage controller. + .filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id)) + .map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, true, now)) + .collect(); + let unhealthy_records = update_req + .unhealthy_tenant_shards + .into_iter() + .filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id)) + .map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, false, now)) + .collect(); + + (healthy_records, unhealthy_records) + }; + + self.persistence + .update_metadata_health_records(healthy_records, unhealthy_records, now) + .await?; + Ok(()) + } + + /// Lists the tenant shards that has unhealthy metadata status. + pub(crate) async fn metadata_health_list_unhealthy( + &self, + ) -> Result, ApiError> { + let result = self + .persistence + .list_unhealthy_metadata_health_records() + .await? + .iter() + .map(|p| p.get_tenant_shard_id().unwrap()) + .collect(); + + Ok(result) + } + + /// Lists the tenant shards that have not been scrubbed for some duration. + pub(crate) async fn metadata_health_list_outdated( + &self, + not_scrubbed_for: Duration, + ) -> Result, ApiError> { + let earlier = chrono::offset::Utc::now() - not_scrubbed_for; + let result = self + .persistence + .list_outdated_metadata_health_records(earlier) + .await? + .into_iter() + .map(|record| record.into()) + .collect(); + Ok(result) + } + pub(crate) fn get_leadership_status(&self) -> LeadershipStatus { self.inner.read().unwrap().get_leadership_status() } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c5fffc2af6..5b2ebea794 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -449,6 +449,7 @@ class TokenScope(str, Enum): GENERATIONS_API = "generations_api" SAFEKEEPER_DATA = "safekeeperdata" TENANT = "tenant" + SCRUBBER = "scrubber" class NeonEnvBuilder: @@ -2586,6 +2587,51 @@ class NeonStorageController(MetricsGetter, LogUtils): time.sleep(backoff) + def metadata_health_update(self, healthy: List[TenantShardId], unhealthy: List[TenantShardId]): + body: Dict[str, Any] = { + "healthy_tenant_shards": [str(t) for t in healthy], + "unhealthy_tenant_shards": [str(t) for t in unhealthy], + } + + self.request( + "POST", + f"{self.env.storage_controller_api}/control/v1/metadata_health/update", + json=body, + headers=self.headers(TokenScope.SCRUBBER), + ) + + def metadata_health_list_unhealthy(self): + response = self.request( + "GET", + f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + + def metadata_health_list_outdated(self, duration: str): + body: Dict[str, Any] = {"not_scrubbed_for": duration} + + response = self.request( + "POST", + f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated", + json=body, + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + + def metadata_health_is_healthy(self, outdated_duration: str = "1h") -> bool: + """Metadata is healthy if there is no unhealthy or outdated health records.""" + + unhealthy = self.metadata_health_list_unhealthy() + outdated = self.metadata_health_list_outdated(outdated_duration) + + healthy = ( + len(unhealthy["unhealthy_tenant_shards"]) == 0 and len(outdated["health_records"]) == 0 + ) + if not healthy: + log.info(f"{unhealthy=}, {outdated=}") + return healthy + def step_down(self): log.info("Asking storage controller to step down") response = self.request( diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index da638ac233..eb2cdccdb9 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3,7 +3,7 @@ import threading import time from collections import defaultdict from datetime import datetime, timezone -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import pytest from fixtures.common_types import TenantId, TenantShardId, TimelineId @@ -1785,6 +1785,126 @@ def test_storage_controller_node_deletion( env.storage_controller.consistency_check() +@pytest.mark.parametrize("shard_count", [None, 2]) +def test_storage_controller_metadata_health( + neon_env_builder: NeonEnvBuilder, + shard_count: Optional[int], +): + """ + Create three tenants A, B, C. + + Phase 1: + - A: Post healthy status. + - B: Post unhealthy status. + - C: No updates. + + Phase 2: + - B: Post healthy status. + - C: Post healthy status. + + Phase 3: + - A: Post unhealthy status. + + Phase 4: + - Delete tenant A, metadata health status should be deleted as well. + """ + + def update_and_query_metadata_health( + env: NeonEnv, + healthy: List[TenantShardId], + unhealthy: List[TenantShardId], + outdated_duration: str = "1h", + ) -> Tuple[Set[str], Set[str]]: + """ + Update metadata health. Then list tenant shards with unhealthy and + outdated metadata health status. + """ + if healthy or unhealthy: + env.storage_controller.metadata_health_update(healthy, unhealthy) + result = env.storage_controller.metadata_health_list_unhealthy() + unhealthy_res = set(result["unhealthy_tenant_shards"]) + result = env.storage_controller.metadata_health_list_outdated(outdated_duration) + outdated_res = set(record["tenant_shard_id"] for record in result["health_records"]) + + return unhealthy_res, outdated_res + + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_start() + + # Mock tenant (`initial_tenant``) with healthy scrubber scan result + tenant_a_shard_ids = ( + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=shard_count) + if shard_count is not None + else [TenantShardId(env.initial_tenant, 0, 0)] + ) + + # Mock tenant with unhealthy scrubber scan result + tenant_b, _ = env.neon_cli.create_tenant(shard_count=shard_count) + tenant_b_shard_ids = ( + env.storage_controller.tenant_shard_split(tenant_b, shard_count=shard_count) + if shard_count is not None + else [TenantShardId(tenant_b, 0, 0)] + ) + + # Mock tenant that never gets a health update from scrubber + tenant_c, _ = env.neon_cli.create_tenant(shard_count=shard_count) + + tenant_c_shard_ids = ( + env.storage_controller.tenant_shard_split(tenant_c, shard_count=shard_count) + if shard_count is not None + else [TenantShardId(tenant_c, 0, 0)] + ) + + # Metadata health table also updated as tenant shards are created. + assert env.storage_controller.metadata_health_is_healthy() + + # post "fake" updates to storage controller db + + unhealthy, outdated = update_and_query_metadata_health( + env, healthy=tenant_a_shard_ids, unhealthy=tenant_b_shard_ids + ) + + log.info(f"After Phase 1: {unhealthy=}, {outdated=}") + assert len(unhealthy) == len(tenant_b_shard_ids) + for t in tenant_b_shard_ids: + assert str(t) in unhealthy + assert len(outdated) == 0 + + unhealthy, outdated = update_and_query_metadata_health( + env, healthy=tenant_b_shard_ids + tenant_c_shard_ids, unhealthy=[] + ) + + log.info(f"After Phase 2: {unhealthy=}, {outdated=}") + assert len(unhealthy) == 0 + assert len(outdated) == 0 + + unhealthy, outdated = update_and_query_metadata_health( + env, healthy=[], unhealthy=tenant_a_shard_ids + ) + + log.info(f"After Phase 3: {unhealthy=}, {outdated=}") + assert len(unhealthy) == len(tenant_a_shard_ids) + for t in tenant_a_shard_ids: + assert str(t) in unhealthy + assert len(outdated) == 0 + + # Phase 4: Delete A + env.storage_controller.pageserver_api().tenant_delete(env.initial_tenant) + + # A's unhealthy metadata health status should be deleted as well. + assert env.storage_controller.metadata_health_is_healthy() + + # All shards from B and C are not fresh if set outdated duration to 0 seconds. + unhealthy, outdated = update_and_query_metadata_health( + env, healthy=[], unhealthy=tenant_a_shard_ids, outdated_duration="0s" + ) + assert len(unhealthy) == 0 + for t in tenant_b_shard_ids + tenant_c_shard_ids: + assert str(t) in outdated + + def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder): """ Test the `/control/v1/step_down` storage controller API. Upon receiving such