From 8f27c577483f47b248694d20409ae976eadc16a0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Sun, 5 Nov 2023 14:35:23 +0000 Subject: [PATCH] neon_local: create tenants with multiple shards --- control_plane/src/bin/neon_local.rs | 95 +++++++++++++++------------ control_plane/src/pageserver.rs | 21 +++--- control_plane/src/tenant_migration.rs | 15 +++++ 3 files changed, 79 insertions(+), 52 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index fd6b743686..caed316ede 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -15,9 +15,9 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; use control_plane::tenant_migration::migrate_tenant; use control_plane::{broker, local_env}; -use pageserver_api::models::TimelineInfo; +use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo}; use pageserver_api::{ - shard, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, + DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, }; use postgres_backend::AuthType; @@ -30,6 +30,7 @@ use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; +use utils::generation::Generation; use utils::{ auth::{Claims, Scope}, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -376,6 +377,8 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> { match tenant_match.subcommand() { Some(("list", _)) => { + // TODO: make command aware of multiple pageservers + let pageserver = get_default_pageserver(env); for t in pageserver.tenant_list()? { println!("{} {:?}", t.id, t.state); } @@ -386,23 +389,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) .unwrap_or_default(); - let shard_count = create_match.get_many::("shard-count").unwrap_or(1); + let shard_count: u8 = create_match + .get_one::("shard-count") + .cloned() + .unwrap_or(1); // If tenant ID was not specified, generate one let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate); - // TODO: per-shard generations - let generation = if env.control_plane_api.is_some() { - // We must register the tenant with the attachment service, so - // that when the pageserver restarts, it will be re-attached. - let attachment_service = AttachmentService::from_env(env); - attachment_service.attach_hook(tenant_id, pageserver.conf.id)? - } else { - None - }; - // We will create an initial timeline for the new tenant - let new_timeline_id = parse_timeline_id(create_match)?; + let new_timeline_id = + parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate()); let pg_version = create_match .get_one::("pg-version") .copied() @@ -411,52 +408,63 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an // TODO: implement ability for one pageserver to hold multiple // shards for the same tenant. Until then, we must place each // shard on a different pageserver. - assert!(env.pageservers.len() >= shard_count); + assert!(env.pageservers.len() >= shard_count as usize); - for shard in 0..shard_count { - let ps_conf = env.pageservers.get(shard as usize).unwrap(); + for shard_number in 0..shard_count { + let ps_conf = env.pageservers.get(shard_number as usize).unwrap(); let pageserver = PageServerNode::from_env(env, ps_conf); - // TODO: shard-aware tenant creation. Currently tenant creation on the + // TODO: per-shard generations + let generation = if env.control_plane_api.is_some() { + // We must register the tenant with the attachment service, so + // that when the pageserver restarts, it will be re-attached. + let attachment_service = AttachmentService::from_env(env); + attachment_service.attach_hook(tenant_id, pageserver.conf.id)? + } else { + None + }; + + // TODO: shard-aware POST /v1/tenant. Currently tenant creation on the // pageserver is a no-op, but we shouldn't skip the command entirely. - // pageserver.location_conf... + let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?; - // pageserver.tenant_create( - // tenant_id, - // shard_number, - // shard_count, - // generation, - // tenant_conf, - // )?; - - todo!(); + let location_conf = LocationConfig { + shard_count, + shard_number, + shard_stripe_size: 32000, + mode: LocationConfigMode::AttachedSingle, + generation: generation.map(Generation::new), + secondary_conf: None, + tenant_conf, + }; + pageserver.location_config(tenant_id, location_conf)?; println!( "tenant {tenant_id} successfully created on pageserver {}", pageserver.conf.id ); + } - let timeline_info = pageserver.timeline_create( + for shard_number in 0..shard_count { + let ps_conf = env.pageservers.get(shard_number as usize).unwrap(); + let pageserver = PageServerNode::from_env(env, ps_conf); + pageserver.timeline_create( tenant_id, - new_timeline_id, + Some(new_timeline_id), None, None, Some(pg_version), )?; - let new_timeline_id = timeline_info.timeline_id; - let last_record_lsn = timeline_info.last_record_lsn; - - env.register_branch_mapping( - DEFAULT_BRANCH_NAME.to_string(), - tenant_id, - new_timeline_id, - )?; - - println!( - "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}", - ); } + env.register_branch_mapping( + DEFAULT_BRANCH_NAME.to_string(), + tenant_id, + new_timeline_id, + )?; + + println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",); + if create_match.get_flag("set-default") { println!("Setting tenant {tenant_id} as a default one"); env.default_tenant_id = Some(tenant_id); @@ -475,6 +483,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) .unwrap_or_default(); + // TODO: make command aware of multiple pageservers + let pageserver = get_default_pageserver(env); pageserver .tenant_config(tenant_id, tenant_conf) .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?; @@ -1373,6 +1383,7 @@ fn cli() -> Command { .arg(pg_version_arg.clone()) .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false) .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified")) + .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) ) .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true)) .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified")) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index e13a234e89..157977ea9a 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -338,15 +338,8 @@ impl PageServerNode { .json()?) } - pub fn tenant_create( - &self, - new_tenant_id: TenantId, - generation: Option, - settings: HashMap<&str, &str>, - ) -> anyhow::Result { - let mut settings = settings.clone(); - - let config = models::TenantConfig { + pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result { + Ok(models::TenantConfig { checkpoint_distance: settings .remove("checkpoint_distance") .map(|x| x.parse::()) @@ -405,8 +398,16 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'gc_feedback' as bool")?, - }; + }) + } + pub fn tenant_create( + &self, + new_tenant_id: TenantId, + generation: Option, + settings: HashMap<&str, &str>, + ) -> anyhow::Result { + let config = Self::build_config(settings.clone())?; let request = models::TenantCreateRequest { new_tenant_id, generation, diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index d28d1f9fe8..7dc3838b7a 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -102,6 +102,9 @@ pub fn migrate_tenant( println!("🔁 Already attached to {origin_ps_id}, freshening..."); let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; let dest_conf = LocationConfig { + shard_count: 0, + shard_number: 0, + shard_stripe_size: 0, mode: LocationConfigMode::AttachedSingle, generation: gen.map(Generation::new), secondary_conf: None, @@ -115,6 +118,9 @@ pub fn migrate_tenant( println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode"); let stale_conf = LocationConfig { + shard_count: 0, + shard_number: 0, + shard_stripe_size: 0, mode: LocationConfigMode::AttachedStale, generation: Some(Generation::new(*generation)), secondary_conf: None, @@ -127,6 +133,9 @@ pub fn migrate_tenant( let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; let dest_conf = LocationConfig { + shard_count: 0, + shard_number: 0, + shard_stripe_size: 0, mode: LocationConfigMode::AttachedMulti, generation: gen.map(Generation::new), secondary_conf: None, @@ -171,6 +180,9 @@ pub fn migrate_tenant( // Downgrade to a secondary location let secondary_conf = LocationConfig { + shard_count: 0, + shard_number: 0, + shard_stripe_size: 0, mode: LocationConfigMode::Secondary, generation: None, secondary_conf: Some(LocationConfigSecondary { warm: true }), @@ -189,6 +201,9 @@ pub fn migrate_tenant( dest_ps.conf.id ); let dest_conf = LocationConfig { + shard_count: 0, + shard_number: 0, + shard_stripe_size: 0, mode: LocationConfigMode::AttachedSingle, generation: gen.map(Generation::new), secondary_conf: None,