From d359d88f517fee7cf848acb6224d9033739f7cdf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 2 Jul 2024 17:32:19 +0000 Subject: [PATCH] toy around with different ways to shut down --- .cargo/config.toml | 3 ++ pageserver/client/src/page_service.rs | 38 ++++++++++++++---- .../pagebench/examples/hang_pageserver.rs | 40 +++++++++++++++++++ pageserver/src/page_service.rs | 12 +++++- 4 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 pageserver/pagebench/examples/hang_pageserver.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 5e452974ad..741ba6f3d7 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,5 @@ +paths = [ "/home/cs/src/rust-postgres" ] + [build] # This is only present for local builds, as it will be overridden # by the RUSTDOCFLAGS env var in CI. @@ -6,3 +8,4 @@ rustdocflags = ["-Arustdoc::private_intra_doc_links"] [alias] build_testing = ["build", "--features", "testing"] neon = ["run", "--bin", "neon_local"] + diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index f9507fc47a..0f88bd85d3 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,5 +1,6 @@ use std::pin::Pin; +use anyhow::Context; use futures::SinkExt; use pageserver_api::{ models::{ @@ -65,10 +66,11 @@ impl Client { let Client { cancel_on_client_drop, conn_task, - client: _, + client, } = self; Ok(PagestreamClient { copy_both: Box::pin(copy_both), + client, conn_task, cancel_on_client_drop, }) @@ -93,11 +95,28 @@ impl Client { } Ok(self.client.copy_out(&args.join(" ")).await?) } + + pub async fn shutdown(self) -> anyhow::Result<()> { + let Self { + client, + cancel_on_client_drop, + conn_task, + } = self; + + drop(client); // this sends Terminate message(?) + conn_task + .await + .context("wait for network communications to finish cleanly")?; + drop(cancel_on_client_drop); + Ok(()) + } } /// Create using [`Client::pagestream`]. pub struct PagestreamClient { copy_both: Pin>>, + /// Must not use this until copy_both has been shut down. + client: tokio_postgres::Client, cancel_on_client_drop: Option, conn_task: JoinHandle<()>, } @@ -108,10 +127,11 @@ pub struct RelTagBlockNo { } impl PagestreamClient { - pub async fn shutdown(self) { + pub async fn shutdown(self) -> anyhow::Result { let Self { - copy_both, - cancel_on_client_drop: cancel_conn_task, + mut copy_both, + client, + cancel_on_client_drop, conn_task, } = self; // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`. @@ -131,9 +151,13 @@ impl PagestreamClient { // // NB: page_service doesn't have a use case to exit the `pagestream` mode currently. // => https://github.com/neondatabase/neon/issues/6390 - let _ = cancel_conn_task.unwrap(); - conn_task.await.unwrap(); - drop(copy_both); + + let _: () = copy_both.close().await.unwrap(); + Ok(Client { + client, + cancel_on_client_drop, + conn_task, + }) } pub async fn getpage( diff --git a/pageserver/pagebench/examples/hang_pageserver.rs b/pageserver/pagebench/examples/hang_pageserver.rs new file mode 100644 index 0000000000..78e7ada42f --- /dev/null +++ b/pageserver/pagebench/examples/hang_pageserver.rs @@ -0,0 +1,40 @@ +use std::str::FromStr; + +use pageserver_client::page_service::Client; +use utils::{ + id::{TenantId, TimelineId}, + logging::Output, +}; + +#[tokio::main] +async fn main() { + utils::logging::init( + utils::logging::LogFormat::Plain, + utils::logging::TracingErrorLayerEnablement::Disabled, + Output::Stderr, + ).unwrap(); + + let client = Client::new("postgresql://localhost:64000".to_owned()) + .await + .unwrap(); + + let mut client = Some(client); + for i in 1..10 { + println!("Iteration: {}", i); + let myclient = client.take().unwrap(); + + let pagestream_client = myclient + .pagestream( + TenantId::from_str("e0dfa97c9dc84f32ab423fe44f186283").unwrap(), + TimelineId::from_str("585d77e52a6e43a7099c6ebaea8730c2").unwrap(), + ) + .await + .unwrap(); + + let myclient = pagestream_client.shutdown().await.unwrap(); + client = Some(myclient); + } + + let client = client.take().unwrap(); + client.shutdown().await.unwrap(); +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6ea5f396d0..5388f3742b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -596,7 +596,14 @@ impl PageServerHandler { let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => break, + Some(FeMessage::Terminate) => { + info!("received Terminate message"); + break; // FIXME we want to not wait for new comment when we see this one (and generally, pgb.read_message() should probably return an error when it sees it) + } + Some(FeMessage::CopyDone) => { + info!("received CopyDone message"); + break; + } Some(m) => { return Err(QueryError::Other(anyhow::anyhow!( "unexpected message: {m:?} during COPY" @@ -715,6 +722,9 @@ impl PageServerHandler { } } } + + pgb.write_message_noflush(&BeMessage::CopyDone)?; + self.flush_cancellable(pgb, &tenant.cancel).await?; Ok(()) }