diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index 15ae2a26b4..f9c4535bd5 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -325,7 +325,10 @@ async fn handle_node_configure(mut req: Request
) -> Result,
}
let state = get_state(&req);
- json_response(StatusCode::OK, state.service.node_configure(config_req)?)
+ json_response(
+ StatusCode::OK,
+ state.service.node_configure(config_req).await?,
+ )
}
async fn handle_tenant_shard_split(
diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs
index 59784249d7..09162701ac 100644
--- a/control_plane/attachment_service/src/node.rs
+++ b/control_plane/attachment_service/src/node.rs
@@ -10,7 +10,7 @@ use crate::persistence::NodePersistence;
///
/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
/// implementation of serialization on this type is only for debug dumps.
-#[derive(Clone, Serialize, Eq, PartialEq)]
+#[derive(Clone, Serialize)]
pub(crate) struct Node {
pub(crate) id: NodeId,
diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs
index 2d0c8a9d15..4f336093cf 100644
--- a/control_plane/attachment_service/src/persistence.rs
+++ b/control_plane/attachment_service/src/persistence.rs
@@ -6,7 +6,7 @@ use std::time::Duration;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
-use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
+use control_plane::attachment_service::NodeSchedulingPolicy;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
@@ -130,24 +130,10 @@ impl Persistence {
}
/// At startup, populate the list of nodes which our shards may be placed on
- pub(crate) async fn list_nodes(&self) -> DatabaseResult> {
- let nodes: Vec = self
+ pub(crate) async fn list_nodes(&self) -> DatabaseResult> {
+ let nodes: Vec = self
.with_conn(move |conn| -> DatabaseResult<_> {
- Ok(crate::schema::nodes::table
- .load::(conn)?
- .into_iter()
- .map(|n| Node {
- id: NodeId(n.node_id as u64),
- // At startup we consider a node offline until proven otherwise.
- availability: NodeAvailability::Offline,
- scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
- .expect("Bad scheduling policy in DB"),
- listen_http_addr: n.listen_http_addr,
- listen_http_port: n.listen_http_port as u16,
- listen_pg_addr: n.listen_pg_addr,
- listen_pg_port: n.listen_pg_port as u16,
- })
- .collect::>())
+ Ok(crate::schema::nodes::table.load::(conn)?)
})
.await?;
@@ -156,6 +142,31 @@ impl Persistence {
Ok(nodes)
}
+ pub(crate) async fn update_node(
+ &self,
+ input_node_id: NodeId,
+ input_scheduling: NodeSchedulingPolicy,
+ ) -> DatabaseResult<()> {
+ use crate::schema::nodes::dsl::*;
+ let updated = self
+ .with_conn(move |conn| {
+ let updated = diesel::update(nodes)
+ .filter(node_id.eq(input_node_id.0 as i64))
+ .set((scheduling_policy.eq(String::from(input_scheduling)),))
+ .execute(conn)?;
+ Ok(updated)
+ })
+ .await?;
+
+ if updated != 1 {
+ Err(DatabaseError::Logical(format!(
+ "Node {node_id:?} not found for update",
+ )))
+ } else {
+ Ok(())
+ }
+ }
+
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult> {
@@ -506,7 +517,7 @@ pub(crate) struct TenantShardPersistence {
}
/// Parts of [`crate::node::Node`] that are stored durably
-#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
+#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]
pub(crate) struct NodePersistence {
pub(crate) node_id: i64,
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 0b9a7d8a69..38249b9223 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -622,7 +622,22 @@ impl Service {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
- let nodes = persistence.list_nodes().await?;
+ let nodes = persistence
+ .list_nodes()
+ .await?
+ .into_iter()
+ .map(|n| Node {
+ id: NodeId(n.node_id as u64),
+ // At startup we consider a node offline until proven otherwise.
+ availability: NodeAvailability::Offline,
+ scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
+ .expect("Bad scheduling policy in DB"),
+ listen_http_addr: n.listen_http_addr,
+ listen_http_port: n.listen_http_port as u16,
+ listen_pg_addr: n.listen_pg_addr,
+ listen_pg_port: n.listen_pg_port as u16,
+ })
+ .collect::>();
let nodes: HashMap = nodes.into_iter().map(|n| (n.id, n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
@@ -2326,7 +2341,11 @@ impl Service {
.context("Scheduler checks")
.map_err(ApiError::InternalServerError)?;
- let expect_nodes = locked.nodes.values().cloned().collect::>();
+ let expect_nodes = locked
+ .nodes
+ .values()
+ .map(|n| n.to_persistent())
+ .collect::>();
let expect_shards = locked
.tenants
@@ -2338,8 +2357,8 @@ impl Service {
};
let mut nodes = self.persistence.list_nodes().await?;
- expect_nodes.sort_by_key(|n| n.id);
- nodes.sort_by_key(|n| n.id);
+ expect_nodes.sort_by_key(|n| n.node_id);
+ nodes.sort_by_key(|n| n.node_id);
if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
@@ -2353,6 +2372,9 @@ impl Service {
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
+ return Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Node consistency failure"
+ )));
}
let mut shards = self.persistence.list_tenant_shards().await?;
@@ -2363,14 +2385,17 @@ impl Service {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
- serde_json::to_string(&expect_nodes)
+ serde_json::to_string(&expect_shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Shards in database: {}",
- serde_json::to_string(&nodes)
+ serde_json::to_string(&shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
+ return Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Shard consistency failure"
+ )));
}
Ok(())
@@ -2496,7 +2521,18 @@ impl Service {
Ok(())
}
- pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
+ pub(crate) async fn node_configure(
+ &self,
+ config_req: NodeConfigureRequest,
+ ) -> Result<(), ApiError> {
+ if let Some(scheduling) = config_req.scheduling {
+ // Scheduling is a persistent part of Node: we must write updates to the database before
+ // applying them in memory
+ self.persistence
+ .update_node(config_req.node_id, scheduling)
+ .await?;
+ }
+
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs
index 3cfffc6c45..02f0171c29 100644
--- a/control_plane/attachment_service/src/tenant_state.rs
+++ b/control_plane/attachment_service/src/tenant_state.rs
@@ -737,7 +737,12 @@ impl TenantState {
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
- generation_pageserver: i64::MAX,
+ generation_pageserver: self
+ .intent
+ .get_attached()
+ .map(|n| n.0 as i64)
+ .unwrap_or(i64::MAX),
+
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),