diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs index 4a8900054a..a26e81c91b 100644 --- a/pageserver/src/bin/getpage_bench_libpq.rs +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -56,6 +56,8 @@ struct RelTagBlockNo { struct Args { #[clap(long, default_value = "http://localhost:9898")] ps_endpoint: String, + #[clap(long, default_value = "postgres://postgres@localhost:64000")] + pq_client_connstring: String, // tenant_id: String, // timeline_id: String, num_tasks: usize, @@ -246,10 +248,13 @@ fn timeline( let task = tokio::spawn({ let stats = Arc::clone(&stats); async move { - let mut client = - getpage_client::Client::new(tenant_id.clone(), timeline_id.clone()) - .await - .unwrap(); + let mut client = getpage_client::Client::new( + args.pq_client_connstring.clone(), + tenant_id.clone(), + timeline_id.clone(), + ) + .await + .unwrap(); for i in 0..args.num_requests { match args.mode { Mode::GetPage => { @@ -319,12 +324,13 @@ mod getpage_client { impl Client { pub fn new( + connstring: String, tenant_id: String, timeline_id: String, ) -> impl std::future::Future> + Send { async move { let (client, connection) = - tokio_postgres::connect("host=localhost port=64000", postgres::NoTls).await?; + tokio_postgres::connect(&connstring, postgres::NoTls).await?; let conn_task_cancel = CancellationToken::new(); let conn_task = tokio::spawn({ diff --git a/pageserver/src/bin/noop_server.rs b/pageserver/src/bin/noop_server.rs new file mode 100644 index 0000000000..87d393f361 --- /dev/null +++ b/pageserver/src/bin/noop_server.rs @@ -0,0 +1,109 @@ +use anyhow::Context; +use bytes::Buf; +use clap::Parser; +use pageserver_api::models::{PagestreamBeMessage, PagestreamErrorResponse, PagestreamFeMessage}; +use postgres_backend::{AuthType, PostgresBackend, QueryError}; +use pq_proto::{BeMessage, FeMessage}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::sync::CancellationToken; + +#[derive(clap::Parser)] +struct Args { + bind: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let listener = tokio::net::TcpListener::bind(&args.bind).await.unwrap(); + loop { + let (socket, _) = listener.accept().await.unwrap(); + tokio::spawn(async move { + handle_connection(socket).await.unwrap(); + }); + } +} + +async fn handle_connection(socket: tokio::net::TcpStream) -> anyhow::Result<()> { + socket + .set_nodelay(true) + .context("could not set TCP_NODELAY")?; + + let peer_addr = socket.peer_addr().context("get peer address")?; + let socket = tokio_io_timeout::TimeoutReader::new(socket); + tokio::pin!(socket); + let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, AuthType::Trust, None)?; + let mut conn_handler = NoOpHandler; + let cancel = CancellationToken::new(); + pgbackend + .run(&mut conn_handler, || { + let cancel = cancel.clone(); + async move { cancel.cancelled().await } + }) + .await?; + anyhow::Ok(()) +} + +struct NoOpHandler; + +#[async_trait::async_trait] +impl postgres_backend::Handler for NoOpHandler +where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, +{ + fn startup( + &mut self, + _pgb: &mut PostgresBackend, + _sm: &pq_proto::FeStartupPacket, + ) -> Result<(), QueryError> { + Ok(()) + } + + async fn process_query( + &mut self, + pgb: &mut PostgresBackend, + query_string: &str, + ) -> Result<(), QueryError> { + if !query_string.starts_with("pagestream ") { + return Err(QueryError::Other(anyhow::anyhow!("not a pagestream query"))); + } + + // switch client to COPYBOTH + pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; + pgb.flush().await?; + + loop { + let msg = pgb.read_message().await?; + + let copy_data_bytes = match msg { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Terminate) => return Ok(()), + Some(m) => { + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); + } + None => return Ok(()), // client disconnected + }; + + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let response = match neon_fe_msg { + PagestreamFeMessage::NoOp => Ok(PagestreamBeMessage::NoOp), + x => Err(QueryError::Other(anyhow::anyhow!( + "this server only supports no-op: {x:?}" + ))), + }; + + let response = response.unwrap_or_else(|e| { + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + + pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?; + pgb.flush().await?; + } + } +}