From e989a5e4a204f05bf9ebab01182af0acf2b85328 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 27 Sep 2024 22:08:46 +0300 Subject: [PATCH] neon_local: Use clap derive macros to parse the CLI args (#9103) This is easier to work with. --- control_plane/src/bin/neon_local.rs | 1716 +++++++++++++------------- control_plane/src/branch_mappings.rs | 94 ++ control_plane/src/safekeeper.rs | 4 +- 3 files changed, 949 insertions(+), 865 deletions(-) create mode 100644 control_plane/src/branch_mappings.rs diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index b6532ff2ac..624936620d 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -6,7 +6,7 @@ //! rely on `neon_local` to set up the environment for each test. //! use anyhow::{anyhow, bail, Context, Result}; -use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum}; +use clap::Parser; use compute_api::spec::ComputeMode; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::{ @@ -56,10 +56,627 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1); const DEFAULT_BRANCH_NAME: &str = "main"; project_git_version!(GIT_VERSION); -const DEFAULT_PG_VERSION: &str = "16"; +const DEFAULT_PG_VERSION: u32 = 16; const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/"; +#[derive(clap::Parser)] +#[command(version = GIT_VERSION, about, name = "Neon CLI")] +struct Cli { + #[command(subcommand)] + command: NeonLocalCmd, +} + +#[derive(clap::Subcommand)] +enum NeonLocalCmd { + Init(InitCmdArgs), + + #[command(subcommand)] + Tenant(TenantCmd), + #[command(subcommand)] + Timeline(TimelineCmd), + #[command(subcommand)] + Pageserver(PageserverCmd), + #[command(subcommand)] + #[clap(alias = "storage_controller")] + StorageController(StorageControllerCmd), + #[command(subcommand)] + #[clap(alias = "storage_broker")] + StorageBroker(StorageBrokerCmd), + #[command(subcommand)] + Safekeeper(SafekeeperCmd), + #[command(subcommand)] + Endpoint(EndpointCmd), + #[command(subcommand)] + Mappings(MappingsCmd), + + Start(StartCmdArgs), + Stop(StopCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")] +struct InitCmdArgs { + #[clap(long, help("How many pageservers to create (default 1)"))] + num_pageservers: Option, + + #[clap(long)] + config: Option, + + #[clap(long, help("Force initialization even if the repository is not empty"))] + #[arg(value_parser)] + #[clap(default_value = "must-not-exist")] + force: InitForceMode, +} + +#[derive(clap::Args)] +#[clap(about = "Start pageserver and safekeepers")] +struct StartCmdArgs { + #[clap(long = "start-timeout", default_value = "10s")] + timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "Stop pageserver and safekeepers")] +struct StopCmdArgs { + #[arg(value_enum)] + #[clap(long, default_value_t = StopMode::Fast)] + mode: StopMode, +} + +#[derive(Clone, Copy, clap::ValueEnum)] +enum StopMode { + Fast, + Immediate, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage tenants")] +enum TenantCmd { + List, + Create(TenantCreateCmdArgs), + SetDefault(TenantSetDefaultCmdArgs), + Config(TenantConfigCmdArgs), + Import(TenantImportCmdArgs), +} + +#[derive(clap::Args)] +struct TenantCreateCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap( + long, + help = "Use a specific timeline id when creating a tenant and its initial timeline" + )] + timeline_id: Option, + + #[clap(short = 'c')] + config: Vec, + + #[arg(default_value_t = DEFAULT_PG_VERSION)] + #[clap(long, help = "Postgres version to use for the initial timeline")] + pg_version: u32, + + #[clap( + long, + help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified" + )] + set_default: bool, + + #[clap(long, help = "Number of shards in the new tenant")] + #[arg(default_value_t = 0)] + shard_count: u8, + #[clap(long, help = "Sharding stripe size in pages")] + shard_stripe_size: Option, + + #[clap(long, help = "Placement policy shards in this tenant")] + #[arg(value_parser = parse_placement_policy)] + placement_policy: Option, +} + +fn parse_placement_policy(s: &str) -> anyhow::Result { + Ok(serde_json::from_str::(s)?) +} + +#[derive(clap::Args)] +#[clap( + about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified" +)] +struct TenantSetDefaultCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: TenantId, +} + +#[derive(clap::Args)] +struct TenantConfigCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(short = 'c')] + config: Vec, +} + +#[derive(clap::Args)] +#[clap( + about = "Import a tenant that is present in remote storage, and create branches for its timelines" +)] +struct TenantImportCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: TenantId, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage timelines")] +enum TimelineCmd { + List(TimelineListCmdArgs), + Branch(TimelineBranchCmdArgs), + Create(TimelineCreateCmdArgs), + Import(TimelineImportCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "List all timelines available to this pageserver")] +struct TimelineListCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_shard_id: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Create a new timeline, branching off from another timeline")] +struct TimelineBranchCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(long, help = "New timeline's ID")] + timeline_id: Option, + + #[clap(long, help = "Human-readable alias for the new timeline")] + branch_name: String, + + #[clap( + long, + help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name." + )] + ancestor_branch_name: Option, + + #[clap( + long, + help = "When using another timeline as base, use a specific Lsn in it instead of the latest one" + )] + ancestor_start_lsn: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Create a new blank timeline")] +struct TimelineCreateCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(long, help = "New timeline's ID")] + timeline_id: Option, + + #[clap(long, help = "Human-readable alias for the new timeline")] + branch_name: String, + + #[arg(default_value_t = DEFAULT_PG_VERSION)] + #[clap(long, help = "Postgres version")] + pg_version: u32, +} + +#[derive(clap::Args)] +#[clap(about = "Import timeline from a basebackup directory")] +struct TimelineImportCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(long, help = "New timeline's ID")] + timeline_id: TimelineId, + + #[clap(long, help = "Human-readable alias for the new timeline")] + branch_name: String, + + #[clap(long, help = "Basebackup tarfile to import")] + base_tarfile: PathBuf, + + #[clap(long, help = "Lsn the basebackup starts at")] + base_lsn: Lsn, + + #[clap(long, help = "Wal to add after base")] + wal_tarfile: Option, + + #[clap(long, help = "Lsn the basebackup ends at")] + end_lsn: Option, + + #[arg(default_value_t = DEFAULT_PG_VERSION)] + #[clap(long, help = "Postgres version of the backup being imported")] + pg_version: u32, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage pageservers")] +enum PageserverCmd { + Status(PageserverStatusCmdArgs), + Start(PageserverStartCmdArgs), + Stop(PageserverStopCmdArgs), + Restart(PageserverRestartCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Show status of a local pageserver")] +struct PageserverStatusCmdArgs { + #[clap(long = "id", help = "pageserver id")] + pageserver_id: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Start local pageserver")] +struct PageserverStartCmdArgs { + #[clap(long = "id", help = "pageserver id")] + pageserver_id: Option, + + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "Stop local pageserver")] +struct PageserverStopCmdArgs { + #[clap(long = "id", help = "pageserver id")] + pageserver_id: Option, + + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + #[arg(value_enum, default_value = "fast")] + stop_mode: StopMode, +} + +#[derive(clap::Args)] +#[clap(about = "Restart local pageserver")] +struct PageserverRestartCmdArgs { + #[clap(long = "id", help = "pageserver id")] + pageserver_id: Option, + + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage storage controller")] +enum StorageControllerCmd { + Start(StorageControllerStartCmdArgs), + Stop(StorageControllerStopCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Start storage controller")] +struct StorageControllerStartCmdArgs { + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, + + #[clap( + long, + help = "Identifier used to distinguish storage controller instances" + )] + #[arg(default_value_t = 1)] + instance_id: u8, + + #[clap( + long, + help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)" + )] + base_port: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Stop storage controller")] +struct StorageControllerStopCmdArgs { + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + #[arg(value_enum, default_value = "fast")] + stop_mode: StopMode, + + #[clap( + long, + help = "Identifier used to distinguish storage controller instances" + )] + #[arg(default_value_t = 1)] + instance_id: u8, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage storage broker")] +enum StorageBrokerCmd { + Start(StorageBrokerStartCmdArgs), + Stop(StorageBrokerStopCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Start broker")] +struct StorageBrokerStartCmdArgs { + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "stop broker")] +struct StorageBrokerStopCmdArgs { + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + #[arg(value_enum, default_value = "fast")] + stop_mode: StopMode, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage safekeepers")] +enum SafekeeperCmd { + Start(SafekeeperStartCmdArgs), + Stop(SafekeeperStopCmdArgs), + Restart(SafekeeperRestartCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Start local safekeeper")] +struct SafekeeperStartCmdArgs { + #[clap(help = "safekeeper id")] + #[arg(default_value_t = NodeId(1))] + id: NodeId, + + #[clap( + short = 'e', + long = "safekeeper-extra-opt", + help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo" + )] + extra_opt: Vec, + + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "Stop local safekeeper")] +struct SafekeeperStopCmdArgs { + #[clap(help = "safekeeper id")] + #[arg(default_value_t = NodeId(1))] + id: NodeId, + + #[arg(value_enum, default_value = "fast")] + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + stop_mode: StopMode, +} + +#[derive(clap::Args)] +#[clap(about = "Restart local safekeeper")] +struct SafekeeperRestartCmdArgs { + #[clap(help = "safekeeper id")] + #[arg(default_value_t = NodeId(1))] + id: NodeId, + + #[arg(value_enum, default_value = "fast")] + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + stop_mode: StopMode, + + #[clap( + short = 'e', + long = "safekeeper-extra-opt", + help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo" + )] + extra_opt: Vec, + + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage Postgres instances")] +enum EndpointCmd { + List(EndpointListCmdArgs), + Create(EndpointCreateCmdArgs), + Start(EndpointStartCmdArgs), + Reconfigure(EndpointReconfigureCmdArgs), + Stop(EndpointStopCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "List endpoints")] +struct EndpointListCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_shard_id: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Create a compute endpoint")] +struct EndpointCreateCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(help = "Postgres endpoint id")] + endpoint_id: Option, + #[clap(long, help = "Name of the branch the endpoint will run on")] + branch_name: Option, + #[clap( + long, + help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used" + )] + lsn: Option, + #[clap(long)] + pg_port: Option, + #[clap(long)] + http_port: Option, + #[clap(long = "pageserver-id")] + endpoint_pageserver_id: Option, + + #[clap( + long, + help = "Don't do basebackup, create endpoint directory with only config files", + action = clap::ArgAction::Set, + default_value_t = false + )] + config_only: bool, + + #[arg(default_value_t = DEFAULT_PG_VERSION)] + #[clap(long, help = "Postgres version")] + pg_version: u32, + + #[clap( + long, + help = "If set, the node will be a hot replica on the specified timeline", + action = clap::ArgAction::Set, + default_value_t = false + )] + hot_standby: bool, + + #[clap(long, help = "If set, will set up the catalog for neon_superuser")] + update_catalog: bool, + + #[clap( + long, + help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests." + )] + allow_multiple: bool, +} + +#[derive(clap::Args)] +#[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")] +struct EndpointStartCmdArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, + #[clap(long = "pageserver-id")] + endpoint_pageserver_id: Option, + + #[clap(long)] + safekeepers: Option, + + #[clap( + long, + help = "Configure the remote extensions storage proxy gateway to request for extensions." + )] + remote_ext_config: Option, + + #[clap( + long, + help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`" + )] + create_test_user: bool, + + #[clap( + long, + help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests." + )] + allow_multiple: bool, + + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "Reconfigure an endpoint")] +struct EndpointReconfigureCmdArgs { + #[clap( + long = "tenant-id", + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: Option, + + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, + #[clap(long = "pageserver-id")] + endpoint_pageserver_id: Option, + + #[clap(long)] + safekeepers: Option, +} + +#[derive(clap::Args)] +#[clap(about = "Stop an endpoint")] +struct EndpointStopCmdArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, + + #[clap( + long, + help = "Also delete data directory (now optional, should be default in future)" + )] + destroy: bool, + + #[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m \"")] + #[arg(value_parser(["smart", "fast", "immediate"]))] + #[arg(default_value = "fast")] + mode: String, +} + +#[derive(clap::Subcommand)] +#[clap(about = "Manage neon_local branch name mappings")] +enum MappingsCmd { + Map(MappingsMapCmdArgs), +} + +#[derive(clap::Args)] +#[clap(about = "Create new mapping which cannot exist already")] +struct MappingsMapCmdArgs { + #[clap( + long, + help = "Tenant id. Represented as a hexadecimal string 32 symbols length" + )] + tenant_id: TenantId, + #[clap( + long, + help = "Timeline id. Represented as a hexadecimal string 32 symbols length" + )] + timeline_id: TimelineId, + #[clap(long, help = "Branch name to give to the timeline")] + branch_name: String, +} + /// /// Timelines tree element used as a value in the HashMap. /// @@ -80,19 +697,13 @@ struct TimelineTreeEl { // * Providing CLI api to the pageserver // * TODO: export/import to/from usual postgres fn main() -> Result<()> { - let matches = cli().get_matches(); - - let (sub_name, sub_args) = match matches.subcommand() { - Some(subcommand_data) => subcommand_data, - None => bail!("no subcommand provided"), - }; + let cli = Cli::parse(); // Check for 'neon init' command first. - let subcommand_result = if sub_name == "init" { - handle_init(sub_args).map(|env| Some(Cow::Owned(env))) + let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command { + handle_init(&args).map(|env| Some(Cow::Owned(env))) } else { // all other commands need an existing config - let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?; let original_env = env.clone(); let env = Box::leak(Box::new(env)); @@ -101,19 +712,20 @@ fn main() -> Result<()> { .build() .unwrap(); - let subcommand_result = match sub_name { - "tenant" => rt.block_on(handle_tenant(sub_args, env)), - "timeline" => rt.block_on(handle_timeline(sub_args, env)), - "start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))), - "stop" => rt.block_on(handle_stop_all(sub_args, env)), - "pageserver" => rt.block_on(handle_pageserver(sub_args, env)), - "storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)), - "storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)), - "safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)), - "endpoint" => rt.block_on(handle_endpoint(sub_args, env)), - "mappings" => handle_mappings(sub_args, env), - "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), - _ => bail!("unexpected subcommand {sub_name}"), + let subcommand_result = match cli.command { + NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"), + NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)), + NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)), + NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)), + NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)), + NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)), + NeonLocalCmd::StorageController(subcmd) => { + rt.block_on(handle_storage_controller(&subcmd, env)) + } + NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)), + NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)), + NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)), + NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env), }; if &original_env != env { @@ -263,10 +875,13 @@ async fn get_timeline_infos( .collect()) } -// Helper function to parse --tenant_id option, or get the default from config file -fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result { - if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() { - tenant_id_from_arguments +/// Helper function to get tenant id from an optional --tenant_id option or from the config file +fn get_tenant_id( + tenant_id_arg: Option, + env: &local_env::LocalEnv, +) -> anyhow::Result { + if let Some(tenant_id_from_arguments) = tenant_id_arg { + Ok(tenant_id_from_arguments) } else if let Some(default_id) = env.default_tenant_id { Ok(default_id) } else { @@ -274,13 +889,14 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R } } -// Helper function to parse --tenant_id option, for commands that accept a shard suffix +/// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file, +/// for commands that accept a shard suffix fn get_tenant_shard_id( - sub_match: &ArgMatches, + tenant_shard_id_arg: Option, env: &local_env::LocalEnv, ) -> anyhow::Result { - if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() { - tenant_id_from_arguments + if let Some(tenant_id_from_arguments) = tenant_shard_id_arg { + Ok(tenant_id_from_arguments) } else if let Some(default_id) = env.default_tenant_id { Ok(TenantShardId::unsharded(default_id)) } else { @@ -288,41 +904,11 @@ fn get_tenant_shard_id( } } -fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result> { - sub_match - .get_one::("tenant-id") - .map(|tenant_id| TenantId::from_str(tenant_id)) - .transpose() - .context("Failed to parse tenant id from the argument string") -} - -fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result> { - sub_match - .get_one::("tenant-id") - .map(|id_str| TenantShardId::from_str(id_str)) - .transpose() - .context("Failed to parse tenant shard id from the argument string") -} - -fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result> { - sub_match - .get_one::("timeline-id") - .map(|timeline_id| TimelineId::from_str(timeline_id)) - .transpose() - .context("Failed to parse timeline id from the argument string") -} - -fn handle_init(init_match: &ArgMatches) -> anyhow::Result { - let num_pageservers = init_match.get_one::("num-pageservers"); - - let force = init_match.get_one("force").expect("we set a default value"); - +fn handle_init(args: &InitCmdArgs) -> anyhow::Result { // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`. - let init_conf: NeonLocalInitConf = if let Some(config_path) = - init_match.get_one::("config") - { + let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config { // User (likely the Python test suite) provided a description of the environment. - if num_pageservers.is_some() { + if args.num_pageservers.is_some() { bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"); } // load and parse the file @@ -346,7 +932,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { http_port: DEFAULT_SAFEKEEPER_HTTP_PORT, ..Default::default() }], - pageservers: (0..num_pageservers.copied().unwrap_or(1)) + pageservers: (0..args.num_pageservers.unwrap_or(1)) .map(|i| { let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64); let pg_port = DEFAULT_PAGESERVER_PG_PORT + i; @@ -369,7 +955,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { } }; - LocalEnv::init(init_conf, force) + LocalEnv::init(init_conf, &args.force) .context("materialize initial neon_local environment on disk")?; Ok(LocalEnv::load_config(&local_env::base_path()) .expect("freshly written config should be loadable")) @@ -387,19 +973,16 @@ fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode { PageServerNode::from_env(env, ps_conf) } -async fn handle_tenant( - tenant_match: &ArgMatches, - env: &mut local_env::LocalEnv, -) -> anyhow::Result<()> { +async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> { let pageserver = get_default_pageserver(env); - match tenant_match.subcommand() { - Some(("list", _)) => { + match subcmd { + TenantCmd::List => { for t in pageserver.tenant_list().await? { println!("{} {:?}", t.id, t.state); } } - Some(("import", import_match)) => { - let tenant_id = parse_tenant_id(import_match)?.unwrap_or_else(TenantId::generate); + TenantCmd::Import(args) => { + let tenant_id = args.tenant_id; let storage_controller = StorageController::from_env(env); let create_response = storage_controller.tenant_import(tenant_id).await?; @@ -446,31 +1029,14 @@ async fn handle_tenant( env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?; } } - Some(("create", create_match)) => { - let tenant_conf: HashMap<_, _> = create_match - .get_many::("config") - .map(|vals: clap::parser::ValuesRef<'_, String>| { - vals.flat_map(|c| c.split_once(':')).collect() - }) - .unwrap_or_default(); - - let shard_count: u8 = create_match - .get_one::("shard-count") - .cloned() - .unwrap_or(0); - - let shard_stripe_size: Option = - create_match.get_one::("shard-stripe-size").cloned(); - - let placement_policy = match create_match.get_one::("placement-policy") { - Some(s) if !s.is_empty() => serde_json::from_str::(s)?, - _ => PlacementPolicy::Attached(0), - }; + TenantCmd::Create(args) => { + let tenant_conf: HashMap<_, _> = + args.config.iter().flat_map(|c| c.split_once(':')).collect(); let tenant_conf = PageServerNode::parse_config(tenant_conf)?; // If tenant ID was not specified, generate one - let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate); + let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate); // We must register the tenant with the storage controller, so // that when the pageserver restarts, it will be re-attached. @@ -478,29 +1044,26 @@ async fn handle_tenant( storage_controller .tenant_create(TenantCreateRequest { // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the - // storage controller expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest - // type is used both in storage controller (for creating tenants) and in pageserver (for creating shards) + // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest + // type is used both in the storage controller (for creating tenants) and in the pageserver (for + // creating shards) new_tenant_id: TenantShardId::unsharded(tenant_id), generation: None, shard_parameters: ShardParameters { - count: ShardCount::new(shard_count), - stripe_size: shard_stripe_size + count: ShardCount::new(args.shard_count), + stripe_size: args + .shard_stripe_size .map(ShardStripeSize) .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE), }, - placement_policy: Some(placement_policy), + placement_policy: args.placement_policy.clone(), config: tenant_conf, }) .await?; println!("tenant {tenant_id} successfully created on the pageserver"); // Create an initial timeline for the new tenant - let new_timeline_id = - parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate()); - let pg_version = create_match - .get_one::("pg-version") - .copied() - .context("Failed to parse postgres version from the argument string")?; + let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate()); // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have // different shards picking different start lsns. Maybe we have to teach storage controller @@ -513,7 +1076,7 @@ async fn handle_tenant( ancestor_timeline_id: None, ancestor_start_lsn: None, existing_initdb_timeline_id: None, - pg_version: Some(pg_version), + pg_version: Some(args.pg_version), }, ) .await?; @@ -526,23 +1089,19 @@ async fn handle_tenant( println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",); - if create_match.get_flag("set-default") { + if args.set_default { println!("Setting tenant {tenant_id} as a default one"); env.default_tenant_id = Some(tenant_id); } } - Some(("set-default", set_default_match)) => { - let tenant_id = - parse_tenant_id(set_default_match)?.context("No tenant id specified")?; - println!("Setting tenant {tenant_id} as a default one"); - env.default_tenant_id = Some(tenant_id); + TenantCmd::SetDefault(args) => { + println!("Setting tenant {} as a default one", args.tenant_id); + env.default_tenant_id = Some(args.tenant_id); } - Some(("config", create_match)) => { - let tenant_id = get_tenant_id(create_match, env)?; - let tenant_conf: HashMap<_, _> = create_match - .get_many::("config") - .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) - .unwrap_or_default(); + TenantCmd::Config(args) => { + let tenant_id = get_tenant_id(args.tenant_id, env)?; + let tenant_conf: HashMap<_, _> = + args.config.iter().flat_map(|c| c.split_once(':')).collect(); pageserver .tenant_config(tenant_id, tenant_conf) @@ -550,36 +1109,25 @@ async fn handle_tenant( .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?; println!("tenant {tenant_id} successfully configured on the pageserver"); } - - Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), - None => bail!("no tenant subcommand provided"), } Ok(()) } -async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { +async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> { let pageserver = get_default_pageserver(env); - match timeline_match.subcommand() { - Some(("list", list_match)) => { + match cmd { + TimelineCmd::List(args) => { // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller // where shard 0 is attached, and query there. - let tenant_shard_id = get_tenant_shard_id(list_match, env)?; + let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?; let timelines = pageserver.timeline_list(&tenant_shard_id).await?; print_timelines_tree(timelines, env.timeline_name_mappings())?; } - Some(("create", create_match)) => { - let tenant_id = get_tenant_id(create_match, env)?; - let new_branch_name = create_match - .get_one::("branch-name") - .ok_or_else(|| anyhow!("No branch name provided"))?; - - let pg_version = create_match - .get_one::("pg-version") - .copied() - .context("Failed to parse postgres version from the argument string")?; - - let new_timeline_id_opt = parse_timeline_id(create_match)?; + TimelineCmd::Create(args) => { + let tenant_id = get_tenant_id(args.tenant_id, env)?; + let new_branch_name = &args.branch_name; + let new_timeline_id_opt = args.timeline_id; let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate()); let storage_controller = StorageController::from_env(env); @@ -588,7 +1136,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local ancestor_timeline_id: None, existing_initdb_timeline_id: None, ancestor_start_lsn: None, - pg_version: Some(pg_version), + pg_version: Some(args.pg_version), }; let timeline_info = storage_controller .tenant_timeline_create(tenant_id, create_req) @@ -602,67 +1150,42 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local timeline_info.timeline_id ); } - Some(("import", import_match)) => { - let tenant_id = get_tenant_id(import_match, env)?; - let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided"); - let branch_name = import_match - .get_one::("branch-name") - .ok_or_else(|| anyhow!("No branch name provided"))?; + TimelineCmd::Import(args) => { + let tenant_id = get_tenant_id(args.tenant_id, env)?; + let timeline_id = args.timeline_id; + let branch_name = &args.branch_name; // Parse base inputs - let base_tarfile = import_match - .get_one::("base-tarfile") - .ok_or_else(|| anyhow!("No base-tarfile provided"))? - .to_owned(); - let base_lsn = Lsn::from_str( - import_match - .get_one::("base-lsn") - .ok_or_else(|| anyhow!("No base-lsn provided"))?, - )?; - let base = (base_lsn, base_tarfile); + let base = (args.base_lsn, args.base_tarfile.clone()); // Parse pg_wal inputs - let wal_tarfile = import_match.get_one::("wal-tarfile").cloned(); - let end_lsn = import_match - .get_one::("end-lsn") - .map(|s| Lsn::from_str(s).unwrap()); + let wal_tarfile = args.wal_tarfile.clone(); + let end_lsn = args.end_lsn; // TODO validate both or none are provided let pg_wal = end_lsn.zip(wal_tarfile); - let pg_version = import_match - .get_one::("pg-version") - .copied() - .context("Failed to parse postgres version from the argument string")?; - println!("Importing timeline into pageserver ..."); pageserver - .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version) + .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version) .await?; env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?; println!("Done"); } - Some(("branch", branch_match)) => { - let tenant_id = get_tenant_id(branch_match, env)?; - let new_timeline_id = - parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate()); - let new_branch_name = branch_match - .get_one::("branch-name") - .ok_or_else(|| anyhow!("No branch name provided"))?; - let ancestor_branch_name = branch_match - .get_one::("ancestor-branch-name") - .map(|s| s.as_str()) - .unwrap_or(DEFAULT_BRANCH_NAME); + TimelineCmd::Branch(args) => { + let tenant_id = get_tenant_id(args.tenant_id, env)?; + let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate()); + let new_branch_name = &args.branch_name; + let ancestor_branch_name = args + .ancestor_branch_name + .clone() + .unwrap_or(DEFAULT_BRANCH_NAME.to_owned()); let ancestor_timeline_id = env - .get_branch_timeline_id(ancestor_branch_name, tenant_id) + .get_branch_timeline_id(&ancestor_branch_name, tenant_id) .ok_or_else(|| { anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'") })?; - let start_lsn = branch_match - .get_one::("ancestor-start-lsn") - .map(|lsn_str| Lsn::from_str(lsn_str)) - .transpose() - .context("Failed to parse ancestor start Lsn from the request")?; + let start_lsn = args.ancestor_start_lsn; let storage_controller = StorageController::from_env(env); let create_req = TimelineCreateRequest { new_timeline_id, @@ -684,25 +1207,19 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local timeline_info.timeline_id ); } - Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"), - None => bail!("no tenant subcommand provided"), } Ok(()) } -async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let (sub_name, sub_args) = match ep_match.subcommand() { - Some(ep_subcommand_data) => ep_subcommand_data, - None => bail!("no endpoint subcommand provided"), - }; +async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> { let mut cplane = ComputeControlPlane::load(env.clone())?; - match sub_name { - "list" => { + match subcmd { + EndpointCmd::List(args) => { // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller // where shard 0 is attached, and query there. - let tenant_shard_id = get_tenant_shard_id(sub_args, env)?; + let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?; let timeline_infos = get_timeline_infos(env, &tenant_shard_id) .await .unwrap_or_else(|e| { @@ -766,52 +1283,29 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re println!("{table}"); } - "create" => { - let tenant_id = get_tenant_id(sub_args, env)?; - let branch_name = sub_args - .get_one::("branch-name") - .map(|s| s.as_str()) - .unwrap_or(DEFAULT_BRANCH_NAME); - let endpoint_id = sub_args - .get_one::("endpoint_id") - .map(String::to_string) + EndpointCmd::Create(args) => { + let tenant_id = get_tenant_id(args.tenant_id, env)?; + let branch_name = args + .branch_name + .clone() + .unwrap_or(DEFAULT_BRANCH_NAME.to_owned()); + let endpoint_id = args + .endpoint_id + .clone() .unwrap_or_else(|| format!("ep-{branch_name}")); - let update_catalog = sub_args - .get_one::("update-catalog") - .cloned() - .unwrap_or_default(); - let lsn = sub_args - .get_one::("lsn") - .map(|lsn_str| Lsn::from_str(lsn_str)) - .transpose() - .context("Failed to parse Lsn from the request")?; let timeline_id = env - .get_branch_timeline_id(branch_name, tenant_id) + .get_branch_timeline_id(&branch_name, tenant_id) .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?; - let pg_port: Option = sub_args.get_one::("pg-port").copied(); - let http_port: Option = sub_args.get_one::("http-port").copied(); - let pg_version = sub_args - .get_one::("pg-version") - .copied() - .context("Failed to parse postgres version from the argument string")?; - - let hot_standby = sub_args - .get_one::("hot-standby") - .copied() - .unwrap_or(false); - - let allow_multiple = sub_args.get_flag("allow-multiple"); - - let mode = match (lsn, hot_standby) { + let mode = match (args.lsn, args.hot_standby) { (Some(lsn), false) => ComputeMode::Static(lsn), (None, true) => ComputeMode::Replica, (None, false) => ComputeMode::Primary, (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; - match (mode, hot_standby) { + match (mode, args.hot_standby) { (ComputeMode::Static(_), true) => { bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") } @@ -821,7 +1315,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re _ => {} } - if !allow_multiple { + if !args.allow_multiple { cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?; } @@ -829,34 +1323,21 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re &endpoint_id, tenant_id, timeline_id, - pg_port, - http_port, - pg_version, + args.pg_port, + args.http_port, + args.pg_version, mode, - !update_catalog, + !args.update_catalog, )?; } - "start" => { - let endpoint_id = sub_args - .get_one::("endpoint_id") - .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; - - let pageserver_id = - if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { - Some(NodeId( - id_str.parse().context("while parsing pageserver id")?, - )) - } else { - None - }; - - let remote_ext_config = sub_args.get_one::("remote-ext-config"); - - let allow_multiple = sub_args.get_flag("allow-multiple"); + EndpointCmd::Start(args) => { + let endpoint_id = &args.endpoint_id; + let pageserver_id = args.endpoint_pageserver_id; + let remote_ext_config = &args.remote_ext_config; // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. - let safekeepers = if let Some(safekeepers) = parse_safekeepers(sub_args)? { + let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? { safekeepers } else { env.safekeepers.iter().map(|sk| sk.id).collect() @@ -867,12 +1348,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re .get(endpoint_id.as_str()) .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?; - let create_test_user = sub_args - .get_one::("create-test-user") - .cloned() - .unwrap_or_default(); - - if !allow_multiple { + if !args.allow_multiple { cplane.check_conflicting_endpoints( endpoint.mode, endpoint.tenant_id, @@ -936,72 +1412,61 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re &auth_token, safekeepers, pageservers, - remote_ext_config, + remote_ext_config.as_ref(), stripe_size.0 as usize, - create_test_user, + args.create_test_user, ) .await?; } - "reconfigure" => { - let endpoint_id = sub_args - .get_one::("endpoint_id") - .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?; + EndpointCmd::Reconfigure(args) => { + let endpoint_id = &args.endpoint_id; let endpoint = cplane .endpoints .get(endpoint_id.as_str()) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; - let pageservers = - if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { - let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?); - let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?); - vec![( - pageserver.pg_connection_config.host().clone(), - pageserver.pg_connection_config.port(), - )] - } else { - let storage_controller = StorageController::from_env(env); - storage_controller - .tenant_locate(endpoint.tenant_id) - .await? - .shards - .into_iter() - .map(|shard| { - ( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported malformed host"), - shard.listen_pg_port, - ) - }) - .collect::>() - }; + let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id { + let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?); + vec![( + pageserver.pg_connection_config.host().clone(), + pageserver.pg_connection_config.port(), + )] + } else { + let storage_controller = StorageController::from_env(env); + storage_controller + .tenant_locate(endpoint.tenant_id) + .await? + .shards + .into_iter() + .map(|shard| { + ( + Host::parse(&shard.listen_pg_addr) + .expect("Storage controller reported malformed host"), + shard.listen_pg_port, + ) + }) + .collect::>() + }; // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. - let safekeepers = parse_safekeepers(sub_args)?; + let safekeepers = parse_safekeepers(&args.safekeepers)?; endpoint.reconfigure(pageservers, None, safekeepers).await?; } - "stop" => { - let endpoint_id = sub_args - .get_one::("endpoint_id") - .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?; - let destroy = sub_args.get_flag("destroy"); - let mode = sub_args.get_one::("mode").expect("has a default"); - + EndpointCmd::Stop(args) => { + let endpoint_id = &args.endpoint_id; let endpoint = cplane .endpoints - .get(endpoint_id.as_str()) + .get(endpoint_id) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; - endpoint.stop(mode, destroy)?; + endpoint.stop(&args.mode, args.destroy)?; } - - _ => bail!("Unexpected endpoint subcommand '{sub_name}'"), } Ok(()) } /// Parse --safekeepers as list of safekeeper ids. -fn parse_safekeepers(sub_args: &ArgMatches) -> Result>> { - if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { +fn parse_safekeepers(safekeepers_str: &Option) -> Result>> { + if let Some(safekeepers_str) = safekeepers_str { let mut safekeepers: Vec = Vec::new(); for sk_id in safekeepers_str.split(',').map(str::trim) { let sk_id = NodeId( @@ -1016,44 +1481,25 @@ fn parse_safekeepers(sub_args: &ArgMatches) -> Result>> { } } -fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { - let (sub_name, sub_args) = match sub_match.subcommand() { - Some(ep_subcommand_data) => ep_subcommand_data, - None => bail!("no mappings subcommand provided"), - }; - - match sub_name { - "map" => { - let branch_name = sub_args - .get_one::("branch-name") - .expect("branch-name argument missing"); - - let tenant_id = sub_args - .get_one::("tenant-id") - .map(|x| TenantId::from_str(x)) - .expect("tenant-id argument missing") - .expect("malformed tenant-id arg"); - - let timeline_id = sub_args - .get_one::("timeline-id") - .map(|x| TimelineId::from_str(x)) - .expect("timeline-id argument missing") - .expect("malformed timeline-id arg"); - - env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?; +fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> { + match subcmd { + MappingsCmd::Map(args) => { + env.register_branch_mapping( + args.branch_name.to_owned(), + args.tenant_id, + args.timeline_id, + )?; Ok(()) } - other => unimplemented!("mappings subcommand {other}"), } } -fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { - let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { - NodeId(id_str.parse().context("while parsing pageserver id")?) - } else { - DEFAULT_PAGESERVER_ID - }; +fn get_pageserver( + env: &local_env::LocalEnv, + pageserver_id_arg: Option, +) -> Result { + let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID); Ok(PageServerNode::from_env( env, @@ -1061,48 +1507,11 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result &Duration { - let humantime_duration = args - .get_one::("start-timeout") - .expect("invalid value for start-timeout"); - humantime_duration.as_ref() -} - -fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs { - let maybe_instance_id = args.get_one::("instance-id"); - - let base_port = args.get_one::("base-port"); - - if maybe_instance_id.is_some() && base_port.is_none() { - panic!("storage-controller start specificied instance-id but did not provide base-port"); - } - - let start_timeout = args - .get_one::("start-timeout") - .expect("invalid value for start-timeout"); - - NeonStorageControllerStartArgs { - instance_id: maybe_instance_id.copied().unwrap_or(1), - base_port: base_port.copied(), - start_timeout: *start_timeout, - } -} - -fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs { - let maybe_instance_id = args.get_one::("instance-id"); - let immediate = args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); - - NeonStorageControllerStopArgs { - instance_id: maybe_instance_id.copied().unwrap_or(1), - immediate, - } -} - -async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - match sub_match.subcommand() { - Some(("start", subcommand_args)) => { - if let Err(e) = get_pageserver(env, subcommand_args)? - .start(get_start_timeout(subcommand_args)) +async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> { + match subcmd { + PageserverCmd::Start(args) => { + if let Err(e) = get_pageserver(env, args.pageserver_id)? + .start(&args.start_timeout) .await { eprintln!("pageserver start failed: {e}"); @@ -1110,34 +1519,36 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } } - Some(("stop", subcommand_args)) => { - let immediate = subcommand_args - .get_one::("stop-mode") - .map(|s| s.as_str()) - == Some("immediate"); - - if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) { + PageserverCmd::Stop(args) => { + let immediate = match args.stop_mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }; + if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) { eprintln!("pageserver stop failed: {}", e); exit(1); } } - Some(("restart", subcommand_args)) => { - let pageserver = get_pageserver(env, subcommand_args)?; + PageserverCmd::Restart(args) => { + let pageserver = get_pageserver(env, args.pageserver_id)?; //TODO what shutdown strategy should we use here? if let Err(e) = pageserver.stop(false) { eprintln!("pageserver stop failed: {}", e); exit(1); } - if let Err(e) = pageserver.start(get_start_timeout(sub_match)).await { + if let Err(e) = pageserver.start(&args.start_timeout).await { eprintln!("pageserver start failed: {e}"); exit(1); } } - Some(("status", subcommand_args)) => { - match get_pageserver(env, subcommand_args)?.check_status().await { + PageserverCmd::Status(args) => { + match get_pageserver(env, args.pageserver_id)? + .check_status() + .await + { Ok(_) => println!("Page server is up and running"), Err(err) => { eprintln!("Page server is not available: {}", err); @@ -1145,34 +1556,42 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } } } - - Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name), - None => bail!("no pageserver subcommand provided"), } Ok(()) } async fn handle_storage_controller( - sub_match: &ArgMatches, + subcmd: &StorageControllerCmd, env: &local_env::LocalEnv, ) -> Result<()> { let svc = StorageController::from_env(env); - match sub_match.subcommand() { - Some(("start", start_match)) => { - if let Err(e) = svc.start(storage_controller_start_args(start_match)).await { + match subcmd { + StorageControllerCmd::Start(args) => { + let start_args = NeonStorageControllerStartArgs { + instance_id: args.instance_id, + base_port: args.base_port, + start_timeout: args.start_timeout, + }; + + if let Err(e) = svc.start(start_args).await { eprintln!("start failed: {e}"); exit(1); } } - Some(("stop", stop_match)) => { - if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await { + StorageControllerCmd::Stop(args) => { + let stop_args = NeonStorageControllerStopArgs { + instance_id: args.instance_id, + immediate: match args.stop_mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }, + }; + if let Err(e) = svc.stop(stop_args).await { eprintln!("stop failed: {}", e); exit(1); } } - Some((sub_name, _)) => bail!("Unexpected storage_controller subcommand '{}'", sub_name), - None => bail!("no storage_controller subcommand provided"), } Ok(()) } @@ -1185,111 +1604,77 @@ fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result Vec { - init_match - .get_many::("safekeeper-extra-opt") - .into_iter() - .flatten() - .map(|s| s.to_owned()) - .collect() -} +async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> { + match subcmd { + SafekeeperCmd::Start(args) => { + let safekeeper = get_safekeeper(env, args.id)?; -async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let (sub_name, sub_args) = match sub_match.subcommand() { - Some(safekeeper_command_data) => safekeeper_command_data, - None => bail!("no safekeeper subcommand provided"), - }; - - // All the commands take an optional safekeeper name argument - let sk_id = if let Some(id_str) = sub_args.get_one::("id") { - NodeId(id_str.parse().context("while parsing safekeeper id")?) - } else { - DEFAULT_SAFEKEEPER_ID - }; - let safekeeper = get_safekeeper(env, sk_id)?; - - match sub_name { - "start" => { - let extra_opts = safekeeper_extra_opts(sub_args); - - if let Err(e) = safekeeper - .start(extra_opts, get_start_timeout(sub_args)) - .await - { + if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await { eprintln!("safekeeper start failed: {}", e); exit(1); } } - "stop" => { - let immediate = - sub_args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); - + SafekeeperCmd::Stop(args) => { + let safekeeper = get_safekeeper(env, args.id)?; + let immediate = match args.stop_mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }; if let Err(e) = safekeeper.stop(immediate) { eprintln!("safekeeper stop failed: {}", e); exit(1); } } - "restart" => { - let immediate = - sub_args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); + SafekeeperCmd::Restart(args) => { + let safekeeper = get_safekeeper(env, args.id)?; + let immediate = match args.stop_mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }; if let Err(e) = safekeeper.stop(immediate) { eprintln!("safekeeper stop failed: {}", e); exit(1); } - let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper - .start(extra_opts, get_start_timeout(sub_args)) - .await - { + if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await { eprintln!("safekeeper start failed: {}", e); exit(1); } } - - _ => { - bail!("Unexpected safekeeper subcommand '{}'", sub_name) - } } Ok(()) } -async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let (sub_name, sub_args) = match sub_match.subcommand() { - Some(broker_command_data) => broker_command_data, - None => bail!("no broker subcommand provided"), - }; - - match sub_name { - "start" => { - if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await { +async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> { + match subcmd { + StorageBrokerCmd::Start(args) => { + if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await { eprintln!("broker start failed: {e}"); exit(1); } } - "stop" => { + StorageBrokerCmd::Stop(_args) => { + // FIXME: stop_mode unused if let Err(e) = broker::stop_broker_process(env) { eprintln!("broker stop failed: {e}"); exit(1); } } - - _ => bail!("Unexpected broker subcommand '{}'", sub_name), } Ok(()) } async fn handle_start_all( + args: &StartCmdArgs, env: &'static local_env::LocalEnv, - retry_timeout: &Duration, ) -> anyhow::Result<()> { - let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else { - neon_start_status_check(env, retry_timeout) + // FIXME: this was called "retry_timeout", is it right? + let Err(errors) = handle_start_all_impl(env, args.timeout).await else { + neon_start_status_check(env, args.timeout.as_ref()) .await .context("status check after successful startup of all services")?; return Ok(()); @@ -1314,7 +1699,7 @@ async fn handle_start_all( /// Otherwise, returns the list of errors that occurred during startup. async fn handle_start_all_impl( env: &'static local_env::LocalEnv, - retry_timeout: Duration, + retry_timeout: humantime::Duration, ) -> Result<(), Vec> { // Endpoints are not started automatically @@ -1334,7 +1719,7 @@ async fn handle_start_all_impl( let storage_controller = StorageController::from_env(env); storage_controller .start(NeonStorageControllerStartArgs::with_default_instance_id( - retry_timeout.into(), + retry_timeout, )) .await .map_err(|e| e.context("start storage_controller")) @@ -1355,7 +1740,7 @@ async fn handle_start_all_impl( js.spawn(async move { let safekeeper = SafekeeperNode::from_env(env, node); safekeeper - .start(vec![], &retry_timeout) + .start(&[], &retry_timeout) .await .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id))) }); @@ -1435,9 +1820,11 @@ async fn neon_start_status_check( anyhow::bail!("\nNeon passed status check") } -async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let immediate = - sub_match.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); +async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> { + let immediate = match args.mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }; try_stop_all(env, immediate).await; @@ -1495,400 +1882,3 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } } - -fn cli() -> Command { - let timeout_arg = Arg::new("start-timeout") - .long("start-timeout") - .short('t') - .global(true) - .help("timeout until we fail the command, e.g. 30s") - .value_parser(value_parser!(humantime::Duration)) - .default_value("10s") - .required(false); - - let branch_name_arg = Arg::new("branch-name") - .long("branch-name") - .help("Name of the branch to be created or used as an alias for other services") - .required(false); - - let endpoint_id_arg = Arg::new("endpoint_id") - .help("Postgres endpoint id") - .required(false); - - let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false); - - // --id, when using a pageserver command - let pageserver_id_arg = Arg::new("pageserver-id") - .long("id") - .global(true) - .help("pageserver id") - .required(false); - // --pageserver-id when using a non-pageserver command - let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id") - .long("pageserver-id") - .required(false); - - let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt") - .short('e') - .long("safekeeper-extra-opt") - .num_args(1) - .action(ArgAction::Append) - .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo") - .required(false); - - let tenant_id_arg = Arg::new("tenant-id") - .long("tenant-id") - .help("Tenant id. Represented as a hexadecimal string 32 symbols length") - .required(false); - - let timeline_id_arg = Arg::new("timeline-id") - .long("timeline-id") - .help("Timeline id. Represented as a hexadecimal string 32 symbols length") - .required(false); - - let pg_version_arg = Arg::new("pg-version") - .long("pg-version") - .help("Postgres version to use for the initial tenant") - .required(false) - .value_parser(value_parser!(u32)) - .default_value(DEFAULT_PG_VERSION); - - let pg_port_arg = Arg::new("pg-port") - .long("pg-port") - .required(false) - .value_parser(value_parser!(u16)) - .value_name("pg-port"); - - let http_port_arg = Arg::new("http-port") - .long("http-port") - .required(false) - .value_parser(value_parser!(u16)) - .value_name("http-port"); - - let safekeepers_arg = Arg::new("safekeepers") - .long("safekeepers") - .required(false) - .value_name("safekeepers"); - - let stop_mode_arg = Arg::new("stop-mode") - .short('m') - .value_parser(["fast", "immediate"]) - .default_value("fast") - .help("If 'immediate', don't flush repository data at shutdown") - .required(false) - .value_name("stop-mode"); - - let remote_ext_config_args = Arg::new("remote-ext-config") - .long("remote-ext-config") - .num_args(1) - .help("Configure the remote extensions storage proxy gateway to request for extensions.") - .required(false); - - let lsn_arg = Arg::new("lsn") - .long("lsn") - .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") - .required(false); - - let hot_standby_arg = Arg::new("hot-standby") - .value_parser(value_parser!(bool)) - .long("hot-standby") - .help("If set, the node will be a hot replica on the specified timeline") - .required(false); - - let force_arg = Arg::new("force") - .value_parser(value_parser!(InitForceMode)) - .long("force") - .default_value( - InitForceMode::MustNotExist - .to_possible_value() - .unwrap() - .get_name() - .to_owned(), - ) - .help("Force initialization even if the repository is not empty") - .required(false); - - let num_pageservers_arg = Arg::new("num-pageservers") - .value_parser(value_parser!(u16)) - .long("num-pageservers") - .help("How many pageservers to create (default 1)"); - - let update_catalog = Arg::new("update-catalog") - .value_parser(value_parser!(bool)) - .long("update-catalog") - .help("If set, will set up the catalog for neon_superuser") - .required(false); - - let create_test_user = Arg::new("create-test-user") - .value_parser(value_parser!(bool)) - .long("create-test-user") - .help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`") - .required(false); - - let allow_multiple = Arg::new("allow-multiple") - .help("Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests.") - .long("allow-multiple") - .action(ArgAction::SetTrue) - .required(false); - - let instance_id = Arg::new("instance-id") - .long("instance-id") - .help("Identifier used to distinguish storage controller instances (default 1)") - .value_parser(value_parser!(u8)) - .required(false); - - let base_port = Arg::new("base-port") - .long("base-port") - .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)") - .value_parser(value_parser!(u16)) - .required(false); - - Command::new("Neon CLI") - .arg_required_else_help(true) - .version(GIT_VERSION) - .subcommand( - Command::new("init") - .about("Initialize a new Neon repository, preparing configs for services to start with") - .arg(num_pageservers_arg.clone()) - .arg( - Arg::new("config") - .long("config") - .required(false) - .value_parser(value_parser!(PathBuf)) - .value_name("config") - ) - .arg(force_arg) - ) - .subcommand( - Command::new("timeline") - .about("Manage timelines") - .arg_required_else_help(true) - .subcommand(Command::new("list") - .about("List all timelines, available to this pageserver") - .arg(tenant_id_arg.clone())) - .subcommand(Command::new("branch") - .about("Create a new timeline, using another timeline as a base, copying its data") - .arg(tenant_id_arg.clone()) - .arg(timeline_id_arg.clone()) - .arg(branch_name_arg.clone()) - .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name") - .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false)) - .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn") - .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false))) - .subcommand(Command::new("create") - .about("Create a new blank timeline") - .arg(tenant_id_arg.clone()) - .arg(timeline_id_arg.clone()) - .arg(branch_name_arg.clone()) - .arg(pg_version_arg.clone()) - ) - .subcommand(Command::new("import") - .about("Import timeline from basebackup directory") - .arg(tenant_id_arg.clone()) - .arg(timeline_id_arg.clone()) - .arg(branch_name_arg.clone()) - .arg(Arg::new("base-tarfile") - .long("base-tarfile") - .value_parser(value_parser!(PathBuf)) - .help("Basebackup tarfile to import") - ) - .arg(Arg::new("base-lsn").long("base-lsn") - .help("Lsn the basebackup starts at")) - .arg(Arg::new("wal-tarfile") - .long("wal-tarfile") - .value_parser(value_parser!(PathBuf)) - .help("Wal to add after base") - ) - .arg(Arg::new("end-lsn").long("end-lsn") - .help("Lsn the basebackup ends at")) - .arg(pg_version_arg.clone()) - ) - ).subcommand( - Command::new("tenant") - .arg_required_else_help(true) - .about("Manage tenants") - .subcommand(Command::new("list")) - .subcommand(Command::new("create") - .arg(tenant_id_arg.clone()) - .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) - .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)) - .arg(pg_version_arg.clone()) - .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false) - .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified")) - .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) - .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages")) - .arg(Arg::new("placement-policy").value_parser(value_parser!(String)).long("placement-policy").action(ArgAction::Set).help("Placement policy shards in this tenant")) - ) - .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true)) - .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified")) - .subcommand(Command::new("config") - .arg(tenant_id_arg.clone()) - .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))) - .subcommand(Command::new("import").arg(tenant_id_arg.clone().required(true)) - .about("Import a tenant that is present in remote storage, and create branches for its timelines")) - ) - .subcommand( - Command::new("pageserver") - .arg_required_else_help(true) - .about("Manage pageserver") - .arg(pageserver_id_arg) - .subcommand(Command::new("status")) - .subcommand(Command::new("start") - .about("Start local pageserver") - .arg(timeout_arg.clone()) - ) - .subcommand(Command::new("stop") - .about("Stop local pageserver") - .arg(stop_mode_arg.clone()) - ) - .subcommand(Command::new("restart") - .about("Restart local pageserver") - .arg(timeout_arg.clone()) - ) - ) - .subcommand( - Command::new("storage_controller") - .arg_required_else_help(true) - .about("Manage storage_controller") - .subcommand(Command::new("start").about("Start storage controller") - .arg(timeout_arg.clone()) - .arg(instance_id.clone()) - .arg(base_port)) - .subcommand(Command::new("stop").about("Stop storage controller") - .arg(stop_mode_arg.clone()) - .arg(instance_id)) - ) - .subcommand( - Command::new("storage_broker") - .arg_required_else_help(true) - .about("Manage broker") - .subcommand(Command::new("start") - .about("Start broker") - .arg(timeout_arg.clone()) - ) - .subcommand(Command::new("stop") - .about("Stop broker") - .arg(stop_mode_arg.clone()) - ) - ) - .subcommand( - Command::new("safekeeper") - .arg_required_else_help(true) - .about("Manage safekeepers") - .subcommand(Command::new("start") - .about("Start local safekeeper") - .arg(safekeeper_id_arg.clone()) - .arg(safekeeper_extra_opt_arg.clone()) - .arg(timeout_arg.clone()) - ) - .subcommand(Command::new("stop") - .about("Stop local safekeeper") - .arg(safekeeper_id_arg.clone()) - .arg(stop_mode_arg.clone()) - ) - .subcommand(Command::new("restart") - .about("Restart local safekeeper") - .arg(safekeeper_id_arg) - .arg(stop_mode_arg.clone()) - .arg(safekeeper_extra_opt_arg) - .arg(timeout_arg.clone()) - ) - ) - .subcommand( - Command::new("endpoint") - .arg_required_else_help(true) - .about("Manage postgres instances") - .subcommand(Command::new("list").arg(tenant_id_arg.clone())) - .subcommand(Command::new("create") - .about("Create a compute endpoint") - .arg(endpoint_id_arg.clone()) - .arg(branch_name_arg.clone()) - .arg(tenant_id_arg.clone()) - .arg(lsn_arg.clone()) - .arg(pg_port_arg.clone()) - .arg(http_port_arg.clone()) - .arg(endpoint_pageserver_id_arg.clone()) - .arg( - Arg::new("config-only") - .help("Don't do basebackup, create endpoint directory with only config files") - .long("config-only") - .required(false)) - .arg(pg_version_arg.clone()) - .arg(hot_standby_arg.clone()) - .arg(update_catalog) - .arg(allow_multiple.clone()) - ) - .subcommand(Command::new("start") - .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") - .arg(endpoint_id_arg.clone()) - .arg(endpoint_pageserver_id_arg.clone()) - .arg(safekeepers_arg.clone()) - .arg(remote_ext_config_args) - .arg(create_test_user) - .arg(allow_multiple.clone()) - .arg(timeout_arg.clone()) - ) - .subcommand(Command::new("reconfigure") - .about("Reconfigure the endpoint") - .arg(endpoint_pageserver_id_arg) - .arg(safekeepers_arg) - .arg(endpoint_id_arg.clone()) - .arg(tenant_id_arg.clone()) - ) - .subcommand( - Command::new("stop") - .arg(endpoint_id_arg) - .arg( - Arg::new("destroy") - .help("Also delete data directory (now optional, should be default in future)") - .long("destroy") - .action(ArgAction::SetTrue) - .required(false) - ) - .arg( - Arg::new("mode") - .help("Postgres shutdown mode, passed to \"pg_ctl -m \"") - .long("mode") - .action(ArgAction::Set) - .required(false) - .value_parser(["smart", "fast", "immediate"]) - .default_value("fast") - ) - ) - - ) - .subcommand( - Command::new("mappings") - .arg_required_else_help(true) - .about("Manage neon_local branch name mappings") - .subcommand( - Command::new("map") - .about("Create new mapping which cannot exist already") - .arg(branch_name_arg.clone()) - .arg(tenant_id_arg.clone()) - .arg(timeline_id_arg.clone()) - ) - ) - // Obsolete old name for 'endpoint'. We now just print an error if it's used. - .subcommand( - Command::new("pg") - .hide(true) - .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false)) - .trailing_var_arg(true) - ) - .subcommand( - Command::new("start") - .about("Start page server and safekeepers") - .arg(timeout_arg.clone()) - ) - .subcommand( - Command::new("stop") - .about("Stop page server and safekeepers") - .arg(stop_mode_arg) - ) -} - -#[test] -fn verify_cli() { - cli().debug_assert(); -} diff --git a/control_plane/src/branch_mappings.rs b/control_plane/src/branch_mappings.rs new file mode 100644 index 0000000000..e89313df39 --- /dev/null +++ b/control_plane/src/branch_mappings.rs @@ -0,0 +1,94 @@ +//! Branch mappings for convenience + +use std::collections::HashMap; +use std::fs; +use std::path::Path; + +use anyhow::{bail, Context}; +use serde::{Deserialize, Serialize}; + +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +/// Keep human-readable aliases in memory (and persist them to config XXX), to hide tenant/timeline hex strings from the user. +#[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct BranchMappings { + /// Default tenant ID to use with the 'neon_local' command line utility, when + /// --tenant_id is not explicitly specified. This comes from the branches. + pub default_tenant_id: Option, + + // A `HashMap>` would be more appropriate here, + // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. + // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". + pub mappings: HashMap>, +} + +impl BranchMappings { + pub fn register_branch_mapping( + &mut self, + branch_name: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result<()> { + let existing_values = self.mappings.entry(branch_name.clone()).or_default(); + + let existing_ids = existing_values + .iter() + .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id); + + if let Some((_, old_timeline_id)) = existing_ids { + if old_timeline_id == &timeline_id { + Ok(()) + } else { + bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"); + } + } else { + existing_values.push((tenant_id, timeline_id)); + Ok(()) + } + } + + pub fn get_branch_timeline_id( + &self, + branch_name: &str, + tenant_id: TenantId, + ) -> Option { + // If it looks like a timeline ID, return it as it is + if let Ok(timeline_id) = branch_name.parse::() { + return Some(timeline_id); + } + + self.mappings + .get(branch_name)? + .iter() + .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id) + .map(|&(_, timeline_id)| timeline_id) + .map(TimelineId::from) + } + + pub fn timeline_name_mappings(&self) -> HashMap { + self.mappings + .iter() + .flat_map(|(name, tenant_timelines)| { + tenant_timelines.iter().map(|&(tenant_id, timeline_id)| { + (TenantTimelineId::new(tenant_id, timeline_id), name.clone()) + }) + }) + .collect() + } + + pub fn persist(&self, path: &Path) -> anyhow::Result<()> { + let content = &toml::to_string_pretty(self)?; + fs::write(path, content).with_context(|| { + format!( + "Failed to write branch information into path '{}'", + path.display() + ) + }) + } + + pub fn load(path: &Path) -> anyhow::Result { + let branches_file_contents = fs::read_to_string(path)?; + Ok(toml::from_str(branches_file_contents.as_str())?) + } +} diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 573f1688d5..7a019bce88 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -113,7 +113,7 @@ impl SafekeeperNode { pub async fn start( &self, - extra_opts: Vec, + extra_opts: &[String], retry_timeout: &Duration, ) -> anyhow::Result<()> { print!( @@ -196,7 +196,7 @@ impl SafekeeperNode { ]); } - args.extend(extra_opts); + args.extend_from_slice(extra_opts); background_process::start_process( &format!("safekeeper-{id}"),