mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
storcon: Introduce deletion tombstones to support flaky node scenario (#12096)
## Problem Removed nodes can re-add themselves on restart if not properly tombstoned. We need a mechanism (e.g. soft-delete flag) to prevent this, especially in cases where the node is unreachable. More details there: #12036 ## Summary of changes - Introduced `NodeLifecycle` enum to represent node lifecycle states. - Added a string representation of `NodeLifecycle` to the `nodes` table. - Implemented node removal using a tombstone mechanism. - Introduced `/debug/v1/tombstone*` handlers to manage the tombstone state.
This commit is contained in:
committed by
Alex Chi Z
parent
72b09473c1
commit
765b76f4cd
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes DROP COLUMN lifecycle;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes ADD COLUMN lifecycle VARCHAR NOT NULL DEFAULT 'active';
|
||||
@@ -907,6 +907,42 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
|
||||
}
|
||||
|
||||
async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
let mut nodes = state.service.tombstone_list().await?;
|
||||
nodes.sort_by_key(|n| n.get_id());
|
||||
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
|
||||
|
||||
json_response(StatusCode::OK, api_nodes)
|
||||
}
|
||||
|
||||
async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state.service.tombstone_delete(node_id).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -2062,6 +2098,20 @@ pub fn make_router(
|
||||
.post("/debug/v1/node/:node_id/drop", |r| {
|
||||
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
|
||||
})
|
||||
.delete("/debug/v1/tombstone/:node_id", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_tombstone_delete,
|
||||
RequestName("debug_v1_tombstone_delete"),
|
||||
)
|
||||
})
|
||||
.get("/debug/v1/tombstone", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_tombstone_list,
|
||||
RequestName("debug_v1_tombstone_list"),
|
||||
)
|
||||
})
|
||||
.post("/debug/v1/tenant/:tenant_id/import", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
|
||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeLifecycle, NodeRegisterRequest,
|
||||
NodeSchedulingPolicy, TenantLocateResponseShard,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -29,6 +29,7 @@ pub(crate) struct Node {
|
||||
|
||||
availability: NodeAvailability,
|
||||
scheduling: NodeSchedulingPolicy,
|
||||
lifecycle: NodeLifecycle,
|
||||
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
@@ -228,6 +229,7 @@ impl Node {
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
lifecycle: NodeLifecycle::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
availability_zone_id,
|
||||
use_https,
|
||||
@@ -239,6 +241,7 @@ impl Node {
|
||||
NodePersistence {
|
||||
node_id: self.id.0 as i64,
|
||||
scheduling_policy: self.scheduling.into(),
|
||||
lifecycle: self.lifecycle.into(),
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
||||
@@ -263,6 +266,7 @@ impl Node {
|
||||
availability: NodeAvailability::Offline,
|
||||
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
|
||||
.expect("Bad scheduling policy in DB"),
|
||||
lifecycle: NodeLifecycle::from_str(&np.lifecycle).expect("Bad lifecycle in DB"),
|
||||
listen_http_addr: np.listen_http_addr,
|
||||
listen_http_port: np.listen_http_port as u16,
|
||||
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
||||
|
||||
@@ -19,7 +19,7 @@ use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
|
||||
AvailabilityZone, MetadataHealthRecord, NodeLifecycle, NodeSchedulingPolicy, PlacementPolicy,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::models::{ShardImportStatus, TenantConfig};
|
||||
@@ -102,6 +102,7 @@ pub(crate) enum DatabaseOperation {
|
||||
UpdateNode,
|
||||
DeleteNode,
|
||||
ListNodes,
|
||||
ListTombstones,
|
||||
BeginShardSplit,
|
||||
CompleteShardSplit,
|
||||
AbortShardSplit,
|
||||
@@ -357,6 +358,8 @@ impl Persistence {
|
||||
}
|
||||
|
||||
/// When a node is first registered, persist it before using it for anything
|
||||
/// If the provided node_id already exists, it will be error.
|
||||
/// The common case is when a node marked for deletion wants to register.
|
||||
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
|
||||
let np = &node.to_persistent();
|
||||
self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| {
|
||||
@@ -373,19 +376,41 @@ impl Persistence {
|
||||
|
||||
/// At startup, populate the list of nodes which our shards may be placed on
|
||||
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
|
||||
let nodes: Vec<NodePersistence> = self
|
||||
use crate::schema::nodes::dsl::*;
|
||||
|
||||
let result: Vec<NodePersistence> = self
|
||||
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(crate::schema::nodes::table
|
||||
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
|
||||
.load::<NodePersistence>(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
|
||||
tracing::info!("list_nodes: loaded {} nodes", result.len());
|
||||
|
||||
Ok(nodes)
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn list_tombstones(&self) -> DatabaseResult<Vec<NodePersistence>> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
|
||||
let result: Vec<NodePersistence> = self
|
||||
.with_measured_conn(DatabaseOperation::ListTombstones, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(crate::schema::nodes::table
|
||||
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
|
||||
.load::<NodePersistence>(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!("list_tombstones: loaded {} nodes", result.len());
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_node<V>(
|
||||
@@ -404,6 +429,7 @@ impl Persistence {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
|
||||
.set(values)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
@@ -447,6 +473,57 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Tombstone is a special state where the node is not deleted from the database,
|
||||
/// but it is not available for usage.
|
||||
/// The main reason for it is to prevent the flaky node to register.
|
||||
pub(crate) async fn set_tombstone(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.update_node(
|
||||
del_node_id,
|
||||
lifecycle.eq(String::from(NodeLifecycle::Deleted)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
|
||||
Box::pin(async move {
|
||||
// You can hard delete a node only if it has a tombstone.
|
||||
// So we need to check if the node has lifecycle set to deleted.
|
||||
let node_to_delete = nodes
|
||||
.filter(node_id.eq(del_node_id.0 as i64))
|
||||
.first::<NodePersistence>(conn)
|
||||
.await
|
||||
.optional()?;
|
||||
|
||||
if let Some(np) = node_to_delete {
|
||||
let lc = NodeLifecycle::from_str(&np.lifecycle).map_err(|e| {
|
||||
DatabaseError::Logical(format!(
|
||||
"Node {} has invalid lifecycle: {}",
|
||||
del_node_id, e
|
||||
))
|
||||
})?;
|
||||
|
||||
if lc != NodeLifecycle::Deleted {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Node {} was not soft deleted before, cannot hard delete it",
|
||||
del_node_id
|
||||
)));
|
||||
}
|
||||
|
||||
diesel::delete(nodes)
|
||||
.filter(node_id.eq(del_node_id.0 as i64))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
///
|
||||
@@ -543,21 +620,6 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
|
||||
Box::pin(async move {
|
||||
diesel::delete(nodes)
|
||||
.filter(node_id.eq(del_node_id.0 as i64))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
|
||||
/// batched increment of the generations of all tenants whose generation_pageserver is equal to
|
||||
/// the node that called /re-attach.
|
||||
@@ -571,6 +633,20 @@ impl Persistence {
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||
Box::pin(async move {
|
||||
// Check if the node is not marked as deleted
|
||||
let deleted_node: i64 = nodes
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
|
||||
.count()
|
||||
.get_result(conn)
|
||||
.await?;
|
||||
if deleted_node > 0 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Node {} is marked as deleted, re-attach is not allowed",
|
||||
input_node_id
|
||||
)));
|
||||
}
|
||||
|
||||
let rows_updated = diesel::update(tenant_shards)
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.set(generation.eq(generation + 1))
|
||||
@@ -2048,6 +2124,7 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_pg_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) listen_https_port: Option<i32>,
|
||||
pub(crate) lifecycle: String,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -33,6 +33,7 @@ diesel::table! {
|
||||
listen_pg_port -> Int4,
|
||||
availability_zone_id -> Varchar,
|
||||
listen_https_port -> Nullable<Int4>,
|
||||
lifecycle -> Varchar,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -166,6 +166,7 @@ enum NodeOperations {
|
||||
Register,
|
||||
Configure,
|
||||
Delete,
|
||||
DeleteTombstone,
|
||||
}
|
||||
|
||||
/// The leadership status for the storage controller process.
|
||||
@@ -6909,7 +6910,7 @@ impl Service {
|
||||
/// detaching or deleting it on pageservers. We do not try and re-schedule any
|
||||
/// tenants that were on this node.
|
||||
pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
self.persistence.delete_node(node_id).await?;
|
||||
self.persistence.set_tombstone(node_id).await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
@@ -7033,9 +7034,10 @@ impl Service {
|
||||
// That is safe because in Service::spawn we only use generation_pageserver if it refers to a node
|
||||
// that exists.
|
||||
|
||||
// 2. Actually delete the node from the database and from in-memory state
|
||||
// 2. Actually delete the node from in-memory state and set tombstone to the database
|
||||
// for preventing the node to register again.
|
||||
tracing::info!("Deleting node from database");
|
||||
self.persistence.delete_node(node_id).await?;
|
||||
self.persistence.set_tombstone(node_id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -7054,6 +7056,35 @@ impl Service {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn tombstone_list(&self) -> Result<Vec<Node>, ApiError> {
|
||||
self.persistence
|
||||
.list_tombstones()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|np| Node::from_persistent(np, false))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
pub(crate) async fn tombstone_delete(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let _node_lock = trace_exclusive_lock(
|
||||
&self.node_op_locks,
|
||||
node_id,
|
||||
NodeOperations::DeleteTombstone,
|
||||
)
|
||||
.await;
|
||||
|
||||
if matches!(self.get_node(node_id).await, Err(ApiError::NotFound(_))) {
|
||||
self.persistence.delete_node(node_id).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict(format!(
|
||||
"Node {} is in use, consider using tombstone API first",
|
||||
node_id
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
||||
self.inner
|
||||
.read()
|
||||
@@ -7224,7 +7255,25 @@ impl Service {
|
||||
};
|
||||
|
||||
match registration_status {
|
||||
RegistrationStatus::New => self.persistence.insert_node(&new_node).await?,
|
||||
RegistrationStatus::New => {
|
||||
self.persistence.insert_node(&new_node).await.map_err(|e| {
|
||||
if matches!(
|
||||
e,
|
||||
crate::persistence::DatabaseError::Query(
|
||||
diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
)
|
||||
)
|
||||
) {
|
||||
// The node can be deleted by tombstone API, and not show up in the list of nodes.
|
||||
// If you see this error, check tombstones first.
|
||||
ApiError::Conflict(format!("Node {} is already exists", new_node.get_id()))
|
||||
} else {
|
||||
ApiError::from(e)
|
||||
}
|
||||
})?;
|
||||
}
|
||||
RegistrationStatus::NeedUpdate => {
|
||||
self.persistence
|
||||
.update_node_on_registration(
|
||||
|
||||
Reference in New Issue
Block a user