From e3687056920782a2a5f5a43fac2406cc23abd097 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 1 Nov 2023 17:09:49 +0000 Subject: [PATCH] control_plane: implement `cargo neon migrate` --- control_plane/src/bin/neon_local.rs | 54 +++++-- control_plane/src/lib.rs | 1 + control_plane/src/pageserver.rs | 25 +++- control_plane/src/tenant_migration.rs | 202 ++++++++++++++++++++++++++ 4 files changed, 268 insertions(+), 14 deletions(-) create mode 100644 control_plane/src/tenant_migration.rs diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8ad78abf56..5e4d3a85e9 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -13,6 +13,7 @@ use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::LocalEnv; use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; +use control_plane::tenant_migration::migrate_tenant; use control_plane::{broker, local_env}; use pageserver_api::models::TimelineInfo; use pageserver_api::{ @@ -451,6 +452,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?; println!("tenant {tenant_id} successfully configured on the pageserver"); } + Some(("migrate", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + let new_pageserver = get_pageserver(env, matches)?; + let new_pageserver_id = new_pageserver.conf.id; + + migrate_tenant(env, tenant_id, new_pageserver)?; + println!("tenant {tenant_id} migrated to {}", new_pageserver_id); + } + Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), } @@ -885,20 +895,20 @@ fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Res } } +fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { + let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { + NodeId(id_str.parse().context("while parsing pageserver id")?) + } else { + DEFAULT_PAGESERVER_ID + }; + + Ok(PageServerNode::from_env( + env, + env.get_pageserver_conf(node_id)?, + )) +} + fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { - let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { - NodeId(id_str.parse().context("while parsing pageserver id")?) - } else { - DEFAULT_PAGESERVER_ID - }; - - Ok(PageServerNode::from_env( - env, - env.get_pageserver_conf(node_id)?, - )) - } - match sub_match.subcommand() { Some(("start", subcommand_args)) => { if let Err(e) = get_pageserver(env, subcommand_args)? @@ -935,6 +945,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } } + Some(("migrate", subcommand_args)) => { + let pageserver = get_pageserver(env, subcommand_args)?; + //TODO what shutdown strategy should we use here? + if let Err(e) = pageserver.stop(false) { + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + + if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + eprintln!("pageserver start failed: {e}"); + exit(1); + } + } + Some(("status", subcommand_args)) => { match get_pageserver(env, subcommand_args)?.check_status() { Ok(_) => println!("Page server is up and running"), @@ -1327,6 +1351,10 @@ fn cli() -> Command { .subcommand(Command::new("config") .arg(tenant_id_arg.clone()) .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))) + .subcommand(Command::new("migrate") + .about("Migrate a tenant from one pageserver to another") + .arg(tenant_id_arg.clone()) + .arg(pageserver_id_arg.clone())) ) .subcommand( Command::new("pageserver") diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index bb79d36bfc..52a0e20429 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -14,3 +14,4 @@ pub mod local_env; pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; +pub mod tenant_migration; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 50d83cca77..505f18fade 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -15,7 +15,9 @@ use std::{io, result}; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use pageserver_api::models::{self, TenantInfo, TimelineInfo}; +use pageserver_api::models::{ + self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo, +}; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -508,6 +510,27 @@ impl PageServerNode { Ok(()) } + pub fn location_config( + &self, + tenant_id: TenantId, + config: LocationConfig, + ) -> anyhow::Result<()> { + let req_body = TenantLocationConfigRequest { tenant_id, config }; + + self.http_request( + Method::PUT, + format!( + "{}/tenant/{}/location_config", + self.http_base_url, tenant_id + ), + )? + .json(&req_body) + .send()? + .error_from_body()?; + + Ok(()) + } + pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result> { let timeline_infos: Vec = self .http_request( diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs new file mode 100644 index 0000000000..d28d1f9fe8 --- /dev/null +++ b/control_plane/src/tenant_migration.rs @@ -0,0 +1,202 @@ +//! +//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code +//! isn't scoped to a particular physical service, as it needs to update compute endpoints to +//! point to the new pageserver. +//! +use crate::local_env::LocalEnv; +use crate::{ + attachment_service::AttachmentService, endpoint::ComputeControlPlane, + pageserver::PageServerNode, +}; +use pageserver_api::models::{ + LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, +}; +use std::collections::HashMap; +use std::time::Duration; +use utils::{ + generation::Generation, + id::{TenantId, TimelineId}, + lsn::Lsn, +}; + +/// Given an attached pageserver, retrieve the LSN for all timelines +fn get_lsns( + tenant_id: TenantId, + pageserver: &PageServerNode, +) -> anyhow::Result> { + let timelines = pageserver.timeline_list(&tenant_id)?; + Ok(timelines + .into_iter() + .map(|t| (t.timeline_id, t.last_record_lsn)) + .collect()) +} + +/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake +/// `baseline`. +fn await_lsn( + tenant_id: TenantId, + pageserver: &PageServerNode, + baseline: HashMap, +) -> anyhow::Result<()> { + loop { + let latest = match get_lsns(tenant_id, pageserver) { + Ok(l) => l, + Err(e) => { + println!( + "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})", + pageserver.conf.id + ); + std::thread::sleep(Duration::from_millis(500)); + continue; + } + }; + + let mut any_behind: bool = false; + for (timeline_id, baseline_lsn) in &baseline { + match latest.get(timeline_id) { + Some(latest_lsn) => { + println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + if latest_lsn < baseline_lsn { + any_behind = true; + } + } + None => { + // Expected timeline isn't yet visible on migration destination. + // (IRL we would have to account for timeline deletion, but this + // is just test helper) + any_behind = true; + } + } + } + + if !any_behind { + println!("✅ LSN caught up. Proceeding..."); + break; + } else { + std::thread::sleep(Duration::from_millis(500)); + } + } + + Ok(()) +} + +/// This function spans multiple services, to demonstrate live migration of a tenant +/// between pageservers: +/// - Coordinate attach/secondary/detach on pageservers +/// - call into attachment_service for generations +/// - reconfigure compute endpoints to point to new attached pageserver +pub fn migrate_tenant( + env: &LocalEnv, + tenant_id: TenantId, + dest_ps: PageServerNode, +) -> anyhow::Result<()> { + // Get a new generation + let attachment_service = AttachmentService::from_env(env); + + let previous = attachment_service.inspect(tenant_id)?; + let mut baseline_lsns = None; + if let Some((generation, origin_ps_id)) = &previous { + let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?); + + if origin_ps_id == &dest_ps.conf.id { + println!("🔁 Already attached to {origin_ps_id}, freshening..."); + let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedSingle, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + dest_ps.location_config(tenant_id, dest_conf)?; + println!("✅ Migration complete"); + return Ok(()); + } + + println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode"); + + let stale_conf = LocationConfig { + mode: LocationConfigMode::AttachedStale, + generation: Some(Generation::new(*generation)), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + origin_ps.location_config(tenant_id, stale_conf)?; + + baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?); + } + + let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedMulti, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + + println!("🔁 Attaching to pageserver {}", dest_ps.conf.id); + dest_ps.location_config(tenant_id, dest_conf)?; + + if let Some(baseline) = baseline_lsns { + println!("🕑 Waiting for LSN to catch up..."); + await_lsn(tenant_id, &dest_ps, baseline)?; + } + + let cplane = ComputeControlPlane::load(env.clone())?; + for (endpoint_name, endpoint) in &cplane.endpoints { + if endpoint.tenant_id == tenant_id { + println!( + "🔁 Reconfiguring endpoint {} to use pageserver {}", + endpoint_name, dest_ps.conf.id + ); + endpoint.reconfigure(Some(dest_ps.conf.id))?; + } + } + + for other_ps_conf in &env.pageservers { + if other_ps_conf.id == dest_ps.conf.id { + continue; + } + + let other_ps = PageServerNode::from_env(env, other_ps_conf); + let other_ps_tenants = other_ps.tenant_list()?; + + // Check if this tenant is attached + let found = other_ps_tenants + .into_iter() + .map(|t| t.id) + .any(|i| i == tenant_id); + if !found { + continue; + } + + // Downgrade to a secondary location + let secondary_conf = LocationConfig { + mode: LocationConfigMode::Secondary, + generation: None, + secondary_conf: Some(LocationConfigSecondary { warm: true }), + tenant_conf: TenantConfig::default(), + }; + + println!( + "💤 Switching to secondary mode on pageserver {}", + other_ps.conf.id + ); + other_ps.location_config(tenant_id, secondary_conf)?; + } + + println!( + "🔁 Switching to AttachedSingle mode on pageserver {}", + dest_ps.conf.id + ); + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedSingle, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + dest_ps.location_config(tenant_id, dest_conf)?; + + println!("✅ Migration complete"); + + Ok(()) +}