From c7569dce472182016e7e2925c5fc8a9e93c407f0 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 24 Feb 2022 20:35:41 +0200 Subject: [PATCH] Allow passing initial timeline id into zenith CLI commands --- control_plane/src/storage.rs | 68 +++++++++++++--- pageserver/src/bin/pageserver.rs | 30 ++++++- pageserver/src/http/models.rs | 4 + pageserver/src/http/routes.rs | 8 +- pageserver/src/tenant_mgr.rs | 12 +-- pageserver/src/timelines.rs | 32 ++++---- test_runner/fixtures/zenith_fixtures.py | 6 +- zenith/src/main.rs | 101 ++++++++++++++++-------- 8 files changed, 192 insertions(+), 69 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 9d5a88784d..e18be05cea 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::time::Duration; use std::{io, result, thread}; -use anyhow::bail; +use anyhow::{bail, Context}; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; @@ -99,9 +99,10 @@ impl PageServerNode { pub fn init( &self, - create_tenant: Option<&str>, + create_tenant: Option, + initial_timeline_id: Option, config_overrides: &[&str], - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let mut cmd = Command::new(self.env.pageserver_bin()?); let id = format!("id={}", self.env.pageserver.id); @@ -138,19 +139,29 @@ impl PageServerNode { ]); } - if let Some(tenantid) = create_tenant { - args.extend(["--create-tenant", tenantid]) + let create_tenant = create_tenant.map(|id| id.to_string()); + if let Some(tenant_id) = create_tenant.as_deref() { + args.extend(["--create-tenant", tenant_id]) } - let status = fill_rust_env_vars(cmd.args(args)) - .status() - .expect("pageserver init failed"); + let initial_timeline_id_str = initial_timeline_id.map(|id| id.to_string()); + if let Some(timeline_id) = initial_timeline_id_str.as_deref() { + args.extend(["--initial-timeline-id", timeline_id]) + } - if !status.success() { + let init_output = fill_rust_env_vars(cmd.args(args)) + .output() + .context("pageserver init failed")?; + + if !init_output.status.success() { bail!("pageserver init failed"); } - Ok(()) + if let Some(initial_timeline_id) = initial_timeline_id { + Ok(initial_timeline_id) + } else { + extract_initial_timeline_id(init_output.stdout) + } } pub fn repo_path(&self) -> PathBuf { @@ -325,11 +336,16 @@ impl PageServerNode { .json()?) } - pub fn tenant_create(&self, tenantid: ZTenantId) -> Result { + pub fn tenant_create( + &self, + tenant_id: ZTenantId, + initial_timeline_id: Option, + ) -> Result { Ok(self .http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant")) .json(&TenantCreateRequest { - tenant_id: tenantid, + tenant_id, + initial_timeline_id, }) .send()? .error_from_body()? @@ -367,3 +383,31 @@ impl PageServerNode { .json()?) } } + +fn extract_initial_timeline_id(init_stdout: Vec) -> anyhow::Result { + let output_string = + String::from_utf8(init_stdout).context("Init stdout is not a valid unicode")?; + + let string_with_timeline_id = match output_string.split_once("created initial timeline ") { + Some((_, string_with_timeline_id)) => string_with_timeline_id, + None => bail!( + "Found no line with timeline id in the init output: '{}'", + output_string + ), + }; + + let timeline_id_str = match string_with_timeline_id.split_once(' ') { + Some((timeline_id_str, _)) => timeline_id_str, + None => bail!( + "Found no timeline id in the init output: '{}'", + output_string + ), + }; + + timeline_id_str.parse().with_context(|| { + format!( + "Failed to parse timeline id from string, extracted from the init output: '{}'", + timeline_id_str + ) + }) +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2fa772af58..83b128dd74 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -2,7 +2,14 @@ use std::{env, path::Path, str::FromStr}; use tracing::*; -use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION}; +use zenith_utils::{ + auth::JwtAuth, + logging, + postgres_backend::AuthType, + tcp_listener, + zid::{ZTenantId, ZTimelineId}, + GIT_VERSION, +}; use anyhow::{bail, Context, Result}; @@ -52,6 +59,13 @@ fn main() -> Result<()> { .help("Create tenant during init") .requires("init"), ) + .arg( + Arg::new("initial-timeline-id") + .long("initial-timeline-id") + .takes_value(true) + .help("Use a specific timeline id during init and tenant creation") + .requires("create-tenant"), + ) // See `settings.md` for more details on the extra configuration patameters pageserver can process .arg( Arg::new("config-override") @@ -71,7 +85,16 @@ fn main() -> Result<()> { let cfg_file_path = workdir.join("pageserver.toml"); let init = arg_matches.is_present("init"); - let create_tenant = arg_matches.value_of("create-tenant"); + let create_tenant = arg_matches + .value_of("create-tenant") + .map(ZTenantId::from_str) + .transpose() + .context("Failed to parse tenant id from the arguments")?; + let initial_timeline_id = arg_matches + .value_of("initial-timeline-id") + .map(ZTimelineId::from_str) + .transpose() + .context("Failed to parse timeline id from the arguments")?; // Set CWD to workdir for non-daemon modes env::set_current_dir(&workdir).with_context(|| { @@ -142,7 +165,8 @@ fn main() -> Result<()> { // Create repo and exit if init was requested if init { - timelines::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?; + timelines::init_pageserver(conf, create_tenant, initial_timeline_id) + .context("Failed to init pageserver")?; // write the config file std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| { format!( diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 7f95c64527..04ccb9708e 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -11,6 +11,7 @@ pub struct TimelineCreateRequest { pub tenant_id: ZTenantId, #[serde(with = "hex")] pub timeline_id: ZTimelineId, + #[serde(default)] #[serde(with = "opt_display_serde")] pub ancestor_timeline_id: Option, pub start_lsn: Option, @@ -20,6 +21,9 @@ pub struct TimelineCreateRequest { pub struct TenantCreateRequest { #[serde(with = "hex")] pub tenant_id: ZTenantId, + #[serde(default)] + #[serde(with = "opt_display_serde")] + pub initial_timeline_id: Option, } #[derive(Serialize)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f332e59135..45b0c3d4be 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -214,8 +214,12 @@ async fn tenant_create_handler(mut request: Request) -> Result, ) -> Result { - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); - let (initial_timeline_id, repo) = timelines::create_repo(conf, tenantid, wal_redo_manager)?; + let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); + let (initial_timeline_id, repo) = + timelines::create_repo(conf, tenant_id, initial_timeline_id, wal_redo_manager)?; - match access_tenants().entry(tenantid) { - hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid), + match access_tenants().entry(tenant_id) { + hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenant_id), hash_map::Entry::Vacant(v) => { v.insert(Tenant { state: TenantState::Idle, diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 8b4dc57342..b97ab045c7 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -9,7 +9,6 @@ use std::{ fs, path::Path, process::{Command, Stdio}, - str::FromStr, sync::Arc, }; use tracing::*; @@ -150,7 +149,11 @@ pub struct PointInTime { pub lsn: Lsn, } -pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str>) -> Result<()> { +pub fn init_pageserver( + conf: &'static PageServerConf, + create_tenant: Option, + initial_timeline_id: Option, +) -> Result<()> { // Initialize logger // use true as daemonize parameter because otherwise we pollute zenith cli output with a few pages long output of info messages let _log_file = logging::init(LOG_FILE_NAME, true)?; @@ -167,10 +170,10 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str // anymore, but I think that could still happen. let dummy_redo_mgr = Arc::new(crate::walredo::DummyRedoManager {}); - if let Some(tenantid) = create_tenant { - let tenantid = ZTenantId::from_str(tenantid)?; - println!("initializing tenantid {}", tenantid); - create_repo(conf, tenantid, dummy_redo_mgr).context("failed to create repo")?; + if let Some(tenant_id) = create_tenant { + println!("initializing tenantid {}", tenant_id); + create_repo(conf, tenant_id, initial_timeline_id, dummy_redo_mgr) + .context("failed to create repo")?; } crashsafe_dir::create_dir_all(conf.tenants_path())?; @@ -180,39 +183,40 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str pub fn create_repo( conf: &'static PageServerConf, - tenantid: ZTenantId, + tenant_id: ZTenantId, + init_timeline_id: Option, wal_redo_manager: Arc, ) -> Result<(ZTimelineId, Arc)> { - let repo_dir = conf.tenant_path(&tenantid); + let repo_dir = conf.tenant_path(&tenant_id); if repo_dir.exists() { - bail!("repo for {} already exists", tenantid) + bail!("repo for {} already exists", tenant_id) } // top-level dir may exist if we are creating it through CLI crashsafe_dir::create_dir_all(&repo_dir) .with_context(|| format!("could not create directory {}", repo_dir.display()))?; - crashsafe_dir::create_dir(conf.timelines_path(&tenantid))?; + crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?; info!("created directory structure in {}", repo_dir.display()); // create a new timeline directory - let timeline_id = ZTimelineId::generate(); - let timelinedir = conf.timeline_path(&timeline_id, &tenantid); + let timeline_id = init_timeline_id.unwrap_or_else(|| ZTimelineId::generate()); + let timelinedir = conf.timeline_path(&timeline_id, &tenant_id); crashsafe_dir::create_dir(&timelinedir)?; let repo = Arc::new(crate::layered_repository::LayeredRepository::new( conf, wal_redo_manager, - tenantid, + tenant_id, conf.remote_storage_config.is_some(), )); // Load data into pageserver // TODO To implement zenith import we need to // move data loading out of create_repo() - bootstrap_timeline(conf, tenantid, timeline_id, repo.as_ref())?; + bootstrap_timeline(conf, tenant_id, timeline_id, repo.as_ref())?; Ok((timeline_id, repo)) } diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 9345c7f238..c283bea48e 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -869,12 +869,16 @@ class ZenithCli: res.stdout.strip().split("\n"))) return branches_cli - def init(self, config_toml: str) -> 'subprocess.CompletedProcess[str]': + def init(self, + config_toml: str, + initial_timeline_id: Optional[uuid.UUID] = None) -> 'subprocess.CompletedProcess[str]': with tempfile.NamedTemporaryFile(mode='w+') as tmp: tmp.write(config_toml) tmp.flush() cmd = ['init', f'--config={tmp.name}'] + if initial_timeline_id: + cmd.extend(['--timeline-id', initial_timeline_id.hex]) append_pageserver_param_overrides(cmd, self.env.pageserver.remote_storage, self.env.pageserver.config_override) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index fb0b230c2c..34cab4b381 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -87,6 +87,12 @@ fn main() -> Result<()> { .takes_value(true) .required(false); + let timeline_id_arg = Arg::new("timeline-id") + .long("timeline-id") + .help("Timeline id. Represented as a hexadecimal string 32 symbols length") + .takes_value(true) + .required(false); + let port_arg = Arg::new("port") .long("port") .required(false) @@ -121,6 +127,7 @@ fn main() -> Result<()> { App::new("init") .about("Initialize a new Zenith repository") .arg(pageserver_config_args.clone()) + .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) .arg( Arg::new("config") .long("config") @@ -151,7 +158,10 @@ fn main() -> Result<()> { .setting(AppSettings::ArgRequiredElseHelp) .about("Manage tenants") .subcommand(App::new("list")) - .subcommand(App::new("create").arg(tenant_id_arg.clone())) + .subcommand(App::new("create") + .arg(tenant_id_arg.clone()) + .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) + ) ) .subcommand( App::new("pageserver") @@ -236,8 +246,8 @@ fn main() -> Result<()> { }; // Check for 'zenith init' command first. - let subcmd_result = if sub_name == "init" { - handle_init(sub_args) + let subcommand_result = if sub_name == "init" { + handle_init(sub_args).map(Some) } else { // all other commands need an existing config let mut env = LocalEnv::load_config().context("Error loading config")?; @@ -254,18 +264,21 @@ fn main() -> Result<()> { _ => bail!("unexpected subcommand {}", sub_name), }; - if subcommand_result.is_ok() && original_env != env { - eprintln!("Subcommand had changed the config, updating"); - env.persist_config(&env.base_data_dir)?; + if original_env != env { + subcommand_result.map(|()| Some(env)) + } else { + subcommand_result.map(|()| None) } - - subcommand_result }; - if let Err(e) = subcmd_result { - eprintln!("command failed: {:#}", e); - exit(1); - } + match subcommand_result { + Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?, + Ok(None) => (), + Err(e) => { + eprintln!("command failed: {:?}", e); + exit(1); + } + } Ok(()) } @@ -411,11 +424,8 @@ fn get_timeline_infos( // Helper function to parse --tenant_id option, or get the default from config file fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result { - if let Some(tenantid_cmd) = sub_match.value_of("tenant-id") { - Ok( - ZTenantId::from_str(tenantid_cmd) - .context("Failed to parse tenant id from arguments")?, - ) + if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() { + tenant_id_from_arguments } else if let Some(tenantid_conf) = env.default_tenant_id { Ok(ZTenantId::from(tenantid_conf)) } else { @@ -423,7 +433,25 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R } } -fn handle_init(init_match: &ArgMatches) -> Result<()> { +fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result> { + sub_match + .value_of("tenant-id") + .map(ZTenantId::from_str) + .transpose() + .context("Failed to parse tenant id from the argument string") +} + +fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result> { + sub_match + .value_of("timeline-id") + .map(ZTimelineId::from_str) + .transpose() + .context("Failed to parse timeline id from the argument string") +} + +fn handle_init(init_match: &ArgMatches) -> Result { + let initial_timeline_id_arg = parse_timeline_id(init_match)?; + // Create config file let toml_file: String = if let Some(config_path) = init_match.value_of("config") { // load and parse the file @@ -439,18 +467,28 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> { env.init() .context("Failed to initialize zenith repository")?; + // default_tenantid was generated by the `env.init()` call above + let initial_tenant_id = env.default_tenant_id.unwrap(); + // Call 'pageserver init'. let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.init( - // default_tenantid was generated by the `env.init()` call above - Some(&ZTenantId::from(env.default_tenant_id.unwrap()).to_string()), - &pageserver_config_overrides(init_match), - ) { - eprintln!("pageserver init failed: {}", e); - exit(1); - } + let initial_timeline_id = pageserver + .init( + Some(initial_tenant_id), + initial_timeline_id_arg, + &pageserver_config_overrides(init_match), + ) + .unwrap_or_else(|e| { + eprintln!("pageserver init failed: {}", e); + exit(1); + }); - Ok(()) + env.branch_name_mappings.insert( + DEFAULT_BRANCH_NAME.to_owned(), + ZTenantTimelineId::new(initial_tenant_id, initial_timeline_id), + ); + + Ok(env) } fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { @@ -470,12 +508,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re } } Some(("create", create_match)) => { - let tenant_id = match create_match.value_of("tenant-id") { - Some(id) => ZTenantId::from_str(id)?, - None => ZTenantId::generate(), - }; + let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(|| ZTenantId::generate()); println!("using tenant id {}", tenant_id); - let initial_timeline_id = pageserver.tenant_create(tenant_id)?; + let initial_timeline_id_argument = parse_timeline_id(create_match)?; + let initial_timeline_id = + pageserver.tenant_create(tenant_id, initial_timeline_id_argument)?; env.branch_name_mappings.insert( DEFAULT_BRANCH_NAME.to_owned(), ZTenantTimelineId::new(tenant_id, initial_timeline_id),