From d7a8e0b1aea44608e9075cdbfd8b8b5535d98348 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:34:32 +0000 Subject: [PATCH] eliminate one workaround, convert sk stuff to async as well --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/attachment_service.rs | 7 +- control_plane/src/background_process.rs | 35 +++++----- control_plane/src/bin/neon_local.rs | 47 ++++++++----- control_plane/src/broker.rs | 7 +- control_plane/src/pageserver.rs | 91 ++++++++++++------------- control_plane/src/safekeeper.rs | 40 ++++++----- 8 files changed, 126 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6695b3cac9..c4b6ecdf5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1245,6 +1245,7 @@ name = "control_plane" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "comfy-table", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index d1487d0c53..face604241 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 528d733793..731c05809e 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -64,7 +64,7 @@ impl AttachmentService { .expect("non-Unicode path") } - pub fn start(&self) -> anyhow::Result { + pub async fn start(&self) -> anyhow::Result { let path_str = self.path.to_string_lossy(); background_process::start_process( @@ -73,10 +73,11 @@ impl AttachmentService { &self.env.attachment_service_bin(), ["-l", &self.listen, "-p", &path_str], [], - background_process::InitialPidFile::Create(&self.pid_file()), + background_process::InitialPidFile::Create(self.pid_file()), // TODO: a real status check - || Ok(true), + || async move { anyhow::Ok(true) }, ) + .await } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 26fc08fc8f..f97722400d 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -44,15 +44,15 @@ const NOTICE_AFTER_RETRIES: u64 = 50; /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates /// it itself. -pub enum InitialPidFile<'t> { +pub enum InitialPidFile { /// Create a pidfile, to allow future CLI invocations to manipulate the process. - Create(&'t Utf8Path), + Create(Utf8PathBuf), /// The process will create the pidfile itself, need to wait for that event. - Expect(&'t Utf8Path), + Expect(Utf8PathBuf), } /// Start a background child process using the parameters given. -pub fn start_process( +pub async fn start_process( process_name: &str, datadir: &Path, command: &Path, @@ -62,7 +62,8 @@ pub fn start_process( process_status_check: F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, AI: IntoIterator, A: AsRef, // Not generic AsRef, otherwise empty `envs` prevents type inference @@ -89,7 +90,7 @@ where let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command)); filled_cmd.envs(envs); - let pid_file_to_check = match initial_pid_file { + let pid_file_to_check = match &initial_pid_file { InitialPidFile::Create(path) => { pre_exec_create_pidfile(filled_cmd, path); path @@ -107,7 +108,7 @@ where ); for retries in 0..RETRIES { - match process_started(pid, Some(pid_file_to_check), &process_status_check) { + match process_started(pid, &pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started, pid: {pid}"); return Ok(spawned_process); @@ -316,22 +317,20 @@ where cmd } -fn process_started( +async fn process_started( pid: Pid, - pid_file_to_check: Option<&Utf8Path>, + pid_file_to_check: &Utf8Path, status_check: &F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, { - match status_check() { - Ok(true) => match pid_file_to_check { - Some(pid_file_path) => match pid_file::read(pid_file_path)? { - PidFileRead::NotExist => Ok(false), - PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), - PidFileRead::NotHeldByAnyProcess(_) => Ok(false), - }, - None => Ok(true), + match status_check().await { + Ok(true) => match pid_file::read(pid_file_to_check)? { + PidFileRead::NotExist => Ok(false), + PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), + PidFileRead::NotHeldByAnyProcess(_) => Ok(false), }, Ok(false) => Ok(false), Err(e) => anyhow::bail!("process failed to start: {e}"), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5dd6370dac..03e69010f7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -128,11 +128,11 @@ fn main() -> Result<()> { let subcommand_result = match sub_name { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => handle_start_all(sub_args, &env), + "start" => rt.block_on(handle_start_all(sub_args, &env)), "stop" => handle_stop_all(sub_args, &env), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), - "attachment_service" => handle_attachment_service(sub_args, &env), - "safekeeper" => handle_safekeeper(sub_args, &env), + "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)), + "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), "mappings" => handle_mappings(sub_args, &mut env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), @@ -560,7 +560,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?; + pageserver + .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version) + .await?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); @@ -904,6 +906,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Some(("start", subcommand_args)) => { if let Err(e) = get_pageserver(env, subcommand_args)? .start(&pageserver_config_overrides(subcommand_args)) + .await { eprintln!("pageserver start failed: {e}"); exit(1); @@ -930,7 +933,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -944,7 +950,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -966,11 +975,14 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } -fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_attachment_service( + sub_match: &ArgMatches, + env: &local_env::LocalEnv, +) -> Result<()> { let svc = AttachmentService::from_env(env); match sub_match.subcommand() { Some(("start", _start_match)) => { - if let Err(e) = svc.start() { + if let Err(e) = svc.start().await { eprintln!("start failed: {e}"); exit(1); } @@ -1011,7 +1023,7 @@ fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec { .collect() } -fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let (sub_name, sub_args) = match sub_match.subcommand() { Some(safekeeper_command_data) => safekeeper_command_data, None => bail!("no safekeeper subcommand provided"), @@ -1029,7 +1041,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul "start" => { let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1055,7 +1067,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1068,15 +1080,15 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { +async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { // Endpoints are not started automatically - broker::start_broker_process(env)?; + broker::start_broker_process(env).await?; // Only start the attachment service if the pageserver is configured to need it if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.start() { + if let Err(e) = attachment_service.start().await { eprintln!("attachment_service start failed: {:#}", e); try_stop_all(env, true); exit(1); @@ -1085,7 +1097,10 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(sub_match)) + .await + { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); try_stop_all(env, true); exit(1); @@ -1094,7 +1109,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![]) { + if let Err(e) = safekeeper.start(vec![]).await { eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); try_stop_all(env, false); exit(1); diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 6be865cc2e..7b6943f9c1 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -11,7 +11,7 @@ use camino::Utf8PathBuf; use crate::{background_process, local_env}; -pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { +pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { let broker = &env.broker; let listen_addr = &broker.listen_addr; @@ -26,8 +26,8 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { &env.storage_broker_bin(), args, [], - background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)), - || { + background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), + || async { let url = broker.client_url(); let status_url = url.join("status").with_context(|| { format!("Failed to append /status path to broker endpoint {url}") @@ -42,6 +42,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { } }, ) + .await .context("Failed to spawn storage_broker subprocess")?; Ok(()) } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index e01f28e07b..3e7e335a68 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -149,8 +149,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result { - self.start_node(config_overrides, false) + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result { + self.start_node(config_overrides, false).await } fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { @@ -191,53 +191,52 @@ impl PageServerNode { Ok(()) } - fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result { + async fn start_node( + &self, + config_overrides: &[&str], + update_config: bool, + ) -> anyhow::Result { // TODO: using a thread here because start_process() is not async but we need to call check_status() - std::thread::scope(move |s| { - s.spawn(move || { - let datadir = self.repo_path(); - print!( - "Starting pageserver node {} at '{}' in {:?}", - self.conf.id, - self.pg_connection_config.raw_address(), - datadir - ); - io::stdout().flush().context("flush stdout")?; + let datadir = self.repo_path(); + print!( + "Starting pageserver node {} at '{}' in {:?}", + self.conf.id, + self.pg_connection_config.raw_address(), + datadir + ); + io::stdout().flush().context("flush stdout")?; - let datadir_path_str = datadir.to_str().with_context(|| { - format!( - "Cannot start pageserver node {} in path that has no string representation: {:?}", - self.conf.id, datadir, - ) - })?; - let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); - if update_config { - args.push(Cow::Borrowed("--update-config")); + let datadir_path_str = datadir.to_str().with_context(|| { + format!( + "Cannot start pageserver node {} in path that has no string representation: {:?}", + self.conf.id, datadir, + ) + })?; + let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); + if update_config { + args.push(Cow::Borrowed("--update-config")); + } + background_process::start_process( + "pageserver", + &datadir, + &self.env.pageserver_bin(), + args.iter().map(Cow::as_ref), + self.pageserver_env_variables()?, + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let st = rt.block_on(self.check_status()); + match st { + Ok(()) => Ok(true), + Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), } - background_process::start_process( - "pageserver", - &datadir, - &self.env.pageserver_bin(), - args.iter().map(Cow::as_ref), - self.pageserver_env_variables()?, - background_process::InitialPidFile::Expect(&self.pid_file()), - || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let st = rt.block_on(self.check_status()); - match st { - Ok(()) => Ok(true), - Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), - } - }, - ) - }) - .join() - .unwrap() - }) + }, + ) + .await } fn pageserver_basic_args<'a>( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index a8baa0ac53..4026ef0eb9 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -13,7 +13,6 @@ use std::{io, result}; use anyhow::Context; use camino::Utf8PathBuf; use postgres_connection::PgConnectionConfig; -use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::{http::error::HttpErrorBody, id::NodeId}; @@ -34,12 +33,14 @@ pub enum SafekeeperHttpError { type Result = result::Result; +#[async_trait::async_trait] pub trait ResponseErrorMessageExt: Sized { - fn error_from_body(self) -> Result; + async fn error_from_body(self) -> Result; } -impl ResponseErrorMessageExt for Response { - fn error_from_body(self) -> Result { +#[async_trait::async_trait] +impl ResponseErrorMessageExt for reqwest::Response { + async fn error_from_body(self) -> Result { let status = self.status(); if !(status.is_client_error() || status.is_server_error()) { return Ok(self); @@ -48,7 +49,7 @@ impl ResponseErrorMessageExt for Response { // reqwest does not export its error construction utility functions, so let's craft the message ourselves let url = self.url().to_owned(); Err(SafekeeperHttpError::Response( - match self.json::() { + match self.json::().await { Ok(err_body) => format!("Error: {}", err_body.msg), Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), }, @@ -69,7 +70,7 @@ pub struct SafekeeperNode { pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, - pub http_client: Client, + pub http_client: reqwest::Client, pub http_base_url: String, } @@ -80,7 +81,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), env: env.clone(), - http_client: Client::new(), + http_client: reqwest::Client::new(), http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), } } @@ -103,7 +104,7 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub fn start(&self, extra_opts: Vec) -> anyhow::Result { + pub async fn start(&self, extra_opts: Vec) -> anyhow::Result { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(), @@ -191,13 +192,16 @@ impl SafekeeperNode { &self.env.safekeeper_bin(), &args, [], - background_process::InitialPidFile::Expect(&self.pid_file()), - || match self.check_status() { - Ok(()) => Ok(true), - Err(SafekeeperHttpError::Transport(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + match self.check_status().await { + Ok(()) => Ok(true), + Err(SafekeeperHttpError::Transport(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + } }, ) + .await } /// @@ -216,7 +220,7 @@ impl SafekeeperNode { ) } - fn http_request(&self, method: Method, url: U) -> RequestBuilder { + fn http_request(&self, method: Method, url: U) -> reqwest::RequestBuilder { // TODO: authentication //if self.env.auth_type == AuthType::NeonJWT { // builder = builder.bearer_auth(&self.env.safekeeper_auth_token) @@ -224,10 +228,12 @@ impl SafekeeperNode { self.http_client.request(method, url) } - pub fn check_status(&self) -> Result<()> { + pub async fn check_status(&self) -> Result<()> { self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) - .send()? - .error_from_body()?; + .send() + .await? + .error_from_body() + .await?; Ok(()) } }