From cf3baf60395b500f7632c7afc10a3c81f2a98e40 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 22 Feb 2024 14:10:49 +0000 Subject: [PATCH] storage controller: fix consistency check (#6855) - Some checks weren't properly returning an error when they failed - TenantState::to_persistent wasn't setting generation_pageserver properly - Changes to node scheduling policy weren't being persisted. --- control_plane/attachment_service/src/http.rs | 5 +- control_plane/attachment_service/src/node.rs | 2 +- .../attachment_service/src/persistence.rs | 49 +++++++++++------- .../attachment_service/src/service.rs | 50 ++++++++++++++++--- .../attachment_service/src/tenant_state.rs | 7 ++- 5 files changed, 84 insertions(+), 29 deletions(-) diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 15ae2a26b4..f9c4535bd5 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -325,7 +325,10 @@ async fn handle_node_configure(mut req: Request) -> Result, } let state = get_state(&req); - json_response(StatusCode::OK, state.service.node_configure(config_req)?) + json_response( + StatusCode::OK, + state.service.node_configure(config_req).await?, + ) } async fn handle_tenant_shard_split( diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs index 59784249d7..09162701ac 100644 --- a/control_plane/attachment_service/src/node.rs +++ b/control_plane/attachment_service/src/node.rs @@ -10,7 +10,7 @@ use crate::persistence::NodePersistence; /// /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the /// implementation of serialization on this type is only for debug dumps. -#[derive(Clone, Serialize, Eq, PartialEq)] +#[derive(Clone, Serialize)] pub(crate) struct Node { pub(crate) id: NodeId, diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index 2d0c8a9d15..4f336093cf 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -6,7 +6,7 @@ use std::time::Duration; use self::split_state::SplitState; use camino::Utf8Path; use camino::Utf8PathBuf; -use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; +use control_plane::attachment_service::NodeSchedulingPolicy; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; @@ -130,24 +130,10 @@ impl Persistence { } /// At startup, populate the list of nodes which our shards may be placed on - pub(crate) async fn list_nodes(&self) -> DatabaseResult> { - let nodes: Vec = self + pub(crate) async fn list_nodes(&self) -> DatabaseResult> { + let nodes: Vec = self .with_conn(move |conn| -> DatabaseResult<_> { - Ok(crate::schema::nodes::table - .load::(conn)? - .into_iter() - .map(|n| Node { - id: NodeId(n.node_id as u64), - // At startup we consider a node offline until proven otherwise. - availability: NodeAvailability::Offline, - scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy) - .expect("Bad scheduling policy in DB"), - listen_http_addr: n.listen_http_addr, - listen_http_port: n.listen_http_port as u16, - listen_pg_addr: n.listen_pg_addr, - listen_pg_port: n.listen_pg_port as u16, - }) - .collect::>()) + Ok(crate::schema::nodes::table.load::(conn)?) }) .await?; @@ -156,6 +142,31 @@ impl Persistence { Ok(nodes) } + pub(crate) async fn update_node( + &self, + input_node_id: NodeId, + input_scheduling: NodeSchedulingPolicy, + ) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + let updated = self + .with_conn(move |conn| { + let updated = diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .execute(conn)?; + Ok(updated) + }) + .await?; + + if updated != 1 { + Err(DatabaseError::Logical(format!( + "Node {node_id:?} not found for update", + ))) + } else { + Ok(()) + } + } + /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult> { @@ -506,7 +517,7 @@ pub(crate) struct TenantShardPersistence { } /// Parts of [`crate::node::Node`] that are stored durably -#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)] +#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)] #[diesel(table_name = crate::schema::nodes)] pub(crate) struct NodePersistence { pub(crate) node_id: i64, diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 0b9a7d8a69..38249b9223 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -622,7 +622,22 @@ impl Service { let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); tracing::info!("Loading nodes from database..."); - let nodes = persistence.list_nodes().await?; + let nodes = persistence + .list_nodes() + .await? + .into_iter() + .map(|n| Node { + id: NodeId(n.node_id as u64), + // At startup we consider a node offline until proven otherwise. + availability: NodeAvailability::Offline, + scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy) + .expect("Bad scheduling policy in DB"), + listen_http_addr: n.listen_http_addr, + listen_http_port: n.listen_http_port as u16, + listen_pg_addr: n.listen_pg_addr, + listen_pg_port: n.listen_pg_port as u16, + }) + .collect::>(); let nodes: HashMap = nodes.into_iter().map(|n| (n.id, n)).collect(); tracing::info!("Loaded {} nodes from database.", nodes.len()); @@ -2326,7 +2341,11 @@ impl Service { .context("Scheduler checks") .map_err(ApiError::InternalServerError)?; - let expect_nodes = locked.nodes.values().cloned().collect::>(); + let expect_nodes = locked + .nodes + .values() + .map(|n| n.to_persistent()) + .collect::>(); let expect_shards = locked .tenants @@ -2338,8 +2357,8 @@ impl Service { }; let mut nodes = self.persistence.list_nodes().await?; - expect_nodes.sort_by_key(|n| n.id); - nodes.sort_by_key(|n| n.id); + expect_nodes.sort_by_key(|n| n.node_id); + nodes.sort_by_key(|n| n.node_id); if nodes != expect_nodes { tracing::error!("Consistency check failed on nodes."); @@ -2353,6 +2372,9 @@ impl Service { serde_json::to_string(&nodes) .map_err(|e| ApiError::InternalServerError(e.into()))? ); + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Node consistency failure" + ))); } let mut shards = self.persistence.list_tenant_shards().await?; @@ -2363,14 +2385,17 @@ impl Service { tracing::error!("Consistency check failed on shards."); tracing::error!( "Shards in memory: {}", - serde_json::to_string(&expect_nodes) + serde_json::to_string(&expect_shards) .map_err(|e| ApiError::InternalServerError(e.into()))? ); tracing::error!( "Shards in database: {}", - serde_json::to_string(&nodes) + serde_json::to_string(&shards) .map_err(|e| ApiError::InternalServerError(e.into()))? ); + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard consistency failure" + ))); } Ok(()) @@ -2496,7 +2521,18 @@ impl Service { Ok(()) } - pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> { + pub(crate) async fn node_configure( + &self, + config_req: NodeConfigureRequest, + ) -> Result<(), ApiError> { + if let Some(scheduling) = config_req.scheduling { + // Scheduling is a persistent part of Node: we must write updates to the database before + // applying them in memory + self.persistence + .update_node(config_req.node_id, scheduling) + .await?; + } + let mut locked = self.inner.write().unwrap(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 3cfffc6c45..02f0171c29 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -737,7 +737,12 @@ impl TenantState { shard_count: self.tenant_shard_id.shard_count.literal() as i32, shard_stripe_size: self.shard.stripe_size.0 as i32, generation: self.generation.into().unwrap_or(0) as i32, - generation_pageserver: i64::MAX, + generation_pageserver: self + .intent + .get_attached() + .map(|n| n.0 as i64) + .unwrap_or(i64::MAX), + placement_policy: serde_json::to_string(&self.policy).unwrap(), config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(),