diff --git a/Cargo.lock b/Cargo.lock index 542a4528d3..588a63b6a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4237,6 +4237,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "bytes", "camino", "clap", "futures", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index ceb1278eab..5e4af88e69 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +bytes.workspace = true camino.workspace = true clap.workspace = true futures.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 395e9cac41..3f3b6e396e 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,4 +1,4 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -8,12 +8,12 @@ use std::time::{Duration, Instant}; use anyhow::Context; use async_trait::async_trait; +use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; -use pageserver_api::models::{ - PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest, -}; +use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; +use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; use pageserver_page_api::proto; use rand::prelude::*; @@ -77,6 +77,16 @@ pub(crate) struct Args { #[clap(long, default_value = "1")] queue_depth: NonZeroUsize, + /// Batch size of contiguous pages generated by each client. This is equivalent to how Postgres + /// will request page batches (e.g. prefetches or vectored reads). A batch counts as 1 RPS and + /// 1 queue depth. + /// + /// The libpq protocol does not support client-side batching, and will submit batches as many + /// individual requests, in the hope that the server will batch them. Each batch still counts as + /// 1 RPS and 1 queue depth. + #[clap(long, default_value = "1")] + batch_size: NonZeroUsize, + #[clap(long)] only_relnode: Option, @@ -392,7 +402,16 @@ async fn run_worker( shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; - let mut inflight = VecDeque::new(); + let mut req_id = 0; + let batch_size: usize = args.batch_size.into(); + + // Track inflight requests by request ID and start time. This times the request duration, and + // ensures responses match requests. We don't expect responses back in any particular order. + // + // NB: this does not check that all requests received a response, because we don't wait for the + // inflight requests to complete when the duration elapses. + let mut inflight: HashMap = HashMap::new(); + while !cancel.is_cancelled() { // Detect if a request took longer than the RPS rate if let Some(period) = &rps_period { @@ -408,36 +427,72 @@ async fn run_worker( } while inflight.len() < args.queue_depth.get() { + req_id += 1; let start = Instant::now(); - let req = { + let (req_lsn, mod_lsn, rel, blks) = { + /// Converts a compact i128 key to a relation tag and block number. + fn key_to_block(key: i128) -> (RelTag, u32) { + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + key.to_rel_block() + .expect("we filter non-rel-block keys out above") + } + + // Pick a random page from a random relation. let mut rng = rand::thread_rng(); let r = &ranges[weights.sample(&mut rng)]; let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid: 0, - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - }, - rel: rel_tag, - blkno: block_no, + let (rel_tag, block_no) = key_to_block(key); + + let mut blks = VecDeque::with_capacity(batch_size); + blks.push_back(block_no); + + // If requested, populate a batch of sequential pages. This is how Postgres will + // request page batches (e.g. prefetches). If we hit the end of the relation, we + // grow the batch towards the start too. + for i in 1..batch_size { + let (r, b) = key_to_block(key + i as i128); + if r != rel_tag { + break; // went outside relation + } + blks.push_back(b) } + + if blks.len() < batch_size { + // Grow batch backwards if needed. + for i in 1..batch_size { + let (r, b) = key_to_block(key - i as i128); + if r != rel_tag { + break; // went outside relation + } + blks.push_front(b) + } + } + + // We assume that the entire batch can fit within the relation. + assert_eq!(blks.len(), batch_size, "incomplete batch"); + + let req_lsn = if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }; + (req_lsn, r.timeline_lsn, rel_tag, blks.into()) }; - client.send_get_page(req).await.unwrap(); - inflight.push_back(start); + client + .send_get_page(req_id, req_lsn, mod_lsn, rel, blks) + .await + .unwrap(); + let old = inflight.insert(req_id, start); + assert!(old.is_none(), "duplicate request ID {req_id}"); } - let start = inflight.pop_front().unwrap(); - client.recv_get_page().await.unwrap(); + let (req_id, pages) = client.recv_get_page().await.unwrap(); + assert_eq!(pages.len(), batch_size, "unexpected page count"); + assert!(pages.iter().all(|p| !p.is_empty()), "empty page"); + let start = inflight + .remove(&req_id) + .expect("response for unknown request ID"); let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1; @@ -467,15 +522,24 @@ async fn run_worker( #[async_trait] trait Client: Send { /// Sends an asynchronous GetPage request to the pageserver. - async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>; + async fn send_get_page( + &mut self, + req_id: u64, + req_lsn: Lsn, + mod_lsn: Lsn, + rel: RelTag, + blks: Vec, + ) -> anyhow::Result<()>; /// Receives the next GetPage response from the pageserver. - async fn recv_get_page(&mut self) -> anyhow::Result; + async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)>; } /// A libpq-based Pageserver client. struct LibpqClient { inner: pageserver_client::page_service::PagestreamClient, + // Track sent batches, so we know how many responses to expect. + batch_sizes: VecDeque, } impl LibpqClient { @@ -484,18 +548,55 @@ impl LibpqClient { .await? .pagestream(ttid.tenant_id, ttid.timeline_id) .await?; - Ok(Self { inner }) + Ok(Self { + inner, + batch_sizes: VecDeque::new(), + }) } } #[async_trait] impl Client for LibpqClient { - async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { - self.inner.getpage_send(req).await + async fn send_get_page( + &mut self, + req_id: u64, + req_lsn: Lsn, + mod_lsn: Lsn, + rel: RelTag, + blks: Vec, + ) -> anyhow::Result<()> { + // libpq doesn't support client-side batches, so we send a bunch of individual requests + // instead in the hope that the server will batch them for us. We use the same request ID + // for all, because we'll return a single batch response. + self.batch_sizes.push_back(blks.len()); + for blkno in blks { + let req = PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid: req_id, + request_lsn: req_lsn, + not_modified_since: mod_lsn, + }, + rel, + blkno, + }; + self.inner.getpage_send(req).await?; + } + Ok(()) } - async fn recv_get_page(&mut self) -> anyhow::Result { - self.inner.getpage_recv().await + async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { + let batch_size = self.batch_sizes.pop_front().unwrap(); + let mut batch = Vec::with_capacity(batch_size); + let mut req_id = None; + for _ in 0..batch_size { + let resp = self.inner.getpage_recv().await?; + if req_id.is_none() { + req_id = Some(resp.req.hdr.reqid); + } + assert_eq!(req_id, Some(resp.req.hdr.reqid), "request ID mismatch"); + batch.push(resp.page); + } + Ok((req_id.unwrap(), batch)) } } @@ -532,31 +633,35 @@ impl GrpcClient { #[async_trait] impl Client for GrpcClient { - async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + async fn send_get_page( + &mut self, + req_id: u64, + req_lsn: Lsn, + mod_lsn: Lsn, + rel: RelTag, + blks: Vec, + ) -> anyhow::Result<()> { let req = proto::GetPageRequest { - request_id: 0, + request_id: req_id, request_class: proto::GetPageClass::Normal as i32, read_lsn: Some(proto::ReadLsn { - request_lsn: req.hdr.request_lsn.0, - not_modified_since_lsn: req.hdr.not_modified_since.0, + request_lsn: req_lsn.0, + not_modified_since_lsn: mod_lsn.0, }), - rel: Some(req.rel.into()), - block_number: vec![req.blkno], + rel: Some(rel.into()), + block_number: blks, }; self.req_tx.send(req).await?; Ok(()) } - async fn recv_get_page(&mut self) -> anyhow::Result { + async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { let resp = self.resp_rx.message().await?.unwrap(); anyhow::ensure!( resp.status_code == proto::GetPageStatusCode::Ok as i32, "unexpected status code: {}", resp.status_code ); - Ok(PagestreamGetPageResponse { - page: resp.page_image[0].clone(), - req: PagestreamGetPageRequest::default(), // dummy - }) + Ok((resp.request_id, resp.page_image)) } }