diff --git a/Cargo.lock b/Cargo.lock index 5e7fce3e8d..8eae4eb6e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2927,6 +2927,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -3193,6 +3203,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "pagectl" version = "0.1.0" @@ -3278,10 +3294,12 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-postgres", + "tokio-stream", "tokio-tar", "tokio-util", "toml_edit", "tracing", + "tracing-subscriber", "url", "utils", "walkdir", @@ -3556,7 +3574,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518" dependencies = [ "bytes", "fallible-iterator", @@ -3569,7 +3587,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518" dependencies = [ "native-tls", "tokio", @@ -3580,7 +3598,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518" dependencies = [ "base64 0.20.0", "byteorder", @@ -3598,7 +3616,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518" dependencies = [ "bytes", "fallible-iterator", @@ -5414,7 +5432,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518" dependencies = [ "async-trait", "byteorder", @@ -5771,6 +5789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6be831a2b3..a74053a2c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,11 +163,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } ## Other git libraries heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending @@ -204,7 +204,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } ################# Binary contents sections diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index cb99dc0a55..098cd4c4e9 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -18,7 +18,7 @@ use utils::{ use crate::reltag::RelTag; use anyhow::bail; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; /// The state of a tenant in this pageserver. /// @@ -767,6 +767,36 @@ impl PagestreamBeMessage { bytes.into() } + + pub fn deserialize(buf: Bytes) -> anyhow::Result { + let mut buf = buf.reader(); + let msg_tag = buf.read_u8()?; + match msg_tag { + 100 => todo!(), + 101 => todo!(), + 102 => { + let buf = buf.get_ref(); + /* TODO use constant */ + if buf.len() == 8192 { + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page: buf.clone(), + })) + } else { + anyhow::bail!("invalid page size: {}", buf.len()); + } + } + 103 => { + let buf = buf.get_ref(); + let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; + let rust_str = cstr.to_str()?; + Ok(PagestreamBeMessage::Error(PagestreamErrorResponse { + message: rust_str.to_owned(), + })) + } + 104 => todo!(), + _ => bail!("unknown tag: {:?}", msg_tag), + } + } } #[cfg(test)] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..2f2336a578 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,6 +82,8 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +tokio-stream.workspace = true +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs new file mode 100644 index 0000000000..e36de0c485 --- /dev/null +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -0,0 +1,382 @@ +use anyhow::Context; +use clap::Parser; + +use hyper::client::HttpConnector; +use hyper::{Client, Uri}; + +use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; +use pageserver::repository; + +use pageserver_api::reltag::RelTag; +use rand::prelude::*; +use tokio::sync::Barrier; +use tracing::info; +use utils::logging; + +use std::future::Future; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use tokio::task::JoinHandle; + +use utils::lsn::Lsn; + +struct Key(repository::Key); + +impl std::str::FromStr for Key { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + repository::Key::from_hex(s).map(Key) + } +} + +struct KeyRange { + start: i128, + end: i128, +} + +impl KeyRange { + fn len(&self) -> i128 { + self.end - self.start + } +} + +struct RelTagBlockNo { + rel_tag: RelTag, + block_no: u32, +} + +#[derive(clap::Parser)] +struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "postgres://postgres@localhost:64000")] + page_service_connstring: String, + // tenant_id: String, + // timeline_id: String, + #[clap(long)] + num_tasks: usize, + #[clap(long)] + num_requests: usize, + #[clap(long)] + pick_n_tenants: Option, + tenants: Option>, +} + +#[derive(Debug, Default)] +struct LiveStats { + completed_requests: AtomicU64, +} + +impl LiveStats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + +#[tokio::main] +async fn main() { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let args: &'static Args = Box::leak(Box::new(Args::parse())); + + let client = Client::new(); + + let tenants = if let Some(tenants) = &args.tenants { + tenants.clone() + } else { + let resp = client + .get(Uri::try_from(&format!("{}/v1/tenant", args.mgmt_api_endpoint)).unwrap()) + .await + .unwrap(); + + let body = hyper::body::to_bytes(resp).await.unwrap(); + let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let mut out = Vec::new(); + for t in tenants.as_array().unwrap() { + if let Some(limit) = args.pick_n_tenants { + if out.len() >= limit { + break; + } + } + out.push(t.get("id").unwrap().as_str().unwrap().to_owned()); + } + if let Some(limit) = args.pick_n_tenants { + assert_eq!(out.len(), limit); + } + out + }; + + let mut tenant_timelines = Vec::new(); + for tenant_id in tenants { + let resp = client + .get( + Uri::try_from(&format!( + "{}/v1/tenant/{}/timeline", + args.mgmt_api_endpoint, tenant_id + )) + .unwrap(), + ) + .await + .unwrap(); + + let body = hyper::body::to_bytes(resp).await.unwrap(); + let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap(); + for t in timelines.as_array().unwrap() { + let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned(); + tenant_timelines.push((tenant_id.clone(), timeline_id)); + } + } + info!("tenant_timelines:\n{:?}", tenant_timelines); + + let stats = Arc::new(LiveStats::default()); + + let num_work_tasks = tenant_timelines.len() * args.num_tasks; + + let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1)); + + tokio::spawn({ + let stats = Arc::clone(&stats); + let start_work_barrier = Arc::clone(&start_work_barrier); + async move { + start_work_barrier.wait().await; + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + info!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + + let mut tasks = Vec::new(); + for (tenant_id, timeline_id) in tenant_timelines { + let stats = Arc::clone(&stats); + let t = tokio::spawn(timeline( + args, + client.clone(), + tenant_id, + timeline_id, + Arc::clone(&start_work_barrier), + stats, + )); + tasks.push(t); + } + + for t in tasks { + t.await.unwrap(); + } +} + +fn timeline( + args: &'static Args, + http_client: Client, + tenant_id: String, + timeline_id: String, + start_work_barrier: Arc, + stats: Arc, +) -> impl Future + Send + Sync { + async move { + let resp = http_client + .get( + Uri::try_from(&format!( + "{}/v1/tenant/{}/timeline/{}/keyspace", + args.mgmt_api_endpoint, tenant_id, timeline_id + )) + .unwrap(), + ) + .await + .unwrap(); + if !resp.status().is_success() { + panic!("Failed to get keyspace: {resp:?}"); + } + let body = hyper::body::to_bytes(resp).await.unwrap(); + let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap(); + + let ranges = keyspace["keys"] + .as_array() + .unwrap() + .iter() + .filter_map(|r| { + let r = r.as_array().unwrap(); + assert_eq!(r.len(), 2); + let start = Key::from_str(r[0].as_str().unwrap()).unwrap(); + let end = Key::from_str(r[1].as_str().unwrap()).unwrap(); + // filter out non-relblock keys + match (is_rel_block_key(start.0), is_rel_block_key(end.0)) { + (true, true) => Some(KeyRange { + start: start.0.to_i128(), + end: end.0.to_i128(), + }), + (true, false) | (false, true) => { + unimplemented!("split up range") + } + (false, false) => None, + } + }) + .collect::>(); + + // weighted ranges + let weights = ranges.iter().map(|r| r.len()).collect::>(); + + let ranges = Arc::new(ranges); + let weights = Arc::new(weights); + + let mut tasks = Vec::>::new(); + + for _i in 0..args.num_tasks { + let ranges = ranges.clone(); + let _weights = weights.clone(); + let _client = http_client.clone(); + let tenant_id = tenant_id.clone(); + let timeline_id = timeline_id.clone(); + let start_work_barrier = Arc::clone(&start_work_barrier); + let task = tokio::spawn({ + let stats = Arc::clone(&stats); + async move { + let mut client = getpage_client::Client::new( + args.page_service_connstring.clone(), + tenant_id.clone(), + timeline_id.clone(), + ) + .await + .unwrap(); + start_work_barrier.wait().await; + for _i in 0..args.num_requests { + let key = { + let mut rng = rand::thread_rng(); + let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + // XXX filter these out when we iterate the keyspace + assert!( + is_rel_block_key(key), + "we filter non-relblock keys out above" + ); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we just checked"); + RelTagBlockNo { rel_tag, block_no } + }; + client + .getpage(key, lsn) + .await + .with_context(|| { + format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) + }) + .unwrap(); + stats.inc(); + } + client.shutdown().await; + } + }); + tasks.push(task); + } + + for task in tasks { + task.await.unwrap(); + } + } +} + +mod getpage_client { + use std::pin::Pin; + + use futures::SinkExt; + use pageserver_api::models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, + PagestreamGetPageResponse, + }; + use tokio::task::JoinHandle; + use tokio_stream::StreamExt; + use tokio_util::sync::CancellationToken; + use utils::lsn::Lsn; + + use crate::RelTagBlockNo; + + pub(crate) struct Client { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, + } + + impl Client { + pub async fn new( + connstring: String, + tenant_id: String, + timeline_id: String, + ) -> anyhow::Result { + let (client, connection) = + tokio_postgres::connect(&connstring, postgres::NoTls).await?; + + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { } + res = connection => { + res.unwrap(); + } + } + } + }); + + let copy_both: tokio_postgres::CopyBothDuplex = client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + + Ok(Self { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + }) + } + + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + + pub async fn getpage( + &mut self, + key: RelTagBlockNo, + lsn: Lsn, + ) -> anyhow::Result { + let req = PagestreamGetPageRequest { + latest: false, + rel: key.rel_tag, + blkno: key.block_no, + lsn, + }; + let req = PagestreamFeMessage::GetPage(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next = next.unwrap().unwrap(); + + match PagestreamBeMessage::deserialize(next)? { + PagestreamBeMessage::Exists(_) => todo!(), + PagestreamBeMessage::Nblocks(_) => todo!(), + PagestreamBeMessage::GetPage(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::DbSize(_) => todo!(), + } + } + } +} diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index aa4d155bcc..49225f30db 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1700,6 +1700,7 @@ const AUX_FILES_KEY: Key = Key { // Reverse mappings for a few Keys. // These are needed by WAL redo manager. +/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`. pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { Ok(match key.field1 { 0x00 => ( @@ -1715,7 +1716,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { }) } -fn is_rel_block_key(key: Key) -> bool { +/// See [[key_to_rel_block]]. +pub fn is_rel_block_key(key: Key) -> bool { key.field1 == 0x00 && key.field4 != 0 }