pageserver: clean up page service Result handling for shutdown/disconnect (#5504)

## Problem

- QueryError always logged at error severity, even though disconnections
are not true errors.
- QueryError type is not expressive enough to distinguish actual errors
from shutdowns.
- In some functions we're returning Ok(()) on shutdown, in others we're
returning an error

## Summary of changes

- Add QueryError::Shutdown and use it in places we check for
cancellation
- Adopt consistent Result behavior: disconnects and shutdowns are always
QueryError, not ok
- Transform shutdown+disconnect errors to Ok(()) at the very top of the
task that runs query handler
- Use the postgres protocol error code for "admin shutdown" in responses
to clients when we are shutting down.

Closes: #5517
This commit is contained in:
John Spray
2023-10-18 13:28:38 +01:00
committed by GitHub
parent 1fa0478980
commit 607d19f0e0
6 changed files with 52 additions and 26 deletions

View File

@@ -19,8 +19,8 @@ use tracing::{debug, error, info, trace};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_INTERNAL_ERROR,
SQLSTATE_SUCCESSFUL_COMPLETION,
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN,
SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION,
};
/// An error, occurred during query processing:
@@ -30,6 +30,9 @@ pub enum QueryError {
/// The connection was lost while processing the query.
#[error(transparent)]
Disconnected(#[from] ConnectionError),
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Some other error
#[error(transparent)]
Other(#[from] anyhow::Error),
@@ -44,7 +47,8 @@ impl From<io::Error> for QueryError {
impl QueryError {
pub fn pg_error_code(&self) -> &'static [u8; 5] {
match self {
Self::Disconnected(_) => b"08006", // connection failure
Self::Disconnected(_) => b"08006", // connection failure
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
}
}
@@ -396,7 +400,20 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
// socket might be already closed, e.g. if previously received error,
// so ignore result.
self.framed.shutdown().await.ok();
ret
match ret {
Ok(()) => Ok(()),
Err(QueryError::Shutdown) => {
info!("Stopped due to shutdown");
Ok(())
}
Err(QueryError::Disconnected(e)) => {
info!("Disconnected ({e:#})");
// Disconnection is not an error: we just use it that way internally to drop
// out of loops.
Ok(())
}
e => e,
}
}
async fn run_message_loop<F, S>(
@@ -416,15 +433,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during handshake");
return Ok(())
return Err(QueryError::Shutdown)
},
result = self.handshake(handler) => {
// Handshake complete.
result?;
if self.state == ProtoState::Closed {
return Ok(()); // EOF during handshake
}
handshake_r = self.handshake(handler) => {
handshake_r?;
}
);
@@ -435,7 +448,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received in run_message_loop");
Ok(None)
return Err(QueryError::Shutdown)
},
msg = self.read_message() => { msg },
)? {
@@ -447,7 +460,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during response flush");
return Ok(())
// If we exited process_message with a shutdown error, there may be
// some valid response content on in our transmit buffer: permit sending
// this within a short timeout. This is a best effort thing so we don't
// care about the result.
tokio::time::timeout(std::time::Duration::from_millis(500), self.flush()).await.ok();
return Err(QueryError::Shutdown)
},
flush_r = self.flush() => {
flush_r?;
@@ -560,7 +580,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
self.peer_addr
);
self.state = ProtoState::Closed;
return Ok(());
return Err(QueryError::Disconnected(ConnectionError::Protocol(
ProtocolError::Protocol("EOF during handshake".to_string()),
)));
}
}
}
@@ -599,7 +621,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
self.peer_addr
);
self.state = ProtoState::Closed;
return Ok(());
return Err(QueryError::Disconnected(ConnectionError::Protocol(
ProtocolError::Protocol("EOF during auth".to_string()),
)));
}
}
}
@@ -923,6 +947,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I
pub fn short_error(e: &QueryError) -> String {
match e {
QueryError::Disconnected(connection_error) => connection_error.to_string(),
QueryError::Shutdown => "shutdown".to_string(),
QueryError::Other(e) => format!("{e:#}"),
}
}
@@ -939,6 +964,9 @@ fn log_query_error(query: &str, e: &QueryError) {
QueryError::Disconnected(other_connection_error) => {
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
}
QueryError::Shutdown => {
info!("query handler for '{query}' cancelled during tenant shutdown")
}
QueryError::Other(e) => {
error!("query handler for '{query}' failed: {e:?}");
}