diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 16e12f4e02..3604e4a241 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -12,13 +12,10 @@ use hyper0::Uri; use nix::unistd::Pid; use pageserver_api::controller_api::{ NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, - TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest, - TenantShardMigrateResponse, + TenantCreateResponse, TenantLocateResponse, }; -use pageserver_api::models::{ - TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, -}; -use pageserver_api::shard::{ShardStripeSize, TenantShardId}; +use pageserver_api::models::{TimelineCreateRequest, TimelineInfo}; +use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; @@ -825,41 +822,6 @@ impl StorageController { .await } - #[instrument(skip(self))] - pub async fn tenant_migrate( - &self, - tenant_shard_id: TenantShardId, - node_id: NodeId, - ) -> anyhow::Result { - self.dispatch( - Method::PUT, - format!("control/v1/tenant/{tenant_shard_id}/migrate"), - Some(TenantShardMigrateRequest { - node_id, - migration_config: None, - }), - ) - .await - } - - #[instrument(skip(self), fields(%tenant_id, %new_shard_count))] - pub async fn tenant_split( - &self, - tenant_id: TenantId, - new_shard_count: u8, - new_stripe_size: Option, - ) -> anyhow::Result { - self.dispatch( - Method::PUT, - format!("control/v1/tenant/{tenant_id}/shard_split"), - Some(TenantShardSplitRequest { - new_shard_count, - new_stripe_size, - }), - ) - .await - } - #[instrument(skip_all, fields(node_id=%req.node_id))] pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req)) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 2e2c22c791..c3f157a9cc 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -5,12 +5,12 @@ use std::time::Duration; use clap::{Parser, Subcommand}; use futures::StreamExt; use pageserver_api::controller_api::{ - AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse, - NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy, - SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, - ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, - TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest, - TenantShardMigrateResponse, + AvailabilityZone, MigrationConfig, NodeAvailabilityWrapper, NodeConfigureRequest, + NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, + PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, + ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, + SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, + TenantShardMigrateRequest, TenantShardMigrateResponse, }; use pageserver_api::models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters, @@ -112,6 +112,15 @@ enum Command { tenant_shard_id: TenantShardId, #[arg(long)] node: NodeId, + #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] + prewarm: bool, + #[arg(long, default_value_t = false, action = clap::ArgAction::Set)] + override_scheduler: bool, + }, + /// Watch the location of a tenant shard evolve, e.g. while expecting it to migrate + TenantShardWatch { + #[arg(long)] + tenant_shard_id: TenantShardId, }, /// Migrate the secondary location for a tenant shard to a specific pageserver. TenantShardMigrateSecondary { @@ -619,19 +628,43 @@ async fn main() -> anyhow::Result<()> { Command::TenantShardMigrate { tenant_shard_id, node, + prewarm, + override_scheduler, } => { - let req = TenantShardMigrateRequest { - node_id: node, - migration_config: None, + let migration_config = MigrationConfig { + prewarm, + override_scheduler, + ..Default::default() }; - storcon_client + let req = TenantShardMigrateRequest { + node_id: node, + origin_node_id: None, + migration_config, + }; + + match storcon_client .dispatch::( Method::PUT, format!("control/v1/tenant/{tenant_shard_id}/migrate"), Some(req), ) - .await?; + .await + { + Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) => { + anyhow::bail!( + "Migration to {node} rejected, may require `--force` ({}) ", + msg + ); + } + Err(e) => return Err(e.into()), + Ok(_) => {} + } + + watch_tenant_shard(storcon_client, tenant_shard_id, Some(node)).await?; + } + Command::TenantShardWatch { tenant_shard_id } => { + watch_tenant_shard(storcon_client, tenant_shard_id, None).await?; } Command::TenantShardMigrateSecondary { tenant_shard_id, @@ -639,7 +672,8 @@ async fn main() -> anyhow::Result<()> { } => { let req = TenantShardMigrateRequest { node_id: node, - migration_config: None, + origin_node_id: None, + migration_config: MigrationConfig::default(), }; storcon_client @@ -1105,7 +1139,8 @@ async fn main() -> anyhow::Result<()> { format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id), Some(TenantShardMigrateRequest { node_id: mv.to, - migration_config: None, + origin_node_id: Some(mv.from), + migration_config: MigrationConfig::default(), }), ) .await @@ -1284,3 +1319,68 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +static WATCH_INTERVAL: Duration = Duration::from_secs(5); + +async fn watch_tenant_shard( + storcon_client: Client, + tenant_shard_id: TenantShardId, + until_migrated_to: Option, +) -> anyhow::Result<()> { + if let Some(until_migrated_to) = until_migrated_to { + println!( + "Waiting for tenant shard {} to be migrated to node {}", + tenant_shard_id, until_migrated_to + ); + } + + loop { + let desc = storcon_client + .dispatch::<(), TenantDescribeResponse>( + Method::GET, + format!("control/v1/tenant/{}", tenant_shard_id.tenant_id), + None, + ) + .await?; + + // Output the current state of the tenant shard + let shard = desc + .shards + .iter() + .find(|s| s.tenant_shard_id == tenant_shard_id) + .ok_or(anyhow::anyhow!("Tenant shard not found"))?; + let summary = format!( + "attached: {} secondary: {} {}", + shard + .node_attached + .map(|n| format!("{}", n)) + .unwrap_or("none".to_string()), + shard + .node_secondary + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(","), + if shard.is_reconciling { + "(reconciler active)" + } else { + "(reconciler idle)" + } + ); + println!("{}", summary); + + // Maybe drop out if we finished migration + if let Some(until_migrated_to) = until_migrated_to { + if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling { + println!( + "Tenant shard {} is now on node {}", + tenant_shard_id, until_migrated_to + ); + break; + } + } + + tokio::time::sleep(WATCH_INTERVAL).await; + } + Ok(()) +} diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 2cfe1a85f9..154ab849dd 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -182,20 +182,66 @@ pub struct TenantDescribeResponseShard { #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateRequest { pub node_id: NodeId, + + /// Optionally, callers may specify the node they are migrating _from_, and the server will + /// reject the request if the shard is no longer attached there: this enables writing safer + /// clients that don't risk fighting with some other movement of the shard. #[serde(default)] - pub migration_config: Option, + pub origin_node_id: Option, + + #[serde(default)] + pub migration_config: MigrationConfig, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct MigrationConfig { + /// If true, the migration will be executed even if it is to a location with a sub-optimal scheduling + /// score: this is usually not what you want, and if you use this then you'll also need to set the + /// tenant's scheduling policy to Essential or Pause to avoid the optimiser reverting your migration. + /// + /// Default: false + #[serde(default)] + pub override_scheduler: bool, + + /// If true, the migration will be done gracefully by creating a secondary location first and + /// waiting for it to warm up before cutting over. If false, if there is no existing secondary + /// location at the destination, the tenant will be migrated immediately. If the tenant's data + /// can't be downloaded within [`Self::secondary_warmup_timeout`], then the migration will go + /// ahead but run with a cold cache that can severely reduce performance until it warms up. + /// + /// When doing a graceful migration, the migration API returns as soon as it is started. + /// + /// Default: true + #[serde(default = "default_prewarm")] + pub prewarm: bool, + + /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait + /// overall for secondary warmup before cutting over #[serde(default)] #[serde(with = "humantime_serde")] pub secondary_warmup_timeout: Option, + /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait + /// within each secondary download poll call to pageserver. #[serde(default)] #[serde(with = "humantime_serde")] pub secondary_download_request_timeout: Option, } +fn default_prewarm() -> bool { + true +} + +impl Default for MigrationConfig { + fn default() -> Self { + Self { + override_scheduler: false, + prewarm: default_prewarm(), + secondary_warmup_timeout: None, + secondary_download_request_timeout: None, + } + } +} + #[derive(Serialize, Clone, Debug)] #[serde(into = "NodeAvailabilityWrapper")] pub enum NodeAvailability { @@ -487,4 +533,43 @@ mod test { err ); } + + /// Check that a minimal migrate request with no config results in the expected default settings + #[test] + fn test_migrate_request_decode_defaults() { + let json = r#"{ + "node_id": 123 + }"#; + + let request: TenantShardMigrateRequest = serde_json::from_str(json).unwrap(); + assert_eq!(request.node_id, NodeId(123)); + assert_eq!(request.origin_node_id, None); + assert!(!request.migration_config.override_scheduler); + assert!(request.migration_config.prewarm); + assert_eq!(request.migration_config.secondary_warmup_timeout, None); + assert_eq!( + request.migration_config.secondary_download_request_timeout, + None + ); + } + + /// Check that a partially specified migration config results in the expected default settings + #[test] + fn test_migration_config_decode_defaults() { + // Specify just one field of the config + let json = r#"{ + }"#; + + let config: MigrationConfig = serde_json::from_str(json).unwrap(); + + // Check each field's expected default value + assert!(!config.override_scheduler); + assert!(config.prewarm); + assert_eq!(config.secondary_warmup_timeout, None); + assert_eq!(config.secondary_download_request_timeout, None); + assert_eq!(config.secondary_warmup_timeout, None); + + // Consistency check that the Default impl agrees with our serde defaults + assert_eq!(MigrationConfig::default(), config); + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index caa2040ce2..6795abf6e9 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -85,7 +85,9 @@ use crate::reconciler::{ attached_location_conf, }; use crate::safekeeper::Safekeeper; -use crate::scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode, Scheduler}; +use crate::scheduler::{ + AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode, Scheduler, +}; use crate::tenant_shard::{ IntentState, MigrateAttachment, ObservedState, ObservedStateDelta, ObservedStateLocation, ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, @@ -5299,12 +5301,93 @@ impl Service { Ok((response, waiters)) } + /// A graceful migration: update the preferred node and let optimisation handle the migration + /// in the background (may take a long time as it will fully warm up a location before cutting over) + /// + /// Our external API calls this a 'prewarm=true' migration, but internally it isn't a special prewarm step: it's + /// just a migration that uses the same graceful procedure as our background scheduling optimisations would use. + fn tenant_shard_migrate_with_prewarm( + &self, + migrate_req: &TenantShardMigrateRequest, + shard: &mut TenantShard, + scheduler: &mut Scheduler, + schedule_context: ScheduleContext, + ) -> Result, ApiError> { + shard.set_preferred_node(Some(migrate_req.node_id)); + + // Generate whatever the initial change to the intent is: this could be creation of a secondary, or + // cutting over to an existing secondary. Caller is responsible for validating this before applying it, + // e.g. by checking secondary is warm enough. + Ok(shard.optimize_attachment(scheduler, &schedule_context)) + } + + /// Immediate migration: directly update the intent state and kick off a reconciler + fn tenant_shard_migrate_immediate( + &self, + migrate_req: &TenantShardMigrateRequest, + nodes: &Arc>, + shard: &mut TenantShard, + scheduler: &mut Scheduler, + ) -> Result, ApiError> { + // Non-graceful migration: update the intent state immediately + let old_attached = *shard.intent.get_attached(); + match shard.policy { + PlacementPolicy::Attached(n) => { + // If our new attached node was a secondary, it no longer should be. + shard + .intent + .remove_secondary(scheduler, migrate_req.node_id); + + shard + .intent + .set_attached(scheduler, Some(migrate_req.node_id)); + + // If we were already attached to something, demote that to a secondary + if let Some(old_attached) = old_attached { + if n > 0 { + // Remove other secondaries to make room for the location we'll demote + while shard.intent.get_secondary().len() >= n { + shard.intent.pop_secondary(scheduler); + } + + shard.intent.push_secondary(scheduler, old_attached); + } + } + } + PlacementPolicy::Secondary => { + shard.intent.clear(scheduler); + shard.intent.push_secondary(scheduler, migrate_req.node_id); + } + PlacementPolicy::Detached => { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first" + ))); + } + } + + tracing::info!("Migrating: new intent {:?}", shard.intent); + shard.sequence = shard.sequence.next(); + shard.set_preferred_node(None); // Abort any in-flight graceful migration + Ok(self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + )) + } + pub(crate) async fn tenant_shard_migrate( &self, tenant_shard_id: TenantShardId, migrate_req: TenantShardMigrateRequest, ) -> Result { - let waiter = { + // Depending on whether the migration is a change and whether it's graceful or immediate, we might + // get a different outcome to handle + enum MigrationOutcome { + Optimization(Option), + Reconcile(Option), + } + + let outcome = { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); @@ -5315,71 +5398,139 @@ impl Service { ))); }; + // Migration to unavavailable node requires force flag if !node.is_available() { - // Warn but proceed: the caller may intend to manually adjust the placement of - // a shard even if the node is down, e.g. if intervening during an incident. - tracing::warn!("Migrating to unavailable node {node}"); + if migrate_req.migration_config.override_scheduler { + // Warn but proceed: the caller may intend to manually adjust the placement of + // a shard even if the node is down, e.g. if intervening during an incident. + tracing::warn!("Forcibly migrating to unavailable node {node}"); + } else { + tracing::warn!("Node {node} is unavailable, refusing migration"); + return Err(ApiError::PreconditionFailed( + format!("Node {node} is unavailable").into_boxed_str(), + )); + } } + // Calculate the ScheduleContext for this tenant + let mut schedule_context = ScheduleContext::default(); + for (_shard_id, shard) in + tenants.range(TenantShardId::tenant_range(tenant_shard_id.tenant_id)) + { + schedule_context.avoid(&shard.intent.all_pageservers()); + } + + // Look up the specific shard we will migrate let Some(shard) = tenants.get_mut(&tenant_shard_id) else { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant shard not found").into(), )); }; + // Migration to a node with unfavorable scheduling score requires a force flag, because it might just + // be migrated back by the optimiser. + if let Some(better_node) = shard.find_better_location::( + scheduler, + &schedule_context, + migrate_req.node_id, + &[], + ) { + if !migrate_req.migration_config.override_scheduler { + return Err(ApiError::PreconditionFailed( + "Migration to a worse-scoring node".into(), + )); + } else { + tracing::info!( + "Migrating to a worse-scoring node {} (optimiser would prefer {better_node})", + migrate_req.node_id + ); + } + } + + if let Some(origin_node_id) = migrate_req.origin_node_id { + if shard.intent.get_attached() != &Some(origin_node_id) { + return Err(ApiError::PreconditionFailed( + format!( + "Migration expected to originate from {} but shard is on {:?}", + origin_node_id, + shard.intent.get_attached() + ) + .into(), + )); + } + } + if shard.intent.get_attached() == &Some(migrate_req.node_id) { // No-op case: we will still proceed to wait for reconciliation in case it is // incomplete from an earlier update to the intent. tracing::info!("Migrating: intent is unchanged {:?}", shard.intent); + + // An instruction to migrate to the currently attached node should + // cancel any pending graceful migration + shard.set_preferred_node(None); + + MigrationOutcome::Reconcile(self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + )) + } else if migrate_req.migration_config.prewarm { + MigrationOutcome::Optimization(self.tenant_shard_migrate_with_prewarm( + &migrate_req, + shard, + scheduler, + schedule_context, + )?) } else { - let old_attached = *shard.intent.get_attached(); - - match shard.policy { - PlacementPolicy::Attached(n) => { - // If our new attached node was a secondary, it no longer should be. - shard - .intent - .remove_secondary(scheduler, migrate_req.node_id); - - shard - .intent - .set_attached(scheduler, Some(migrate_req.node_id)); - - // If we were already attached to something, demote that to a secondary - if let Some(old_attached) = old_attached { - if n > 0 { - // Remove other secondaries to make room for the location we'll demote - while shard.intent.get_secondary().len() >= n { - shard.intent.pop_secondary(scheduler); - } - - shard.intent.push_secondary(scheduler, old_attached); - } - } - } - PlacementPolicy::Secondary => { - shard.intent.clear(scheduler); - shard.intent.push_secondary(scheduler, migrate_req.node_id); - } - PlacementPolicy::Detached => { - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first" - ))); - } - } - - tracing::info!("Migrating: new intent {:?}", shard.intent); - shard.sequence = shard.sequence.next(); + MigrationOutcome::Reconcile(self.tenant_shard_migrate_immediate( + &migrate_req, + nodes, + shard, + scheduler, + )?) } - - let reconciler_config = match migrate_req.migration_config { - Some(cfg) => (&cfg).into(), - None => ReconcilerConfig::new(ReconcilerPriority::High), - }; - - self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config) }; + // We may need to validate + apply an optimisation, or we may need to just retrive a reconcile waiter + let waiter = match outcome { + MigrationOutcome::Optimization(Some(optimization)) => { + // Validate and apply the optimization -- this would happen anyway in background reconcile loop, but + // we might as well do it more promptly as this is a direct external request. + let mut validated = self + .optimize_all_validate(vec![(tenant_shard_id, optimization)]) + .await; + if let Some((_shard_id, optimization)) = validated.pop() { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { + // Rare but possible: tenant is removed between generating optimisation and validating it. + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant shard not found").into(), + )); + }; + + if !shard.apply_optimization(scheduler, optimization) { + // This can happen but is unusual enough to warn on: something else changed in the shard that made the optimisation stale + // and therefore not applied. + tracing::warn!( + "Schedule optimisation generated during graceful migration was not applied, shard changed?" + ); + } + self.maybe_configured_reconcile_shard( + shard, + nodes, + (&migrate_req.migration_config).into(), + ) + } else { + None + } + } + MigrationOutcome::Optimization(None) => None, + MigrationOutcome::Reconcile(waiter) => waiter, + }; + + // Finally, wait for any reconcile we started to complete. In the case of immediate-mode migrations to cold + // locations, this has a good chance of timing out. if let Some(waiter) = waiter { waiter.wait_timeout(RECONCILE_TIMEOUT).await?; } else { @@ -6959,6 +7110,10 @@ impl Service { ShardSchedulingPolicy::Active => { // Ok to do optimization } + ShardSchedulingPolicy::Essential if shard.get_preferred_node().is_some() => { + // Ok to do optimization: we are executing a graceful migration that + // has set preferred_node + } ShardSchedulingPolicy::Essential | ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => { diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 27e478043e..96ff70a951 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -132,6 +132,10 @@ pub(crate) struct TenantShard { /// of state that we publish externally in an eventually consistent way. pub(crate) pending_compute_notification: bool, + /// To do a graceful migration, set this field to the destination pageserver, and optimization + /// functions will consider this node the best location and react appropriately. + preferred_node: Option, + // Support/debug tool: if something is going wrong or flapping with scheduling, this may // be set to a non-active state to avoid making changes while the issue is fixed. scheduling_policy: ShardSchedulingPolicy, @@ -555,6 +559,7 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), + preferred_node: None, } } @@ -809,6 +814,15 @@ impl TenantShard { return None; }; + // If the candidate is our preferred node, then it is better than the current location, as long + // as it is online -- the online check is part of the score calculation we did above, so it's + // important that this check comes after that one. + if let Some(preferred) = self.preferred_node.as_ref() { + if preferred == &candidate { + return Some(true); + } + } + match scheduler.compute_node_score::( current, &self.intent.preferred_az_id, @@ -847,13 +861,22 @@ impl TenantShard { } } - fn find_better_location( + pub(crate) fn find_better_location( &self, scheduler: &mut Scheduler, schedule_context: &ScheduleContext, current: NodeId, hard_exclude: &[NodeId], ) -> Option { + // If we have a migration hint, then that is our better location + if let Some(hint) = self.preferred_node.as_ref() { + if hint == ¤t { + return None; + } + + return Some(*hint); + } + // Look for a lower-scoring location to attach to let Ok(candidate_node) = scheduler.schedule_shard::( hard_exclude, @@ -887,6 +910,13 @@ impl TenantShard { scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> bool { + // Tenant with preferred node: check if it is not already at the preferred node + if let Some(preferred) = self.preferred_node.as_ref() { + if Some(preferred) != self.intent.get_attached().as_ref() { + return true; + } + } + // Sharded tenant: check if any locations have a nonzero affinity score if self.shard.count >= ShardCount(1) { let schedule_context = schedule_context.project_detach(self); @@ -927,6 +957,9 @@ impl TenantShard { /// Optimize attachments: if a shard has a secondary location that is preferable to /// its primary location based on soft constraints, switch that secondary location /// to be attached. + /// + /// `schedule_context` should have been populated with all shards in the tenant, including + /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions) #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_attachment( &self, @@ -1055,7 +1088,8 @@ impl TenantShard { // // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes // there are too overloaded for scheduler to suggest them, more should be provisioned eventually). - if self.intent.preferred_az_id.is_some() + if self.preferred_node.is_none() + && self.intent.preferred_az_id.is_some() && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id { tracing::debug!( @@ -1161,6 +1195,27 @@ impl TenantShard { None } + /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the + /// other optimisation functions, to bias them to move to the destination node. + pub(crate) fn set_preferred_node(&mut self, node: Option) { + if let Some(hint) = self.preferred_node.as_ref() { + if Some(hint) != node.as_ref() { + // This is legal but a bit surprising: we expect that administrators wouldn't usually + // change their mind about where to migrate something. + tracing::warn!( + "Changing migration destination from {hint} to {node:?} (current intent {:?})", + self.intent + ); + } + } + + self.preferred_node = node; + } + + pub(crate) fn get_preferred_node(&self) -> Option { + self.preferred_node + } + /// Return true if the optimization was really applied: it will not be applied if the optimization's /// sequence is behind this tenant shard's pub(crate) fn apply_optimization( @@ -1185,6 +1240,14 @@ impl TenantShard { self.intent.demote_attached(scheduler, old_attached_node_id); self.intent .promote_attached(scheduler, new_attached_node_id); + + if let Some(hint) = self.preferred_node.as_ref() { + if hint == &new_attached_node_id { + // The migration target is not a long term pin: once we are done with the migration, clear it. + tracing::info!("Graceful migration to {hint} complete"); + self.preferred_node = None; + } + } } ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary { old_node_id, @@ -1703,6 +1766,10 @@ impl TenantShard { debug_assert!(!self.intent.all_pageservers().contains(&node_id)); + if self.preferred_node == Some(node_id) { + self.preferred_node = None; + } + intent_modified } @@ -1750,6 +1817,7 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), + preferred_node: None, }) } @@ -2270,6 +2338,85 @@ pub(crate) mod tests { Ok(()) } + #[test] + /// How the optimisation code handles a shard with a preferred node set; this is an example + /// of the multi-step migration, but driven by a different input. + fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 4, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-b".to_string()), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Two shards of a tenant that wants to be in AZ A + let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + + // Initially attached in a stable location + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(3)); + + // Set the preferred node to node 2, an equally high scoring node to its current location + shard_a.preferred_node = Some(NodeId(2)); + + fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context + } + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(2)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + // The first step of the optimisation should not have cleared the preferred node + assert_eq!(shard_a.preferred_node, Some(NodeId(2))); + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(1), + new_attached_node_id: NodeId(2) + }) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // The cutover step of the optimisation should have cleared the preferred node + assert_eq!(shard_a.preferred_node, None); + + let schedule_context = make_schedule_context(&shard_a); + let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + shard_a.intent.clear(&mut scheduler); + + Ok(()) + } + #[test] /// Check that multi-step migration works when moving to somewhere that is only better by /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 61e1ec79ad..4d2b3587e8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1715,8 +1715,12 @@ class StorageControllerLeadershipStatus(StrEnum): @dataclass class StorageControllerMigrationConfig: - secondary_warmup_timeout: str | None - secondary_download_request_timeout: str | None + # Unlike the API itself, tests default to prewarm=False because it's a simpler API and doesn't + # require the test to go poll for the migration actually completing. + prewarm: bool = False + override_scheduler: bool = False + secondary_warmup_timeout: str | None = None + secondary_download_request_timeout: str | None = None class NeonStorageController(MetricsGetter, LogUtils): @@ -2120,8 +2124,10 @@ class NeonStorageController(MetricsGetter, LogUtils): config: StorageControllerMigrationConfig | None = None, ): payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id} - if config is not None: - payload["migration_config"] = dataclasses.asdict(config) + if config is None: + config = StorageControllerMigrationConfig() + + payload["migration_config"] = dataclasses.asdict(config) self.request( "PUT", @@ -2129,8 +2135,13 @@ class NeonStorageController(MetricsGetter, LogUtils): json=payload, headers=self.headers(TokenScope.ADMIN), ) - log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") - assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id + if config.prewarm: + log.info( + f"Started prewarm migration of tenant {tenant_shard_id} to pageserver {dest_ps_id}" + ) + else: + log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") + assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id def tenant_policy_update(self, tenant_id: TenantId, body: dict[str, Any]): log.info(f"tenant_policy_update({tenant_id}, {body})") diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index d45db28c78..777b9e2870 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -16,6 +16,7 @@ from fixtures.neon_fixtures import ( NeonPageserver, PageserverAvailability, PageserverSchedulingPolicy, + StorageControllerMigrationConfig, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pg_version import PgVersion @@ -362,7 +363,10 @@ def test_storage_controller_many_tenants( dest_ps_id = desc["shards"][shard_number]["node_secondary"][0] f = executor.submit( - env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id + env.storage_controller.tenant_shard_migrate, + tenant_shard_id, + dest_ps_id, + StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) elif op == Operation.TENANT_PASSTHROUGH: # A passthrough read to shard zero diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index b9e2934505..130db009c9 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -976,7 +976,7 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): # We can't hydrate everything anyway because of the failpoints. # Implicitly, this also uploads a heatmap from the current attached location. config = StorageControllerMigrationConfig( - secondary_warmup_timeout="5s", secondary_download_request_timeout="2s" + secondary_warmup_timeout="5s", secondary_download_request_timeout="2s", prewarm=False ) env.storage_controller.tenant_shard_migrate( TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index d5acc257b2..b5572ce6a1 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -26,6 +26,7 @@ from fixtures.neon_fixtures import ( PgBin, StorageControllerApiException, StorageControllerLeadershipStatus, + StorageControllerMigrationConfig, last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient @@ -765,7 +766,10 @@ def test_storage_controller_stuck_compute_hook( # status is cleared. handle_params["status"] = 423 migrate_fut = executor.submit( - env.storage_controller.tenant_shard_migrate, shard_0_id, dest_ps_id + env.storage_controller.tenant_shard_migrate, + shard_0_id, + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) def logged_stuck(): @@ -793,7 +797,10 @@ def test_storage_controller_stuck_compute_hook( # Now, do a migration in the opposite direction handle_params["status"] = 423 migrate_fut = executor.submit( - env.storage_controller.tenant_shard_migrate, shard_0_id, origin_pageserver.id + env.storage_controller.tenant_shard_migrate, + shard_0_id, + origin_pageserver.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), ) def logged_stuck_again(): @@ -1027,7 +1034,11 @@ def test_storage_controller_compute_hook_revert( with pytest.raises(StorageControllerApiException, match="Timeout waiting for shard"): # We expect the controller to give us an error because its reconciliation timed out # waiting for the compute hook. - env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_b.id) + env.storage_controller.tenant_shard_migrate( + tenant_shard_id, + pageserver_b.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) # Although the migration API failed, the hook should still see pageserver B (it remembers what # was posted even when returning an error code) @@ -1068,7 +1079,11 @@ def test_storage_controller_compute_hook_revert( # Migrate B -> A, with a working compute hook: the controller should notify the hook because the # last update it made that was acked (423) by the compute was for node B. handle_params["status"] = 200 - env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_a.id) + env.storage_controller.tenant_shard_migrate( + tenant_shard_id, + pageserver_a.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) wait_until(lambda: notified_ps(pageserver_a.id)) @@ -1949,6 +1964,9 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder): env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"] ) ), + # A simple migration where we will ignore scheduling (force=true) and do it immediately (prewarm=false) + "--prewarm=false", + "--override-scheduler=true", ] ) @@ -3865,3 +3883,108 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB ) assert reconciles_after_restart == 0 + + +@pytest.mark.parametrize("wrong_az", [True, False]) +def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool): + """ + Test that the graceful migration API goes through the process of + creating a secondary & waiting for it to warm up before cutting over, when + we use the prewarm=True flag to the API. + """ + + # 2 pageservers in 2 AZs, so that each AZ has a pageserver we can migrate to + neon_env_builder.num_pageservers = 4 + neon_env_builder.num_azs = 2 + + env = neon_env_builder.init_start() + + # Enable secondary location (neon_local disables by default) + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0] + initial_ps_id = initial_desc["node_attached"] + initial_secondary_id = initial_desc["node_secondary"][0] + initial_ps_az = initial_desc["preferred_az_id"] + initial_ps = [ps for ps in env.pageservers if ps.id == initial_ps_id][0] + + if wrong_az: + dest_ps = [ + ps + for ps in env.pageservers + if ps.id != initial_ps_id + and ps.az_id != initial_ps_az + and ps.id != initial_secondary_id + ][0] + else: + dest_ps = [ + ps + for ps in env.pageservers + if ps.id != initial_ps_id + and ps.az_id == initial_ps_az + and ps.id != initial_secondary_id + ][0] + + log.info( + f"Migrating to {dest_ps.id} in AZ {dest_ps.az_id} (from {initial_ps_id} in AZ {initial_ps_az})" + ) + dest_ps_id = dest_ps.id + + # Set a failpoint so that the migration will block at the point it has a secondary location + for ps in env.pageservers: + ps.http_client().configure_failpoints(("secondary-layer-download-pausable", "pause")) + + # Before migration, our destination has no locations. Guaranteed because any secondary for our + # tenant will be in another AZ. + assert dest_ps.http_client().tenant_list_locations()["tenant_shards"] == [] + + if wrong_az: + # If migrating to the wrong AZ, first check that omitting force flag results in rejection + with pytest.raises(StorageControllerApiException, match="worse-scoring node"): + env.storage_controller.tenant_shard_migrate( + TenantShardId(env.initial_tenant, 0, 0), + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=True, override_scheduler=False), + ) + + # Turn off ordinary optimisations so that our migration will stay put once complete + env.storage_controller.tenant_policy_update(env.initial_tenant, {"scheduling": "Essential"}) + + # We expect this API call to succeed, and result in a new secondary location on the destination + env.storage_controller.tenant_shard_migrate( + TenantShardId(env.initial_tenant, 0, 0), + dest_ps_id, + config=StorageControllerMigrationConfig(prewarm=True, override_scheduler=wrong_az), + ) + + def secondary_at_dest(): + locs = dest_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(locs) == 1 + assert locs[0][0] == str(env.initial_tenant) + assert locs[0][1]["mode"] == "Secondary" + + wait_until(secondary_at_dest) + + # Unblock secondary downloads + for ps in env.pageservers: + ps.http_client().configure_failpoints(("secondary-layer-download-pausable", "off")) + + # Pump the reconciler to avoid waiting for background reconciles + env.storage_controller.reconcile_until_idle() + + # We should be attached at the destination + locs = dest_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(locs) == 1 + assert locs[0][1]["mode"] == "AttachedSingle" + + # Nothing left behind at the origin + if wrong_az: + # We're in essential scheduling mode, so the end state should be attached in the migration + # destination and a secondary in the original location + assert ( + initial_ps.http_client().tenant_list_locations()["tenant_shards"][0][1]["mode"] + == "Secondary" + ) + else: + assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []