From 83bdebb4afa8da4707da5298afdf900c9f309822 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 12:22:33 +0000 Subject: [PATCH 01/12] WIP --- Cargo.lock | 3 ++ control_plane/Cargo.toml | 3 ++ control_plane/src/bin/neon_local.rs | 2 +- control_plane/src/pageserver.rs | 51 +++++++++++++++++++++-------- libs/postgres_connection/src/lib.rs | 15 +++++++-- 5 files changed, 57 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a647568f28..6695b3cac9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1249,6 +1249,7 @@ dependencies = [ "clap", "comfy-table", "compute_api", + "futures", "git-version", "hex", "hyper", @@ -1269,6 +1270,8 @@ dependencies = [ "tar", "thiserror", "tokio", + "tokio-postgres", + "tokio-util", "toml", "tracing", "url", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index fb36ee8aa1..d1487d0c53 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -9,6 +9,7 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true +futures.workspace = true git-version.workspace = true nix.workspace = true once_cell.workspace = true @@ -24,6 +25,8 @@ tar.workspace = true thiserror.workspace = true toml.workspace = true tokio.workspace = true +tokio-postgres.workspace = true +tokio-util.workspace = true url.workspace = true pageserver = { path = "../pageserver" } pageserver_api.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f639c6cdc6..5dd6370dac 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -560,7 +560,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?; + pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index f230973cd0..d812ff12e3 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -16,6 +16,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; +use futures::{SinkExt, StreamExt}; use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; @@ -282,7 +283,12 @@ impl PageServerNode { background_process::stop_process(immediate, "pageserver", &self.pid_file()) } - pub fn page_server_psql_client(&self) -> anyhow::Result { + pub async fn page_server_psql_client( + &self, + ) -> anyhow::Result<( + tokio_postgres::Client, + tokio_postgres::Connection, + )> { let mut config = self.pg_connection_config.clone(); if self.conf.pg_auth_type == AuthType::NeonJWT { let token = self @@ -290,7 +296,7 @@ impl PageServerNode { .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; config = config.set_password(Some(token)); } - Ok(config.connect_no_tls()?) + Ok(config.connect_no_tls().await?) } pub async fn check_status(&self) -> mgmt_api::Result<()> { @@ -514,7 +520,7 @@ impl PageServerNode { /// * `timeline_id` - id to assign to imported timeline /// * `base` - (start lsn of basebackup, path to `base.tar` file) /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`) - pub fn timeline_import( + pub async fn timeline_import( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -522,17 +528,25 @@ impl PageServerNode { pg_wal: Option<(Lsn, PathBuf)>, pg_version: u32, ) -> anyhow::Result<()> { - let mut client = self.page_server_psql_client()?; + let (client, conn) = self.page_server_psql_client().await?; + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {}", e); + } + }); + tokio::pin!(client); // Init base reader let (start_lsn, base_tarfile_path) = base; - let base_tarfile = File::open(base_tarfile_path)?; - let mut base_reader = BufReader::new(base_tarfile); + let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; + let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { - let wal_tarfile = File::open(wal_tarfile_path)?; - let wal_reader = BufReader::new(wal_tarfile); + let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?; + let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile); (end_lsn, Some(wal_reader)) } else { (start_lsn, None) @@ -542,16 +556,25 @@ impl PageServerNode { let import_cmd = format!( "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" ); - let mut writer = client.copy_in(&import_cmd)?; - io::copy(&mut base_reader, &mut writer)?; - writer.finish()?; + let writer = client.copy_in(&import_cmd).await?; + let mut writer = std::pin::pin!(writer); + let mut writer = + writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); + let mut base_tarfile = std::pin::pin!(base_tarfile); + writer.send_all(&mut base_tarfile).await?; + writer.into_inner().finish().await?; // Import wal if necessary if let Some(mut wal_reader) = wal_reader { let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); - let mut writer = client.copy_in(&import_cmd)?; - io::copy(&mut wal_reader, &mut writer)?; - writer.finish()?; + + let writer = client.copy_in(&import_cmd).await?; + let mut writer = std::pin::pin!(writer); + let mut writer = writer + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); + let mut wal_reader = std::pin::pin!(wal_reader); + writer.send_all(&mut wal_reader).await?; + writer.into_inner().finish().await?; } Ok(()) diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index 35cb1a2691..d793abc8d0 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context}; use itertools::Itertools; use std::borrow::Cow; use std::fmt; +use tokio_postgres::tls::NoTlsStream; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair. @@ -163,8 +164,18 @@ impl PgConnectionConfig { } /// Connect using postgres protocol with TLS disabled. - pub fn connect_no_tls(&self) -> Result { - postgres::Config::from(self.to_tokio_postgres_config()).connect(postgres::NoTls) + pub async fn connect_no_tls( + &self, + ) -> Result< + ( + tokio_postgres::Client, + tokio_postgres::Connection, + ), + postgres::Error, + > { + self.to_tokio_postgres_config() + .connect(postgres::NoTls) + .await } } From 2664e9b8348516715a8694dc5deb3cea41e744eb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 12:29:02 +0000 Subject: [PATCH 02/12] fix --- control_plane/src/pageserver.rs | 57 ++++++++++++++++------------- libs/postgres_connection/src/lib.rs | 1 - 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index d812ff12e3..e01f28e07b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -6,9 +6,9 @@ //! use std::borrow::Cow; use std::collections::HashMap; -use std::fs::File; + use std::io; -use std::io::{BufReader, Write}; +use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; use std::process::{Child, Command}; @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; @@ -541,7 +541,7 @@ impl PageServerNode { // Init base reader let (start_lsn, base_tarfile_path) = base; let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; - let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); + let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { @@ -552,29 +552,36 @@ impl PageServerNode { (start_lsn, None) }; + let copy_in = |reader, cmd| { + let client = &client; + async move { + let writer = client.copy_in(&cmd).await?; + let writer = std::pin::pin!(writer); + let mut writer = writer.sink_map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) + }); + let mut reader = std::pin::pin!(reader); + writer.send_all(&mut reader).await?; + writer.into_inner().finish().await?; + anyhow::Ok(()) + } + }; + // Import base - let import_cmd = format!( - "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" - ); - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = - writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut base_tarfile = std::pin::pin!(base_tarfile); - writer.send_all(&mut base_tarfile).await?; - writer.into_inner().finish().await?; - + copy_in( + base_tarfile, + format!( + "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" + ), + ) + .await?; // Import wal if necessary - if let Some(mut wal_reader) = wal_reader { - let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); - - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = writer - .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut wal_reader = std::pin::pin!(wal_reader); - writer.send_all(&mut wal_reader).await?; - writer.into_inner().finish().await?; + if let Some(wal_reader) = wal_reader { + copy_in( + wal_reader, + format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"), + ) + .await?; } Ok(()) diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index d793abc8d0..ccf9108895 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -4,7 +4,6 @@ use anyhow::{bail, Context}; use itertools::Itertools; use std::borrow::Cow; use std::fmt; -use tokio_postgres::tls::NoTlsStream; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair. From d7a8e0b1aea44608e9075cdbfd8b8b5535d98348 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:34:32 +0000 Subject: [PATCH 03/12] eliminate one workaround, convert sk stuff to async as well --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/attachment_service.rs | 7 +- control_plane/src/background_process.rs | 35 +++++----- control_plane/src/bin/neon_local.rs | 47 ++++++++----- control_plane/src/broker.rs | 7 +- control_plane/src/pageserver.rs | 91 ++++++++++++------------- control_plane/src/safekeeper.rs | 40 ++++++----- 8 files changed, 126 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6695b3cac9..c4b6ecdf5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1245,6 +1245,7 @@ name = "control_plane" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "comfy-table", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index d1487d0c53..face604241 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 528d733793..731c05809e 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -64,7 +64,7 @@ impl AttachmentService { .expect("non-Unicode path") } - pub fn start(&self) -> anyhow::Result { + pub async fn start(&self) -> anyhow::Result { let path_str = self.path.to_string_lossy(); background_process::start_process( @@ -73,10 +73,11 @@ impl AttachmentService { &self.env.attachment_service_bin(), ["-l", &self.listen, "-p", &path_str], [], - background_process::InitialPidFile::Create(&self.pid_file()), + background_process::InitialPidFile::Create(self.pid_file()), // TODO: a real status check - || Ok(true), + || async move { anyhow::Ok(true) }, ) + .await } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 26fc08fc8f..f97722400d 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -44,15 +44,15 @@ const NOTICE_AFTER_RETRIES: u64 = 50; /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates /// it itself. -pub enum InitialPidFile<'t> { +pub enum InitialPidFile { /// Create a pidfile, to allow future CLI invocations to manipulate the process. - Create(&'t Utf8Path), + Create(Utf8PathBuf), /// The process will create the pidfile itself, need to wait for that event. - Expect(&'t Utf8Path), + Expect(Utf8PathBuf), } /// Start a background child process using the parameters given. -pub fn start_process( +pub async fn start_process( process_name: &str, datadir: &Path, command: &Path, @@ -62,7 +62,8 @@ pub fn start_process( process_status_check: F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, AI: IntoIterator, A: AsRef, // Not generic AsRef, otherwise empty `envs` prevents type inference @@ -89,7 +90,7 @@ where let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command)); filled_cmd.envs(envs); - let pid_file_to_check = match initial_pid_file { + let pid_file_to_check = match &initial_pid_file { InitialPidFile::Create(path) => { pre_exec_create_pidfile(filled_cmd, path); path @@ -107,7 +108,7 @@ where ); for retries in 0..RETRIES { - match process_started(pid, Some(pid_file_to_check), &process_status_check) { + match process_started(pid, &pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started, pid: {pid}"); return Ok(spawned_process); @@ -316,22 +317,20 @@ where cmd } -fn process_started( +async fn process_started( pid: Pid, - pid_file_to_check: Option<&Utf8Path>, + pid_file_to_check: &Utf8Path, status_check: &F, ) -> anyhow::Result where - F: Fn() -> anyhow::Result, + F: Fn() -> Fut, + Fut: std::future::Future>, { - match status_check() { - Ok(true) => match pid_file_to_check { - Some(pid_file_path) => match pid_file::read(pid_file_path)? { - PidFileRead::NotExist => Ok(false), - PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), - PidFileRead::NotHeldByAnyProcess(_) => Ok(false), - }, - None => Ok(true), + match status_check().await { + Ok(true) => match pid_file::read(pid_file_to_check)? { + PidFileRead::NotExist => Ok(false), + PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid), + PidFileRead::NotHeldByAnyProcess(_) => Ok(false), }, Ok(false) => Ok(false), Err(e) => anyhow::bail!("process failed to start: {e}"), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5dd6370dac..03e69010f7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -128,11 +128,11 @@ fn main() -> Result<()> { let subcommand_result = match sub_name { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => handle_start_all(sub_args, &env), + "start" => rt.block_on(handle_start_all(sub_args, &env)), "stop" => handle_stop_all(sub_args, &env), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), - "attachment_service" => handle_attachment_service(sub_args, &env), - "safekeeper" => handle_safekeeper(sub_args, &env), + "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)), + "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), "mappings" => handle_mappings(sub_args, &mut env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), @@ -560,7 +560,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?; + pageserver + .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version) + .await?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); @@ -904,6 +906,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Some(("start", subcommand_args)) => { if let Err(e) = get_pageserver(env, subcommand_args)? .start(&pageserver_config_overrides(subcommand_args)) + .await { eprintln!("pageserver start failed: {e}"); exit(1); @@ -930,7 +933,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -944,7 +950,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -966,11 +975,14 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } -fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_attachment_service( + sub_match: &ArgMatches, + env: &local_env::LocalEnv, +) -> Result<()> { let svc = AttachmentService::from_env(env); match sub_match.subcommand() { Some(("start", _start_match)) => { - if let Err(e) = svc.start() { + if let Err(e) = svc.start().await { eprintln!("start failed: {e}"); exit(1); } @@ -1011,7 +1023,7 @@ fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec { .collect() } -fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let (sub_name, sub_args) = match sub_match.subcommand() { Some(safekeeper_command_data) => safekeeper_command_data, None => bail!("no safekeeper subcommand provided"), @@ -1029,7 +1041,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul "start" => { let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1055,7 +1067,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts) { + if let Err(e) = safekeeper.start(extra_opts).await { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1068,15 +1080,15 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { +async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { // Endpoints are not started automatically - broker::start_broker_process(env)?; + broker::start_broker_process(env).await?; // Only start the attachment service if the pageserver is configured to need it if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.start() { + if let Err(e) = attachment_service.start().await { eprintln!("attachment_service start failed: {:#}", e); try_stop_all(env, true); exit(1); @@ -1085,7 +1097,10 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { + if let Err(e) = pageserver + .start(&pageserver_config_overrides(sub_match)) + .await + { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); try_stop_all(env, true); exit(1); @@ -1094,7 +1109,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![]) { + if let Err(e) = safekeeper.start(vec![]).await { eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); try_stop_all(env, false); exit(1); diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 6be865cc2e..7b6943f9c1 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -11,7 +11,7 @@ use camino::Utf8PathBuf; use crate::{background_process, local_env}; -pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { +pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { let broker = &env.broker; let listen_addr = &broker.listen_addr; @@ -26,8 +26,8 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { &env.storage_broker_bin(), args, [], - background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)), - || { + background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), + || async { let url = broker.client_url(); let status_url = url.join("status").with_context(|| { format!("Failed to append /status path to broker endpoint {url}") @@ -42,6 +42,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { } }, ) + .await .context("Failed to spawn storage_broker subprocess")?; Ok(()) } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index e01f28e07b..3e7e335a68 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -149,8 +149,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result { - self.start_node(config_overrides, false) + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result { + self.start_node(config_overrides, false).await } fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { @@ -191,53 +191,52 @@ impl PageServerNode { Ok(()) } - fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result { + async fn start_node( + &self, + config_overrides: &[&str], + update_config: bool, + ) -> anyhow::Result { // TODO: using a thread here because start_process() is not async but we need to call check_status() - std::thread::scope(move |s| { - s.spawn(move || { - let datadir = self.repo_path(); - print!( - "Starting pageserver node {} at '{}' in {:?}", - self.conf.id, - self.pg_connection_config.raw_address(), - datadir - ); - io::stdout().flush().context("flush stdout")?; + let datadir = self.repo_path(); + print!( + "Starting pageserver node {} at '{}' in {:?}", + self.conf.id, + self.pg_connection_config.raw_address(), + datadir + ); + io::stdout().flush().context("flush stdout")?; - let datadir_path_str = datadir.to_str().with_context(|| { - format!( - "Cannot start pageserver node {} in path that has no string representation: {:?}", - self.conf.id, datadir, - ) - })?; - let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); - if update_config { - args.push(Cow::Borrowed("--update-config")); + let datadir_path_str = datadir.to_str().with_context(|| { + format!( + "Cannot start pageserver node {} in path that has no string representation: {:?}", + self.conf.id, datadir, + ) + })?; + let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); + if update_config { + args.push(Cow::Borrowed("--update-config")); + } + background_process::start_process( + "pageserver", + &datadir, + &self.env.pageserver_bin(), + args.iter().map(Cow::as_ref), + self.pageserver_env_variables()?, + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let st = rt.block_on(self.check_status()); + match st { + Ok(()) => Ok(true), + Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), } - background_process::start_process( - "pageserver", - &datadir, - &self.env.pageserver_bin(), - args.iter().map(Cow::as_ref), - self.pageserver_env_variables()?, - background_process::InitialPidFile::Expect(&self.pid_file()), - || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let st = rt.block_on(self.check_status()); - match st { - Ok(()) => Ok(true), - Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), - } - }, - ) - }) - .join() - .unwrap() - }) + }, + ) + .await } fn pageserver_basic_args<'a>( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index a8baa0ac53..4026ef0eb9 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -13,7 +13,6 @@ use std::{io, result}; use anyhow::Context; use camino::Utf8PathBuf; use postgres_connection::PgConnectionConfig; -use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::{http::error::HttpErrorBody, id::NodeId}; @@ -34,12 +33,14 @@ pub enum SafekeeperHttpError { type Result = result::Result; +#[async_trait::async_trait] pub trait ResponseErrorMessageExt: Sized { - fn error_from_body(self) -> Result; + async fn error_from_body(self) -> Result; } -impl ResponseErrorMessageExt for Response { - fn error_from_body(self) -> Result { +#[async_trait::async_trait] +impl ResponseErrorMessageExt for reqwest::Response { + async fn error_from_body(self) -> Result { let status = self.status(); if !(status.is_client_error() || status.is_server_error()) { return Ok(self); @@ -48,7 +49,7 @@ impl ResponseErrorMessageExt for Response { // reqwest does not export its error construction utility functions, so let's craft the message ourselves let url = self.url().to_owned(); Err(SafekeeperHttpError::Response( - match self.json::() { + match self.json::().await { Ok(err_body) => format!("Error: {}", err_body.msg), Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), }, @@ -69,7 +70,7 @@ pub struct SafekeeperNode { pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, - pub http_client: Client, + pub http_client: reqwest::Client, pub http_base_url: String, } @@ -80,7 +81,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), env: env.clone(), - http_client: Client::new(), + http_client: reqwest::Client::new(), http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), } } @@ -103,7 +104,7 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub fn start(&self, extra_opts: Vec) -> anyhow::Result { + pub async fn start(&self, extra_opts: Vec) -> anyhow::Result { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(), @@ -191,13 +192,16 @@ impl SafekeeperNode { &self.env.safekeeper_bin(), &args, [], - background_process::InitialPidFile::Expect(&self.pid_file()), - || match self.check_status() { - Ok(()) => Ok(true), - Err(SafekeeperHttpError::Transport(_)) => Ok(false), - Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + background_process::InitialPidFile::Expect(self.pid_file()), + || async { + match self.check_status().await { + Ok(()) => Ok(true), + Err(SafekeeperHttpError::Transport(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + } }, ) + .await } /// @@ -216,7 +220,7 @@ impl SafekeeperNode { ) } - fn http_request(&self, method: Method, url: U) -> RequestBuilder { + fn http_request(&self, method: Method, url: U) -> reqwest::RequestBuilder { // TODO: authentication //if self.env.auth_type == AuthType::NeonJWT { // builder = builder.bearer_auth(&self.env.safekeeper_auth_token) @@ -224,10 +228,12 @@ impl SafekeeperNode { self.http_client.request(method, url) } - pub fn check_status(&self) -> Result<()> { + pub async fn check_status(&self) -> Result<()> { self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) - .send()? - .error_from_body()?; + .send() + .await? + .error_from_body() + .await?; Ok(()) } } From 7ac9ef8291ed5e3a876fcd33d18929fd36762c9a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:38:34 +0000 Subject: [PATCH 04/12] remove unused dep --- Cargo.lock | 1 - libs/pageserver_api/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4b6ecdf5d..a435aeb7d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3167,7 +3167,6 @@ dependencies = [ "enum-map", "hex", "postgres_ffi", - "rand 0.8.5", "serde", "serde_json", "serde_with", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 09f85f6bfc..4d08d78e87 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -15,7 +15,6 @@ byteorder.workspace = true utils.workspace = true postgres_ffi.workspace = true enum-map.workspace = true -rand.workspace = true strum.workspace = true strum_macros.workspace = true hex.workspace = true From cab12c02a358bcb276c47e00e2fc31482903a792 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 13:43:40 +0000 Subject: [PATCH 05/12] clippy --- control_plane/src/background_process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index f97722400d..20fa3af9b8 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -108,7 +108,7 @@ where ); for retries in 0..RETRIES { - match process_started(pid, &pid_file_to_check, &process_status_check).await { + match process_started(pid, pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started, pid: {pid}"); return Ok(spawned_process); From f91625a552a9f04a91669b8254d22d3e6a05b6a9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 14:04:09 +0000 Subject: [PATCH 06/12] fixup --- control_plane/src/broker.rs | 4 ++-- control_plane/src/pageserver.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 7b6943f9c1..f40705863b 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -19,7 +19,7 @@ pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<( let args = [format!("--listen-addr={listen_addr}")]; - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); background_process::start_process( "storage_broker", &env.base_data_dir, @@ -36,7 +36,7 @@ pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<( .get(status_url) .build() .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?; - match client.execute(request) { + match client.execute(request).await { Ok(resp) => Ok(resp.status().is_success()), Err(_) => Ok(false), } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3e7e335a68..a0256c1e52 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -224,11 +224,7 @@ impl PageServerNode { self.pageserver_env_variables()?, background_process::InitialPidFile::Expect(self.pid_file()), || async { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let st = rt.block_on(self.check_status()); + let st = self.check_status().await; match st { Ok(()) => Ok(true), Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false), From 1a71b72c391cd747f90099141c7946b5af52655b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:09:39 +0000 Subject: [PATCH 07/12] move serialization roundtrip to rust unit test --- pageserver/src/client/mgmt_api.rs | 2 +- pageserver/src/http/models/partitioning.rs | 39 ++++++++++++++++++++++ pageserver/src/http/routes.rs | 16 +-------- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/pageserver/src/client/mgmt_api.rs b/pageserver/src/client/mgmt_api.rs index 66d7da9d0b..8e50741029 100644 --- a/pageserver/src/client/mgmt_api.rs +++ b/pageserver/src/client/mgmt_api.rs @@ -99,7 +99,7 @@ impl Client { timeline_id: TimelineId, ) -> Result { let uri = format!( - "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true", + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace", self.mgmt_api_endpoint ); self.get(&uri) diff --git a/pageserver/src/http/models/partitioning.rs b/pageserver/src/http/models/partitioning.rs index bf0a62f3a7..d52794fd8c 100644 --- a/pageserver/src/http/models/partitioning.rs +++ b/pageserver/src/http/models/partitioning.rs @@ -110,3 +110,42 @@ impl<'a> serde::Deserialize<'a> for Partitioning { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialization_roundtrip() { + let reference = r#" + { + "keys": [ + [ + "000000000000000000000000000000000000", + "000000000000000000000000000000000001" + ], + [ + "000000067F00000001000000000000000000", + "000000067F00000001000000000000000002" + ], + [ + "030000000000000000000000000000000000", + "030000000000000000000000000000000003" + ] + ], + "at_lsn": "0/2240160" + } + "#; + + let de: Partitioning = serde_json::from_str(reference).unwrap(); + + let ser = serde_json::to_string(&de).unwrap(); + + let ser_de: serde_json::Value = serde_json::from_str(&ser).unwrap(); + + assert_eq!( + ser_de, + serde_json::from_str::<'_, serde_json::Value>(reference).unwrap() + ); + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c63c691726..d5248fb996 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1489,9 +1489,6 @@ async fn timeline_collect_keyspace( let at_lsn: Option = parse_query_param(&request, "at_lsn")?; - let check_serialization_roundtrip: bool = - parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false); - async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; @@ -1502,18 +1499,7 @@ async fn timeline_collect_keyspace( .map_err(|e| ApiError::InternalServerError(e.into()))?; let res = crate::http::models::partitioning::Partitioning { keys, at_lsn }; - if check_serialization_roundtrip { - (|| { - let ser = serde_json::ser::to_vec(&res).context("serialize")?; - let de: crate::http::models::partitioning::Partitioning = - serde_json::from_slice(&ser).context("deserialize")?; - anyhow::ensure!(de == res, "not equal"); - info!("passed serialization rountrip check"); - Ok(()) - })() - .context("serialization rountrip") - .map_err(ApiError::InternalServerError)?; - } + json_response(StatusCode::OK, res) } .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) From 46889d768ee2aa37e35a7c4003e1a924a8e92759 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:21:05 +0000 Subject: [PATCH 08/12] move client to separate crate --- Cargo.lock | 14 +++++++++++++- Cargo.toml | 2 ++ control_plane/Cargo.toml | 2 +- control_plane/src/pageserver.rs | 2 +- pageserver/client/Cargo.toml | 13 +++++++++++++ pageserver/{src/client.rs => client/src/lib.rs} | 0 pageserver/{src/client => client/src}/mgmt_api.rs | 5 ++--- pageserver/src/lib.rs | 1 - 8 files changed, 32 insertions(+), 7 deletions(-) create mode 100644 pageserver/client/Cargo.toml rename pageserver/{src/client.rs => client/src/lib.rs} (100%) rename pageserver/{src/client => client/src}/mgmt_api.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index a435aeb7d0..22fde5d2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1256,8 +1256,8 @@ dependencies = [ "hyper", "nix 0.26.2", "once_cell", - "pageserver", "pageserver_api", + "pageserver_client", "postgres", "postgres_backend", "postgres_connection", @@ -3177,6 +3177,18 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "pageserver_client" +version = "0.1.0" +dependencies = [ + "async-trait", + "pageserver_api", + "reqwest", + "serde", + "thiserror", + "utils", +] + [[package]] name = "parking" version = "2.1.1" diff --git a/Cargo.toml b/Cargo.toml index 496a9d7839..b44544d626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "control_plane", "pageserver", "pageserver/ctl", + "pageserver/client", "proxy", "safekeeper", "storage_broker", @@ -182,6 +183,7 @@ compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } +pageserver_client = { path = "./pageserver/client" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index face604241..898ad05add 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -29,8 +29,8 @@ tokio.workspace = true tokio-postgres.workspace = true tokio-util.workspace = true url.workspace = true -pageserver = { path = "../pageserver" } pageserver_api.workspace = true +pageserver_client.workspace = true postgres_backend.workspace = true safekeeper_api.workspace = true postgres_connection.workspace = true diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index a0256c1e52..7d490016bf 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,9 +17,9 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; -use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; +use pageserver_client::mgmt_api; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; use utils::auth::{Claims, Scope}; diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml new file mode 100644 index 0000000000..5c79bf0e59 --- /dev/null +++ b/pageserver/client/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "pageserver_client" +version = "0.1.0" +edition.workspace = true + + +[dependencies] +pageserver_api.workspace = true +thiserror.workspace = true +async-trait.workspace = true +reqwest.workspace = true +utils.workspace = true +serde.workspace = true diff --git a/pageserver/src/client.rs b/pageserver/client/src/lib.rs similarity index 100% rename from pageserver/src/client.rs rename to pageserver/client/src/lib.rs diff --git a/pageserver/src/client/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs similarity index 97% rename from pageserver/src/client/mgmt_api.rs rename to pageserver/client/src/mgmt_api.rs index 10d39ec9cd..821c238082 100644 --- a/pageserver/src/client/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,6 +1,5 @@ -use hyper::{header::AUTHORIZATION, http::method::Method}; use pageserver_api::models::*; -use reqwest::IntoUrl; +use reqwest::{IntoUrl, Method}; use utils::{ http::error::HttpErrorBody, id::{TenantId, TimelineId}, @@ -105,7 +104,7 @@ impl Client { ) -> Result { let req = self.client.request(method, uri); let req = if let Some(value) = &self.authorization_header { - req.header(AUTHORIZATION, value) + req.header(reqwest::header::AUTHORIZATION, value) } else { req }; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 42f7afb865..797cb6f944 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -25,7 +25,6 @@ pub mod walingest; pub mod walrecord; pub mod walredo; -pub mod client; pub mod failpoint_support; use crate::task_mgr::TaskKind; From 9e238a34b480f2a2045537805ec4966b83a6b4e4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:31:29 +0000 Subject: [PATCH 09/12] make cargo deny happy --- pageserver/client/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml index 5c79bf0e59..166dcb7b4c 100644 --- a/pageserver/client/Cargo.toml +++ b/pageserver/client/Cargo.toml @@ -2,7 +2,7 @@ name = "pageserver_client" version = "0.1.0" edition.workspace = true - +license.workspace = true [dependencies] pageserver_api.workspace = true From 28479529ae1fb38e3b9fdaacf87235e02a96628a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:40:42 +0000 Subject: [PATCH 10/12] fixup merge: move keyspace and models::partitioning into pageserver_api --- .../pageserver_api}/src/keyspace.rs | 30 ++++++++++++++++++- libs/pageserver_api/src/lib.rs | 1 + libs/pageserver_api/src/models.rs | 2 ++ .../src}/models/partitioning.rs | 4 +-- pageserver/client/src/mgmt_api.rs | 2 +- pageserver/src/http/mod.rs | 2 -- pageserver/src/http/models.rs | 3 -- pageserver/src/http/routes.rs | 4 +-- pageserver/src/lib.rs | 2 +- pageserver/src/repository.rs | 29 +----------------- 10 files changed, 39 insertions(+), 40 deletions(-) rename {pageserver => libs/pageserver_api}/src/keyspace.rs (94%) rename {pageserver/src/http => libs/pageserver_api/src}/models/partitioning.rs (97%) delete mode 100644 pageserver/src/http/models.rs diff --git a/pageserver/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs similarity index 94% rename from pageserver/src/keyspace.rs rename to libs/pageserver_api/src/keyspace.rs index 970c96589e..16651c322e 100644 --- a/pageserver/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -1,7 +1,8 @@ -use crate::repository::{key_range_size, singleton_range, Key}; use postgres_ffi::BLCKSZ; use std::ops::Range; +use crate::key::Key; + /// /// Represents a set of Keys, in a compact form. /// @@ -186,6 +187,33 @@ impl KeySpaceRandomAccum { } } +pub fn key_range_size(key_range: &Range) -> u32 { + let start = key_range.start; + let end = key_range.end; + + if end.field1 != start.field1 + || end.field2 != start.field2 + || end.field3 != start.field3 + || end.field4 != start.field4 + { + return u32::MAX; + } + + let start = (start.field5 as u64) << 32 | start.field6 as u64; + let end = (end.field5 as u64) << 32 | end.field6 as u64; + + let diff = end - start; + if diff > u32::MAX as u64 { + u32::MAX + } else { + diff as u32 + } +} + +pub fn singleton_range(key: Key) -> Range { + key..key.next() +} + #[cfg(test)] mod tests { use super::*; diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 511c5ed208..b236b93428 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -5,6 +5,7 @@ use const_format::formatcp; /// Public API types pub mod control_api; pub mod key; +pub mod keyspace; pub mod models; pub mod reltag; pub mod shard; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fbc7d73235..a78ba8ad94 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,3 +1,5 @@ +pub mod partitioning; + use std::{ collections::HashMap, num::{NonZeroU64, NonZeroUsize}, diff --git a/pageserver/src/http/models/partitioning.rs b/libs/pageserver_api/src/models/partitioning.rs similarity index 97% rename from pageserver/src/http/models/partitioning.rs rename to libs/pageserver_api/src/models/partitioning.rs index d52794fd8c..0d287f7be0 100644 --- a/pageserver/src/http/models/partitioning.rs +++ b/libs/pageserver_api/src/models/partitioning.rs @@ -49,7 +49,7 @@ impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { } } -pub struct KeyRange<'a>(&'a std::ops::Range); +pub struct KeyRange<'a>(&'a std::ops::Range); impl<'a> serde::Serialize for KeyRange<'a> { fn serialize(&self, serializer: S) -> Result @@ -79,7 +79,7 @@ impl<'a> serde::Deserialize<'a> for Partitioning { #[serde_with::serde_as] #[derive(serde::Deserialize)] #[serde(transparent)] - struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key); + struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::key::Key); #[serde_with::serde_as] #[derive(serde::Deserialize)] diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 7fb858b2f4..77eb1bb8e2 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -96,7 +96,7 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, - ) -> Result { + ) -> Result { let uri = format!( "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace", self.mgmt_api_endpoint diff --git a/pageserver/src/http/mod.rs b/pageserver/src/http/mod.rs index 220fa29b43..c82d1c0362 100644 --- a/pageserver/src/http/mod.rs +++ b/pageserver/src/http/mod.rs @@ -1,4 +1,2 @@ pub mod routes; pub use routes::make_router; - -pub mod models; diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs deleted file mode 100644 index 1d8e76b7a7..0000000000 --- a/pageserver/src/http/models.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! If possible, use `::pageserver_api::models` instead. - -pub mod partitioning; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d5248fb996..a7d1e79614 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1498,8 +1498,8 @@ async fn timeline_collect_keyspace( .await .map_err(|e| ApiError::InternalServerError(e.into()))?; - let res = crate::http::models::partitioning::Partitioning { keys, at_lsn }; - + let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn }; + json_response(StatusCode::OK, res) } .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 797cb6f944..58adf6e8c4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -10,7 +10,7 @@ pub mod deletion_queue; pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; -pub mod keyspace; +pub use pageserver_api::keyspace; pub mod metrics; pub mod page_cache; pub mod page_service; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 97d731bf49..c726139524 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -2,38 +2,11 @@ use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::ops::{AddAssign, Range}; +use std::ops::AddAssign; use std::time::Duration; pub use pageserver_api::key::{Key, KEY_SIZE}; -pub fn key_range_size(key_range: &Range) -> u32 { - let start = key_range.start; - let end = key_range.end; - - if end.field1 != start.field1 - || end.field2 != start.field2 - || end.field3 != start.field3 - || end.field4 != start.field4 - { - return u32::MAX; - } - - let start = (start.field5 as u64) << 32 | start.field6 as u64; - let end = (end.field5 as u64) << 32 | end.field6 as u64; - - let diff = end - start; - if diff > u32::MAX as u64 { - u32::MAX - } else { - diff as u32 - } -} - -pub fn singleton_range(key: Key) -> Range { - key..key.next() -} - /// A 'value' stored for a one Key. #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq))] From 888a7311f4af30c9994f9df657b73d0c1e2a5f60 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:44:09 +0000 Subject: [PATCH 11/12] preseed rng for display_fromstr_bijection test case --- Cargo.lock | 1 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/key.rs | 6 ++++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22fde5d2d0..daaf44ef2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3167,6 +3167,7 @@ dependencies = [ "enum-map", "hex", "postgres_ffi", + "rand 0.8.5", "serde", "serde_json", "serde_with", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 4d08d78e87..4146597d8d 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -24,3 +24,4 @@ workspace_hack.workspace = true [dev-dependencies] bincode.workspace = true +rand.workspace = true diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 151ed14627..d680a5600e 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -159,10 +159,12 @@ mod tests { use crate::key::Key; + use rand::Rng; + use rand::SeedableRng; + #[test] fn display_fromstr_bijection() { - let mut rng = rand::thread_rng(); - use rand::Rng; + let mut rng = rand::rngs::StdRng::seed_from_u64(42); let key = Key { field1: rng.gen(), From a6abcbe454d32fea42d46b0d236ccc18052d3d7e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Dec 2023 15:49:08 +0000 Subject: [PATCH 12/12] hakari manage-deps --- Cargo.lock | 1 + pageserver/client/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 22fde5d2d0..2fe599ce60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3187,6 +3187,7 @@ dependencies = [ "serde", "thiserror", "utils", + "workspace_hack", ] [[package]] diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml index 166dcb7b4c..4bd36185a6 100644 --- a/pageserver/client/Cargo.toml +++ b/pageserver/client/Cargo.toml @@ -11,3 +11,4 @@ async-trait.workspace = true reqwest.workspace = true utils.workspace = true serde.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" }