diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 1d66532d49..2397811628 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -106,6 +106,7 @@ fn main() -> Result<()> { "stop" => rt.block_on(handle_stop_all(sub_args, &env)), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), + "storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)), "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), "mappings" => handle_mappings(sub_args, &mut env), @@ -1244,6 +1245,32 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } +async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let (sub_name, sub_args) = match sub_match.subcommand() { + Some(broker_command_data) => broker_command_data, + None => bail!("no broker subcommand provided"), + }; + + match sub_name { + "start" => { + if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await { + eprintln!("broker start failed: {e}"); + exit(1); + } + } + + "stop" => { + if let Err(e) = broker::stop_broker_process(env) { + eprintln!("broker stop failed: {e}"); + exit(1); + } + } + + _ => bail!("Unexpected broker subcommand '{}'", sub_name), + } + Ok(()) +} + async fn handle_start_all( env: &local_env::LocalEnv, retry_timeout: &Duration, @@ -1671,6 +1698,19 @@ fn cli() -> Command { .arg(stop_mode_arg.clone()) .arg(instance_id)) ) + .subcommand( + Command::new("storage_broker") + .arg_required_else_help(true) + .about("Manage broker") + .subcommand(Command::new("start") + .about("Start broker") + .arg(timeout_arg.clone()) + ) + .subcommand(Command::new("stop") + .about("Stop broker") + .arg(stop_mode_arg.clone()) + ) + ) .subcommand( Command::new("safekeeper") .arg_required_else_help(true)