mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
storcon: Introduce deletion tombstones to support flaky node scenario (#12096)
## Problem Removed nodes can re-add themselves on restart if not properly tombstoned. We need a mechanism (e.g. soft-delete flag) to prevent this, especially in cases where the node is unreachable. More details there: #12036 ## Summary of changes - Introduced `NodeLifecycle` enum to represent node lifecycle states. - Added a string representation of `NodeLifecycle` to the `nodes` table. - Implemented node removal using a tombstone mechanism. - Introduced `/debug/v1/tombstone*` handlers to manage the tombstone state.
This commit is contained in:
committed by
GitHub
parent
c511786548
commit
590301df08
@@ -61,10 +61,16 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
scheduling: Option<NodeSchedulingPolicy>,
|
scheduling: Option<NodeSchedulingPolicy>,
|
||||||
},
|
},
|
||||||
|
// Set a node status as deleted.
|
||||||
NodeDelete {
|
NodeDelete {
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
},
|
},
|
||||||
|
/// Delete a tombstone of node from the storage controller.
|
||||||
|
NodeDeleteTombstone {
|
||||||
|
#[arg(long)]
|
||||||
|
node_id: NodeId,
|
||||||
|
},
|
||||||
/// Modify a tenant's policies in the storage controller
|
/// Modify a tenant's policies in the storage controller
|
||||||
TenantPolicy {
|
TenantPolicy {
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
@@ -82,6 +88,8 @@ enum Command {
|
|||||||
},
|
},
|
||||||
/// List nodes known to the storage controller
|
/// List nodes known to the storage controller
|
||||||
Nodes {},
|
Nodes {},
|
||||||
|
/// List soft deleted nodes known to the storage controller
|
||||||
|
NodeTombstones {},
|
||||||
/// List tenants known to the storage controller
|
/// List tenants known to the storage controller
|
||||||
Tenants {
|
Tenants {
|
||||||
/// If this field is set, it will list the tenants on a specific node
|
/// If this field is set, it will list the tenants on a specific node
|
||||||
@@ -900,6 +908,39 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
|
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
Command::NodeDeleteTombstone { node_id } => {
|
||||||
|
storcon_client
|
||||||
|
.dispatch::<(), ()>(
|
||||||
|
Method::DELETE,
|
||||||
|
format!("debug/v1/tombstone/{node_id}"),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Command::NodeTombstones {} => {
|
||||||
|
let mut resp = storcon_client
|
||||||
|
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
||||||
|
Method::GET,
|
||||||
|
"debug/v1/tombstone".to_string(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
|
||||||
|
|
||||||
|
let mut table = comfy_table::Table::new();
|
||||||
|
table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
|
||||||
|
for node in resp {
|
||||||
|
table.add_row([
|
||||||
|
format!("{}", node.id),
|
||||||
|
node.listen_http_addr,
|
||||||
|
node.availability_zone_id,
|
||||||
|
format!("{:?}", node.scheduling),
|
||||||
|
format!("{:?}", node.availability),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
println!("{table}");
|
||||||
|
}
|
||||||
Command::TenantSetTimeBasedEviction {
|
Command::TenantSetTimeBasedEviction {
|
||||||
tenant_id,
|
tenant_id,
|
||||||
period,
|
period,
|
||||||
|
|||||||
@@ -344,6 +344,35 @@ impl Default for ShardSchedulingPolicy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||||
|
pub enum NodeLifecycle {
|
||||||
|
Active,
|
||||||
|
Deleted,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for NodeLifecycle {
|
||||||
|
type Err = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
match s {
|
||||||
|
"active" => Ok(Self::Active),
|
||||||
|
"deleted" => Ok(Self::Deleted),
|
||||||
|
_ => Err(anyhow::anyhow!("Unknown node lifecycle '{s}'")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<NodeLifecycle> for String {
|
||||||
|
fn from(value: NodeLifecycle) -> String {
|
||||||
|
use NodeLifecycle::*;
|
||||||
|
match value {
|
||||||
|
Active => "active",
|
||||||
|
Deleted => "deleted",
|
||||||
|
}
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||||
pub enum NodeSchedulingPolicy {
|
pub enum NodeSchedulingPolicy {
|
||||||
Active,
|
Active,
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
ALTER TABLE nodes DROP COLUMN lifecycle;
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
ALTER TABLE nodes ADD COLUMN lifecycle VARCHAR NOT NULL DEFAULT 'active';
|
||||||
@@ -907,6 +907,42 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
|||||||
json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
|
json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
|
||||||
|
let req = match maybe_forward(req).await {
|
||||||
|
ForwardOutcome::Forwarded(res) => {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
ForwardOutcome::NotForwarded(req) => req,
|
||||||
|
};
|
||||||
|
|
||||||
|
let state = get_state(&req);
|
||||||
|
let mut nodes = state.service.tombstone_list().await?;
|
||||||
|
nodes.sort_by_key(|n| n.get_id());
|
||||||
|
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
json_response(StatusCode::OK, api_nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
|
||||||
|
let req = match maybe_forward(req).await {
|
||||||
|
ForwardOutcome::Forwarded(res) => {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
ForwardOutcome::NotForwarded(req) => req,
|
||||||
|
};
|
||||||
|
|
||||||
|
let state = get_state(&req);
|
||||||
|
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||||
|
json_response(
|
||||||
|
StatusCode::OK,
|
||||||
|
state.service.tombstone_delete(node_id).await?,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
check_permissions(&req, Scope::Admin)?;
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
|
||||||
@@ -2062,6 +2098,20 @@ pub fn make_router(
|
|||||||
.post("/debug/v1/node/:node_id/drop", |r| {
|
.post("/debug/v1/node/:node_id/drop", |r| {
|
||||||
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
|
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
|
||||||
})
|
})
|
||||||
|
.delete("/debug/v1/tombstone/:node_id", |r| {
|
||||||
|
named_request_span(
|
||||||
|
r,
|
||||||
|
handle_tombstone_delete,
|
||||||
|
RequestName("debug_v1_tombstone_delete"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.get("/debug/v1/tombstone", |r| {
|
||||||
|
named_request_span(
|
||||||
|
r,
|
||||||
|
handle_tombstone_list,
|
||||||
|
RequestName("debug_v1_tombstone_list"),
|
||||||
|
)
|
||||||
|
})
|
||||||
.post("/debug/v1/tenant/:tenant_id/import", |r| {
|
.post("/debug/v1/tenant/:tenant_id/import", |r| {
|
||||||
named_request_span(
|
named_request_span(
|
||||||
r,
|
r,
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use std::str::FromStr;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use pageserver_api::controller_api::{
|
use pageserver_api::controller_api::{
|
||||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
|
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeLifecycle, NodeRegisterRequest,
|
||||||
NodeSchedulingPolicy, TenantLocateResponseShard,
|
NodeSchedulingPolicy, TenantLocateResponseShard,
|
||||||
};
|
};
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
@@ -29,6 +29,7 @@ pub(crate) struct Node {
|
|||||||
|
|
||||||
availability: NodeAvailability,
|
availability: NodeAvailability,
|
||||||
scheduling: NodeSchedulingPolicy,
|
scheduling: NodeSchedulingPolicy,
|
||||||
|
lifecycle: NodeLifecycle,
|
||||||
|
|
||||||
listen_http_addr: String,
|
listen_http_addr: String,
|
||||||
listen_http_port: u16,
|
listen_http_port: u16,
|
||||||
@@ -228,6 +229,7 @@ impl Node {
|
|||||||
listen_pg_addr,
|
listen_pg_addr,
|
||||||
listen_pg_port,
|
listen_pg_port,
|
||||||
scheduling: NodeSchedulingPolicy::Active,
|
scheduling: NodeSchedulingPolicy::Active,
|
||||||
|
lifecycle: NodeLifecycle::Active,
|
||||||
availability: NodeAvailability::Offline,
|
availability: NodeAvailability::Offline,
|
||||||
availability_zone_id,
|
availability_zone_id,
|
||||||
use_https,
|
use_https,
|
||||||
@@ -239,6 +241,7 @@ impl Node {
|
|||||||
NodePersistence {
|
NodePersistence {
|
||||||
node_id: self.id.0 as i64,
|
node_id: self.id.0 as i64,
|
||||||
scheduling_policy: self.scheduling.into(),
|
scheduling_policy: self.scheduling.into(),
|
||||||
|
lifecycle: self.lifecycle.into(),
|
||||||
listen_http_addr: self.listen_http_addr.clone(),
|
listen_http_addr: self.listen_http_addr.clone(),
|
||||||
listen_http_port: self.listen_http_port as i32,
|
listen_http_port: self.listen_http_port as i32,
|
||||||
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
||||||
@@ -263,6 +266,7 @@ impl Node {
|
|||||||
availability: NodeAvailability::Offline,
|
availability: NodeAvailability::Offline,
|
||||||
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
|
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
|
||||||
.expect("Bad scheduling policy in DB"),
|
.expect("Bad scheduling policy in DB"),
|
||||||
|
lifecycle: NodeLifecycle::from_str(&np.lifecycle).expect("Bad lifecycle in DB"),
|
||||||
listen_http_addr: np.listen_http_addr,
|
listen_http_addr: np.listen_http_addr,
|
||||||
listen_http_port: np.listen_http_port as u16,
|
listen_http_port: np.listen_http_port as u16,
|
||||||
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use futures::FutureExt;
|
|||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use pageserver_api::controller_api::{
|
use pageserver_api::controller_api::{
|
||||||
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
|
AvailabilityZone, MetadataHealthRecord, NodeLifecycle, NodeSchedulingPolicy, PlacementPolicy,
|
||||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
|
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
|
||||||
};
|
};
|
||||||
use pageserver_api::models::{ShardImportStatus, TenantConfig};
|
use pageserver_api::models::{ShardImportStatus, TenantConfig};
|
||||||
@@ -102,6 +102,7 @@ pub(crate) enum DatabaseOperation {
|
|||||||
UpdateNode,
|
UpdateNode,
|
||||||
DeleteNode,
|
DeleteNode,
|
||||||
ListNodes,
|
ListNodes,
|
||||||
|
ListTombstones,
|
||||||
BeginShardSplit,
|
BeginShardSplit,
|
||||||
CompleteShardSplit,
|
CompleteShardSplit,
|
||||||
AbortShardSplit,
|
AbortShardSplit,
|
||||||
@@ -357,6 +358,8 @@ impl Persistence {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// When a node is first registered, persist it before using it for anything
|
/// When a node is first registered, persist it before using it for anything
|
||||||
|
/// If the provided node_id already exists, it will be error.
|
||||||
|
/// The common case is when a node marked for deletion wants to register.
|
||||||
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
|
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
|
||||||
let np = &node.to_persistent();
|
let np = &node.to_persistent();
|
||||||
self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| {
|
self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| {
|
||||||
@@ -373,19 +376,41 @@ impl Persistence {
|
|||||||
|
|
||||||
/// At startup, populate the list of nodes which our shards may be placed on
|
/// At startup, populate the list of nodes which our shards may be placed on
|
||||||
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
|
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
|
||||||
let nodes: Vec<NodePersistence> = self
|
use crate::schema::nodes::dsl::*;
|
||||||
|
|
||||||
|
let result: Vec<NodePersistence> = self
|
||||||
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
|
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
Ok(crate::schema::nodes::table
|
Ok(crate::schema::nodes::table
|
||||||
|
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
|
||||||
.load::<NodePersistence>(conn)
|
.load::<NodePersistence>(conn)
|
||||||
.await?)
|
.await?)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
|
tracing::info!("list_nodes: loaded {} nodes", result.len());
|
||||||
|
|
||||||
Ok(nodes)
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn list_tombstones(&self) -> DatabaseResult<Vec<NodePersistence>> {
|
||||||
|
use crate::schema::nodes::dsl::*;
|
||||||
|
|
||||||
|
let result: Vec<NodePersistence> = self
|
||||||
|
.with_measured_conn(DatabaseOperation::ListTombstones, move |conn| {
|
||||||
|
Box::pin(async move {
|
||||||
|
Ok(crate::schema::nodes::table
|
||||||
|
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
|
||||||
|
.load::<NodePersistence>(conn)
|
||||||
|
.await?)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
tracing::info!("list_tombstones: loaded {} nodes", result.len());
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn update_node<V>(
|
pub(crate) async fn update_node<V>(
|
||||||
@@ -404,6 +429,7 @@ impl Persistence {
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let updated = diesel::update(nodes)
|
let updated = diesel::update(nodes)
|
||||||
.filter(node_id.eq(input_node_id.0 as i64))
|
.filter(node_id.eq(input_node_id.0 as i64))
|
||||||
|
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
|
||||||
.set(values)
|
.set(values)
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -447,6 +473,57 @@ impl Persistence {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tombstone is a special state where the node is not deleted from the database,
|
||||||
|
/// but it is not available for usage.
|
||||||
|
/// The main reason for it is to prevent the flaky node to register.
|
||||||
|
pub(crate) async fn set_tombstone(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
||||||
|
use crate::schema::nodes::dsl::*;
|
||||||
|
self.update_node(
|
||||||
|
del_node_id,
|
||||||
|
lifecycle.eq(String::from(NodeLifecycle::Deleted)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
||||||
|
use crate::schema::nodes::dsl::*;
|
||||||
|
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
|
||||||
|
Box::pin(async move {
|
||||||
|
// You can hard delete a node only if it has a tombstone.
|
||||||
|
// So we need to check if the node has lifecycle set to deleted.
|
||||||
|
let node_to_delete = nodes
|
||||||
|
.filter(node_id.eq(del_node_id.0 as i64))
|
||||||
|
.first::<NodePersistence>(conn)
|
||||||
|
.await
|
||||||
|
.optional()?;
|
||||||
|
|
||||||
|
if let Some(np) = node_to_delete {
|
||||||
|
let lc = NodeLifecycle::from_str(&np.lifecycle).map_err(|e| {
|
||||||
|
DatabaseError::Logical(format!(
|
||||||
|
"Node {} has invalid lifecycle: {}",
|
||||||
|
del_node_id, e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if lc != NodeLifecycle::Deleted {
|
||||||
|
return Err(DatabaseError::Logical(format!(
|
||||||
|
"Node {} was not soft deleted before, cannot hard delete it",
|
||||||
|
del_node_id
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
diesel::delete(nodes)
|
||||||
|
.filter(node_id.eq(del_node_id.0 as i64))
|
||||||
|
.execute(conn)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||||
/// be enriched at runtime with state discovered on pageservers.
|
/// be enriched at runtime with state discovered on pageservers.
|
||||||
///
|
///
|
||||||
@@ -543,21 +620,6 @@ impl Persistence {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
|
||||||
use crate::schema::nodes::dsl::*;
|
|
||||||
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
|
|
||||||
Box::pin(async move {
|
|
||||||
diesel::delete(nodes)
|
|
||||||
.filter(node_id.eq(del_node_id.0 as i64))
|
|
||||||
.execute(conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
|
/// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
|
||||||
/// batched increment of the generations of all tenants whose generation_pageserver is equal to
|
/// batched increment of the generations of all tenants whose generation_pageserver is equal to
|
||||||
/// the node that called /re-attach.
|
/// the node that called /re-attach.
|
||||||
@@ -571,6 +633,20 @@ impl Persistence {
|
|||||||
let updated = self
|
let updated = self
|
||||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
// Check if the node is not marked as deleted
|
||||||
|
let deleted_node: i64 = nodes
|
||||||
|
.filter(node_id.eq(input_node_id.0 as i64))
|
||||||
|
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
|
||||||
|
.count()
|
||||||
|
.get_result(conn)
|
||||||
|
.await?;
|
||||||
|
if deleted_node > 0 {
|
||||||
|
return Err(DatabaseError::Logical(format!(
|
||||||
|
"Node {} is marked as deleted, re-attach is not allowed",
|
||||||
|
input_node_id
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
let rows_updated = diesel::update(tenant_shards)
|
let rows_updated = diesel::update(tenant_shards)
|
||||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||||
.set(generation.eq(generation + 1))
|
.set(generation.eq(generation + 1))
|
||||||
@@ -2048,6 +2124,7 @@ pub(crate) struct NodePersistence {
|
|||||||
pub(crate) listen_pg_port: i32,
|
pub(crate) listen_pg_port: i32,
|
||||||
pub(crate) availability_zone_id: String,
|
pub(crate) availability_zone_id: String,
|
||||||
pub(crate) listen_https_port: Option<i32>,
|
pub(crate) listen_https_port: Option<i32>,
|
||||||
|
pub(crate) lifecycle: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tenant metadata health status that are stored durably.
|
/// Tenant metadata health status that are stored durably.
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ diesel::table! {
|
|||||||
listen_pg_port -> Int4,
|
listen_pg_port -> Int4,
|
||||||
availability_zone_id -> Varchar,
|
availability_zone_id -> Varchar,
|
||||||
listen_https_port -> Nullable<Int4>,
|
listen_https_port -> Nullable<Int4>,
|
||||||
|
lifecycle -> Varchar,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -166,6 +166,7 @@ enum NodeOperations {
|
|||||||
Register,
|
Register,
|
||||||
Configure,
|
Configure,
|
||||||
Delete,
|
Delete,
|
||||||
|
DeleteTombstone,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The leadership status for the storage controller process.
|
/// The leadership status for the storage controller process.
|
||||||
@@ -6909,7 +6910,7 @@ impl Service {
|
|||||||
/// detaching or deleting it on pageservers. We do not try and re-schedule any
|
/// detaching or deleting it on pageservers. We do not try and re-schedule any
|
||||||
/// tenants that were on this node.
|
/// tenants that were on this node.
|
||||||
pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
|
pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||||
self.persistence.delete_node(node_id).await?;
|
self.persistence.set_tombstone(node_id).await?;
|
||||||
|
|
||||||
let mut locked = self.inner.write().unwrap();
|
let mut locked = self.inner.write().unwrap();
|
||||||
|
|
||||||
@@ -7033,9 +7034,10 @@ impl Service {
|
|||||||
// That is safe because in Service::spawn we only use generation_pageserver if it refers to a node
|
// That is safe because in Service::spawn we only use generation_pageserver if it refers to a node
|
||||||
// that exists.
|
// that exists.
|
||||||
|
|
||||||
// 2. Actually delete the node from the database and from in-memory state
|
// 2. Actually delete the node from in-memory state and set tombstone to the database
|
||||||
|
// for preventing the node to register again.
|
||||||
tracing::info!("Deleting node from database");
|
tracing::info!("Deleting node from database");
|
||||||
self.persistence.delete_node(node_id).await?;
|
self.persistence.set_tombstone(node_id).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -7054,6 +7056,35 @@ impl Service {
|
|||||||
Ok(nodes)
|
Ok(nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn tombstone_list(&self) -> Result<Vec<Node>, ApiError> {
|
||||||
|
self.persistence
|
||||||
|
.list_tombstones()
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(|np| Node::from_persistent(np, false))
|
||||||
|
.collect::<Result<Vec<_>, _>>()
|
||||||
|
.map_err(ApiError::InternalServerError)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn tombstone_delete(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||||
|
let _node_lock = trace_exclusive_lock(
|
||||||
|
&self.node_op_locks,
|
||||||
|
node_id,
|
||||||
|
NodeOperations::DeleteTombstone,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if matches!(self.get_node(node_id).await, Err(ApiError::NotFound(_))) {
|
||||||
|
self.persistence.delete_node(node_id).await?;
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(ApiError::Conflict(format!(
|
||||||
|
"Node {} is in use, consider using tombstone API first",
|
||||||
|
node_id
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
||||||
self.inner
|
self.inner
|
||||||
.read()
|
.read()
|
||||||
@@ -7224,7 +7255,25 @@ impl Service {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match registration_status {
|
match registration_status {
|
||||||
RegistrationStatus::New => self.persistence.insert_node(&new_node).await?,
|
RegistrationStatus::New => {
|
||||||
|
self.persistence.insert_node(&new_node).await.map_err(|e| {
|
||||||
|
if matches!(
|
||||||
|
e,
|
||||||
|
crate::persistence::DatabaseError::Query(
|
||||||
|
diesel::result::Error::DatabaseError(
|
||||||
|
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||||
|
_,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
// The node can be deleted by tombstone API, and not show up in the list of nodes.
|
||||||
|
// If you see this error, check tombstones first.
|
||||||
|
ApiError::Conflict(format!("Node {} is already exists", new_node.get_id()))
|
||||||
|
} else {
|
||||||
|
ApiError::from(e)
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
}
|
||||||
RegistrationStatus::NeedUpdate => {
|
RegistrationStatus::NeedUpdate => {
|
||||||
self.persistence
|
self.persistence
|
||||||
.update_node_on_registration(
|
.update_node_on_registration(
|
||||||
|
|||||||
@@ -2054,6 +2054,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
|||||||
headers=self.headers(TokenScope.ADMIN),
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def tombstone_delete(self, node_id):
|
||||||
|
log.info(f"tombstone_delete({node_id})")
|
||||||
|
self.request(
|
||||||
|
"DELETE",
|
||||||
|
f"{self.api}/debug/v1/tombstone/{node_id}",
|
||||||
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
|
)
|
||||||
|
|
||||||
def node_drain(self, node_id):
|
def node_drain(self, node_id):
|
||||||
log.info(f"node_drain({node_id})")
|
log.info(f"node_drain({node_id})")
|
||||||
self.request(
|
self.request(
|
||||||
@@ -2110,6 +2118,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
|||||||
)
|
)
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
|
def tombstone_list(self):
|
||||||
|
response = self.request(
|
||||||
|
"GET",
|
||||||
|
f"{self.api}/debug/v1/tombstone",
|
||||||
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
|
)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def tenant_shard_dump(self):
|
def tenant_shard_dump(self):
|
||||||
"""
|
"""
|
||||||
Debug listing API: dumps the internal map of tenant shards
|
Debug listing API: dumps the internal map of tenant shards
|
||||||
|
|||||||
@@ -3093,6 +3093,58 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB
|
|||||||
wait_until(reconfigure_node_again)
|
wait_until(reconfigure_node_again)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
|
||||||
|
neon_env_builder.num_pageservers = 3
|
||||||
|
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
def assert_nodes_count(n: int):
|
||||||
|
nodes = env.storage_controller.node_list()
|
||||||
|
assert len(nodes) == n
|
||||||
|
|
||||||
|
# Nodes count must remain the same before deletion
|
||||||
|
assert_nodes_count(3)
|
||||||
|
|
||||||
|
ps = env.pageservers[0]
|
||||||
|
env.storage_controller.node_delete(ps.id)
|
||||||
|
|
||||||
|
# After deletion, the node count must be reduced
|
||||||
|
assert_nodes_count(2)
|
||||||
|
|
||||||
|
# Running pageserver CLI init in a separate thread
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||||
|
log.info("Restarting tombstoned pageserver...")
|
||||||
|
ps.stop()
|
||||||
|
ps_start_fut = executor.submit(lambda: ps.start(await_active=False))
|
||||||
|
|
||||||
|
# After deleted pageserver restart, the node count must remain the same
|
||||||
|
assert_nodes_count(2)
|
||||||
|
|
||||||
|
tombstones = env.storage_controller.tombstone_list()
|
||||||
|
assert len(tombstones) == 1 and tombstones[0]["id"] == ps.id
|
||||||
|
|
||||||
|
env.storage_controller.tombstone_delete(ps.id)
|
||||||
|
|
||||||
|
tombstones = env.storage_controller.tombstone_list()
|
||||||
|
assert len(tombstones) == 0
|
||||||
|
|
||||||
|
# Wait for the pageserver start operation to complete.
|
||||||
|
# If it fails with an exception, we try restarting the pageserver since the failure
|
||||||
|
# may be due to the storage controller refusing to register the node.
|
||||||
|
# However, if we get a TimeoutError that means the pageserver is completely hung,
|
||||||
|
# which is an unexpected failure mode that we'll let propagate up.
|
||||||
|
try:
|
||||||
|
ps_start_fut.result(timeout=20)
|
||||||
|
except TimeoutError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
log.info("Restarting deleted pageserver...")
|
||||||
|
ps.restart()
|
||||||
|
|
||||||
|
# Finally, the node can be registered again after tombstone is deleted
|
||||||
|
wait_until(lambda: assert_nodes_count(3))
|
||||||
|
|
||||||
|
|
||||||
def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder):
|
def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder):
|
||||||
"""
|
"""
|
||||||
The storage controller is meant to handle the case where a timeline CRUD operation races
|
The storage controller is meant to handle the case where a timeline CRUD operation races
|
||||||
|
|||||||
Reference in New Issue
Block a user