storage controller: fix consistency check (#6855)

- Some checks weren't properly returning an error when they failed
- TenantState::to_persistent wasn't setting generation_pageserver
properly
- Changes to node scheduling policy weren't being persisted.
This commit is contained in:
John Spray
2024-02-22 14:10:49 +00:00
committed by GitHub
parent 9c48b5c4ab
commit cf3baf6039
5 changed files with 84 additions and 29 deletions

View File

@@ -325,7 +325,10 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
}
let state = get_state(&req);
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
json_response(
StatusCode::OK,
state.service.node_configure(config_req).await?,
)
}
async fn handle_tenant_shard_split(

View File

@@ -10,7 +10,7 @@ use crate::persistence::NodePersistence;
///
/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize, Eq, PartialEq)]
#[derive(Clone, Serialize)]
pub(crate) struct Node {
pub(crate) id: NodeId,

View File

@@ -6,7 +6,7 @@ use std::time::Duration;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use control_plane::attachment_service::NodeSchedulingPolicy;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
@@ -130,24 +130,10 @@ impl Persistence {
}
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
let nodes: Vec<Node> = self
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table
.load::<NodePersistence>(conn)?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<Node>>())
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
})
.await?;
@@ -156,6 +142,31 @@ impl Persistence {
Ok(nodes)
}
pub(crate) async fn update_node(
&self,
input_node_id: NodeId,
input_scheduling: NodeSchedulingPolicy,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
let updated = self
.with_conn(move |conn| {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
.execute(conn)?;
Ok(updated)
})
.await?;
if updated != 1 {
Err(DatabaseError::Logical(format!(
"Node {node_id:?} not found for update",
)))
} else {
Ok(())
}
}
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
@@ -506,7 +517,7 @@ pub(crate) struct TenantShardPersistence {
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]
pub(crate) struct NodePersistence {
pub(crate) node_id: i64,

View File

@@ -622,7 +622,22 @@ impl Service {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let nodes = persistence.list_nodes().await?;
let nodes = persistence
.list_nodes()
.await?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<_>>();
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
@@ -2326,7 +2341,11 @@ impl Service {
.context("Scheduler checks")
.map_err(ApiError::InternalServerError)?;
let expect_nodes = locked.nodes.values().cloned().collect::<Vec<_>>();
let expect_nodes = locked
.nodes
.values()
.map(|n| n.to_persistent())
.collect::<Vec<_>>();
let expect_shards = locked
.tenants
@@ -2338,8 +2357,8 @@ impl Service {
};
let mut nodes = self.persistence.list_nodes().await?;
expect_nodes.sort_by_key(|n| n.id);
nodes.sort_by_key(|n| n.id);
expect_nodes.sort_by_key(|n| n.node_id);
nodes.sort_by_key(|n| n.node_id);
if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
@@ -2353,6 +2372,9 @@ impl Service {
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Node consistency failure"
)));
}
let mut shards = self.persistence.list_tenant_shards().await?;
@@ -2363,14 +2385,17 @@ impl Service {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
serde_json::to_string(&expect_nodes)
serde_json::to_string(&expect_shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&nodes)
serde_json::to_string(&shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard consistency failure"
)));
}
Ok(())
@@ -2496,7 +2521,18 @@ impl Service {
Ok(())
}
pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
pub(crate) async fn node_configure(
&self,
config_req: NodeConfigureRequest,
) -> Result<(), ApiError> {
if let Some(scheduling) = config_req.scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
self.persistence
.update_node(config_req.node_id, scheduling)
.await?;
}
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();

View File

@@ -737,7 +737,12 @@ impl TenantState {
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
generation_pageserver: i64::MAX,
generation_pageserver: self
.intent
.get_attached()
.map(|n| n.0 as i64)
.unwrap_or(i64::MAX),
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),