diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index fcefe0e431..822ac7d8a6 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -9,6 +9,7 @@ pub struct AttachmentService { env: LocalEnv, listen: String, path: PathBuf, + client: reqwest::blocking::Client, } const COMMAND: &str = "attachment_service"; @@ -24,6 +25,16 @@ pub struct AttachHookResponse { pub gen: Option, } +#[derive(Serialize, Deserialize)] +pub struct InspectRequest { + pub tenant_id: TenantId, +} + +#[derive(Serialize, Deserialize)] +pub struct InspectResponse { + pub attachment: Option<(u32, NodeId)>, +} + impl AttachmentService { pub fn from_env(env: &LocalEnv) -> Self { let path = env.base_data_dir.join("attachments.json"); @@ -42,6 +53,9 @@ impl AttachmentService { env: env.clone(), path, listen, + client: reqwest::blocking::ClientBuilder::new() + .build() + .expect("Failed to construct http client"), } } @@ -84,16 +98,13 @@ impl AttachmentService { .unwrap() .join("attach-hook") .unwrap(); - let client = reqwest::blocking::ClientBuilder::new() - .build() - .expect("Failed to construct http client"); let request = AttachHookRequest { tenant_id, node_id: Some(pageserver_id), }; - let response = client.post(url).json(&request).send()?; + let response = self.client.post(url).json(&request).send()?; if response.status() != StatusCode::OK { return Err(anyhow!("Unexpected status {}", response.status())); } @@ -101,4 +112,26 @@ impl AttachmentService { let response = response.json::()?; Ok(response.gen) } + + pub fn inspect(&self, tenant_id: TenantId) -> anyhow::Result> { + use hyper::StatusCode; + + let url = self + .env + .control_plane_api + .clone() + .unwrap() + .join("inspect") + .unwrap(); + + let request = InspectRequest { tenant_id }; + + let response = self.client.post(url).json(&request).send()?; + if response.status() != StatusCode::OK { + return Err(anyhow!("Unexpected status {}", response.status())); + } + + let response = response.json::()?; + Ok(response.attachment) + } } diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index 3205703c7d..ddd50ff51d 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -32,7 +32,9 @@ use pageserver_api::control_api::{ ValidateResponseTenant, }; -use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse}; +use control_plane::attachment_service::{ + AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, +}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -255,12 +257,28 @@ async fn handle_attach_hook(mut req: Request) -> Result, Ap ) } +async fn handle_inspect(mut req: Request) -> Result, ApiError> { + let inspect_req = json_request::(&mut req).await?; + + let state = get_state(&req).inner.clone(); + let locked = state.write().await; + let tenant_state = locked.tenants.get(&inspect_req.tenant_id); + + json_response( + StatusCode::OK, + InspectResponse { + attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))), + }, + ) +} + fn make_router(persistent_state: PersistentState) -> RouterBuilder { endpoint::make_router() .data(Arc::new(State::new(persistent_state))) .post("/re-attach", |r| request_span(r, handle_re_attach)) .post("/validate", |r| request_span(r, handle_validate)) .post("/attach-hook", |r| request_span(r, handle_attach_hook)) + .post("/inspect", |r| request_span(r, handle_inspect)) } #[tokio::main] diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index bc68eeaa8a..87ea519a9e 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -11,13 +11,14 @@ use compute_api::spec::ComputeMode; use control_plane::attachment_service::AttachmentService; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::LocalEnv; -use control_plane::pageserver::PageServerNode; +use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; +use control_plane::tenant_migration::migrate_tenant; use control_plane::{broker, local_env}; use pageserver_api::models::TimelineInfo; use pageserver_api::{ - DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR, - DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR, + DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, + DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, }; use postgres_backend::AuthType; use safekeeper_api::{ @@ -46,8 +47,8 @@ const DEFAULT_PG_VERSION: &str = "15"; const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/"; -fn default_conf() -> String { - format!( +fn default_conf(num_pageservers: u16) -> String { + let mut template = format!( r#" # Default built-in configuration, defined in main.rs control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' @@ -55,21 +56,33 @@ control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' [broker] listen_addr = '{DEFAULT_BROKER_ADDR}' -[[pageservers]] -id = {DEFAULT_PAGESERVER_ID} -listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}' -listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}' -pg_auth_type = '{trust_auth}' -http_auth_type = '{trust_auth}' - [[safekeepers]] id = {DEFAULT_SAFEKEEPER_ID} pg_port = {DEFAULT_SAFEKEEPER_PG_PORT} http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT} "#, - trust_auth = AuthType::Trust, - ) + ); + + for i in 0..num_pageservers { + let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64); + let pg_port = DEFAULT_PAGESERVER_PG_PORT + i; + let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i; + + template += &format!( + r#" +[[pageservers]] +id = {pageserver_id} +listen_pg_addr = '127.0.0.1:{pg_port}' +listen_http_addr = '127.0.0.1:{http_port}' +pg_auth_type = '{trust_auth}' +http_auth_type = '{trust_auth}' +"#, + trust_auth = AuthType::Trust, + ) + } + + template } /// @@ -295,6 +308,9 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result anyhow::Result { + let num_pageservers = init_match + .get_one::("num-pageservers") + .expect("num-pageservers arg has a default"); // Create config file let toml_file: String = if let Some(config_path) = init_match.get_one::("config") { // load and parse the file @@ -306,7 +322,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { })? } else { // Built-in default config - default_conf() + default_conf(*num_pageservers) }; let pg_version = init_match @@ -320,6 +336,9 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { env.init(pg_version, force) .context("Failed to initialize neon repository")?; + // Create remote storage location for default LocalFs remote storage + std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?; + // Initialize pageserver, create initial tenant and timeline. for ps_conf in &env.pageservers { PageServerNode::from_env(&env, ps_conf) @@ -433,6 +452,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?; println!("tenant {tenant_id} successfully configured on the pageserver"); } + Some(("migrate", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + let new_pageserver = get_pageserver(env, matches)?; + let new_pageserver_id = new_pageserver.conf.id; + + migrate_tenant(env, tenant_id, new_pageserver)?; + println!("tenant {tenant_id} migrated to {}", new_pageserver_id); + } + Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), } @@ -867,20 +895,20 @@ fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Res } } +fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { + let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { + NodeId(id_str.parse().context("while parsing pageserver id")?) + } else { + DEFAULT_PAGESERVER_ID + }; + + Ok(PageServerNode::from_env( + env, + env.get_pageserver_conf(node_id)?, + )) +} + fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { - let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { - NodeId(id_str.parse().context("while parsing pageserver id")?) - } else { - DEFAULT_PAGESERVER_ID - }; - - Ok(PageServerNode::from_env( - env, - env.get_pageserver_conf(node_id)?, - )) - } - match sub_match.subcommand() { Some(("start", subcommand_args)) => { if let Err(e) = get_pageserver(env, subcommand_args)? @@ -917,6 +945,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } } + Some(("migrate", subcommand_args)) => { + let pageserver = get_pageserver(env, subcommand_args)?; + //TODO what shutdown strategy should we use here? + if let Err(e) = pageserver.stop(false) { + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + + if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + eprintln!("pageserver start failed: {e}"); + exit(1); + } + } + Some(("status", subcommand_args)) => { match get_pageserver(env, subcommand_args)?.check_status() { Ok(_) => println!("Page server is up and running"), @@ -1224,6 +1266,13 @@ fn cli() -> Command { .help("Force initialization even if the repository is not empty") .required(false); + let num_pageservers_arg = Arg::new("num-pageservers") + .value_parser(value_parser!(u16)) + .long("num-pageservers") + .help("How many pageservers to create (default 1)") + .required(false) + .default_value("1"); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1231,6 +1280,7 @@ fn cli() -> Command { Command::new("init") .about("Initialize a new Neon repository, preparing configs for services to start with") .arg(pageserver_config_args.clone()) + .arg(num_pageservers_arg.clone()) .arg( Arg::new("config") .long("config") @@ -1301,6 +1351,10 @@ fn cli() -> Command { .subcommand(Command::new("config") .arg(tenant_id_arg.clone()) .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))) + .subcommand(Command::new("migrate") + .about("Migrate a tenant from one pageserver to another") + .arg(tenant_id_arg.clone()) + .arg(pageserver_id_arg.clone())) ) .subcommand( Command::new("pageserver") diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index bb79d36bfc..52a0e20429 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -14,3 +14,4 @@ pub mod local_env; pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; +pub mod tenant_migration; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 0746dde4ef..e13a234e89 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -15,7 +15,9 @@ use std::{io, result}; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use pageserver_api::models::{self, TenantInfo, TimelineInfo}; +use pageserver_api::models::{ + self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo, +}; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -31,6 +33,9 @@ use utils::{ use crate::local_env::PageServerConf; use crate::{background_process, local_env::LocalEnv}; +/// Directory within .neon which will be used by default for LocalFs remote storage. +pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver"; + #[derive(Error, Debug)] pub enum PageserverHttpError { #[error("Reqwest error: {0}")] @@ -98,8 +103,10 @@ impl PageServerNode { } } - // pageserver conf overrides defined by neon_local configuration. - fn neon_local_overrides(&self) -> Vec { + /// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration. + /// + /// These all end up on the command line of the `pageserver` binary. + fn neon_local_overrides(&self, cli_overrides: &[&str]) -> Vec { let id = format!("id={}", self.conf.id); // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. let pg_distrib_dir_param = format!( @@ -132,12 +139,25 @@ impl PageServerNode { )); } + if !cli_overrides + .iter() + .any(|c| c.starts_with("remote_storage")) + { + overrides.push(format!( + "remote_storage={{local_path='../{PAGESERVER_REMOTE_STORAGE_DIR}'}}" + )); + } + if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust { // Keys are generated in the toplevel repo dir, pageservers' workdirs // are one level below that, so refer to keys with ../ overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned()); } + + // Apply the user-provided overrides + overrides.extend(cli_overrides.iter().map(|&c| c.to_owned())); + overrides } @@ -203,9 +223,6 @@ impl PageServerNode { } fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result { - let mut overrides = self.neon_local_overrides(); - overrides.extend(config_overrides.iter().map(|&c| c.to_owned())); - let datadir = self.repo_path(); print!( "Starting pageserver node {} at '{}' in {:?}", @@ -248,8 +265,7 @@ impl PageServerNode { ) -> Vec> { let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)]; - let mut overrides = self.neon_local_overrides(); - overrides.extend(config_overrides.iter().map(|&c| c.to_owned())); + let overrides = self.neon_local_overrides(config_overrides); for config_override in overrides { args.push(Cow::Borrowed("-c")); args.push(Cow::Owned(config_override)); @@ -501,6 +517,27 @@ impl PageServerNode { Ok(()) } + pub fn location_config( + &self, + tenant_id: TenantId, + config: LocationConfig, + ) -> anyhow::Result<()> { + let req_body = TenantLocationConfigRequest { tenant_id, config }; + + self.http_request( + Method::PUT, + format!( + "{}/tenant/{}/location_config", + self.http_base_url, tenant_id + ), + )? + .json(&req_body) + .send()? + .error_from_body()?; + + Ok(()) + } + pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result> { let timeline_infos: Vec = self .http_request( diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs new file mode 100644 index 0000000000..d28d1f9fe8 --- /dev/null +++ b/control_plane/src/tenant_migration.rs @@ -0,0 +1,202 @@ +//! +//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code +//! isn't scoped to a particular physical service, as it needs to update compute endpoints to +//! point to the new pageserver. +//! +use crate::local_env::LocalEnv; +use crate::{ + attachment_service::AttachmentService, endpoint::ComputeControlPlane, + pageserver::PageServerNode, +}; +use pageserver_api::models::{ + LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, +}; +use std::collections::HashMap; +use std::time::Duration; +use utils::{ + generation::Generation, + id::{TenantId, TimelineId}, + lsn::Lsn, +}; + +/// Given an attached pageserver, retrieve the LSN for all timelines +fn get_lsns( + tenant_id: TenantId, + pageserver: &PageServerNode, +) -> anyhow::Result> { + let timelines = pageserver.timeline_list(&tenant_id)?; + Ok(timelines + .into_iter() + .map(|t| (t.timeline_id, t.last_record_lsn)) + .collect()) +} + +/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake +/// `baseline`. +fn await_lsn( + tenant_id: TenantId, + pageserver: &PageServerNode, + baseline: HashMap, +) -> anyhow::Result<()> { + loop { + let latest = match get_lsns(tenant_id, pageserver) { + Ok(l) => l, + Err(e) => { + println!( + "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})", + pageserver.conf.id + ); + std::thread::sleep(Duration::from_millis(500)); + continue; + } + }; + + let mut any_behind: bool = false; + for (timeline_id, baseline_lsn) in &baseline { + match latest.get(timeline_id) { + Some(latest_lsn) => { + println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + if latest_lsn < baseline_lsn { + any_behind = true; + } + } + None => { + // Expected timeline isn't yet visible on migration destination. + // (IRL we would have to account for timeline deletion, but this + // is just test helper) + any_behind = true; + } + } + } + + if !any_behind { + println!("✅ LSN caught up. Proceeding..."); + break; + } else { + std::thread::sleep(Duration::from_millis(500)); + } + } + + Ok(()) +} + +/// This function spans multiple services, to demonstrate live migration of a tenant +/// between pageservers: +/// - Coordinate attach/secondary/detach on pageservers +/// - call into attachment_service for generations +/// - reconfigure compute endpoints to point to new attached pageserver +pub fn migrate_tenant( + env: &LocalEnv, + tenant_id: TenantId, + dest_ps: PageServerNode, +) -> anyhow::Result<()> { + // Get a new generation + let attachment_service = AttachmentService::from_env(env); + + let previous = attachment_service.inspect(tenant_id)?; + let mut baseline_lsns = None; + if let Some((generation, origin_ps_id)) = &previous { + let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?); + + if origin_ps_id == &dest_ps.conf.id { + println!("🔁 Already attached to {origin_ps_id}, freshening..."); + let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedSingle, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + dest_ps.location_config(tenant_id, dest_conf)?; + println!("✅ Migration complete"); + return Ok(()); + } + + println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode"); + + let stale_conf = LocationConfig { + mode: LocationConfigMode::AttachedStale, + generation: Some(Generation::new(*generation)), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + origin_ps.location_config(tenant_id, stale_conf)?; + + baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?); + } + + let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedMulti, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + + println!("🔁 Attaching to pageserver {}", dest_ps.conf.id); + dest_ps.location_config(tenant_id, dest_conf)?; + + if let Some(baseline) = baseline_lsns { + println!("🕑 Waiting for LSN to catch up..."); + await_lsn(tenant_id, &dest_ps, baseline)?; + } + + let cplane = ComputeControlPlane::load(env.clone())?; + for (endpoint_name, endpoint) in &cplane.endpoints { + if endpoint.tenant_id == tenant_id { + println!( + "🔁 Reconfiguring endpoint {} to use pageserver {}", + endpoint_name, dest_ps.conf.id + ); + endpoint.reconfigure(Some(dest_ps.conf.id))?; + } + } + + for other_ps_conf in &env.pageservers { + if other_ps_conf.id == dest_ps.conf.id { + continue; + } + + let other_ps = PageServerNode::from_env(env, other_ps_conf); + let other_ps_tenants = other_ps.tenant_list()?; + + // Check if this tenant is attached + let found = other_ps_tenants + .into_iter() + .map(|t| t.id) + .any(|i| i == tenant_id); + if !found { + continue; + } + + // Downgrade to a secondary location + let secondary_conf = LocationConfig { + mode: LocationConfigMode::Secondary, + generation: None, + secondary_conf: Some(LocationConfigSecondary { warm: true }), + tenant_conf: TenantConfig::default(), + }; + + println!( + "💤 Switching to secondary mode on pageserver {}", + other_ps.conf.id + ); + other_ps.location_config(tenant_id, secondary_conf)?; + } + + println!( + "🔁 Switching to AttachedSingle mode on pageserver {}", + dest_ps.conf.id + ); + let dest_conf = LocationConfig { + mode: LocationConfigMode::AttachedSingle, + generation: gen.map(Generation::new), + secondary_conf: None, + tenant_conf: TenantConfig::default(), + }; + dest_ps.location_config(tenant_id, dest_conf)?; + + println!("✅ Migration complete"); + + Ok(()) +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4057f620a0..b45b0a12c0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1871,6 +1871,8 @@ def append_pageserver_param_overrides( params_to_update.append( f"--pageserver-config-override=remote_storage={remote_storage_toml_table}" ) + else: + params_to_update.append('--pageserver-config-override=remote_storage=""') env_overrides = os.getenv("NEON_PAGESERVER_OVERRIDES") if env_overrides is not None: