diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 231461267a..ff542670f1 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -108,9 +108,32 @@ pub struct RelTagBlockNo { } impl PagestreamClient { - pub async fn shutdown(mut self) { - let _ = self.cancel_on_client_drop.take(); - self.conn_task.await.unwrap(); + pub async fn shutdown(self) { + let Self { + copy_both, + cancel_on_client_drop: cancel_conn_task, + conn_task, + } = self; + // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`. + // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection. + // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56). + // + // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`, + // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race). + // + // Further, the pageserver makes a lot of noise when it receives CopyFail. + // Computes don't send it in practice, they just hard-close the connection. + // + // So, let's behave like the computes and suppress the CopyFail as follows: + // kill the socket first, then drop copy_both. + // + // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY + // + // NB: page_service doesn't have a use case to exit the `pagestream` mode currently. + // => https://github.com/neondatabase/neon/issues/6390 + let _ = cancel_conn_task.unwrap(); + conn_task.await.unwrap(); + drop(copy_both); } pub async fn getpage( diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index d8957ddd6b..98f1852acd 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -404,23 +404,27 @@ async fn client( .await .unwrap(); - start_work_barrier.wait().await; - - while let Some(req) = - tokio::select! { work = work.recv() => { work } , _ = cancel.cancelled() => { return; } } - { - let start = Instant::now(); - - let res = tokio::select! { - res = client.getpage(req) => { res }, - _ = cancel.cancelled() => { return; } - }; - res.with_context(|| format!("getpage for {timeline}")) - .unwrap(); - let elapsed = start.elapsed(); - live_stats.inc(); - STATS.with(|stats| { - stats.borrow().lock().unwrap().observe(elapsed).unwrap(); - }); + let do_requests = async { + start_work_barrier.wait().await; + while let Some(req) = work.recv().await { + let start = Instant::now(); + client + .getpage(req) + .await + .with_context(|| format!("getpage for {timeline}")) + .unwrap(); + let elapsed = start.elapsed(); + live_stats.inc(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); + }); + } + }; + tokio::select! { + res = do_requests => { res }, + _ = cancel.cancelled() => { + client.shutdown().await; + return; + } } }