From 83bdebb4afa8da4707da5298afdf900c9f309822 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 12:22:33 +0000 Subject: [PATCH 1/5] WIP --- Cargo.lock | 3 ++ control_plane/Cargo.toml | 3 ++ control_plane/src/bin/neon_local.rs | 2 +- control_plane/src/pageserver.rs | 51 +++++++++++++++++++++-------- libs/postgres_connection/src/lib.rs | 15 +++++++-- 5 files changed, 57 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a647568f28..6695b3cac9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1249,6 +1249,7 @@ dependencies = [ "clap", "comfy-table", "compute_api", + "futures", "git-version", "hex", "hyper", @@ -1269,6 +1270,8 @@ dependencies = [ "tar", "thiserror", "tokio", + "tokio-postgres", + "tokio-util", "toml", "tracing", "url", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index fb36ee8aa1..d1487d0c53 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -9,6 +9,7 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true +futures.workspace = true git-version.workspace = true nix.workspace = true once_cell.workspace = true @@ -24,6 +25,8 @@ tar.workspace = true thiserror.workspace = true toml.workspace = true tokio.workspace = true +tokio-postgres.workspace = true +tokio-util.workspace = true url.workspace = true pageserver = { path = "../pageserver" } pageserver_api.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f639c6cdc6..5dd6370dac 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -560,7 +560,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?; + pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index f230973cd0..d812ff12e3 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -16,6 +16,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; +use futures::{SinkExt, StreamExt}; use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; @@ -282,7 +283,12 @@ impl PageServerNode { background_process::stop_process(immediate, "pageserver", &self.pid_file()) } - pub fn page_server_psql_client(&self) -> anyhow::Result { + pub async fn page_server_psql_client( + &self, + ) -> anyhow::Result<( + tokio_postgres::Client, + tokio_postgres::Connection, + )> { let mut config = self.pg_connection_config.clone(); if self.conf.pg_auth_type == AuthType::NeonJWT { let token = self @@ -290,7 +296,7 @@ impl PageServerNode { .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; config = config.set_password(Some(token)); } - Ok(config.connect_no_tls()?) + Ok(config.connect_no_tls().await?) } pub async fn check_status(&self) -> mgmt_api::Result<()> { @@ -514,7 +520,7 @@ impl PageServerNode { /// * `timeline_id` - id to assign to imported timeline /// * `base` - (start lsn of basebackup, path to `base.tar` file) /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`) - pub fn timeline_import( + pub async fn timeline_import( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -522,17 +528,25 @@ impl PageServerNode { pg_wal: Option<(Lsn, PathBuf)>, pg_version: u32, ) -> anyhow::Result<()> { - let mut client = self.page_server_psql_client()?; + let (client, conn) = self.page_server_psql_client().await?; + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {}", e); + } + }); + tokio::pin!(client); // Init base reader let (start_lsn, base_tarfile_path) = base; - let base_tarfile = File::open(base_tarfile_path)?; - let mut base_reader = BufReader::new(base_tarfile); + let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; + let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { - let wal_tarfile = File::open(wal_tarfile_path)?; - let wal_reader = BufReader::new(wal_tarfile); + let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?; + let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile); (end_lsn, Some(wal_reader)) } else { (start_lsn, None) @@ -542,16 +556,25 @@ impl PageServerNode { let import_cmd = format!( "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" ); - let mut writer = client.copy_in(&import_cmd)?; - io::copy(&mut base_reader, &mut writer)?; - writer.finish()?; + let writer = client.copy_in(&import_cmd).await?; + let mut writer = std::pin::pin!(writer); + let mut writer = + writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); + let mut base_tarfile = std::pin::pin!(base_tarfile); + writer.send_all(&mut base_tarfile).await?; + writer.into_inner().finish().await?; // Import wal if necessary if let Some(mut wal_reader) = wal_reader { let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); - let mut writer = client.copy_in(&import_cmd)?; - io::copy(&mut wal_reader, &mut writer)?; - writer.finish()?; + + let writer = client.copy_in(&import_cmd).await?; + let mut writer = std::pin::pin!(writer); + let mut writer = writer + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); + let mut wal_reader = std::pin::pin!(wal_reader); + writer.send_all(&mut wal_reader).await?; + writer.into_inner().finish().await?; } Ok(()) diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index 35cb1a2691..d793abc8d0 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context}; use itertools::Itertools; use std::borrow::Cow; use std::fmt; +use tokio_postgres::tls::NoTlsStream; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair. @@ -163,8 +164,18 @@ impl PgConnectionConfig { } /// Connect using postgres protocol with TLS disabled. - pub fn connect_no_tls(&self) -> Result { - postgres::Config::from(self.to_tokio_postgres_config()).connect(postgres::NoTls) + pub async fn connect_no_tls( + &self, + ) -> Result< + ( + tokio_postgres::Client, + tokio_postgres::Connection, + ), + postgres::Error, + > { + self.to_tokio_postgres_config() + .connect(postgres::NoTls) + .await } } From 2664e9b8348516715a8694dc5deb3cea41e744eb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 12:29:02 +0000 Subject: [PATCH 2/5] fix --- control_plane/src/pageserver.rs | 57 ++++++++++++++++------------- libs/postgres_connection/src/lib.rs | 1 - 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index d812ff12e3..e01f28e07b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -6,9 +6,9 @@ //! use std::borrow::Cow; use std::collections::HashMap; -use std::fs::File; + use std::io; -use std::io::{BufReader, Write}; +use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; use std::process::{Child, Command}; @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; @@ -541,7 +541,7 @@ impl PageServerNode { // Init base reader let (start_lsn, base_tarfile_path) = base; let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; - let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); + let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { @@ -552,29 +552,36 @@ impl PageServerNode { (start_lsn, None) }; + let copy_in = |reader, cmd| { + let client = &client; + async move { + let writer = client.copy_in(&cmd).await?; + let writer = std::pin::pin!(writer); + let mut writer = writer.sink_map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) + }); + let mut reader = std::pin::pin!(reader); + writer.send_all(&mut reader).await?; + writer.into_inner().finish().await?; + anyhow::Ok(()) + } + }; + // Import base - let import_cmd = format!( - "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" - ); - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = - writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut base_tarfile = std::pin::pin!(base_tarfile); - writer.send_all(&mut base_tarfile).await?; - writer.into_inner().finish().await?; - + copy_in( + base_tarfile, + format!( + "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" + ), + ) + .await?; // Import wal if necessary - if let Some(mut wal_reader) = wal_reader { - let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); - - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = writer - .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut wal_reader = std::pin::pin!(wal_reader); - writer.send_all(&mut wal_reader).await?; - writer.into_inner().finish().await?; + if let Some(wal_reader) = wal_reader { + copy_in( + wal_reader, + format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"), + ) + .await?; } Ok(()) diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index d793abc8d0..ccf9108895 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -4,7 +4,6 @@ use anyhow::{bail, Context}; use itertools::Itertools; use std::borrow::Cow; use std::fmt; -use tokio_postgres::tls::NoTlsStream; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair. From d7a8e0b1aea44608e9075cdbfd8b8b5535d98348 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:34:32 +0000 Subject: [PATCH 3/5] eliminate one workaround, convert sk stuff to async as well --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/attachment_service.rs | 7 +- control_plane/src/background_process.rs | 35 +++++----- control_plane/src/bin/neon_local.rs | 47 ++++++++----- control_plane/src/broker.rs | 7 +- control_plane/src/pageserver.rs | 91 ++++++++++++------------- control_plane/src/safekeeper.rs | 40 ++++++----- 8 files changed, 126 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6695b3cac9..c4b6ecdf5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1245,6 +1245,7 @@ name = "control_plane" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "comfy-table", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index d1487d0c53..face604241 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 528d733793..731c05809e 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -64,7 +64,7 @@ impl AttachmentService { .expect("non-Unicode path") } - pub fn start(&self) -> anyhow::Result { + pub async fn start(&self) -> anyhow::Result { let path_str = self.path.to_string_lossy(); background_process::start_process( @@ -73,10 +73,11 @@ impl AttachmentService { &self.env.attachment_service_bin(), ["-l", &self.listen, "-p", &path_str], [], - background_process::InitialPidFile::Create(&self.pid_file()), + background_process::InitialPidFile::Create(self.pid_file()), // TODO: a real status check - || Ok(true), + || async move { anyhow::Ok(true) }, ) + .await } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 26fc08fc8f..f97722400d 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -44,15 +44,15 @@ const NOTICE_AFTER_RETRIES: u64 = 50; /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates /// it itself. -pub enum InitialPidFile<'t> { +pub enum InitialPidFile { /// Create a pidfile, to allow future CLI invocations to manipulate the process. - Create(&'t Utf8Path), + Create(Utf8PathBuf), /// The process will create the pidfile itself, need to wait for that event. - Expect(&'t Utf8Path), + Expect(Utf8PathBuf), } /// Start a background child process using the parameters given. -pub fn start_process( +pub async fn start_process( process_name: &str, datadir: &Path, command: &Path, @@ -62,7 +62,8 @@ pub fn start_process( process_status_check: F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, AI: IntoIterator, A: AsRef, // Not generic AsRef, otherwise empty `envs` prevents type inference @@ -89,7 +90,7 @@ where let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command)); filled_cmd.envs(envs); - let pid_file_to_check = match initial_pid_file { + let pid_file_to_check = match &initial_pid_file { InitialPidFile::Create(path) => { pre_exec_create_pidfile(filled_cmd, path); path @@ -107,7 +108,7 @@ where ); for retries in 0..RETRIES { - match process_started(pid, Some(pid_file_to_check), &process_status_check) { + match process_started(pid, &pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started, pid: {pid}"); return Ok(spawned_process); @@ -316,22 +317,20 @@ where cmd } -fn process_started( +async fn process_started( pid: Pid, - pid_file_to_check: Option<&Utf8Path>, + pid_file_to_check: &Utf8Path, status_check: &F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, { - match status_check() { - Ok(true) => match pid_file_to_check { - Some(pid_file_path) => match pid_file::read(pid_file_path)? { - PidFileRead::NotExist => Ok(false), - PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), - PidFileRead::NotHeldByAnyProcess(_) => Ok(false), - }, - None => Ok(true), + match status_check().await { + Ok(true) => match pid_file::read(pid_file_to_check)? { + PidFileRead::NotExist => Ok(false), + PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), + PidFileRead::NotHeldByAnyProcess(_) => Ok(false), }, Ok(false) => Ok(false), Err(e) => anyhow::bail!("process failed to start: {e}"), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5dd6370dac..03e69010f7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -128,11 +128,11 @@ fn main() -> Result<()> { let subcommand_result = match sub_name { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => handle_start_all(sub_args, &env), + "start" => rt.block_on(handle_start_all(sub_args, &env)), "stop" => handle_stop_all(sub_args, &env), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), - "attachment_service" => handle_attachment_service(sub_args, &env), - "safekeeper" => handle_safekeeper(sub_args, &env), + "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)), + "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), "mappings" => handle_mappings(sub_args, &mut env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), @@ -560,7 +560,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?; + pageserver + .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version) + .await?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); @@ -904,6 +906,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Some(("start", subcommand_args)) => { if let Err(e) = get_pageserver(env, subcommand_args)? .start(&pageserver_config_overrides(subcommand_args)) + .await { eprintln!("pageserver start failed: {e}"); exit(1); @@ -930,7 +933,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -944,7 +950,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -966,11 +975,14 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } -fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_attachment_service( + sub_match: &ArgMatches, + env: &local_env::LocalEnv, +) -> Result<()> { let svc = AttachmentService::from_env(env); match sub_match.subcommand() { Some(("start", _start_match)) => { - if let Err(e) = svc.start() { + if let Err(e) = svc.start().await { eprintln!("start failed: {e}"); exit(1); } @@ -1011,7 +1023,7 @@ fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec { .collect() } -fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let (sub_name, sub_args) = match sub_match.subcommand() { Some(safekeeper_command_data) => safekeeper_command_data, None => bail!("no safekeeper subcommand provided"), @@ -1029,7 +1041,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul "start" => { let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1055,7 +1067,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1068,15 +1080,15 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { +async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { // Endpoints are not started automatically - broker::start_broker_process(env)?; + broker::start_broker_process(env).await?; // Only start the attachment service if the pageserver is configured to need it if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.start() { + if let Err(e) = attachment_service.start().await { eprintln!("attachment_service start failed: {:#}", e); try_stop_all(env, true); exit(1); @@ -1085,7 +1097,10 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(sub_match)) + .await + { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); try_stop_all(env, true); exit(1); @@ -1094,7 +1109,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![]) { + if let Err(e) = safekeeper.start(vec![]).await { eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); try_stop_all(env, false); exit(1); diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 6be865cc2e..7b6943f9c1 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -11,7 +11,7 @@ use camino::Utf8PathBuf; use crate::{background_process, local_env}; -pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { +pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { let broker = &env.broker; let listen_addr = &broker.listen_addr; @@ -26,8 +26,8 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { &env.storage_broker_bin(), args, [], - background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)), - || { + background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), + || async { let url = broker.client_url(); let status_url = url.join("status").with_context(|| { format!("Failed to append /status path to broker endpoint {url}") @@ -42,6 +42,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { } }, ) + .await .context("Failed to spawn storage_broker subprocess")?; Ok(()) } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index e01f28e07b..3e7e335a68 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -149,8 +149,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result { - self.start_node(config_overrides, false) + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result { + self.start_node(config_overrides, false).await } fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { @@ -191,53 +191,52 @@ impl PageServerNode { Ok(()) } - fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result { + async fn start_node( + &self, + config_overrides: &[&str], + update_config: bool, + ) -> anyhow::Result { // TODO: using a thread here because start_process() is not async but we need to call check_status() - std::thread::scope(move |s| { - s.spawn(move || { - let datadir = self.repo_path(); - print!( - "Starting pageserver node {} at '{}' in {:?}", - self.conf.id, - self.pg_connection_config.raw_address(), - datadir - ); - io::stdout().flush().context("flush stdout")?; + let datadir = self.repo_path(); + print!( + "Starting pageserver node {} at '{}' in {:?}", + self.conf.id, + self.pg_connection_config.raw_address(), + datadir + ); + io::stdout().flush().context("flush stdout")?; - let datadir_path_str = datadir.to_str().with_context(|| { - format!( - "Cannot start pageserver node {} in path that has no string representation: {:?}", - self.conf.id, datadir, - ) - })?; - let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); - if update_config { - args.push(Cow::Borrowed("--update-config")); + let datadir_path_str = datadir.to_str().with_context(|| { + format!( + "Cannot start pageserver node {} in path that has no string representation: {:?}", + self.conf.id, datadir, + ) + })?; + let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); + if update_config { + args.push(Cow::Borrowed("--update-config")); + } + background_process::start_process( + "pageserver", + &datadir, + &self.env.pageserver_bin(), + args.iter().map(Cow::as_ref), + self.pageserver_env_variables()?, + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let st = rt.block_on(self.check_status()); + match st { + Ok(()) => Ok(true), + Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), } - background_process::start_process( - "pageserver", - &datadir, - &self.env.pageserver_bin(), - args.iter().map(Cow::as_ref), - self.pageserver_env_variables()?, - background_process::InitialPidFile::Expect(&self.pid_file()), - || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let st = rt.block_on(self.check_status()); - match st { - Ok(()) => Ok(true), - Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), - } - }, - ) - }) - .join() - .unwrap() - }) + }, + ) + .await } fn pageserver_basic_args<'a>( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index a8baa0ac53..4026ef0eb9 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -13,7 +13,6 @@ use std::{io, result}; use anyhow::Context; use camino::Utf8PathBuf; use postgres_connection::PgConnectionConfig; -use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::{http::error::HttpErrorBody, id::NodeId}; @@ -34,12 +33,14 @@ pub enum SafekeeperHttpError { type Result = result::Result; +#[async_trait::async_trait] pub trait ResponseErrorMessageExt: Sized { - fn error_from_body(self) -> Result; + async fn error_from_body(self) -> Result; } -impl ResponseErrorMessageExt for Response { - fn error_from_body(self) -> Result { +#[async_trait::async_trait] +impl ResponseErrorMessageExt for reqwest::Response { + async fn error_from_body(self) -> Result { let status = self.status(); if !(status.is_client_error() || status.is_server_error()) { return Ok(self); @@ -48,7 +49,7 @@ impl ResponseErrorMessageExt for Response { // reqwest does not export its error construction utility functions, so let's craft the message ourselves let url = self.url().to_owned(); Err(SafekeeperHttpError::Response( - match self.json::() { + match self.json::().await { Ok(err_body) => format!("Error: {}", err_body.msg), Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), }, @@ -69,7 +70,7 @@ pub struct SafekeeperNode { pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, - pub http_client: Client, + pub http_client: reqwest::Client, pub http_base_url: String, } @@ -80,7 +81,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), env: env.clone(), - http_client: Client::new(), + http_client: reqwest::Client::new(), http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), } } @@ -103,7 +104,7 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub fn start(&self, extra_opts: Vec) -> anyhow::Result { + pub async fn start(&self, extra_opts: Vec) -> anyhow::Result { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(), @@ -191,13 +192,16 @@ impl SafekeeperNode { &self.env.safekeeper_bin(), &args, [], - background_process::InitialPidFile::Expect(&self.pid_file()), - || match self.check_status() { - Ok(()) => Ok(true), - Err(SafekeeperHttpError::Transport(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + match self.check_status().await { + Ok(()) => Ok(true), + Err(SafekeeperHttpError::Transport(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + } }, ) + .await } /// @@ -216,7 +220,7 @@ impl SafekeeperNode { ) } - fn http_request(&self, method: Method, url: U) -> RequestBuilder { + fn http_request(&self, method: Method, url: U) -> reqwest::RequestBuilder { // TODO: authentication //if self.env.auth_type == AuthType::NeonJWT { // builder = builder.bearer_auth(&self.env.safekeeper_auth_token) @@ -224,10 +228,12 @@ impl SafekeeperNode { self.http_client.request(method, url) } - pub fn check_status(&self) -> Result<()> { + pub async fn check_status(&self) -> Result<()> { self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) - .send()? - .error_from_body()?; + .send() + .await? + .error_from_body() + .await?; Ok(()) } } From 7ac9ef8291ed5e3a876fcd33d18929fd36762c9a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:38:34 +0000 Subject: [PATCH 4/5] remove unused dep --- Cargo.lock | 1 - libs/pageserver_api/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4b6ecdf5d..a435aeb7d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3167,7 +3167,6 @@ dependencies = [ "enum-map", "hex", "postgres_ffi", - "rand 0.8.5", "serde", "serde_json", "serde_with", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 09f85f6bfc..4d08d78e87 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -15,7 +15,6 @@ byteorder.workspace = true utils.workspace = true postgres_ffi.workspace = true enum-map.workspace = true -rand.workspace = true strum.workspace = true strum_macros.workspace = true hex.workspace = true From cab12c02a358bcb276c47e00e2fc31482903a792 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:43:40 +0000 Subject: [PATCH 5/5] clippy --- control_plane/src/background_process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index f97722400d..20fa3af9b8 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -108,7 +108,7 @@ where ); for retries in 0..RETRIES { - match process_started(pid, &pid_file_to_check, &process_status_check).await { + match process_started(pid, pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started, pid: {pid}"); return Ok(spawned_process);