storage controller: debug observability endpoints and self-test (#6820)

This PR stacks on https://github.com/neondatabase/neon/pull/6814

Observability:
- Because we only persist a subset of our state, and our external API is
pretty high level, it can be hard to get at the detail of what's going
on internally (e.g. the IntentState of a shard).
- Add debug endpoints for getting a full dump of all TenantState and
SchedulerNode objects
- Enrich the /control/v1/node listing endpoint to include full in-memory
detail of `Node` rather than just the `NodePersistence` subset

Consistency checks:
- The storage controller maintains separate in-memory and on-disk
states, by design. To catch subtle bugs, it is useful to occasionally
cross-check these.
- The Scheduler maintains reference counts for shard->node
relationships, which could drift if there was a bug in IntentState:
exhausively cross check them in tests.
This commit is contained in:
John Spray
2024-02-19 20:29:23 +00:00
committed by GitHub
parent 4f7704af24
commit 0c105ef352
11 changed files with 346 additions and 30 deletions

View File

@@ -333,6 +333,22 @@ async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&req);
state.service.tenants_dump()
}
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&req);
state.service.scheduler_dump()
}
async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&req);
json_response(StatusCode::OK, state.service.consistency_check().await?)
}
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ())
@@ -421,6 +437,13 @@ pub fn make_router(
.post("/debug/v1/node/:node_id/drop", |r| {
request_span(r, handle_node_drop)
})
.get("/debug/v1/tenant", |r| request_span(r, handle_tenants_dump))
.get("/debug/v1/scheduler", |r| {
request_span(r, handle_scheduler_dump)
})
.post("/debug/v1/consistency_check", |r| {
request_span(r, handle_consistency_check)
})
.get("/control/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
})

View File

@@ -12,7 +12,7 @@ mod schema;
pub mod service;
mod tenant_state;
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Debug)]
enum PlacementPolicy {
/// Cheapest way to attach a tenant: just one pageserver, no secondary
Single,
@@ -23,7 +23,7 @@ enum PlacementPolicy {
Detached,
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64);
impl Sequence {

View File

@@ -1,9 +1,16 @@
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use serde::Serialize;
use utils::id::NodeId;
use crate::persistence::NodePersistence;
#[derive(Clone)]
/// Represents the in-memory description of a Node.
///
/// Scheduling statistics are maintened separately in [`crate::scheduler`].
///
/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize, Eq, PartialEq)]
pub(crate) struct Node {
pub(crate) id: NodeId,

View File

@@ -477,7 +477,7 @@ impl Persistence {
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[diesel(table_name = crate::schema::tenant_shards)]
pub(crate) struct TenantShardPersistence {
#[serde(default)]

View File

@@ -1,4 +1,5 @@
use crate::node::Node;
use crate::{node::Node, tenant_state::TenantState};
use serde::Serialize;
use std::collections::HashMap;
use utils::{http::error::ApiError, id::NodeId};
@@ -17,6 +18,7 @@ impl From<ScheduleError> for ApiError {
}
}
#[derive(Serialize, Eq, PartialEq)]
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
shard_count: usize,
@@ -26,6 +28,12 @@ struct SchedulerNode {
may_schedule: bool,
}
/// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
/// on which to run.
///
/// The type has no persistent state of its own: this is all populated at startup. The Serialize
/// impl is only for debug dumps.
#[derive(Serialize)]
pub(crate) struct Scheduler {
nodes: HashMap<NodeId, SchedulerNode>,
}
@@ -48,6 +56,77 @@ impl Scheduler {
}
}
/// For debug/support: check that our internal statistics are in sync with the state of
/// the nodes & tenant shards.
///
/// If anything is inconsistent, log details and return an error.
pub(crate) fn consistency_check<'a>(
&self,
nodes: impl Iterator<Item = &'a Node>,
shards: impl Iterator<Item = &'a TenantState>,
) -> anyhow::Result<()> {
let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
for node in nodes {
expect_nodes.insert(
node.id,
SchedulerNode {
shard_count: 0,
may_schedule: node.may_schedule(),
},
);
}
for shard in shards {
if let Some(node_id) = shard.intent.get_attached() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
node_id
),
}
}
for node_id in shard.intent.get_secondary() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
node_id
),
}
}
}
for (node_id, expect_node) in &expect_nodes {
let Some(self_node) = self.nodes.get(node_id) else {
anyhow::bail!("Node {node_id} not found in Self")
};
if self_node != expect_node {
tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
anyhow::bail!("Inconsistent state on {node_id}");
}
}
if expect_nodes.len() != self.nodes.len() {
// We just checked that all the expected nodes are present. If the lengths don't match,
// it means that we have nodes in Self that are unexpected.
for node_id in self.nodes.keys() {
if !expect_nodes.contains_key(node_id) {
anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
}
}
}
Ok(())
}
/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
@@ -90,6 +169,12 @@ impl Scheduler {
}
}
pub(crate) fn node_remove(&mut self, node_id: NodeId) {
if self.nodes.remove(&node_id).is_none() {
tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
}
}
pub(crate) fn schedule_shard(
&mut self,
hard_exclude: &[NodeId],

View File

@@ -6,6 +6,7 @@ use std::{
time::{Duration, Instant},
};
use anyhow::Context;
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
@@ -44,10 +45,7 @@ use utils::{
use crate::{
compute_hook::{self, ComputeHook},
node::Node,
persistence::{
split_state::SplitState, DatabaseError, NodePersistence, Persistence,
TenantShardPersistence,
},
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
reconciler::attached_location_conf,
scheduler::Scheduler,
tenant_state::{
@@ -505,7 +503,9 @@ impl Service {
// after when pageservers start up and register.
let mut node_ids = HashSet::new();
for tsp in &tenant_shard_persistence {
node_ids.insert(tsp.generation_pageserver);
if tsp.generation_pageserver != i64::MAX {
node_ids.insert(tsp.generation_pageserver);
}
}
for node_id in node_ids {
tracing::info!("Creating node {} in scheduler for tests", node_id);
@@ -1460,6 +1460,11 @@ impl Service {
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
shard.policy
);
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
@@ -1972,6 +1977,104 @@ impl Service {
Ok(())
}
/// For debug/support: a full JSON dump of TenantStates. Returns a response so that
/// we don't have to make TenantState clonable in the return path.
pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
let serialized = {
let locked = self.inner.read().unwrap();
let result = locked.tenants.values().collect::<Vec<_>>();
serde_json::to_string(&result).map_err(|e| ApiError::InternalServerError(e.into()))?
};
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(hyper::Body::from(serialized))
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// Check the consistency of in-memory state vs. persistent state, and check that the
/// scheduler's statistics are up to date.
///
/// These consistency checks expect an **idle** system. If changes are going on while
/// we run, then we can falsely indicate a consistency issue. This is sufficient for end-of-test
/// checks, but not suitable for running continuously in the background in the field.
pub(crate) async fn consistency_check(&self) -> Result<(), ApiError> {
let (mut expect_nodes, mut expect_shards) = {
let locked = self.inner.read().unwrap();
locked
.scheduler
.consistency_check(locked.nodes.values(), locked.tenants.values())
.context("Scheduler checks")
.map_err(ApiError::InternalServerError)?;
let expect_nodes = locked.nodes.values().cloned().collect::<Vec<_>>();
let expect_shards = locked
.tenants
.values()
.map(|t| t.to_persistent())
.collect::<Vec<_>>();
(expect_nodes, expect_shards)
};
let mut nodes = self.persistence.list_nodes().await?;
expect_nodes.sort_by_key(|n| n.id);
nodes.sort_by_key(|n| n.id);
if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
tracing::error!(
"Nodes in memory: {}",
serde_json::to_string(&expect_nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Nodes in database: {}",
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
}
let mut shards = self.persistence.list_tenant_shards().await?;
shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
if shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
serde_json::to_string(&expect_nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
}
Ok(())
}
/// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that
/// we don't have to make TenantState clonable in the return path.
pub(crate) fn scheduler_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
let serialized = {
let locked = self.inner.read().unwrap();
serde_json::to_string(&locked.scheduler)
.map_err(|e| ApiError::InternalServerError(e.into()))?
};
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(hyper::Body::from(serialized))
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// This is for debug/support only: we simply drop all state for a tenant, without
/// detaching or deleting it on pageservers. We do not try and re-schedule any
/// tenants that were on this node.
@@ -1990,19 +2093,21 @@ impl Service {
nodes.remove(&node_id);
locked.nodes = Arc::new(nodes);
locked.scheduler.node_remove(node_id);
Ok(())
}
pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
// It is convenient to avoid taking the big lock and converting Node to a serializable
// structure, by fetching from storage instead of reading in-memory state.
let nodes = self
.persistence
.list_nodes()
.await?
.into_iter()
.map(|n| n.to_persistent())
.collect();
pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> {
let nodes = {
self.inner
.read()
.unwrap()
.nodes
.values()
.cloned()
.collect::<Vec<_>>()
};
Ok(nodes)
}

View File

@@ -1,11 +1,12 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::metrics;
use crate::{metrics, persistence::TenantShardPersistence};
use control_plane::attachment_service::NodeAvailability;
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
};
use serde::Serialize;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
@@ -27,6 +28,20 @@ use crate::{
service, PlacementPolicy, Sequence,
};
/// Serialization helper
fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
T: Clone + std::fmt::Display,
{
serializer.collect_str(&v.lock().unwrap())
}
/// In-memory state for a particular tenant shard.
///
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
#[derive(Serialize)]
pub(crate) struct TenantState {
pub(crate) tenant_shard_id: TenantShardId,
@@ -61,6 +76,7 @@ pub(crate) struct TenantState {
/// If a reconcile task is currently in flight, it may be joined here (it is
/// only safe to join if either the result has been received or the reconciler's
/// cancellation token has been fired)
#[serde(skip)]
pub(crate) reconciler: Option<ReconcilerHandle>,
/// If a tenant is being split, then all shards with that TenantId will have a
@@ -70,16 +86,19 @@ pub(crate) struct TenantState {
/// Optionally wait for reconciliation to complete up to a particular
/// sequence number.
#[serde(skip)]
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// Indicates sequence number for which we have encountered an error reconciling. If
/// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
/// and callers should stop waiting for `waiter` and propagate the error.
#[serde(skip)]
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// The most recent error from a reconcile on this tenant
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
#[serde(serialize_with = "read_mutex_content")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
@@ -89,7 +108,7 @@ pub(crate) struct TenantState {
pub(crate) pending_compute_notification: bool,
}
#[derive(Default, Clone, Debug)]
#[derive(Default, Clone, Debug, Serialize)]
pub(crate) struct IntentState {
attached: Option<NodeId>,
secondary: Vec<NodeId>,
@@ -194,7 +213,7 @@ impl Drop for IntentState {
}
}
#[derive(Default, Clone)]
#[derive(Default, Clone, Serialize)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
}
@@ -208,7 +227,7 @@ pub(crate) struct ObservedState {
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
#[derive(Clone)]
#[derive(Clone, Serialize)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.
@@ -661,4 +680,18 @@ impl TenantState {
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
}
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
TenantShardPersistence {
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
shard_number: self.tenant_shard_id.shard_number.0 as i32,
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
}
}
}

View File

@@ -113,7 +113,7 @@ pub struct TenantShardMigrateRequest {
pub node_id: NodeId,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeAvailability {
// Normal, happy state
Active,
@@ -137,7 +137,7 @@ impl FromStr for NodeAvailability {
/// FIXME: this is a duplicate of the type in the attachment_service crate, because the
/// type needs to be defined with diesel traits in there.
#[derive(Serialize, Deserialize, Clone, Copy)]
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy {
Active,
Filling,

View File

@@ -2100,6 +2100,17 @@ class NeonAttachmentService(MetricsGetter):
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id
def consistency_check(self):
"""
Throw an exception if the service finds any inconsistencies in its state
"""
response = self.request(
"POST",
f"{self.env.attachment_service_api}/debug/v1/consistency_check",
)
response.raise_for_status()
log.info("Attachment service passed consistency check")
def __enter__(self) -> "NeonAttachmentService":
return self

View File

@@ -83,6 +83,8 @@ def test_sharding_smoke(
)
assert timelines == {env.initial_timeline, timeline_b}
env.attachment_service.consistency_check()
def test_sharding_split_unsharded(
neon_env_builder: NeonEnvBuilder,
@@ -113,6 +115,8 @@ def test_sharding_split_unsharded(
workload.validate()
env.attachment_service.consistency_check()
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
@@ -278,3 +282,5 @@ def test_sharding_split_smoke(
)
is None
)
env.attachment_service.consistency_check()

View File

@@ -51,13 +51,13 @@ def test_sharding_service_smoke(
# The pageservers we started should have registered with the sharding service on startup
nodes = env.attachment_service.node_list()
assert len(nodes) == 2
assert set(n["node_id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
assert set(n["id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
# Starting an additional pageserver should register successfully
env.pageservers[2].start()
nodes = env.attachment_service.node_list()
assert len(nodes) == 3
assert set(n["node_id"] for n in nodes) == {ps.id for ps in env.pageservers}
assert set(n["id"] for n in nodes) == {ps.id for ps in env.pageservers}
# Use a multiple of pageservers to get nice even number of shards on each one
tenant_shard_count = len(env.pageservers) * 4
@@ -127,6 +127,8 @@ def test_sharding_service_smoke(
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
env.attachment_service.consistency_check()
def test_node_status_after_restart(
neon_env_builder: NeonEnvBuilder,
@@ -159,6 +161,8 @@ def test_node_status_after_restart(
# should have had its availabilty state set to Active.
env.attachment_service.tenant_create(TenantId.generate())
env.attachment_service.consistency_check()
def test_sharding_service_passthrough(
neon_env_builder: NeonEnvBuilder,
@@ -184,6 +188,8 @@ def test_sharding_service_passthrough(
}
assert status["state"]["slug"] == "Active"
env.attachment_service.consistency_check()
def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
@@ -216,6 +222,8 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
assert tenant_a not in observed
assert tenant_b in observed
env.attachment_service.consistency_check()
def test_sharding_service_onboarding(
neon_env_builder: NeonEnvBuilder,
@@ -318,6 +326,8 @@ def test_sharding_service_onboarding(
dest_ps.stop()
dest_ps.start()
env.attachment_service.consistency_check()
def test_sharding_service_compute_hook(
httpserver: HTTPServer,
@@ -388,6 +398,8 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_restart_notification)
env.attachment_service.consistency_check()
def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
"""
@@ -401,13 +413,47 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
tenant_id = TenantId.generate()
env.attachment_service.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
# Check that the consistency check passes on a freshly setup system
env.attachment_service.consistency_check()
# These APIs are intentionally not implemented as methods on NeonAttachmentService, as
# they're just for use in unanticipated circumstances.
env.attachment_service.request(
# Initial tenant (1 shard) and the one we just created (2 shards) should be visible
response = env.attachment_service.request(
"GET", f"{env.attachment_service_api}/debug/v1/tenant"
)
response.raise_for_status()
assert len(response.json()) == 3
# Scheduler should report the expected nodes and shard counts
response = env.attachment_service.request(
"GET", f"{env.attachment_service_api}/debug/v1/scheduler"
)
response.raise_for_status()
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
response = env.attachment_service.request(
"POST", f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop"
)
response.raise_for_status()
assert len(env.attachment_service.node_list()) == 1
env.attachment_service.request(
response = env.attachment_service.request(
"POST", f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop"
)
response.raise_for_status()
# Tenant drop should be reflected in dump output
response = env.attachment_service.request(
"GET", f"{env.attachment_service_api}/debug/v1/tenant"
)
response.raise_for_status()
assert len(response.json()) == 1
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
env.attachment_service.consistency_check()