mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
Allow passing initial timeline id into zenith CLI commands
This commit is contained in:
committed by
Kirill Bulatov
parent
4d0f7fd1e4
commit
c7569dce47
@@ -5,7 +5,7 @@ use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::{io, result, thread};
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::{bail, Context};
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
@@ -99,9 +99,10 @@ impl PageServerNode {
|
||||
|
||||
pub fn init(
|
||||
&self,
|
||||
create_tenant: Option<&str>,
|
||||
create_tenant: Option<ZTenantId>,
|
||||
initial_timeline_id: Option<ZTimelineId>,
|
||||
config_overrides: &[&str],
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<ZTimelineId> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
|
||||
let id = format!("id={}", self.env.pageserver.id);
|
||||
@@ -138,19 +139,29 @@ impl PageServerNode {
|
||||
]);
|
||||
}
|
||||
|
||||
if let Some(tenantid) = create_tenant {
|
||||
args.extend(["--create-tenant", tenantid])
|
||||
let create_tenant = create_tenant.map(|id| id.to_string());
|
||||
if let Some(tenant_id) = create_tenant.as_deref() {
|
||||
args.extend(["--create-tenant", tenant_id])
|
||||
}
|
||||
|
||||
let status = fill_rust_env_vars(cmd.args(args))
|
||||
.status()
|
||||
.expect("pageserver init failed");
|
||||
let initial_timeline_id_str = initial_timeline_id.map(|id| id.to_string());
|
||||
if let Some(timeline_id) = initial_timeline_id_str.as_deref() {
|
||||
args.extend(["--initial-timeline-id", timeline_id])
|
||||
}
|
||||
|
||||
if !status.success() {
|
||||
let init_output = fill_rust_env_vars(cmd.args(args))
|
||||
.output()
|
||||
.context("pageserver init failed")?;
|
||||
|
||||
if !init_output.status.success() {
|
||||
bail!("pageserver init failed");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
if let Some(initial_timeline_id) = initial_timeline_id {
|
||||
Ok(initial_timeline_id)
|
||||
} else {
|
||||
extract_initial_timeline_id(init_output.stdout)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn repo_path(&self) -> PathBuf {
|
||||
@@ -325,11 +336,16 @@ impl PageServerNode {
|
||||
.json()?)
|
||||
}
|
||||
|
||||
pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<ZTimelineId> {
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
tenant_id: ZTenantId,
|
||||
initial_timeline_id: Option<ZTimelineId>,
|
||||
) -> Result<ZTimelineId> {
|
||||
Ok(self
|
||||
.http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant"))
|
||||
.json(&TenantCreateRequest {
|
||||
tenant_id: tenantid,
|
||||
tenant_id,
|
||||
initial_timeline_id,
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?
|
||||
@@ -367,3 +383,31 @@ impl PageServerNode {
|
||||
.json()?)
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_initial_timeline_id(init_stdout: Vec<u8>) -> anyhow::Result<ZTimelineId> {
|
||||
let output_string =
|
||||
String::from_utf8(init_stdout).context("Init stdout is not a valid unicode")?;
|
||||
|
||||
let string_with_timeline_id = match output_string.split_once("created initial timeline ") {
|
||||
Some((_, string_with_timeline_id)) => string_with_timeline_id,
|
||||
None => bail!(
|
||||
"Found no line with timeline id in the init output: '{}'",
|
||||
output_string
|
||||
),
|
||||
};
|
||||
|
||||
let timeline_id_str = match string_with_timeline_id.split_once(' ') {
|
||||
Some((timeline_id_str, _)) => timeline_id_str,
|
||||
None => bail!(
|
||||
"Found no timeline id in the init output: '{}'",
|
||||
output_string
|
||||
),
|
||||
};
|
||||
|
||||
timeline_id_str.parse().with_context(|| {
|
||||
format!(
|
||||
"Failed to parse timeline id from string, extracted from the init output: '{}'",
|
||||
timeline_id_str
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,7 +2,14 @@
|
||||
|
||||
use std::{env, path::Path, str::FromStr};
|
||||
use tracing::*;
|
||||
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION};
|
||||
use zenith_utils::{
|
||||
auth::JwtAuth,
|
||||
logging,
|
||||
postgres_backend::AuthType,
|
||||
tcp_listener,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
GIT_VERSION,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
@@ -52,6 +59,13 @@ fn main() -> Result<()> {
|
||||
.help("Create tenant during init")
|
||||
.requires("init"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("initial-timeline-id")
|
||||
.long("initial-timeline-id")
|
||||
.takes_value(true)
|
||||
.help("Use a specific timeline id during init and tenant creation")
|
||||
.requires("create-tenant"),
|
||||
)
|
||||
// See `settings.md` for more details on the extra configuration patameters pageserver can process
|
||||
.arg(
|
||||
Arg::new("config-override")
|
||||
@@ -71,7 +85,16 @@ fn main() -> Result<()> {
|
||||
let cfg_file_path = workdir.join("pageserver.toml");
|
||||
|
||||
let init = arg_matches.is_present("init");
|
||||
let create_tenant = arg_matches.value_of("create-tenant");
|
||||
let create_tenant = arg_matches
|
||||
.value_of("create-tenant")
|
||||
.map(ZTenantId::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse tenant id from the arguments")?;
|
||||
let initial_timeline_id = arg_matches
|
||||
.value_of("initial-timeline-id")
|
||||
.map(ZTimelineId::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse timeline id from the arguments")?;
|
||||
|
||||
// Set CWD to workdir for non-daemon modes
|
||||
env::set_current_dir(&workdir).with_context(|| {
|
||||
@@ -142,7 +165,8 @@ fn main() -> Result<()> {
|
||||
|
||||
// Create repo and exit if init was requested
|
||||
if init {
|
||||
timelines::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?;
|
||||
timelines::init_pageserver(conf, create_tenant, initial_timeline_id)
|
||||
.context("Failed to init pageserver")?;
|
||||
// write the config file
|
||||
std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -11,6 +11,7 @@ pub struct TimelineCreateRequest {
|
||||
pub tenant_id: ZTenantId,
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: ZTimelineId,
|
||||
#[serde(default)]
|
||||
#[serde(with = "opt_display_serde")]
|
||||
pub ancestor_timeline_id: Option<ZTimelineId>,
|
||||
pub start_lsn: Option<Lsn>,
|
||||
@@ -20,6 +21,9 @@ pub struct TimelineCreateRequest {
|
||||
pub struct TenantCreateRequest {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: ZTenantId,
|
||||
#[serde(default)]
|
||||
#[serde(with = "opt_display_serde")]
|
||||
pub initial_timeline_id: Option<ZTimelineId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
||||
@@ -214,8 +214,12 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
let initial_timeline_id = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_create", tenant = %request_data.tenant_id).entered();
|
||||
tenant_mgr::create_repository_for_tenant(get_config(&request), request_data.tenant_id)
|
||||
let _enter = info_span!("tenant_create", tenant = %request_data.tenant_id, initial_timeline = ?request_data.initial_timeline_id).entered();
|
||||
tenant_mgr::create_repository_for_tenant(
|
||||
get_config(&request),
|
||||
request_data.tenant_id,
|
||||
request_data.initial_timeline_id,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -179,13 +179,15 @@ pub fn shutdown_all_tenants() {
|
||||
|
||||
pub fn create_repository_for_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
tenant_id: ZTenantId,
|
||||
initial_timeline_id: Option<ZTimelineId>,
|
||||
) -> Result<ZTimelineId> {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||
let (initial_timeline_id, repo) = timelines::create_repo(conf, tenantid, wal_redo_manager)?;
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let (initial_timeline_id, repo) =
|
||||
timelines::create_repo(conf, tenant_id, initial_timeline_id, wal_redo_manager)?;
|
||||
|
||||
match access_tenants().entry(tenantid) {
|
||||
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid),
|
||||
match access_tenants().entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenant_id),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(Tenant {
|
||||
state: TenantState::Idle,
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::{
|
||||
fs,
|
||||
path::Path,
|
||||
process::{Command, Stdio},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::*;
|
||||
@@ -150,7 +149,11 @@ pub struct PointInTime {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str>) -> Result<()> {
|
||||
pub fn init_pageserver(
|
||||
conf: &'static PageServerConf,
|
||||
create_tenant: Option<ZTenantId>,
|
||||
initial_timeline_id: Option<ZTimelineId>,
|
||||
) -> Result<()> {
|
||||
// Initialize logger
|
||||
// use true as daemonize parameter because otherwise we pollute zenith cli output with a few pages long output of info messages
|
||||
let _log_file = logging::init(LOG_FILE_NAME, true)?;
|
||||
@@ -167,10 +170,10 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str
|
||||
// anymore, but I think that could still happen.
|
||||
let dummy_redo_mgr = Arc::new(crate::walredo::DummyRedoManager {});
|
||||
|
||||
if let Some(tenantid) = create_tenant {
|
||||
let tenantid = ZTenantId::from_str(tenantid)?;
|
||||
println!("initializing tenantid {}", tenantid);
|
||||
create_repo(conf, tenantid, dummy_redo_mgr).context("failed to create repo")?;
|
||||
if let Some(tenant_id) = create_tenant {
|
||||
println!("initializing tenantid {}", tenant_id);
|
||||
create_repo(conf, tenant_id, initial_timeline_id, dummy_redo_mgr)
|
||||
.context("failed to create repo")?;
|
||||
}
|
||||
crashsafe_dir::create_dir_all(conf.tenants_path())?;
|
||||
|
||||
@@ -180,39 +183,40 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str
|
||||
|
||||
pub fn create_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
tenant_id: ZTenantId,
|
||||
init_timeline_id: Option<ZTimelineId>,
|
||||
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
) -> Result<(ZTimelineId, Arc<dyn Repository>)> {
|
||||
let repo_dir = conf.tenant_path(&tenantid);
|
||||
let repo_dir = conf.tenant_path(&tenant_id);
|
||||
if repo_dir.exists() {
|
||||
bail!("repo for {} already exists", tenantid)
|
||||
bail!("repo for {} already exists", tenant_id)
|
||||
}
|
||||
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
crashsafe_dir::create_dir_all(&repo_dir)
|
||||
.with_context(|| format!("could not create directory {}", repo_dir.display()))?;
|
||||
|
||||
crashsafe_dir::create_dir(conf.timelines_path(&tenantid))?;
|
||||
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
|
||||
|
||||
info!("created directory structure in {}", repo_dir.display());
|
||||
|
||||
// create a new timeline directory
|
||||
let timeline_id = ZTimelineId::generate();
|
||||
let timelinedir = conf.timeline_path(&timeline_id, &tenantid);
|
||||
let timeline_id = init_timeline_id.unwrap_or_else(|| ZTimelineId::generate());
|
||||
let timelinedir = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
|
||||
crashsafe_dir::create_dir(&timelinedir)?;
|
||||
|
||||
let repo = Arc::new(crate::layered_repository::LayeredRepository::new(
|
||||
conf,
|
||||
wal_redo_manager,
|
||||
tenantid,
|
||||
tenant_id,
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
|
||||
// Load data into pageserver
|
||||
// TODO To implement zenith import we need to
|
||||
// move data loading out of create_repo()
|
||||
bootstrap_timeline(conf, tenantid, timeline_id, repo.as_ref())?;
|
||||
bootstrap_timeline(conf, tenant_id, timeline_id, repo.as_ref())?;
|
||||
|
||||
Ok((timeline_id, repo))
|
||||
}
|
||||
|
||||
@@ -869,12 +869,16 @@ class ZenithCli:
|
||||
res.stdout.strip().split("\n")))
|
||||
return branches_cli
|
||||
|
||||
def init(self, config_toml: str) -> 'subprocess.CompletedProcess[str]':
|
||||
def init(self,
|
||||
config_toml: str,
|
||||
initial_timeline_id: Optional[uuid.UUID] = None) -> 'subprocess.CompletedProcess[str]':
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tmp:
|
||||
tmp.write(config_toml)
|
||||
tmp.flush()
|
||||
|
||||
cmd = ['init', f'--config={tmp.name}']
|
||||
if initial_timeline_id:
|
||||
cmd.extend(['--timeline-id', initial_timeline_id.hex])
|
||||
append_pageserver_param_overrides(cmd,
|
||||
self.env.pageserver.remote_storage,
|
||||
self.env.pageserver.config_override)
|
||||
|
||||
@@ -87,6 +87,12 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.required(false);
|
||||
|
||||
let timeline_id_arg = Arg::new("timeline-id")
|
||||
.long("timeline-id")
|
||||
.help("Timeline id. Represented as a hexadecimal string 32 symbols length")
|
||||
.takes_value(true)
|
||||
.required(false);
|
||||
|
||||
let port_arg = Arg::new("port")
|
||||
.long("port")
|
||||
.required(false)
|
||||
@@ -121,6 +127,7 @@ fn main() -> Result<()> {
|
||||
App::new("init")
|
||||
.about("Initialize a new Zenith repository")
|
||||
.arg(pageserver_config_args.clone())
|
||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||
.arg(
|
||||
Arg::new("config")
|
||||
.long("config")
|
||||
@@ -151,7 +158,10 @@ fn main() -> Result<()> {
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.about("Manage tenants")
|
||||
.subcommand(App::new("list"))
|
||||
.subcommand(App::new("create").arg(tenant_id_arg.clone()))
|
||||
.subcommand(App::new("create")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("pageserver")
|
||||
@@ -236,8 +246,8 @@ fn main() -> Result<()> {
|
||||
};
|
||||
|
||||
// Check for 'zenith init' command first.
|
||||
let subcmd_result = if sub_name == "init" {
|
||||
handle_init(sub_args)
|
||||
let subcommand_result = if sub_name == "init" {
|
||||
handle_init(sub_args).map(Some)
|
||||
} else {
|
||||
// all other commands need an existing config
|
||||
let mut env = LocalEnv::load_config().context("Error loading config")?;
|
||||
@@ -254,18 +264,21 @@ fn main() -> Result<()> {
|
||||
_ => bail!("unexpected subcommand {}", sub_name),
|
||||
};
|
||||
|
||||
if subcommand_result.is_ok() && original_env != env {
|
||||
eprintln!("Subcommand had changed the config, updating");
|
||||
env.persist_config(&env.base_data_dir)?;
|
||||
if original_env != env {
|
||||
subcommand_result.map(|()| Some(env))
|
||||
} else {
|
||||
subcommand_result.map(|()| None)
|
||||
}
|
||||
|
||||
subcommand_result
|
||||
};
|
||||
if let Err(e) = subcmd_result {
|
||||
eprintln!("command failed: {:#}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
match subcommand_result {
|
||||
Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?,
|
||||
Ok(None) => (),
|
||||
Err(e) => {
|
||||
eprintln!("command failed: {:?}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -411,11 +424,8 @@ fn get_timeline_infos(
|
||||
|
||||
// Helper function to parse --tenant_id option, or get the default from config file
|
||||
fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<ZTenantId> {
|
||||
if let Some(tenantid_cmd) = sub_match.value_of("tenant-id") {
|
||||
Ok(
|
||||
ZTenantId::from_str(tenantid_cmd)
|
||||
.context("Failed to parse tenant id from arguments")?,
|
||||
)
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(tenantid_conf) = env.default_tenant_id {
|
||||
Ok(ZTenantId::from(tenantid_conf))
|
||||
} else {
|
||||
@@ -423,7 +433,25 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<ZTenantId>> {
|
||||
sub_match
|
||||
.value_of("tenant-id")
|
||||
.map(ZTenantId::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse tenant id from the argument string")
|
||||
}
|
||||
|
||||
fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<ZTimelineId>> {
|
||||
sub_match
|
||||
.value_of("timeline-id")
|
||||
.map(ZTimelineId::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse timeline id from the argument string")
|
||||
}
|
||||
|
||||
fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
|
||||
let initial_timeline_id_arg = parse_timeline_id(init_match)?;
|
||||
|
||||
// Create config file
|
||||
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
|
||||
// load and parse the file
|
||||
@@ -439,18 +467,28 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
env.init()
|
||||
.context("Failed to initialize zenith repository")?;
|
||||
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
let initial_tenant_id = env.default_tenant_id.unwrap();
|
||||
|
||||
// Call 'pageserver init'.
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&ZTenantId::from(env.default_tenant_id.unwrap()).to_string()),
|
||||
&pageserver_config_overrides(init_match),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
let initial_timeline_id = pageserver
|
||||
.init(
|
||||
Some(initial_tenant_id),
|
||||
initial_timeline_id_arg,
|
||||
&pageserver_config_overrides(init_match),
|
||||
)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
env.branch_name_mappings.insert(
|
||||
DEFAULT_BRANCH_NAME.to_owned(),
|
||||
ZTenantTimelineId::new(initial_tenant_id, initial_timeline_id),
|
||||
);
|
||||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
|
||||
@@ -470,12 +508,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
|
||||
}
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let tenant_id = match create_match.value_of("tenant-id") {
|
||||
Some(id) => ZTenantId::from_str(id)?,
|
||||
None => ZTenantId::generate(),
|
||||
};
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(|| ZTenantId::generate());
|
||||
println!("using tenant id {}", tenant_id);
|
||||
let initial_timeline_id = pageserver.tenant_create(tenant_id)?;
|
||||
let initial_timeline_id_argument = parse_timeline_id(create_match)?;
|
||||
let initial_timeline_id =
|
||||
pageserver.tenant_create(tenant_id, initial_timeline_id_argument)?;
|
||||
env.branch_name_mappings.insert(
|
||||
DEFAULT_BRANCH_NAME.to_owned(),
|
||||
ZTenantTimelineId::new(tenant_id, initial_timeline_id),
|
||||
|
||||
Reference in New Issue
Block a user