From 3f85246a42c495bc8271b2c6fcd07359eb01f6de Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 14 Sep 2024 12:06:57 +0000 Subject: [PATCH] 4096 queuedepth in pagebench get-page-latest-lsn --- pageserver/client/src/page_service.rs | 78 +++++++++++--- .../pagebench/src/cmd/getpage_latest_lsn.rs | 100 ++++++++---------- 2 files changed, 108 insertions(+), 70 deletions(-) diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 9701ca6ff9..79e943b54c 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,6 +1,6 @@ -use std::pin::Pin; +use std::{pin::Pin, sync::Arc}; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use pageserver_api::{ models::{ PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, @@ -10,7 +10,6 @@ use pageserver_api::{ }; use tokio::task::JoinHandle; use tokio_postgres::CopyOutStream; -use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use utils::{ id::{TenantId, TimelineId}, @@ -136,23 +135,68 @@ impl PagestreamClient { drop(copy_both); } - pub async fn getpage( - &mut self, - req: PagestreamGetPageRequest, - ) -> anyhow::Result { + pub fn split(self) -> (PagestreamTx, PagestreamRx) { + let Self { + copy_both, + cancel_on_client_drop, + conn_task, + } = self; + let keep_client_alive = KeepClientAlive { + client: conn_task, + cancel_on_client_drop: cancel_on_client_drop.unwrap(), + }; + let keep_client_alive = Arc::new(keep_client_alive); + let (sink, stream): ( + futures::stream::SplitSink< + Pin>>, + bytes::Bytes, + >, + futures::stream::SplitStream>>>, + ) = copy_both.split(); + ( + PagestreamTx { + sink, + keep_client_alive: keep_client_alive.clone(), + }, + PagestreamRx { + stream, + keep_client_alive, + }, + ) + } +} + +struct KeepClientAlive { + client: JoinHandle<()>, + cancel_on_client_drop: tokio_util::sync::DropGuard, +} + +pub struct PagestreamTx { + sink: futures::stream::SplitSink< + Pin>>, + bytes::Bytes, + >, + keep_client_alive: Arc, +} + +pub struct PagestreamRx { + stream: futures::stream::SplitStream>>>, + keep_client_alive: Arc, +} + +impl PagestreamTx { + pub async fn send_getpage(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { let req = PagestreamFeMessage::GetPage(req); let req: bytes::Bytes = req.serialize(); + let mut req = tokio_stream::once(Ok(req.clone())); + self.sink.send_all(&mut req).await?; + Ok(()) + } +} - for i in 0..10 { - let mut req = tokio_stream::once(Ok(req.clone())); - self.copy_both.send_all(&mut req).await?; - } - - for i in 0..9 { - let next: Option> = self.copy_both.next().await; - let next: bytes::Bytes = next.unwrap()?; - } - let next: Option> = self.copy_both.next().await; +impl PagestreamRx { + pub async fn recv_getpage(&mut self) -> anyhow::Result { + let next: Option> = self.stream.next().await; let next: bytes::Bytes = next.unwrap()?; let msg = PagestreamBeMessage::deserialize(next)?; diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index ac4a732377..6796d539a6 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -13,7 +13,7 @@ use rand::prelude::*; use tokio::task::JoinSet; use tracing::info; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -295,64 +295,58 @@ async fn main_impl( .await .unwrap(); + let (mut pagestream_tx, mut pagestream_rx) = client.split(); + start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; - while !cancel.is_cancelled() { - // Detect if a request took longer than the RPS rate - if let Some(period) = &rps_period { - let periods_passed_until_now = - usize::try_from(client_start.elapsed().as_micros() / period.as_micros()) + let (rq_tx, mut rq_rx) = tokio::sync::mpsc::channel(4096); + let sender = tokio::spawn(async move { + while !cancel.is_cancelled() { + let start = Instant::now(); + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key + .to_rel_block() + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since: r.timeline_lsn, + rel: rel_tag, + blkno: block_no, + } + }; + pagestream_tx.send_getpage(req).await.unwrap(); + rq_tx.send(start).await.unwrap(); + } + }); + + let receiver = tokio::spawn(async move { + while let Some(start) = rq_rx.recv().await { + let response = pagestream_rx.recv_getpage().await.unwrap(); + let end = Instant::now(); + live_stats.request_done(); + STATS.with(|stats| { + stats + .borrow() + .lock() + .unwrap() + .observe(end.duration_since(start)) .unwrap(); - - if periods_passed_until_now > ticks_processed { - live_stats.missed((periods_passed_until_now - ticks_processed) as u64); - } - ticks_processed = periods_passed_until_now; + }); } + }); - let start = Instant::now(); - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, - } - }; - client.getpage(req).await.unwrap(); - let end = Instant::now(); - live_stats.request_done(); - ticks_processed += 1; - STATS.with(|stats| { - stats - .borrow() - .lock() - .unwrap() - .observe(end.duration_since(start)) - .unwrap(); - }); - - if let Some(period) = &rps_period { - let next_at = client_start - + Duration::from_micros( - (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), - ); - tokio::time::sleep_until(next_at.into()).await; - } - } + sender.await.unwrap(); + receiver.await.unwrap(); }) };