diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index 38785d3a98..d6c8fa084b 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -333,6 +333,22 @@ async fn handle_tenant_drop(req: Request
) -> Result, ApiErr
json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
}
+async fn handle_tenants_dump(req: Request) -> Result, ApiError> {
+ let state = get_state(&req);
+ state.service.tenants_dump()
+}
+
+async fn handle_scheduler_dump(req: Request) -> Result, ApiError> {
+ let state = get_state(&req);
+ state.service.scheduler_dump()
+}
+
+async fn handle_consistency_check(req: Request) -> Result, ApiError> {
+ let state = get_state(&req);
+
+ json_response(StatusCode::OK, state.service.consistency_check().await?)
+}
+
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request) -> Result, ApiError> {
json_response(StatusCode::OK, ())
@@ -421,6 +437,13 @@ pub fn make_router(
.post("/debug/v1/node/:node_id/drop", |r| {
request_span(r, handle_node_drop)
})
+ .get("/debug/v1/tenant", |r| request_span(r, handle_tenants_dump))
+ .get("/debug/v1/scheduler", |r| {
+ request_span(r, handle_scheduler_dump)
+ })
+ .post("/debug/v1/consistency_check", |r| {
+ request_span(r, handle_consistency_check)
+ })
.get("/control/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
})
diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs
index 1a2b001392..e950a57e57 100644
--- a/control_plane/attachment_service/src/lib.rs
+++ b/control_plane/attachment_service/src/lib.rs
@@ -12,7 +12,7 @@ mod schema;
pub mod service;
mod tenant_state;
-#[derive(Clone, Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize, Debug)]
enum PlacementPolicy {
/// Cheapest way to attach a tenant: just one pageserver, no secondary
Single,
@@ -23,7 +23,7 @@ enum PlacementPolicy {
Detached,
}
-#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
+#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64);
impl Sequence {
diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs
index 47f61702d8..59784249d7 100644
--- a/control_plane/attachment_service/src/node.rs
+++ b/control_plane/attachment_service/src/node.rs
@@ -1,9 +1,16 @@
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
+use serde::Serialize;
use utils::id::NodeId;
use crate::persistence::NodePersistence;
-#[derive(Clone)]
+/// Represents the in-memory description of a Node.
+///
+/// Scheduling statistics are maintened separately in [`crate::scheduler`].
+///
+/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
+/// implementation of serialization on this type is only for debug dumps.
+#[derive(Clone, Serialize, Eq, PartialEq)]
pub(crate) struct Node {
pub(crate) id: NodeId,
diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs
index c5829cae88..2d0c8a9d15 100644
--- a/control_plane/attachment_service/src/persistence.rs
+++ b/control_plane/attachment_service/src/persistence.rs
@@ -477,7 +477,7 @@ impl Persistence {
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
-#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
+#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[diesel(table_name = crate::schema::tenant_shards)]
pub(crate) struct TenantShardPersistence {
#[serde(default)]
diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs
index 7a99118312..39d8d0a260 100644
--- a/control_plane/attachment_service/src/scheduler.rs
+++ b/control_plane/attachment_service/src/scheduler.rs
@@ -1,4 +1,5 @@
-use crate::node::Node;
+use crate::{node::Node, tenant_state::TenantState};
+use serde::Serialize;
use std::collections::HashMap;
use utils::{http::error::ApiError, id::NodeId};
@@ -17,6 +18,7 @@ impl From for ApiError {
}
}
+#[derive(Serialize, Eq, PartialEq)]
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
shard_count: usize,
@@ -26,6 +28,12 @@ struct SchedulerNode {
may_schedule: bool,
}
+/// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
+/// on which to run.
+///
+/// The type has no persistent state of its own: this is all populated at startup. The Serialize
+/// impl is only for debug dumps.
+#[derive(Serialize)]
pub(crate) struct Scheduler {
nodes: HashMap,
}
@@ -48,6 +56,77 @@ impl Scheduler {
}
}
+ /// For debug/support: check that our internal statistics are in sync with the state of
+ /// the nodes & tenant shards.
+ ///
+ /// If anything is inconsistent, log details and return an error.
+ pub(crate) fn consistency_check<'a>(
+ &self,
+ nodes: impl Iterator- ,
+ shards: impl Iterator
- ,
+ ) -> anyhow::Result<()> {
+ let mut expect_nodes: HashMap = HashMap::new();
+ for node in nodes {
+ expect_nodes.insert(
+ node.id,
+ SchedulerNode {
+ shard_count: 0,
+ may_schedule: node.may_schedule(),
+ },
+ );
+ }
+
+ for shard in shards {
+ if let Some(node_id) = shard.intent.get_attached() {
+ match expect_nodes.get_mut(node_id) {
+ Some(node) => node.shard_count += 1,
+ None => anyhow::bail!(
+ "Tenant {} references nonexistent node {}",
+ shard.tenant_shard_id,
+ node_id
+ ),
+ }
+ }
+
+ for node_id in shard.intent.get_secondary() {
+ match expect_nodes.get_mut(node_id) {
+ Some(node) => node.shard_count += 1,
+ None => anyhow::bail!(
+ "Tenant {} references nonexistent node {}",
+ shard.tenant_shard_id,
+ node_id
+ ),
+ }
+ }
+ }
+
+ for (node_id, expect_node) in &expect_nodes {
+ let Some(self_node) = self.nodes.get(node_id) else {
+ anyhow::bail!("Node {node_id} not found in Self")
+ };
+
+ if self_node != expect_node {
+ tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
+ tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
+ tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
+
+ anyhow::bail!("Inconsistent state on {node_id}");
+ }
+ }
+
+ if expect_nodes.len() != self.nodes.len() {
+ // We just checked that all the expected nodes are present. If the lengths don't match,
+ // it means that we have nodes in Self that are unexpected.
+ for node_id in self.nodes.keys() {
+ if !expect_nodes.contains_key(node_id) {
+ anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
+ }
+ }
+ }
+
+ Ok(())
+ }
+
/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
@@ -90,6 +169,12 @@ impl Scheduler {
}
}
+ pub(crate) fn node_remove(&mut self, node_id: NodeId) {
+ if self.nodes.remove(&node_id).is_none() {
+ tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
+ }
+ }
+
pub(crate) fn schedule_shard(
&mut self,
hard_exclude: &[NodeId],
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index b1e66ebdad..0fe758e731 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -6,6 +6,7 @@ use std::{
time::{Duration, Instant},
};
+use anyhow::Context;
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
@@ -44,10 +45,7 @@ use utils::{
use crate::{
compute_hook::{self, ComputeHook},
node::Node,
- persistence::{
- split_state::SplitState, DatabaseError, NodePersistence, Persistence,
- TenantShardPersistence,
- },
+ persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
reconciler::attached_location_conf,
scheduler::Scheduler,
tenant_state::{
@@ -505,7 +503,9 @@ impl Service {
// after when pageservers start up and register.
let mut node_ids = HashSet::new();
for tsp in &tenant_shard_persistence {
- node_ids.insert(tsp.generation_pageserver);
+ if tsp.generation_pageserver != i64::MAX {
+ node_ids.insert(tsp.generation_pageserver);
+ }
}
for node_id in node_ids {
tracing::info!("Creating node {} in scheduler for tests", node_id);
@@ -1460,6 +1460,11 @@ impl Service {
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.get_attached() else {
+ tracing::warn!(
+ tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
+ "Shard not scheduled (policy {:?}), cannot generate pass-through URL",
+ shard.policy
+ );
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
@@ -1972,6 +1977,104 @@ impl Service {
Ok(())
}
+ /// For debug/support: a full JSON dump of TenantStates. Returns a response so that
+ /// we don't have to make TenantState clonable in the return path.
+ pub(crate) fn tenants_dump(&self) -> Result, ApiError> {
+ let serialized = {
+ let locked = self.inner.read().unwrap();
+ let result = locked.tenants.values().collect::>();
+ serde_json::to_string(&result).map_err(|e| ApiError::InternalServerError(e.into()))?
+ };
+
+ hyper::Response::builder()
+ .status(hyper::StatusCode::OK)
+ .header(hyper::header::CONTENT_TYPE, "application/json")
+ .body(hyper::Body::from(serialized))
+ .map_err(|e| ApiError::InternalServerError(e.into()))
+ }
+
+ /// Check the consistency of in-memory state vs. persistent state, and check that the
+ /// scheduler's statistics are up to date.
+ ///
+ /// These consistency checks expect an **idle** system. If changes are going on while
+ /// we run, then we can falsely indicate a consistency issue. This is sufficient for end-of-test
+ /// checks, but not suitable for running continuously in the background in the field.
+ pub(crate) async fn consistency_check(&self) -> Result<(), ApiError> {
+ let (mut expect_nodes, mut expect_shards) = {
+ let locked = self.inner.read().unwrap();
+
+ locked
+ .scheduler
+ .consistency_check(locked.nodes.values(), locked.tenants.values())
+ .context("Scheduler checks")
+ .map_err(ApiError::InternalServerError)?;
+
+ let expect_nodes = locked.nodes.values().cloned().collect::>();
+
+ let expect_shards = locked
+ .tenants
+ .values()
+ .map(|t| t.to_persistent())
+ .collect::>();
+
+ (expect_nodes, expect_shards)
+ };
+
+ let mut nodes = self.persistence.list_nodes().await?;
+ expect_nodes.sort_by_key(|n| n.id);
+ nodes.sort_by_key(|n| n.id);
+
+ if nodes != expect_nodes {
+ tracing::error!("Consistency check failed on nodes.");
+ tracing::error!(
+ "Nodes in memory: {}",
+ serde_json::to_string(&expect_nodes)
+ .map_err(|e| ApiError::InternalServerError(e.into()))?
+ );
+ tracing::error!(
+ "Nodes in database: {}",
+ serde_json::to_string(&nodes)
+ .map_err(|e| ApiError::InternalServerError(e.into()))?
+ );
+ }
+
+ let mut shards = self.persistence.list_tenant_shards().await?;
+ shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
+ expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
+
+ if shards != expect_shards {
+ tracing::error!("Consistency check failed on shards.");
+ tracing::error!(
+ "Shards in memory: {}",
+ serde_json::to_string(&expect_nodes)
+ .map_err(|e| ApiError::InternalServerError(e.into()))?
+ );
+ tracing::error!(
+ "Shards in database: {}",
+ serde_json::to_string(&nodes)
+ .map_err(|e| ApiError::InternalServerError(e.into()))?
+ );
+ }
+
+ Ok(())
+ }
+
+ /// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that
+ /// we don't have to make TenantState clonable in the return path.
+ pub(crate) fn scheduler_dump(&self) -> Result, ApiError> {
+ let serialized = {
+ let locked = self.inner.read().unwrap();
+ serde_json::to_string(&locked.scheduler)
+ .map_err(|e| ApiError::InternalServerError(e.into()))?
+ };
+
+ hyper::Response::builder()
+ .status(hyper::StatusCode::OK)
+ .header(hyper::header::CONTENT_TYPE, "application/json")
+ .body(hyper::Body::from(serialized))
+ .map_err(|e| ApiError::InternalServerError(e.into()))
+ }
+
/// This is for debug/support only: we simply drop all state for a tenant, without
/// detaching or deleting it on pageservers. We do not try and re-schedule any
/// tenants that were on this node.
@@ -1990,19 +2093,21 @@ impl Service {
nodes.remove(&node_id);
locked.nodes = Arc::new(nodes);
+ locked.scheduler.node_remove(node_id);
+
Ok(())
}
- pub(crate) async fn node_list(&self) -> Result, ApiError> {
- // It is convenient to avoid taking the big lock and converting Node to a serializable
- // structure, by fetching from storage instead of reading in-memory state.
- let nodes = self
- .persistence
- .list_nodes()
- .await?
- .into_iter()
- .map(|n| n.to_persistent())
- .collect();
+ pub(crate) async fn node_list(&self) -> Result, ApiError> {
+ let nodes = {
+ self.inner
+ .read()
+ .unwrap()
+ .nodes
+ .values()
+ .cloned()
+ .collect::>()
+ };
Ok(nodes)
}
diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs
index b0ddb83f06..4ec6fdca67 100644
--- a/control_plane/attachment_service/src/tenant_state.rs
+++ b/control_plane/attachment_service/src/tenant_state.rs
@@ -1,11 +1,12 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
-use crate::metrics;
+use crate::{metrics, persistence::TenantShardPersistence};
use control_plane::attachment_service::NodeAvailability;
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
};
+use serde::Serialize;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
@@ -27,6 +28,20 @@ use crate::{
service, PlacementPolicy, Sequence,
};
+/// Serialization helper
+fn read_mutex_content
(v: &std::sync::Mutex, serializer: S) -> Result
+where
+ S: serde::ser::Serializer,
+ T: Clone + std::fmt::Display,
+{
+ serializer.collect_str(&v.lock().unwrap())
+}
+
+/// In-memory state for a particular tenant shard.
+///
+/// This struct implement Serialize for debugging purposes, but is _not_ persisted
+/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
+#[derive(Serialize)]
pub(crate) struct TenantState {
pub(crate) tenant_shard_id: TenantShardId,
@@ -61,6 +76,7 @@ pub(crate) struct TenantState {
/// If a reconcile task is currently in flight, it may be joined here (it is
/// only safe to join if either the result has been received or the reconciler's
/// cancellation token has been fired)
+ #[serde(skip)]
pub(crate) reconciler: Option,
/// If a tenant is being split, then all shards with that TenantId will have a
@@ -70,16 +86,19 @@ pub(crate) struct TenantState {
/// Optionally wait for reconciliation to complete up to a particular
/// sequence number.
+ #[serde(skip)]
pub(crate) waiter: std::sync::Arc>,
/// Indicates sequence number for which we have encountered an error reconciling. If
/// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
/// and callers should stop waiting for `waiter` and propagate the error.
+ #[serde(skip)]
pub(crate) error_waiter: std::sync::Arc>,
/// The most recent error from a reconcile on this tenant
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
+ #[serde(serialize_with = "read_mutex_content")]
pub(crate) last_error: std::sync::Arc>,
/// If we have a pending compute notification that for some reason we weren't able to send,
@@ -89,7 +108,7 @@ pub(crate) struct TenantState {
pub(crate) pending_compute_notification: bool,
}
-#[derive(Default, Clone, Debug)]
+#[derive(Default, Clone, Debug, Serialize)]
pub(crate) struct IntentState {
attached: Option,
secondary: Vec,
@@ -194,7 +213,7 @@ impl Drop for IntentState {
}
}
-#[derive(Default, Clone)]
+#[derive(Default, Clone, Serialize)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap,
}
@@ -208,7 +227,7 @@ pub(crate) struct ObservedState {
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
-#[derive(Clone)]
+#[derive(Clone, Serialize)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.
@@ -661,4 +680,18 @@ impl TenantState {
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
}
+
+ pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
+ TenantShardPersistence {
+ tenant_id: self.tenant_shard_id.tenant_id.to_string(),
+ shard_number: self.tenant_shard_id.shard_number.0 as i32,
+ shard_count: self.tenant_shard_id.shard_count.literal() as i32,
+ shard_stripe_size: self.shard.stripe_size.0 as i32,
+ generation: self.generation.into().unwrap_or(0) as i32,
+ generation_pageserver: i64::MAX,
+ placement_policy: serde_json::to_string(&self.policy).unwrap(),
+ config: serde_json::to_string(&self.config).unwrap(),
+ splitting: SplitState::default(),
+ }
+ }
}
diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs
index 14bfda47c3..4a1d316fe7 100644
--- a/control_plane/src/attachment_service.rs
+++ b/control_plane/src/attachment_service.rs
@@ -113,7 +113,7 @@ pub struct TenantShardMigrateRequest {
pub node_id: NodeId,
}
-#[derive(Serialize, Deserialize, Clone, Copy)]
+#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeAvailability {
// Normal, happy state
Active,
@@ -137,7 +137,7 @@ impl FromStr for NodeAvailability {
/// FIXME: this is a duplicate of the type in the attachment_service crate, because the
/// type needs to be defined with diesel traits in there.
-#[derive(Serialize, Deserialize, Clone, Copy)]
+#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy {
Active,
Filling,
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index b347ff44e9..cbf6e0e4de 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2100,6 +2100,17 @@ class NeonAttachmentService(MetricsGetter):
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id
+ def consistency_check(self):
+ """
+ Throw an exception if the service finds any inconsistencies in its state
+ """
+ response = self.request(
+ "POST",
+ f"{self.env.attachment_service_api}/debug/v1/consistency_check",
+ )
+ response.raise_for_status()
+ log.info("Attachment service passed consistency check")
+
def __enter__(self) -> "NeonAttachmentService":
return self
diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py
index 5676727a2e..99b2ceb8bc 100644
--- a/test_runner/regress/test_sharding.py
+++ b/test_runner/regress/test_sharding.py
@@ -83,6 +83,8 @@ def test_sharding_smoke(
)
assert timelines == {env.initial_timeline, timeline_b}
+ env.attachment_service.consistency_check()
+
def test_sharding_split_unsharded(
neon_env_builder: NeonEnvBuilder,
@@ -113,6 +115,8 @@ def test_sharding_split_unsharded(
workload.validate()
+ env.attachment_service.consistency_check()
+
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
@@ -278,3 +282,5 @@ def test_sharding_split_smoke(
)
is None
)
+
+ env.attachment_service.consistency_check()
diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py
index 248d992851..d2334c7776 100644
--- a/test_runner/regress/test_sharding_service.py
+++ b/test_runner/regress/test_sharding_service.py
@@ -51,13 +51,13 @@ def test_sharding_service_smoke(
# The pageservers we started should have registered with the sharding service on startup
nodes = env.attachment_service.node_list()
assert len(nodes) == 2
- assert set(n["node_id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
+ assert set(n["id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
# Starting an additional pageserver should register successfully
env.pageservers[2].start()
nodes = env.attachment_service.node_list()
assert len(nodes) == 3
- assert set(n["node_id"] for n in nodes) == {ps.id for ps in env.pageservers}
+ assert set(n["id"] for n in nodes) == {ps.id for ps in env.pageservers}
# Use a multiple of pageservers to get nice even number of shards on each one
tenant_shard_count = len(env.pageservers) * 4
@@ -127,6 +127,8 @@ def test_sharding_service_smoke(
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
+ env.attachment_service.consistency_check()
+
def test_node_status_after_restart(
neon_env_builder: NeonEnvBuilder,
@@ -159,6 +161,8 @@ def test_node_status_after_restart(
# should have had its availabilty state set to Active.
env.attachment_service.tenant_create(TenantId.generate())
+ env.attachment_service.consistency_check()
+
def test_sharding_service_passthrough(
neon_env_builder: NeonEnvBuilder,
@@ -184,6 +188,8 @@ def test_sharding_service_passthrough(
}
assert status["state"]["slug"] == "Active"
+ env.attachment_service.consistency_check()
+
def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
@@ -216,6 +222,8 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
assert tenant_a not in observed
assert tenant_b in observed
+ env.attachment_service.consistency_check()
+
def test_sharding_service_onboarding(
neon_env_builder: NeonEnvBuilder,
@@ -318,6 +326,8 @@ def test_sharding_service_onboarding(
dest_ps.stop()
dest_ps.start()
+ env.attachment_service.consistency_check()
+
def test_sharding_service_compute_hook(
httpserver: HTTPServer,
@@ -388,6 +398,8 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_restart_notification)
+ env.attachment_service.consistency_check()
+
def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
"""
@@ -401,13 +413,47 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
tenant_id = TenantId.generate()
env.attachment_service.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
+ # Check that the consistency check passes on a freshly setup system
+ env.attachment_service.consistency_check()
+
# These APIs are intentionally not implemented as methods on NeonAttachmentService, as
# they're just for use in unanticipated circumstances.
- env.attachment_service.request(
+
+ # Initial tenant (1 shard) and the one we just created (2 shards) should be visible
+ response = env.attachment_service.request(
+ "GET", f"{env.attachment_service_api}/debug/v1/tenant"
+ )
+ response.raise_for_status()
+ assert len(response.json()) == 3
+
+ # Scheduler should report the expected nodes and shard counts
+ response = env.attachment_service.request(
+ "GET", f"{env.attachment_service_api}/debug/v1/scheduler"
+ )
+ response.raise_for_status()
+ # Two nodes, in a dict of node_id->node
+ assert len(response.json()["nodes"]) == 2
+ assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
+ assert all(v["may_schedule"] for v in response.json()["nodes"].values())
+
+ response = env.attachment_service.request(
"POST", f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop"
)
+ response.raise_for_status()
assert len(env.attachment_service.node_list()) == 1
- env.attachment_service.request(
+ response = env.attachment_service.request(
"POST", f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop"
)
+ response.raise_for_status()
+
+ # Tenant drop should be reflected in dump output
+ response = env.attachment_service.request(
+ "GET", f"{env.attachment_service_api}/debug/v1/tenant"
+ )
+ response.raise_for_status()
+ assert len(response.json()) == 1
+
+ # Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
+ # meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
+ env.attachment_service.consistency_check()