diff --git a/.cargo/config.toml b/.cargo/config.toml index 741ba6f3d7..5e452974ad 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,3 @@ -paths = [ "/home/cs/src/rust-postgres" ] - [build] # This is only present for local builds, as it will be overridden # by the RUSTDOCFLAGS env var in CI. @@ -8,4 +6,3 @@ rustdocflags = ["-Arustdoc::private_intra_doc_links"] [alias] build_testing = ["build", "--features", "testing"] neon = ["run", "--bin", "neon_local"] - diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 0f88bd85d3..f9507fc47a 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,6 +1,5 @@ use std::pin::Pin; -use anyhow::Context; use futures::SinkExt; use pageserver_api::{ models::{ @@ -66,11 +65,10 @@ impl Client { let Client { cancel_on_client_drop, conn_task, - client, + client: _, } = self; Ok(PagestreamClient { copy_both: Box::pin(copy_both), - client, conn_task, cancel_on_client_drop, }) @@ -95,28 +93,11 @@ impl Client { } Ok(self.client.copy_out(&args.join(" ")).await?) } - - pub async fn shutdown(self) -> anyhow::Result<()> { - let Self { - client, - cancel_on_client_drop, - conn_task, - } = self; - - drop(client); // this sends Terminate message(?) - conn_task - .await - .context("wait for network communications to finish cleanly")?; - drop(cancel_on_client_drop); - Ok(()) - } } /// Create using [`Client::pagestream`]. pub struct PagestreamClient { copy_both: Pin>>, - /// Must not use this until copy_both has been shut down. - client: tokio_postgres::Client, cancel_on_client_drop: Option, conn_task: JoinHandle<()>, } @@ -127,11 +108,10 @@ pub struct RelTagBlockNo { } impl PagestreamClient { - pub async fn shutdown(self) -> anyhow::Result { + pub async fn shutdown(self) { let Self { - mut copy_both, - client, - cancel_on_client_drop, + copy_both, + cancel_on_client_drop: cancel_conn_task, conn_task, } = self; // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`. @@ -151,13 +131,9 @@ impl PagestreamClient { // // NB: page_service doesn't have a use case to exit the `pagestream` mode currently. // => https://github.com/neondatabase/neon/issues/6390 - - let _: () = copy_both.close().await.unwrap(); - Ok(Client { - client, - cancel_on_client_drop, - conn_task, - }) + let _ = cancel_conn_task.unwrap(); + conn_task.await.unwrap(); + drop(copy_both); } pub async fn getpage( diff --git a/pageserver/pagebench/examples/hang_pageserver.rs b/pageserver/pagebench/examples/hang_pageserver.rs deleted file mode 100644 index 78e7ada42f..0000000000 --- a/pageserver/pagebench/examples/hang_pageserver.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::str::FromStr; - -use pageserver_client::page_service::Client; -use utils::{ - id::{TenantId, TimelineId}, - logging::Output, -}; - -#[tokio::main] -async fn main() { - utils::logging::init( - utils::logging::LogFormat::Plain, - utils::logging::TracingErrorLayerEnablement::Disabled, - Output::Stderr, - ).unwrap(); - - let client = Client::new("postgresql://localhost:64000".to_owned()) - .await - .unwrap(); - - let mut client = Some(client); - for i in 1..10 { - println!("Iteration: {}", i); - let myclient = client.take().unwrap(); - - let pagestream_client = myclient - .pagestream( - TenantId::from_str("e0dfa97c9dc84f32ab423fe44f186283").unwrap(), - TimelineId::from_str("585d77e52a6e43a7099c6ebaea8730c2").unwrap(), - ) - .await - .unwrap(); - - let myclient = pagestream_client.shutdown().await.unwrap(); - client = Some(myclient); - } - - let client = client.take().unwrap(); - client.shutdown().await.unwrap(); -} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5388f3742b..6ea5f396d0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -596,14 +596,7 @@ impl PageServerHandler { let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => { - info!("received Terminate message"); - break; // FIXME we want to not wait for new comment when we see this one (and generally, pgb.read_message() should probably return an error when it sees it) - } - Some(FeMessage::CopyDone) => { - info!("received CopyDone message"); - break; - } + Some(FeMessage::Terminate) => break, Some(m) => { return Err(QueryError::Other(anyhow::anyhow!( "unexpected message: {m:?} during COPY" @@ -722,9 +715,6 @@ impl PageServerHandler { } } } - - pgb.write_message_noflush(&BeMessage::CopyDone)?; - self.flush_cancellable(pgb, &tenant.cancel).await?; Ok(()) }