mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
storcon: reset transient node policies on re-attach
This commit is contained in:
@@ -442,13 +442,15 @@ impl Persistence {
|
||||
#[tracing::instrument(skip_all, fields(node_id))]
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
input_node_id: NodeId,
|
||||
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
|
||||
use crate::schema::nodes::dsl::scheduling_policy;
|
||||
use crate::schema::nodes::dsl::*;
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||
let rows_updated = diesel::update(tenant_shards)
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.set(generation.eq(generation + 1))
|
||||
.execute(conn)?;
|
||||
|
||||
@@ -457,9 +459,23 @@ impl Persistence {
|
||||
// TODO: UPDATE+SELECT in one query
|
||||
|
||||
let updated = tenant_shards
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
.load(conn)?;
|
||||
|
||||
// If the node went through a drain and restart phase before re-attaching,
|
||||
// then reset it's node scheduling policy to active.
|
||||
diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(
|
||||
scheduling_policy
|
||||
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
|
||||
)
|
||||
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -1600,15 +1600,32 @@ impl Service {
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
// the re-attach response.
|
||||
//
|
||||
// Additionally, reset the nodes scheduling policy to match the conditional update done
|
||||
// in [`Persistence::re_attach`].
|
||||
if let Some(node) = nodes.get(&reattach_req.node_id) {
|
||||
if !node.is_available() {
|
||||
let reset_scheduling = matches!(
|
||||
node.get_scheduling(),
|
||||
NodeSchedulingPolicy::PauseForRestart
|
||||
| NodeSchedulingPolicy::Draining
|
||||
| NodeSchedulingPolicy::Filling
|
||||
);
|
||||
|
||||
if !node.is_available() || reset_scheduling {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
if !node.is_available() {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
}
|
||||
|
||||
if reset_scheduling {
|
||||
node.set_scheduling(NodeSchedulingPolicy::Active);
|
||||
}
|
||||
|
||||
scheduler.node_upsert(node);
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user