storcon: wait for the migration from the drained node in the draining loop (#12754)

## Problem
We have seen some errors in staging when the shard migration was
triggered by optimizations, and it was ongoing during draining the node
it was migrating from. It happens because the node draining loop only
waits for the migrations started by the drain loop itself. The ongoing
migrations are ignored.

Closes: https://databricks.atlassian.net/browse/LKB-1625

## Summary of changes
- Wait for the shard reconciliation during the drain if it is being
migrated from the drained node.
This commit is contained in:
Dmitrii Kovalkov
2025-07-29 15:58:31 +04:00
committed by GitHub
parent 568927a8a0
commit 58327cbba8
3 changed files with 62 additions and 25 deletions

View File

@@ -46,11 +46,31 @@ impl TenantShardDrain {
&self,
tenants: &BTreeMap<TenantShardId, TenantShard>,
scheduler: &Scheduler,
) -> Option<NodeId> {
let tenant_shard = tenants.get(&self.tenant_shard_id)?;
) -> TenantShardDrainAction {
let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else {
return TenantShardDrainAction::Skip;
};
if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
return None;
// If the intent attached node is not the drained node, check the observed state
// of the shard on the drained node. If it is Attached*, it means the shard is
// beeing migrated from the drained node. The drain loop needs to wait for the
// reconciliation to complete for a smooth draining.
use pageserver_api::models::LocationConfigMode::*;
let attach_mode = tenant_shard
.observed
.locations
.get(&self.drained_node)
.and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode));
return match (attach_mode, tenant_shard.intent.get_attached()) {
(Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => {
TenantShardDrainAction::Reconcile(*intent_node_id)
}
_ => TenantShardDrainAction::Skip,
};
}
// Only tenants with a normal (Active) scheduling policy are proactively moved
@@ -63,19 +83,19 @@ impl TenantShardDrain {
}
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
// If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
return None;
return TenantShardDrainAction::Skip;
}
}
match tenant_shard.preferred_secondary(scheduler) {
Some(node) => Some(node),
Some(node) => TenantShardDrainAction::RescheduleToSecondary(node),
None => {
tracing::warn!(
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
"No eligible secondary while draining {}", self.drained_node
);
None
TenantShardDrainAction::Skip
}
}
}
@@ -138,3 +158,17 @@ impl TenantShardDrain {
}
}
}
/// Action to take when draining a tenant shard.
pub(crate) enum TenantShardDrainAction {
/// The tenant shard is on the draining node.
/// Reschedule the tenant shard to a secondary location.
/// Holds a destination node id to reschedule to.
RescheduleToSecondary(NodeId),
/// The tenant shard is beeing migrated from the draining node.
/// Wait for the reconciliation to complete.
/// Holds the intent attached node id.
Reconcile(NodeId),
/// The tenant shard is not eligible for drainining, skip it.
Skip,
}

View File

@@ -79,7 +79,7 @@ use crate::id_lock_map::{
use crate::leadership::Leadership;
use crate::metrics;
use crate::node::{AvailabilityTransition, Node};
use crate::operation_utils::{self, TenantShardDrain};
use crate::operation_utils::{self, TenantShardDrain, TenantShardDrainAction};
use crate::pageserver_client::PageserverClient;
use crate::peer_client::GlobalObservedState;
use crate::persistence::split_state::SplitState;
@@ -1274,7 +1274,7 @@ impl Service {
// Always attempt autosplits. Sharding is crucial for bulk ingest performance, so we
// must be responsive when new projects begin ingesting and reach the threshold.
self.autosplit_tenants().await;
}
},
_ = self.reconcilers_cancel.cancelled() => return
}
}
@@ -8876,6 +8876,9 @@ impl Service {
for (_tenant_id, schedule_context, shards) in
TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative)
{
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
for shard in shards {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
@@ -9640,16 +9643,16 @@ impl Service {
tenant_shard_id: tid,
};
let dest_node_id = {
let drain_action = {
let locked = self.inner.read().unwrap();
tid_drain.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler)
};
match tid_drain
.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler)
{
Some(node_id) => node_id,
None => {
continue;
}
let dest_node_id = match drain_action {
TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => dest_node_id,
TenantShardDrainAction::Reconcile(intent_node_id) => intent_node_id,
TenantShardDrainAction::Skip => {
continue;
}
};
@@ -9684,14 +9687,16 @@ impl Service {
{
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let rescheduled = tid_drain.reschedule_to_secondary(
dest_node_id,
tenants,
scheduler,
nodes,
)?;
if let Some(tenant_shard) = rescheduled {
let tenant_shard = match drain_action {
TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => tid_drain
.reschedule_to_secondary(dest_node_id, tenants, scheduler, nodes)?,
TenantShardDrainAction::Reconcile(_) => tenants.get_mut(&tid),
// Note: Unreachable, handled above.
TenantShardDrainAction::Skip => None,
};
if let Some(tenant_shard) = tenant_shard {
let waiter = self.maybe_configured_reconcile_shard(
tenant_shard,
nodes,

View File

@@ -812,8 +812,6 @@ impl TenantShard {
/// if the swap is not possible and leaves the intent state in its original state.
///
/// Arguments:
/// `attached_to`: the currently attached location matching the intent state (may be None if the
/// shard is not attached)
/// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
/// the scheduler to recommend a node
pub(crate) fn reschedule_to_secondary(