Revert "toy around with different ways to shut down"

This reverts commit d359d88f51.
This commit is contained in:
Christian Schwarz
2024-07-02 17:33:22 +00:00
parent d359d88f51
commit 45035ca950
4 changed files with 8 additions and 85 deletions

View File

@@ -1,5 +1,3 @@
paths = [ "/home/cs/src/rust-postgres" ]
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
@@ -8,4 +6,3 @@ rustdocflags = ["-Arustdoc::private_intra_doc_links"]
[alias]
build_testing = ["build", "--features", "testing"]
neon = ["run", "--bin", "neon_local"]

View File

@@ -1,6 +1,5 @@
use std::pin::Pin;
use anyhow::Context;
use futures::SinkExt;
use pageserver_api::{
models::{
@@ -66,11 +65,10 @@ impl Client {
let Client {
cancel_on_client_drop,
conn_task,
client,
client: _,
} = self;
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
client,
conn_task,
cancel_on_client_drop,
})
@@ -95,28 +93,11 @@ impl Client {
}
Ok(self.client.copy_out(&args.join(" ")).await?)
}
pub async fn shutdown(self) -> anyhow::Result<()> {
let Self {
client,
cancel_on_client_drop,
conn_task,
} = self;
drop(client); // this sends Terminate message(?)
conn_task
.await
.context("wait for network communications to finish cleanly")?;
drop(cancel_on_client_drop);
Ok(())
}
}
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
/// Must not use this until copy_both has been shut down.
client: tokio_postgres::Client,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -127,11 +108,10 @@ pub struct RelTagBlockNo {
}
impl PagestreamClient {
pub async fn shutdown(self) -> anyhow::Result<Client> {
pub async fn shutdown(self) {
let Self {
mut copy_both,
client,
cancel_on_client_drop,
copy_both,
cancel_on_client_drop: cancel_conn_task,
conn_task,
} = self;
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
@@ -151,13 +131,9 @@ impl PagestreamClient {
//
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
// => https://github.com/neondatabase/neon/issues/6390
let _: () = copy_both.close().await.unwrap();
Ok(Client {
client,
cancel_on_client_drop,
conn_task,
})
let _ = cancel_conn_task.unwrap();
conn_task.await.unwrap();
drop(copy_both);
}
pub async fn getpage(

View File

@@ -1,40 +0,0 @@
use std::str::FromStr;
use pageserver_client::page_service::Client;
use utils::{
id::{TenantId, TimelineId},
logging::Output,
};
#[tokio::main]
async fn main() {
utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::Disabled,
Output::Stderr,
).unwrap();
let client = Client::new("postgresql://localhost:64000".to_owned())
.await
.unwrap();
let mut client = Some(client);
for i in 1..10 {
println!("Iteration: {}", i);
let myclient = client.take().unwrap();
let pagestream_client = myclient
.pagestream(
TenantId::from_str("e0dfa97c9dc84f32ab423fe44f186283").unwrap(),
TimelineId::from_str("585d77e52a6e43a7099c6ebaea8730c2").unwrap(),
)
.await
.unwrap();
let myclient = pagestream_client.shutdown().await.unwrap();
client = Some(myclient);
}
let client = client.take().unwrap();
client.shutdown().await.unwrap();
}

View File

@@ -596,14 +596,7 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
info!("received Terminate message");
break; // FIXME we want to not wait for new comment when we see this one (and generally, pgb.read_message() should probably return an error when it sees it)
}
Some(FeMessage::CopyDone) => {
info!("received CopyDone message");
break;
}
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
@@ -722,9 +715,6 @@ impl PageServerHandler {
}
}
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
Ok(())
}