4096 queuedepth in pagebench get-page-latest-lsn

This commit is contained in:
Christian Schwarz
2024-09-14 12:06:57 +00:00
parent 709a6ad29b
commit 3f85246a42
2 changed files with 108 additions and 70 deletions

View File

@@ -1,6 +1,6 @@
use std::pin::Pin;
use std::{pin::Pin, sync::Arc};
use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
@@ -10,7 +10,6 @@ use pageserver_api::{
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -136,23 +135,68 @@ impl PagestreamClient {
drop(copy_both);
}
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
) -> anyhow::Result<PagestreamGetPageResponse> {
pub fn split(self) -> (PagestreamTx, PagestreamRx) {
let Self {
copy_both,
cancel_on_client_drop,
conn_task,
} = self;
let keep_client_alive = KeepClientAlive {
client: conn_task,
cancel_on_client_drop: cancel_on_client_drop.unwrap(),
};
let keep_client_alive = Arc::new(keep_client_alive);
let (sink, stream): (
futures::stream::SplitSink<
Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
bytes::Bytes,
>,
futures::stream::SplitStream<Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>>,
) = copy_both.split();
(
PagestreamTx {
sink,
keep_client_alive: keep_client_alive.clone(),
},
PagestreamRx {
stream,
keep_client_alive,
},
)
}
}
struct KeepClientAlive {
client: JoinHandle<()>,
cancel_on_client_drop: tokio_util::sync::DropGuard,
}
pub struct PagestreamTx {
sink: futures::stream::SplitSink<
Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
bytes::Bytes,
>,
keep_client_alive: Arc<KeepClientAlive>,
}
pub struct PagestreamRx {
stream: futures::stream::SplitStream<Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>>,
keep_client_alive: Arc<KeepClientAlive>,
}
impl PagestreamTx {
pub async fn send_getpage(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
let mut req = tokio_stream::once(Ok(req.clone()));
self.sink.send_all(&mut req).await?;
Ok(())
}
}
for i in 0..10 {
let mut req = tokio_stream::once(Ok(req.clone()));
self.copy_both.send_all(&mut req).await?;
}
for i in 0..9 {
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
impl PagestreamRx {
pub async fn recv_getpage(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;