mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
storcon: track pageserver availability zone (#8852)
## Problem In order to build AZ aware scheduling, the storage controller needs to know what AZ pageservers are in. Related https://github.com/neondatabase/neon/issues/8848 ## Summary of changes This patch set adds a new nullable column to the `nodes` table: `availability_zone_id`. The node registration request is extended to include the AZ id (pageservers already have this in their `metadata.json` file). If the node is already registered, then we update the persistent and in-memory state with the provided AZ. Otherwise, we add the node with the AZ to begin with. A couple assumptions are made here: 1. Pageserver AZ ids are stable 2. AZ ids do not change over time Once all pageservers have a configured AZ, we can remove the optionals in the code and make the database column not nullable.
This commit is contained in:
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes DROP availability_zone_id;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes ADD availability_zone_id VARCHAR;
|
||||
@@ -36,6 +36,8 @@ pub(crate) struct Node {
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
availability_zone_id: Option<String>,
|
||||
|
||||
// This cancellation token means "stop any RPCs in flight to this node, and don't start
|
||||
// any more". It is not related to process shutdown.
|
||||
#[serde(skip)]
|
||||
@@ -61,6 +63,10 @@ impl Node {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub(crate) fn get_availability_zone_id(&self) -> Option<&str> {
|
||||
self.availability_zone_id.as_deref()
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
self.scheduling
|
||||
}
|
||||
@@ -72,7 +78,18 @@ impl Node {
|
||||
/// Does this registration request match `self`? This is used when deciding whether a registration
|
||||
/// request should be allowed to update an existing record with the same node ID.
|
||||
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
self.id == register_req.node_id
|
||||
let az_ids_match = {
|
||||
match (
|
||||
self.availability_zone_id.as_deref(),
|
||||
register_req.availability_zone_id.as_deref(),
|
||||
) {
|
||||
(Some(current_az), Some(register_req_az)) => current_az == register_req_az,
|
||||
_ => true,
|
||||
}
|
||||
};
|
||||
|
||||
az_ids_match
|
||||
&& self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
&& self.listen_http_port == register_req.listen_http_port
|
||||
&& self.listen_pg_addr == register_req.listen_pg_addr
|
||||
@@ -173,6 +190,7 @@ impl Node {
|
||||
listen_http_port: u16,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
availability_zone_id: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
@@ -182,6 +200,7 @@ impl Node {
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
availability_zone_id,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
@@ -194,6 +213,7 @@ impl Node {
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +228,7 @@ impl Node {
|
||||
listen_http_port: np.listen_http_port as u16,
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
availability_zone_id: np.availability_zone_id,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +103,7 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealthOutdated,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
SetNodeAzId,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -315,6 +316,31 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn set_node_availability_zone_id(
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_az_id: String,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::SetNodeAzId, move |conn| {
|
||||
let updated = diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.set((availability_zone_id.eq(input_az_id.clone()),))
|
||||
.execute(conn)?;
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
if updated != 1 {
|
||||
Err(DatabaseError::Logical(format!(
|
||||
"Node {node_id:?} not found for setting az id",
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
@@ -974,6 +1000,7 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_http_port: i32,
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
pub(crate) availability_zone_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -528,6 +528,7 @@ pub(crate) mod test_utils {
|
||||
80 + i as u16,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
None,
|
||||
);
|
||||
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
|
||||
assert!(node.is_available());
|
||||
|
||||
@@ -25,6 +25,7 @@ diesel::table! {
|
||||
listen_http_port -> Int4,
|
||||
listen_pg_addr -> Varchar,
|
||||
listen_pg_port -> Int4,
|
||||
availability_zone_id -> Nullable<Varchar>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1257,6 +1257,7 @@ impl Service {
|
||||
123,
|
||||
"".to_string(),
|
||||
123,
|
||||
None,
|
||||
);
|
||||
|
||||
scheduler.node_upsert(&node);
|
||||
@@ -4683,29 +4684,84 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
{
|
||||
if register_req.availability_zone_id.is_none() {
|
||||
tracing::warn!(
|
||||
"Node {} registering without specific availability zone id",
|
||||
register_req.node_id
|
||||
);
|
||||
}
|
||||
|
||||
enum RegistrationStatus {
|
||||
Matched(Node),
|
||||
Mismatched,
|
||||
New,
|
||||
}
|
||||
|
||||
let registration_status = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
if let Some(node) = locked.nodes.get(®ister_req.node_id) {
|
||||
// Note that we do not do a total equality of the struct, because we don't require
|
||||
// the availability/scheduling states to agree for a POST to be idempotent.
|
||||
if node.registration_match(®ister_req) {
|
||||
tracing::info!(
|
||||
"Node {} re-registered with matching address",
|
||||
register_req.node_id
|
||||
);
|
||||
return Ok(());
|
||||
RegistrationStatus::Matched(node.clone())
|
||||
} else {
|
||||
// TODO: decide if we want to allow modifying node addresses without removing and re-adding
|
||||
// the node. Safest/simplest thing is to refuse it, and usually we deploy with
|
||||
// a fixed address through the lifetime of a node.
|
||||
tracing::warn!(
|
||||
"Node {} tried to register with different address",
|
||||
register_req.node_id
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Node is already registered with different address".to_string(),
|
||||
));
|
||||
RegistrationStatus::Mismatched
|
||||
}
|
||||
} else {
|
||||
RegistrationStatus::New
|
||||
}
|
||||
};
|
||||
|
||||
match registration_status {
|
||||
RegistrationStatus::Matched(node) => {
|
||||
tracing::info!(
|
||||
"Node {} re-registered with matching address",
|
||||
register_req.node_id
|
||||
);
|
||||
|
||||
if node.get_availability_zone_id().is_none() {
|
||||
if let Some(az_id) = register_req.availability_zone_id.clone() {
|
||||
tracing::info!("Extracting availability zone id from registration request for node {}: {}",
|
||||
register_req.node_id, az_id);
|
||||
|
||||
// Persist to the database and update in memory state. See comment below
|
||||
// on ordering.
|
||||
self.persistence
|
||||
.set_node_availability_zone_id(register_req.node_id, az_id)
|
||||
.await?;
|
||||
let node_with_az = Node::new(
|
||||
register_req.node_id,
|
||||
register_req.listen_http_addr,
|
||||
register_req.listen_http_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.availability_zone_id,
|
||||
);
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
|
||||
locked.scheduler.node_upsert(&node_with_az);
|
||||
new_nodes.insert(register_req.node_id, node_with_az);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
RegistrationStatus::Mismatched => {
|
||||
// TODO: decide if we want to allow modifying node addresses without removing and re-adding
|
||||
// the node. Safest/simplest thing is to refuse it, and usually we deploy with
|
||||
// a fixed address through the lifetime of a node.
|
||||
tracing::warn!(
|
||||
"Node {} tried to register with different address",
|
||||
register_req.node_id
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Node is already registered with different address".to_string(),
|
||||
));
|
||||
}
|
||||
RegistrationStatus::New => {
|
||||
// fallthrough
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4742,6 +4798,7 @@ impl Service {
|
||||
register_req.listen_http_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.availability_zone_id,
|
||||
);
|
||||
|
||||
// TODO: idempotency if the node already exists in the database
|
||||
|
||||
Reference in New Issue
Block a user