From 607d19f0e0babcae60551af9bfd2236f927fa93b Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 18 Oct 2023 13:28:38 +0100 Subject: [PATCH] pageserver: clean up page service Result handling for shutdown/disconnect (#5504) ## Problem - QueryError always logged at error severity, even though disconnections are not true errors. - QueryError type is not expressive enough to distinguish actual errors from shutdowns. - In some functions we're returning Ok(()) on shutdown, in others we're returning an error ## Summary of changes - Add QueryError::Shutdown and use it in places we check for cancellation - Adopt consistent Result behavior: disconnects and shutdowns are always QueryError, not ok - Transform shutdown+disconnect errors to Ok(()) at the very top of the task that runs query handler - Use the postgres protocol error code for "admin shutdown" in responses to clients when we are shutting down. Closes: #5517 --- libs/postgres_backend/src/lib.rs | 58 ++++++++++++++----- libs/pq_proto/src/lib.rs | 1 + pageserver/src/bin/pageserver.rs | 1 + pageserver/src/page_service.rs | 13 +++-- ...test_pageserver_restarts_under_workload.py | 2 - test_runner/regress/test_tenant_detach.py | 3 - 6 files changed, 52 insertions(+), 26 deletions(-) diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 08c4e03d13..8ce7f85481 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -19,8 +19,8 @@ use tracing::{debug, error, info, trace}; use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter}; use pq_proto::{ - BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_INTERNAL_ERROR, - SQLSTATE_SUCCESSFUL_COMPLETION, + BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN, + SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION, }; /// An error, occurred during query processing: @@ -30,6 +30,9 @@ pub enum QueryError { /// The connection was lost while processing the query. #[error(transparent)] Disconnected(#[from] ConnectionError), + /// We were instructed to shutdown while processing the query + #[error("Shutting down")] + Shutdown, /// Some other error #[error(transparent)] Other(#[from] anyhow::Error), @@ -44,7 +47,8 @@ impl From for QueryError { impl QueryError { pub fn pg_error_code(&self) -> &'static [u8; 5] { match self { - Self::Disconnected(_) => b"08006", // connection failure + Self::Disconnected(_) => b"08006", // connection failure + Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN, Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error } } @@ -396,7 +400,20 @@ impl PostgresBackend { // socket might be already closed, e.g. if previously received error, // so ignore result. self.framed.shutdown().await.ok(); - ret + match ret { + Ok(()) => Ok(()), + Err(QueryError::Shutdown) => { + info!("Stopped due to shutdown"); + Ok(()) + } + Err(QueryError::Disconnected(e)) => { + info!("Disconnected ({e:#})"); + // Disconnection is not an error: we just use it that way internally to drop + // out of loops. + Ok(()) + } + e => e, + } } async fn run_message_loop( @@ -416,15 +433,11 @@ impl PostgresBackend { _ = shutdown_watcher() => { // We were requested to shut down. tracing::info!("shutdown request received during handshake"); - return Ok(()) + return Err(QueryError::Shutdown) }, - result = self.handshake(handler) => { - // Handshake complete. - result?; - if self.state == ProtoState::Closed { - return Ok(()); // EOF during handshake - } + handshake_r = self.handshake(handler) => { + handshake_r?; } ); @@ -435,7 +448,7 @@ impl PostgresBackend { _ = shutdown_watcher() => { // We were requested to shut down. tracing::info!("shutdown request received in run_message_loop"); - Ok(None) + return Err(QueryError::Shutdown) }, msg = self.read_message() => { msg }, )? { @@ -447,7 +460,14 @@ impl PostgresBackend { _ = shutdown_watcher() => { // We were requested to shut down. tracing::info!("shutdown request received during response flush"); - return Ok(()) + + // If we exited process_message with a shutdown error, there may be + // some valid response content on in our transmit buffer: permit sending + // this within a short timeout. This is a best effort thing so we don't + // care about the result. + tokio::time::timeout(std::time::Duration::from_millis(500), self.flush()).await.ok(); + + return Err(QueryError::Shutdown) }, flush_r = self.flush() => { flush_r?; @@ -560,7 +580,9 @@ impl PostgresBackend { self.peer_addr ); self.state = ProtoState::Closed; - return Ok(()); + return Err(QueryError::Disconnected(ConnectionError::Protocol( + ProtocolError::Protocol("EOF during handshake".to_string()), + ))); } } } @@ -599,7 +621,9 @@ impl PostgresBackend { self.peer_addr ); self.state = ProtoState::Closed; - return Ok(()); + return Err(QueryError::Disconnected(ConnectionError::Protocol( + ProtocolError::Protocol("EOF during auth".to_string()), + ))); } } } @@ -923,6 +947,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I pub fn short_error(e: &QueryError) -> String { match e { QueryError::Disconnected(connection_error) => connection_error.to_string(), + QueryError::Shutdown => "shutdown".to_string(), QueryError::Other(e) => format!("{e:#}"), } } @@ -939,6 +964,9 @@ fn log_query_error(query: &str, e: &QueryError) { QueryError::Disconnected(other_connection_error) => { error!("query handler for '{query}' failed with connection error: {other_connection_error:?}") } + QueryError::Shutdown => { + info!("query handler for '{query}' cancelled during tenant shutdown") + } QueryError::Other(e) => { error!("query handler for '{query}' failed: {e:?}"); } diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 47faad363f..94bc7f60f8 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -670,6 +670,7 @@ pub fn read_cstr(buf: &mut Bytes) -> Result { } pub const SQLSTATE_INTERNAL_ERROR: &[u8; 5] = b"XX000"; +pub const SQLSTATE_ADMIN_SHUTDOWN: &[u8; 5] = b"57P01"; pub const SQLSTATE_SUCCESSFUL_COMPLETION: &[u8; 5] = b"00000"; impl<'a> BeMessage<'a> { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 827f74fcce..76cb0e8ec6 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -579,6 +579,7 @@ fn start_pageserver( pageserver_listener, conf.pg_auth_type, libpq_ctx, + task_mgr::shutdown_token(), ) .await }, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5ab4fbbd4c..536334d051 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -122,6 +122,7 @@ pub async fn libpq_listener_main( listener: TcpListener, auth_type: AuthType, listener_ctx: RequestContext, + cancel: CancellationToken, ) -> anyhow::Result<()> { listener.set_nonblocking(true)?; let tokio_listener = tokio::net::TcpListener::from_std(listener)?; @@ -130,7 +131,7 @@ pub async fn libpq_listener_main( while let Some(res) = tokio::select! { biased; - _ = task_mgr::shutdown_watcher() => { + _ = cancel.cancelled() => { // We were requested to shut down. None } @@ -299,7 +300,7 @@ impl PageServerHandler { Ok(flush_r?) }, _ = self.cancel.cancelled() => { - Err(QueryError::Other(anyhow::anyhow!("Shutting down"))) + Err(QueryError::Shutdown) } ) } @@ -316,11 +317,11 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = task_mgr::shutdown_watcher() => { + _ = self.cancel.cancelled() => { // We were requested to shut down. let msg = "pageserver is shutting down"; let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None)); - Err(QueryError::Other(anyhow::anyhow!(msg))) + Err(QueryError::Shutdown) } msg = pgb.read_message() => { msg.map_err(QueryError::from)} @@ -414,10 +415,10 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = task_mgr::shutdown_watcher() => { + _ = self.cancel.cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); - break; + return Err(QueryError::Shutdown) } msg = pgb.read_message() => { msg } diff --git a/test_runner/regress/test_pageserver_restarts_under_workload.py b/test_runner/regress/test_pageserver_restarts_under_workload.py index 71058268a6..65569f3bac 100644 --- a/test_runner/regress/test_pageserver_restarts_under_workload.py +++ b/test_runner/regress/test_pageserver_restarts_under_workload.py @@ -17,8 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB n_restarts = 10 scale = 10 - env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down") - def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index ec6f1258e5..e92a906fab 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -744,9 +744,6 @@ def test_ignore_while_attaching( env.pageserver.allowed_errors.append( f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" ) - # An endpoint is starting up concurrently with our detach, it can - # experience RPC failure due to shutdown. - env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down") data_id = 1 data_secret = "very secret secret"