From 58327cbba8f39610f08413e4f20702b58d072d2a Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Tue, 29 Jul 2025 15:58:31 +0400 Subject: [PATCH] storcon: wait for the migration from the drained node in the draining loop (#12754) ## Problem We have seen some errors in staging when the shard migration was triggered by optimizations, and it was ongoing during draining the node it was migrating from. It happens because the node draining loop only waits for the migrations started by the drain loop itself. The ongoing migrations are ignored. Closes: https://databricks.atlassian.net/browse/LKB-1625 ## Summary of changes - Wait for the shard reconciliation during the drain if it is being migrated from the drained node. --- storage_controller/src/operation_utils.rs | 46 ++++++++++++++++++++--- storage_controller/src/service.rs | 39 ++++++++++--------- storage_controller/src/tenant_shard.rs | 2 - 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/storage_controller/src/operation_utils.rs b/storage_controller/src/operation_utils.rs index af86010ab7..1060c92832 100644 --- a/storage_controller/src/operation_utils.rs +++ b/storage_controller/src/operation_utils.rs @@ -46,11 +46,31 @@ impl TenantShardDrain { &self, tenants: &BTreeMap, scheduler: &Scheduler, - ) -> Option { - let tenant_shard = tenants.get(&self.tenant_shard_id)?; + ) -> TenantShardDrainAction { + let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else { + return TenantShardDrainAction::Skip; + }; if *tenant_shard.intent.get_attached() != Some(self.drained_node) { - return None; + // If the intent attached node is not the drained node, check the observed state + // of the shard on the drained node. If it is Attached*, it means the shard is + // beeing migrated from the drained node. The drain loop needs to wait for the + // reconciliation to complete for a smooth draining. + + use pageserver_api::models::LocationConfigMode::*; + + let attach_mode = tenant_shard + .observed + .locations + .get(&self.drained_node) + .and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode)); + + return match (attach_mode, tenant_shard.intent.get_attached()) { + (Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => { + TenantShardDrainAction::Reconcile(*intent_node_id) + } + _ => TenantShardDrainAction::Skip, + }; } // Only tenants with a normal (Active) scheduling policy are proactively moved @@ -63,19 +83,19 @@ impl TenantShardDrain { } ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => { // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain - return None; + return TenantShardDrainAction::Skip; } } match tenant_shard.preferred_secondary(scheduler) { - Some(node) => Some(node), + Some(node) => TenantShardDrainAction::RescheduleToSecondary(node), None => { tracing::warn!( tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), "No eligible secondary while draining {}", self.drained_node ); - None + TenantShardDrainAction::Skip } } } @@ -138,3 +158,17 @@ impl TenantShardDrain { } } } + +/// Action to take when draining a tenant shard. +pub(crate) enum TenantShardDrainAction { + /// The tenant shard is on the draining node. + /// Reschedule the tenant shard to a secondary location. + /// Holds a destination node id to reschedule to. + RescheduleToSecondary(NodeId), + /// The tenant shard is beeing migrated from the draining node. + /// Wait for the reconciliation to complete. + /// Holds the intent attached node id. + Reconcile(NodeId), + /// The tenant shard is not eligible for drainining, skip it. + Skip, +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 4c011033af..37380b8fbe 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -79,7 +79,7 @@ use crate::id_lock_map::{ use crate::leadership::Leadership; use crate::metrics; use crate::node::{AvailabilityTransition, Node}; -use crate::operation_utils::{self, TenantShardDrain}; +use crate::operation_utils::{self, TenantShardDrain, TenantShardDrainAction}; use crate::pageserver_client::PageserverClient; use crate::peer_client::GlobalObservedState; use crate::persistence::split_state::SplitState; @@ -1274,7 +1274,7 @@ impl Service { // Always attempt autosplits. Sharding is crucial for bulk ingest performance, so we // must be responsive when new projects begin ingesting and reach the threshold. self.autosplit_tenants().await; - } + }, _ = self.reconcilers_cancel.cancelled() => return } } @@ -8876,6 +8876,9 @@ impl Service { for (_tenant_id, schedule_context, shards) in TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative) { + if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { + break; + } for shard in shards { if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { break; @@ -9640,16 +9643,16 @@ impl Service { tenant_shard_id: tid, }; - let dest_node_id = { + let drain_action = { let locked = self.inner.read().unwrap(); + tid_drain.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler) + }; - match tid_drain - .tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler) - { - Some(node_id) => node_id, - None => { - continue; - } + let dest_node_id = match drain_action { + TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => dest_node_id, + TenantShardDrainAction::Reconcile(intent_node_id) => intent_node_id, + TenantShardDrainAction::Skip => { + continue; } }; @@ -9684,14 +9687,16 @@ impl Service { { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); - let rescheduled = tid_drain.reschedule_to_secondary( - dest_node_id, - tenants, - scheduler, - nodes, - )?; - if let Some(tenant_shard) = rescheduled { + let tenant_shard = match drain_action { + TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => tid_drain + .reschedule_to_secondary(dest_node_id, tenants, scheduler, nodes)?, + TenantShardDrainAction::Reconcile(_) => tenants.get_mut(&tid), + // Note: Unreachable, handled above. + TenantShardDrainAction::Skip => None, + }; + + if let Some(tenant_shard) = tenant_shard { let waiter = self.maybe_configured_reconcile_shard( tenant_shard, nodes, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3eb54d714d..bf16c642af 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -812,8 +812,6 @@ impl TenantShard { /// if the swap is not possible and leaves the intent state in its original state. /// /// Arguments: - /// `attached_to`: the currently attached location matching the intent state (may be None if the - /// shard is not attached) /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask /// the scheduler to recommend a node pub(crate) fn reschedule_to_secondary(