toy around with different ways to shut down

This commit is contained in:
Christian Schwarz
2024-07-02 17:32:19 +00:00
parent 03613b0017
commit d359d88f51
4 changed files with 85 additions and 8 deletions

View File

@@ -1,3 +1,5 @@
paths = [ "/home/cs/src/rust-postgres" ]
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
@@ -6,3 +8,4 @@ rustdocflags = ["-Arustdoc::private_intra_doc_links"]
[alias]
build_testing = ["build", "--features", "testing"]
neon = ["run", "--bin", "neon_local"]

View File

@@ -1,5 +1,6 @@
use std::pin::Pin;
use anyhow::Context;
use futures::SinkExt;
use pageserver_api::{
models::{
@@ -65,10 +66,11 @@ impl Client {
let Client {
cancel_on_client_drop,
conn_task,
client: _,
client,
} = self;
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
client,
conn_task,
cancel_on_client_drop,
})
@@ -93,11 +95,28 @@ impl Client {
}
Ok(self.client.copy_out(&args.join(" ")).await?)
}
pub async fn shutdown(self) -> anyhow::Result<()> {
let Self {
client,
cancel_on_client_drop,
conn_task,
} = self;
drop(client); // this sends Terminate message(?)
conn_task
.await
.context("wait for network communications to finish cleanly")?;
drop(cancel_on_client_drop);
Ok(())
}
}
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
/// Must not use this until copy_both has been shut down.
client: tokio_postgres::Client,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -108,10 +127,11 @@ pub struct RelTagBlockNo {
}
impl PagestreamClient {
pub async fn shutdown(self) {
pub async fn shutdown(self) -> anyhow::Result<Client> {
let Self {
copy_both,
cancel_on_client_drop: cancel_conn_task,
mut copy_both,
client,
cancel_on_client_drop,
conn_task,
} = self;
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
@@ -131,9 +151,13 @@ impl PagestreamClient {
//
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
// => https://github.com/neondatabase/neon/issues/6390
let _ = cancel_conn_task.unwrap();
conn_task.await.unwrap();
drop(copy_both);
let _: () = copy_both.close().await.unwrap();
Ok(Client {
client,
cancel_on_client_drop,
conn_task,
})
}
pub async fn getpage(

View File

@@ -0,0 +1,40 @@
use std::str::FromStr;
use pageserver_client::page_service::Client;
use utils::{
id::{TenantId, TimelineId},
logging::Output,
};
#[tokio::main]
async fn main() {
utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::Disabled,
Output::Stderr,
).unwrap();
let client = Client::new("postgresql://localhost:64000".to_owned())
.await
.unwrap();
let mut client = Some(client);
for i in 1..10 {
println!("Iteration: {}", i);
let myclient = client.take().unwrap();
let pagestream_client = myclient
.pagestream(
TenantId::from_str("e0dfa97c9dc84f32ab423fe44f186283").unwrap(),
TimelineId::from_str("585d77e52a6e43a7099c6ebaea8730c2").unwrap(),
)
.await
.unwrap();
let myclient = pagestream_client.shutdown().await.unwrap();
client = Some(myclient);
}
let client = client.take().unwrap();
client.shutdown().await.unwrap();
}

View File

@@ -596,7 +596,14 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(FeMessage::Terminate) => {
info!("received Terminate message");
break; // FIXME we want to not wait for new comment when we see this one (and generally, pgb.read_message() should probably return an error when it sees it)
}
Some(FeMessage::CopyDone) => {
info!("received CopyDone message");
break;
}
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
@@ -715,6 +722,9 @@ impl PageServerHandler {
}
}
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
Ok(())
}