From 87e6117dfda0ea49f5b30fed190ad7ad82949bc2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 7 Mar 2025 17:02:38 +0000 Subject: [PATCH] storage controller: API-driven graceful migrations (#10913) ## Problem The current migration API does a live migration, but if the destination doesn't already have a secondary, that live migration is unlikely to be able to warm up a tenant properly within its timeout (full warmup of a big tenant can take tens of minutes). Background optimisation code knows how to do this gracefully by creating a secondary first, but we don't currently give a human a way to trigger that. Closes: https://github.com/neondatabase/neon/issues/10540 ## Summary of changes - Add `prefererred_node` parameter to TenantShard, which is respected by optimize_attachment - Modify migration API to have optional prewarm=true mode, in which we set preferred_node and call optimize_attachment, rather than directly modifying intentstate - Require override_scheduler=true flag if migrating somewhere that is a less-than-optimal scheduling location (e.g. wrong AZ) - Add `origin_node_id` to migration API so that callers can ensure they're moving from where they think they're moving from - Add tests for the above The storcon_cli wrapper for this has a 'watch' mode that waits for eventual cutover. This doesn't show the warmth of the secondary evolve because we don't currently have an API for that in the controller, as the passthrough API only targets attached locations, not secondaries. It would be straightforward to add later as a dedicated endpoint for getting secondary status, then extend the storcon_cli to consume that and print a nice progress indicator. --- control_plane/src/storage_controller.rs | 44 +-- control_plane/storcon_cli/src/main.rs | 126 ++++++++- libs/pageserver_api/src/controller_api.rs | 89 +++++- storage_controller/src/service.rs | 255 ++++++++++++++---- storage_controller/src/tenant_shard.rs | 151 ++++++++++- test_runner/fixtures/neon_fixtures.py | 23 +- .../test_storage_controller_scale.py | 6 +- .../regress/test_pageserver_secondary.py | 2 +- .../regress/test_storage_controller.py | 131 ++++++++- 9 files changed, 707 insertions(+), 120 deletions(-) diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 16e12f4e02..3604e4a241 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -12,13 +12,10 @@ use hyper0::Uri; use nix::unistd::Pid; use pageserver_api::controller_api::{ NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, - TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest, - TenantShardMigrateResponse, + TenantCreateResponse, TenantLocateResponse, }; -use pageserver_api::models::{ - TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, -}; -use pageserver_api::shard::{ShardStripeSize, TenantShardId}; +use pageserver_api::models::{TimelineCreateRequest, TimelineInfo}; +use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; @@ -825,41 +822,6 @@ impl StorageController { .await } - #[instrument(skip(self))] - pub async fn tenant_migrate( - &self, - tenant_shard_id: TenantShardId, - node_id: NodeId, - ) -> anyhow::Result { - self.dispatch( - Method::PUT, - format!("control/v1/tenant/{tenant_shard_id}/migrate"), - Some(TenantShardMigrateRequest { - node_id, - migration_config: None, - }), - ) - .await - } - - #[instrument(skip(self), fields(%tenant_id, %new_shard_count))] - pub async fn tenant_split( - &self, - tenant_id: TenantId, - new_shard_count: u8, - new_stripe_size: Option, - ) -> anyhow::Result { - self.dispatch( - Method::PUT, - format!("control/v1/tenant/{tenant_id}/shard_split"), - Some(TenantShardSplitRequest { - new_shard_count, - new_stripe_size, - }), - ) - .await - } - #[instrument(skip_all, fields(node_id=%req.node_id))] pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req)) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 2e2c22c791..c3f157a9cc 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -5,12 +5,12 @@ use std::time::Duration; use clap::{Parser, Subcommand}; use futures::StreamExt; use pageserver_api::controller_api::{ - AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse, - NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy, - SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, - ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, - TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest, - TenantShardMigrateResponse, + AvailabilityZone, MigrationConfig, NodeAvailabilityWrapper, NodeConfigureRequest, + NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, + PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, + ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, + SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, + TenantShardMigrateRequest, TenantShardMigrateResponse, }; use pageserver_api::models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters, @@ -112,6 +112,15 @@ enum Command { tenant_shard_id: TenantShardId, #[arg(long)] node: NodeId, + #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] + prewarm: bool, + #[arg(long, default_value_t = false, action = clap::ArgAction::Set)] + override_scheduler: bool, + }, + /// Watch the location of a tenant shard evolve, e.g. while expecting it to migrate + TenantShardWatch { + #[arg(long)] + tenant_shard_id: TenantShardId, }, /// Migrate the secondary location for a tenant shard to a specific pageserver. TenantShardMigrateSecondary { @@ -619,19 +628,43 @@ async fn main() -> anyhow::Result<()> { Command::TenantShardMigrate { tenant_shard_id, node, + prewarm, + override_scheduler, } => { - let req = TenantShardMigrateRequest { - node_id: node, - migration_config: None, + let migration_config = MigrationConfig { + prewarm, + override_scheduler, + ..Default::default() }; - storcon_client + let req = TenantShardMigrateRequest { + node_id: node, + origin_node_id: None, + migration_config, + }; + + match storcon_client .dispatch::( Method::PUT, format!("control/v1/tenant/{tenant_shard_id}/migrate"), Some(req), ) - .await?; + .await + { + Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) => { + anyhow::bail!( + "Migration to {node} rejected, may require `--force` ({}) ", + msg + ); + } + Err(e) => return Err(e.into()), + Ok(_) => {} + } + + watch_tenant_shard(storcon_client, tenant_shard_id, Some(node)).await?; + } + Command::TenantShardWatch { tenant_shard_id } => { + watch_tenant_shard(storcon_client, tenant_shard_id, None).await?; } Command::TenantShardMigrateSecondary { tenant_shard_id, @@ -639,7 +672,8 @@ async fn main() -> anyhow::Result<()> { } => { let req = TenantShardMigrateRequest { node_id: node, - migration_config: None, + origin_node_id: None, + migration_config: MigrationConfig::default(), }; storcon_client @@ -1105,7 +1139,8 @@ async fn main() -> anyhow::Result<()> { format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id), Some(TenantShardMigrateRequest { node_id: mv.to, - migration_config: None, + origin_node_id: Some(mv.from), + migration_config: MigrationConfig::default(), }), ) .await @@ -1284,3 +1319,68 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +static WATCH_INTERVAL: Duration = Duration::from_secs(5); + +async fn watch_tenant_shard( + storcon_client: Client, + tenant_shard_id: TenantShardId, + until_migrated_to: Option, +) -> anyhow::Result<()> { + if let Some(until_migrated_to) = until_migrated_to { + println!( + "Waiting for tenant shard {} to be migrated to node {}", + tenant_shard_id, until_migrated_to + ); + } + + loop { + let desc = storcon_client + .dispatch::<(), TenantDescribeResponse>( + Method::GET, + format!("control/v1/tenant/{}", tenant_shard_id.tenant_id), + None, + ) + .await?; + + // Output the current state of the tenant shard + let shard = desc + .shards + .iter() + .find(|s| s.tenant_shard_id == tenant_shard_id) + .ok_or(anyhow::anyhow!("Tenant shard not found"))?; + let summary = format!( + "attached: {} secondary: {} {}", + shard + .node_attached + .map(|n| format!("{}", n)) + .unwrap_or("none".to_string()), + shard + .node_secondary + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(","), + if shard.is_reconciling { + "(reconciler active)" + } else { + "(reconciler idle)" + } + ); + println!("{}", summary); + + // Maybe drop out if we finished migration + if let Some(until_migrated_to) = until_migrated_to { + if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling { + println!( + "Tenant shard {} is now on node {}", + tenant_shard_id, until_migrated_to + ); + break; + } + } + + tokio::time::sleep(WATCH_INTERVAL).await; + } + Ok(()) +} diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 2cfe1a85f9..154ab849dd 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -182,20 +182,66 @@ pub struct TenantDescribeResponseShard { #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateRequest { pub node_id: NodeId, + + /// Optionally, callers may specify the node they are migrating _from_, and the server will + /// reject the request if the shard is no longer attached there: this enables writing safer + /// clients that don't risk fighting with some other movement of the shard. #[serde(default)] - pub migration_config: Option, + pub origin_node_id: Option, + + #[serde(default)] + pub migration_config: MigrationConfig, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct MigrationConfig { + /// If true, the migration will be executed even if it is to a location with a sub-optimal scheduling + /// score: this is usually not what you want, and if you use this then you'll also need to set the + /// tenant's scheduling policy to Essential or Pause to avoid the optimiser reverting your migration. + /// + /// Default: false + #[serde(default)] + pub override_scheduler: bool, + + /// If true, the migration will be done gracefully by creating a secondary location first and + /// waiting for it to warm up before cutting over. If false, if there is no existing secondary + /// location at the destination, the tenant will be migrated immediately. If the tenant's data + /// can't be downloaded within [`Self::secondary_warmup_timeout`], then the migration will go + /// ahead but run with a cold cache that can severely reduce performance until it warms up. + /// + /// When doing a graceful migration, the migration API returns as soon as it is started. + /// + /// Default: true + #[serde(default = "default_prewarm")] + pub prewarm: bool, + + /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait + /// overall for secondary warmup before cutting over #[serde(default)] #[serde(with = "humantime_serde")] pub secondary_warmup_timeout: Option, + /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait + /// within each secondary download poll call to pageserver. #[serde(default)] #[serde(with = "humantime_serde")] pub secondary_download_request_timeout: Option, } +fn default_prewarm() -> bool { + true +} + +impl Default for MigrationConfig { + fn default() -> Self { + Self { + override_scheduler: false, + prewarm: default_prewarm(), + secondary_warmup_timeout: None, + secondary_download_request_timeout: None, + } + } +} + #[derive(Serialize, Clone, Debug)] #[serde(into = "NodeAvailabilityWrapper")] pub enum NodeAvailability { @@ -487,4 +533,43 @@ mod test { err ); } + + /// Check that a minimal migrate request with no config results in the expected default settings + #[test] + fn test_migrate_request_decode_defaults() { + let json = r#"{ + "node_id": 123 + }"#; + + let request: TenantShardMigrateRequest = serde_json::from_str(json).unwrap(); + assert_eq!(request.node_id, NodeId(123)); + assert_eq!(request.origin_node_id, None); + assert!(!request.migration_config.override_scheduler); + assert!(request.migration_config.prewarm); + assert_eq!(request.migration_config.secondary_warmup_timeout, None); + assert_eq!( + request.migration_config.secondary_download_request_timeout, + None + ); + } + + /// Check that a partially specified migration config results in the expected default settings + #[test] + fn test_migration_config_decode_defaults() { + // Specify just one field of the config + let json = r#"{ + }"#; + + let config: MigrationConfig = serde_json::from_str(json).unwrap(); + + // Check each field's expected default value + assert!(!config.override_scheduler); + assert!(config.prewarm); + assert_eq!(config.secondary_warmup_timeout, None); + assert_eq!(config.secondary_download_request_timeout, None); + assert_eq!(config.secondary_warmup_timeout, None); + + // Consistency check that the Default impl agrees with our serde defaults + assert_eq!(MigrationConfig::default(), config); + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index caa2040ce2..6795abf6e9 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -85,7 +85,9 @@ use crate::reconciler::{ attached_location_conf, }; use crate::safekeeper::Safekeeper; -use crate::scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode, Scheduler}; +use crate::scheduler::{ + AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode, Scheduler, +}; use crate::tenant_shard::{ IntentState, MigrateAttachment, ObservedState, ObservedStateDelta, ObservedStateLocation, ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, @@ -5299,12 +5301,93 @@ impl Service { Ok((response, waiters)) } + /// A graceful migration: update the preferred node and let optimisation handle the migration + /// in the background (may take a long time as it will fully warm up a location before cutting over) + /// + /// Our external API calls this a 'prewarm=true' migration, but internally it isn't a special prewarm step: it's + /// just a migration that uses the same graceful procedure as our background scheduling optimisations would use. + fn tenant_shard_migrate_with_prewarm( + &self, + migrate_req: &TenantShardMigrateRequest, + shard: &mut TenantShard, + scheduler: &mut Scheduler, + schedule_context: ScheduleContext, + ) -> Result, ApiError> { + shard.set_preferred_node(Some(migrate_req.node_id)); + + // Generate whatever the initial change to the intent is: this could be creation of a secondary, or + // cutting over to an existing secondary. Caller is responsible for validating this before applying it, + // e.g. by checking secondary is warm enough. + Ok(shard.optimize_attachment(scheduler, &schedule_context)) + } + + /// Immediate migration: directly update the intent state and kick off a reconciler + fn tenant_shard_migrate_immediate( + &self, + migrate_req: &TenantShardMigrateRequest, + nodes: &Arc>, + shard: &mut TenantShard, + scheduler: &mut Scheduler, + ) -> Result, ApiError> { + // Non-graceful migration: update the intent state immediately + let old_attached = *shard.intent.get_attached(); + match shard.policy { + PlacementPolicy::Attached(n) => { + // If our new attached node was a secondary, it no longer should be. + shard + .intent + .remove_secondary(scheduler, migrate_req.node_id); + + shard + .intent + .set_attached(scheduler, Some(migrate_req.node_id)); + + // If we were already attached to something, demote that to a secondary + if let Some(old_attached) = old_attached { + if n > 0 { + // Remove other secondaries to make room for the location we'll demote + while shard.intent.get_secondary().len() >= n { + shard.intent.pop_secondary(scheduler); + } + + shard.intent.push_secondary(scheduler, old_attached); + } + } + } + PlacementPolicy::Secondary => { + shard.intent.clear(scheduler); + shard.intent.push_secondary(scheduler, migrate_req.node_id); + } + PlacementPolicy::Detached => { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first" + ))); + } + } + + tracing::info!("Migrating: new intent {:?}", shard.intent); + shard.sequence = shard.sequence.next(); + shard.set_preferred_node(None); // Abort any in-flight graceful migration + Ok(self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + )) + } + pub(crate) async fn tenant_shard_migrate( &self, tenant_shard_id: TenantShardId, migrate_req: TenantShardMigrateRequest, ) -> Result { - let waiter = { + // Depending on whether the migration is a change and whether it's graceful or immediate, we might + // get a different outcome to handle + enum MigrationOutcome { + Optimization(Option), + Reconcile(Option), + } + + let outcome = { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); @@ -5315,71 +5398,139 @@ impl Service { ))); }; + // Migration to unavavailable node requires force flag if !node.is_available() { - // Warn but proceed: the caller may intend to manually adjust the placement of - // a shard even if the node is down, e.g. if intervening during an incident. - tracing::warn!("Migrating to unavailable node {node}"); + if migrate_req.migration_config.override_scheduler { + // Warn but proceed: the caller may intend to manually adjust the placement of + // a shard even if the node is down, e.g. if intervening during an incident. + tracing::warn!("Forcibly migrating to unavailable node {node}"); + } else { + tracing::warn!("Node {node} is unavailable, refusing migration"); + return Err(ApiError::PreconditionFailed( + format!("Node {node} is unavailable").into_boxed_str(), + )); + } } + // Calculate the ScheduleContext for this tenant + let mut schedule_context = ScheduleContext::default(); + for (_shard_id, shard) in + tenants.range(TenantShardId::tenant_range(tenant_shard_id.tenant_id)) + { + schedule_context.avoid(&shard.intent.all_pageservers()); + } + + // Look up the specific shard we will migrate let Some(shard) = tenants.get_mut(&tenant_shard_id) else { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant shard not found").into(), )); }; + // Migration to a node with unfavorable scheduling score requires a force flag, because it might just + // be migrated back by the optimiser. + if let Some(better_node) = shard.find_better_location::( + scheduler, + &schedule_context, + migrate_req.node_id, + &[], + ) { + if !migrate_req.migration_config.override_scheduler { + return Err(ApiError::PreconditionFailed( + "Migration to a worse-scoring node".into(), + )); + } else { + tracing::info!( + "Migrating to a worse-scoring node {} (optimiser would prefer {better_node})", + migrate_req.node_id + ); + } + } + + if let Some(origin_node_id) = migrate_req.origin_node_id { + if shard.intent.get_attached() != &Some(origin_node_id) { + return Err(ApiError::PreconditionFailed( + format!( + "Migration expected to originate from {} but shard is on {:?}", + origin_node_id, + shard.intent.get_attached() + ) + .into(), + )); + } + } + if shard.intent.get_attached() == &Some(migrate_req.node_id) { // No-op case: we will still proceed to wait for reconciliation in case it is // incomplete from an earlier update to the intent. tracing::info!("Migrating: intent is unchanged {:?}", shard.intent); + + // An instruction to migrate to the currently attached node should + // cancel any pending graceful migration + shard.set_preferred_node(None); + + MigrationOutcome::Reconcile(self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + )) + } else if migrate_req.migration_config.prewarm { + MigrationOutcome::Optimization(self.tenant_shard_migrate_with_prewarm( + &migrate_req, + shard, + scheduler, + schedule_context, + )?) } else { - let old_attached = *shard.intent.get_attached(); - - match shard.policy { - PlacementPolicy::Attached(n) => { - // If our new attached node was a secondary, it no longer should be. - shard - .intent - .remove_secondary(scheduler, migrate_req.node_id); - - shard - .intent - .set_attached(scheduler, Some(migrate_req.node_id)); - - // If we were already attached to something, demote that to a secondary - if let Some(old_attached) = old_attached { - if n > 0 { - // Remove other secondaries to make room for the location we'll demote - while shard.intent.get_secondary().len() >= n { - shard.intent.pop_secondary(scheduler); - } - - shard.intent.push_secondary(scheduler, old_attached); - } - } - } - PlacementPolicy::Secondary => { - shard.intent.clear(scheduler); - shard.intent.push_secondary(scheduler, migrate_req.node_id); - } - PlacementPolicy::Detached => { - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first" - ))); - } - } - - tracing::info!("Migrating: new intent {:?}", shard.intent); - shard.sequence = shard.sequence.next(); + MigrationOutcome::Reconcile(self.tenant_shard_migrate_immediate( + &migrate_req, + nodes, + shard, + scheduler, + )?) } - - let reconciler_config = match migrate_req.migration_config { - Some(cfg) => (&cfg).into(), - None => ReconcilerConfig::new(ReconcilerPriority::High), - }; - - self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config) }; + // We may need to validate + apply an optimisation, or we may need to just retrive a reconcile waiter + let waiter = match outcome { + MigrationOutcome::Optimization(Some(optimization)) => { + // Validate and apply the optimization -- this would happen anyway in background reconcile loop, but + // we might as well do it more promptly as this is a direct external request. + let mut validated = self + .optimize_all_validate(vec![(tenant_shard_id, optimization)]) + .await; + if let Some((_shard_id, optimization)) = validated.pop() { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { + // Rare but possible: tenant is removed between generating optimisation and validating it. + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant shard not found").into(), + )); + }; + + if !shard.apply_optimization(scheduler, optimization) { + // This can happen but is unusual enough to warn on: something else changed in the shard that made the optimisation stale + // and therefore not applied. + tracing::warn!( + "Schedule optimisation generated during graceful migration was not applied, shard changed?" + ); + } + self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + ) + } else { + None + } + } + MigrationOutcome::Optimization(None) => None, + MigrationOutcome::Reconcile(waiter) => waiter, + }; + + // Finally, wait for any reconcile we started to complete. In the case of immediate-mode migrations to cold + // locations, this has a good chance of timing out. if let Some(waiter) = waiter { waiter.wait_timeout(RECONCILE_TIMEOUT).await?; } else { @@ -6959,6 +7110,10 @@ impl Service { ShardSchedulingPolicy::Active => { // Ok to do optimization } + ShardSchedulingPolicy::Essential if shard.get_preferred_node().is_some() => { + // Ok to do optimization: we are executing a graceful migration that + // has set preferred_node + } ShardSchedulingPolicy::Essential | ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => { diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 27e478043e..96ff70a951 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -132,6 +132,10 @@ pub(crate) struct TenantShard { /// of state that we publish externally in an eventually consistent way. pub(crate) pending_compute_notification: bool, + /// To do a graceful migration, set this field to the destination pageserver, and optimization + /// functions will consider this node the best location and react appropriately. + preferred_node: Option, + // Support/debug tool: if something is going wrong or flapping with scheduling, this may // be set to a non-active state to avoid making changes while the issue is fixed. scheduling_policy: ShardSchedulingPolicy, @@ -555,6 +559,7 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), + preferred_node: None, } } @@ -809,6 +814,15 @@ impl TenantShard { return None; }; + // If the candidate is our preferred node, then it is better than the current location, as long + // as it is online -- the online check is part of the score calculation we did above, so it's + // important that this check comes after that one. + if let Some(preferred) = self.preferred_node.as_ref() { + if preferred == &candidate { + return Some(true); + } + } + match scheduler.compute_node_score::( current, &self.intent.preferred_az_id, @@ -847,13 +861,22 @@ impl TenantShard { } } - fn find_better_location( + pub(crate) fn find_better_location( &self, scheduler: &mut Scheduler, schedule_context: &ScheduleContext, current: NodeId, hard_exclude: &[NodeId], ) -> Option { + // If we have a migration hint, then that is our better location + if let Some(hint) = self.preferred_node.as_ref() { + if hint == ¤t { + return None; + } + + return Some(*hint); + } + // Look for a lower-scoring location to attach to let Ok(candidate_node) = scheduler.schedule_shard::( hard_exclude, @@ -887,6 +910,13 @@ impl TenantShard { scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> bool { + // Tenant with preferred node: check if it is not already at the preferred node + if let Some(preferred) = self.preferred_node.as_ref() { + if Some(preferred) != self.intent.get_attached().as_ref() { + return true; + } + } + // Sharded tenant: check if any locations have a nonzero affinity score if self.shard.count >= ShardCount(1) { let schedule_context = schedule_context.project_detach(self); @@ -927,6 +957,9 @@ impl TenantShard { /// Optimize attachments: if a shard has a secondary location that is preferable to /// its primary location based on soft constraints, switch that secondary location /// to be attached. + /// + /// `schedule_context` should have been populated with all shards in the tenant, including + /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions) #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_attachment( &self, @@ -1055,7 +1088,8 @@ impl TenantShard { // // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes // there are too overloaded for scheduler to suggest them, more should be provisioned eventually). - if self.intent.preferred_az_id.is_some() + if self.preferred_node.is_none() + && self.intent.preferred_az_id.is_some() && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id { tracing::debug!( @@ -1161,6 +1195,27 @@ impl TenantShard { None } + /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the + /// other optimisation functions, to bias them to move to the destination node. + pub(crate) fn set_preferred_node(&mut self, node: Option) { + if let Some(hint) = self.preferred_node.as_ref() { + if Some(hint) != node.as_ref() { + // This is legal but a bit surprising: we expect that administrators wouldn't usually + // change their mind about where to migrate something. + tracing::warn!( + "Changing migration destination from {hint} to {node:?} (current intent {:?})", + self.intent + ); + } + } + + self.preferred_node = node; + } + + pub(crate) fn get_preferred_node(&self) -> Option { + self.preferred_node + } + /// Return true if the optimization was really applied: it will not be applied if the optimization's /// sequence is behind this tenant shard's pub(crate) fn apply_optimization( @@ -1185,6 +1240,14 @@ impl TenantShard { self.intent.demote_attached(scheduler, old_attached_node_id); self.intent .promote_attached(scheduler, new_attached_node_id); + + if let Some(hint) = self.preferred_node.as_ref() { + if hint == &new_attached_node_id { + // The migration target is not a long term pin: once we are done with the migration, clear it. + tracing::info!("Graceful migration to {hint} complete"); + self.preferred_node = None; + } + } } ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary { old_node_id, @@ -1703,6 +1766,10 @@ impl TenantShard { debug_assert!(!self.intent.all_pageservers().contains(&node_id)); + if self.preferred_node == Some(node_id) { + self.preferred_node = None; + } + intent_modified } @@ -1750,6 +1817,7 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), + preferred_node: None, }) } @@ -2270,6 +2338,85 @@ pub(crate) mod tests { Ok(()) } + #[test] + /// How the optimisation code handles a shard with a preferred node set; this is an example + /// of the multi-step migration, but driven by a different input. + fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 4, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-b".to_string()), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Two shards of a tenant that wants to be in AZ A + let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + + // Initially attached in a stable location + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(3)); + + // Set the preferred node to node 2, an equally high scoring node to its current location + shard_a.preferred_node = Some(NodeId(2)); + + fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context + } + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(2)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + // The first step of the optimisation should not have cleared the preferred node + assert_eq!(shard_a.preferred_node, Some(NodeId(2))); + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(1), + new_attached_node_id: NodeId(2) + }) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // The cutover step of the optimisation should have cleared the preferred node + assert_eq!(shard_a.preferred_node, None); + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + shard_a.intent.clear(&mut scheduler); + + Ok(()) + } + #[test] /// Check that multi-step migration works when moving to somewhere that is only better by /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 61e1ec79ad..4d2b3587e8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1715,8 +1715,12 @@ class StorageControllerLeadershipStatus(StrEnum): @dataclass class StorageControllerMigrationConfig: - secondary_warmup_timeout: str | None - secondary_download_request_timeout: str | None + # Unlike the API itself, tests default to prewarm=False because it's a simpler API and doesn't + # require the test to go poll for the migration actually completing. + prewarm: bool = False + override_scheduler: bool = False + secondary_warmup_timeout: str | None = None + secondary_download_request_timeout: str | None = None class NeonStorageController(MetricsGetter, LogUtils): @@ -2120,8 +2124,10 @@ class NeonStorageController(MetricsGetter, LogUtils): config: StorageControllerMigrationConfig | None = None, ): payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id} - if config is not None: - payload["migration_config"] = dataclasses.asdict(config) + if config is None: + config = StorageControllerMigrationConfig() + + payload["migration_config"] = dataclasses.asdict(config) self.request( "PUT", @@ -2129,8 +2135,13 @@ class NeonStorageController(MetricsGetter, LogUtils): json=payload, headers=self.headers(TokenScope.ADMIN), ) - log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") - assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id + if config.prewarm: + log.info( + f"Started prewarm migration of tenant {tenant_shard_id} to pageserver {dest_ps_id}" + ) + else: + log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") + assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id def tenant_policy_update(self, tenant_id: TenantId, body: dict[str, Any]): log.info(f"tenant_policy_update({tenant_id}, {body})") diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index d45db28c78..777b9e2870 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -16,6 +16,7 @@ from fixtures.neon_fixtures import ( NeonPageserver, PageserverAvailability, PageserverSchedulingPolicy, + StorageControllerMigrationConfig, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pg_version import PgVersion @@ -362,7 +363,10 @@ def test_storage_controller_many_tenants( dest_ps_id = desc["shards"][shard_number]["node_secondary"][0] f = executor.submit( - env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id + env.storage_controller.tenant_shard_migrate, + tenant_shard_id, + dest_ps_id, + StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) elif op == Operation.TENANT_PASSTHROUGH: # A passthrough read to shard zero diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index b9e2934505..130db009c9 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -976,7 +976,7 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): # We can't hydrate everything anyway because of the failpoints. # Implicitly, this also uploads a heatmap from the current attached location. config = StorageControllerMigrationConfig( - secondary_warmup_timeout="5s", secondary_download_request_timeout="2s" + secondary_warmup_timeout="5s", secondary_download_request_timeout="2s", prewarm=False ) env.storage_controller.tenant_shard_migrate( TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index d5acc257b2..b5572ce6a1 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -26,6 +26,7 @@ from fixtures.neon_fixtures import ( PgBin, StorageControllerApiException, StorageControllerLeadershipStatus, + StorageControllerMigrationConfig, last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient @@ -765,7 +766,10 @@ def test_storage_controller_stuck_compute_hook( # status is cleared. handle_params["status"] = 423 migrate_fut = executor.submit( - env.storage_controller.tenant_shard_migrate, shard_0_id, dest_ps_id + env.storage_controller.tenant_shard_migrate, + shard_0_id, + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) def logged_stuck(): @@ -793,7 +797,10 @@ def test_storage_controller_stuck_compute_hook( # Now, do a migration in the opposite direction handle_params["status"] = 423 migrate_fut = executor.submit( - env.storage_controller.tenant_shard_migrate, shard_0_id, origin_pageserver.id + env.storage_controller.tenant_shard_migrate, + shard_0_id, + origin_pageserver.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) def logged_stuck_again(): @@ -1027,7 +1034,11 @@ def test_storage_controller_compute_hook_revert( with pytest.raises(StorageControllerApiException, match="Timeout waiting for shard"): # We expect the controller to give us an error because its reconciliation timed out # waiting for the compute hook. - env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_b.id) + env.storage_controller.tenant_shard_migrate( + tenant_shard_id, + pageserver_b.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) # Although the migration API failed, the hook should still see pageserver B (it remembers what # was posted even when returning an error code) @@ -1068,7 +1079,11 @@ def test_storage_controller_compute_hook_revert( # Migrate B -> A, with a working compute hook: the controller should notify the hook because the # last update it made that was acked (423) by the compute was for node B. handle_params["status"] = 200 - env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_a.id) + env.storage_controller.tenant_shard_migrate( + tenant_shard_id, + pageserver_a.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) wait_until(lambda: notified_ps(pageserver_a.id)) @@ -1949,6 +1964,9 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder): env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"] ) ), + # A simple migration where we will ignore scheduling (force=true) and do it immediately (prewarm=false) + "--prewarm=false", + "--override-scheduler=true", ] ) @@ -3865,3 +3883,108 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB ) assert reconciles_after_restart == 0 + + +@pytest.mark.parametrize("wrong_az", [True, False]) +def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool): + """ + Test that the graceful migration API goes through the process of + creating a secondary & waiting for it to warm up before cutting over, when + we use the prewarm=True flag to the API. + """ + + # 2 pageservers in 2 AZs, so that each AZ has a pageserver we can migrate to + neon_env_builder.num_pageservers = 4 + neon_env_builder.num_azs = 2 + + env = neon_env_builder.init_start() + + # Enable secondary location (neon_local disables by default) + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0] + initial_ps_id = initial_desc["node_attached"] + initial_secondary_id = initial_desc["node_secondary"][0] + initial_ps_az = initial_desc["preferred_az_id"] + initial_ps = [ps for ps in env.pageservers if ps.id == initial_ps_id][0] + + if wrong_az: + dest_ps = [ + ps + for ps in env.pageservers + if ps.id != initial_ps_id + and ps.az_id != initial_ps_az + and ps.id != initial_secondary_id + ][0] + else: + dest_ps = [ + ps + for ps in env.pageservers + if ps.id != initial_ps_id + and ps.az_id == initial_ps_az + and ps.id != initial_secondary_id + ][0] + + log.info( + f"Migrating to {dest_ps.id} in AZ {dest_ps.az_id} (from {initial_ps_id} in AZ {initial_ps_az})" + ) + dest_ps_id = dest_ps.id + + # Set a failpoint so that the migration will block at the point it has a secondary location + for ps in env.pageservers: + ps.http_client().configure_failpoints(("secondary-layer-download-pausable", "pause")) + + # Before migration, our destination has no locations. Guaranteed because any secondary for our + # tenant will be in another AZ. + assert dest_ps.http_client().tenant_list_locations()["tenant_shards"] == [] + + if wrong_az: + # If migrating to the wrong AZ, first check that omitting force flag results in rejection + with pytest.raises(StorageControllerApiException, match="worse-scoring node"): + env.storage_controller.tenant_shard_migrate( + TenantShardId(env.initial_tenant, 0, 0), + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=True, override_scheduler=False), + ) + + # Turn off ordinary optimisations so that our migration will stay put once complete + env.storage_controller.tenant_policy_update(env.initial_tenant, {"scheduling": "Essential"}) + + # We expect this API call to succeed, and result in a new secondary location on the destination + env.storage_controller.tenant_shard_migrate( + TenantShardId(env.initial_tenant, 0, 0), + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=True, override_scheduler=wrong_az), + ) + + def secondary_at_dest(): + locs = dest_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(locs) == 1 + assert locs[0][0] == str(env.initial_tenant) + assert locs[0][1]["mode"] == "Secondary" + + wait_until(secondary_at_dest) + + # Unblock secondary downloads + for ps in env.pageservers: + ps.http_client().configure_failpoints(("secondary-layer-download-pausable", "off")) + + # Pump the reconciler to avoid waiting for background reconciles + env.storage_controller.reconcile_until_idle() + + # We should be attached at the destination + locs = dest_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(locs) == 1 + assert locs[0][1]["mode"] == "AttachedSingle" + + # Nothing left behind at the origin + if wrong_az: + # We're in essential scheduling mode, so the end state should be attached in the migration + # destination and a secondary in the original location + assert ( + initial_ps.http_client().tenant_list_locations()["tenant_shards"][0][1]["mode"] + == "Secondary" + ) + else: + assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []