diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 1dae008a4f..73d25619c3 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -35,6 +35,12 @@ pub enum QueryError { /// We were instructed to shutdown while processing the query #[error("Shutting down")] Shutdown, + /// Query handler indicated that client should reconnect + #[error("Server requested reconnect")] + Reconnect, + /// Query named an entity that was not found + #[error("Not found: {0}")] + NotFound(std::borrow::Cow<'static, str>), /// Authentication failure #[error("Unauthorized: {0}")] Unauthorized(std::borrow::Cow<'static, str>), @@ -54,9 +60,9 @@ impl From for QueryError { impl QueryError { pub fn pg_error_code(&self) -> &'static [u8; 5] { match self { - Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure + Self::Disconnected(_) | Self::SimulatedConnectionError | Self::Reconnect => b"08006", // connection failure Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN, - Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR, + Self::Unauthorized(_) | Self::NotFound(_) => SQLSTATE_INTERNAL_ERROR, Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error } } @@ -425,6 +431,11 @@ impl PostgresBackend { info!("Stopped due to shutdown"); Ok(()) } + Err(QueryError::Reconnect) => { + // Dropping out of this loop implicitly disconnects + info!("Stopped due to handler reconnect request"); + Ok(()) + } Err(QueryError::Disconnected(e)) => { info!("Disconnected ({e:#})"); // Disconnection is not an error: we just use it that way internally to drop @@ -974,7 +985,9 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I pub fn short_error(e: &QueryError) -> String { match e { QueryError::Disconnected(connection_error) => connection_error.to_string(), + QueryError::Reconnect => "reconnect".to_string(), QueryError::Shutdown => "shutdown".to_string(), + QueryError::NotFound(_) => "not found".to_string(), QueryError::Unauthorized(_e) => "JWT authentication error".to_string(), QueryError::SimulatedConnectionError => "simulated connection error".to_string(), QueryError::Other(e) => format!("{e:#}"), @@ -996,9 +1009,15 @@ fn log_query_error(query: &str, e: &QueryError) { QueryError::SimulatedConnectionError => { error!("query handler for query '{query}' failed due to a simulated connection error") } + QueryError::Reconnect => { + info!("query handler for '{query}' requested client to reconnect") + } QueryError::Shutdown => { info!("query handler for '{query}' cancelled during tenant shutdown") } + QueryError::NotFound(reason) => { + info!("query handler for '{query}' entity not found: {reason}") + } QueryError::Unauthorized(e) => { warn!("query handler for '{query}' failed with authentication error: {e}"); } diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index ac68b04888..3e9281ac81 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -31,6 +31,9 @@ pub enum ApiError { #[error("Shutting down")] ShuttingDown, + #[error("Timeout")] + Timeout(Cow<'static, str>), + #[error(transparent)] InternalServerError(anyhow::Error), } @@ -67,6 +70,10 @@ impl ApiError { err.to_string(), StatusCode::SERVICE_UNAVAILABLE, ), + ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status( + err.to_string(), + StatusCode::REQUEST_TIMEOUT, + ), ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status( err.to_string(), StatusCode::INTERNAL_SERVER_ERROR, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 157e6b4e3e..8265627cb5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -152,6 +152,7 @@ impl From for ApiError { PageReconstructError::AncestorStopping(_) => { ApiError::ResourceUnavailable(format!("{pre}").into()) } + PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()), PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre), } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d478d375f8..291490d016 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; +use std::borrow::Cow; use std::io; use std::net::TcpListener; use std::pin::pin; @@ -61,6 +62,9 @@ use crate::tenant::mgr; use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::ShardSelector; +use crate::tenant::timeline::WaitLsnError; +use crate::tenant::GetTimelineError; +use crate::tenant::PageReconstructError; use crate::tenant::Timeline; use crate::trace::Tracer; @@ -283,6 +287,64 @@ struct PageServerHandler { connection_ctx: RequestContext, } +#[derive(thiserror::Error, Debug)] +enum PageStreamError { + /// We encountered an error that should prompt the client to reconnect: + /// in practice this means we drop the connection without sending a response. + #[error("Reconnect required: {0}")] + Reconnect(Cow<'static, str>), + + /// We were instructed to shutdown while processing the query + #[error("Shutting down")] + Shutdown, + + /// Something went wrong reading a page: this likely indicates a pageserver bug + #[error("Read error: {0}")] + Read(PageReconstructError), + + /// Ran out of time waiting for an LSN + #[error("LSN timeout: {0}")] + LsnTimeout(WaitLsnError), + + /// The entity required to serve the request (tenant or timeline) is not found, + /// or is not found in a suitable state to serve a request. + #[error("Not found: {0}")] + NotFound(std::borrow::Cow<'static, str>), + + /// Request asked for something that doesn't make sense, like an invalid LSN + #[error("Bad request: {0}")] + BadRequest(std::borrow::Cow<'static, str>), +} + +impl From for PageStreamError { + fn from(value: PageReconstructError) -> Self { + match value { + PageReconstructError::Cancelled => Self::Shutdown, + e => Self::Read(e), + } + } +} + +impl From for PageStreamError { + fn from(value: GetActiveTimelineError) -> Self { + match value { + GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown, + GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()), + GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()), + } + } +} + +impl From for PageStreamError { + fn from(value: WaitLsnError) -> Self { + match value { + e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e), + WaitLsnError::Shutdown => Self::Shutdown, + WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()), + } + } +} + impl PageServerHandler { pub fn new( conf: &'static PageServerConf, @@ -428,7 +490,7 @@ impl PageServerHandler { // Check that the timeline exists let timeline = tenant .get_timeline(timeline_id, true) - .map_err(|e| anyhow::anyhow!(e))?; + .map_err(|e| QueryError::NotFound(format!("{e}").into()))?; // Avoid starting new requests if the timeline has already started shutting down, // and block timeline shutdown until this request is complete, or drops out due @@ -520,32 +582,44 @@ impl PageServerHandler { } }; - if let Err(e) = &response { - // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet, - // because wait_lsn etc will drop out - // is_stopping(): [`Timeline::flush_and_shutdown`] has entered - // is_canceled(): [`Timeline::shutdown`]` has entered - if timeline.cancel.is_cancelled() || timeline.is_stopping() { + match response { + Err(PageStreamError::Shutdown) => { // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the // connection. - span.in_scope(|| info!("dropped response during shutdown: {e:#}")); + span.in_scope(|| info!("dropping connection due to shutdown")); return Err(QueryError::Shutdown); } + Err(PageStreamError::Reconnect(reason)) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => { + // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean + // shutdown error, this may be buried inside a PageReconstructError::Other for example. + // + // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet, + // because wait_lsn etc will drop out + // is_stopping(): [`Timeline::flush_and_shutdown`] has entered + // is_canceled(): [`Timeline::shutdown`]` has entered + span.in_scope(|| info!("dropped error response during shutdown: {e:#}")); + return Err(QueryError::Shutdown); + } + r => { + let response_msg = r.unwrap_or_else(|e| { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + span.in_scope(|| error!("error reading relation or page version: {:#}", e)); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + + pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + self.flush_cancellable(pgb, &timeline.cancel).await?; + } } - - let response = response.unwrap_or_else(|e| { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - span.in_scope(|| error!("error reading relation or page version: {:#}", e)); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - }); - - pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?; - self.flush_cancellable(pgb, &timeline.cancel).await?; } Ok(()) } @@ -692,7 +766,7 @@ impl PageServerHandler { latest: bool, latest_gc_cutoff_lsn: &RcuReadGuard, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { if latest { // Latest page version was requested. If LSN is given, it is a hint // to the page server that there have been no modifications to the @@ -723,15 +797,19 @@ impl PageServerHandler { } } else { if lsn == Lsn(0) { - anyhow::bail!("invalid LSN(0) in request"); + return Err(PageStreamError::BadRequest( + "invalid LSN(0) in request".into(), + )); } timeline.wait_lsn(lsn, ctx).await?; } - anyhow::ensure!( - lsn >= **latest_gc_cutoff_lsn, - "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", - lsn, **latest_gc_cutoff_lsn - ); + + if lsn < **latest_gc_cutoff_lsn { + return Err(PageStreamError::BadRequest(format!( + "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", + lsn, **latest_gc_cutoff_lsn + ).into())); + } Ok(lsn) } @@ -740,7 +818,7 @@ impl PageServerHandler { timeline: &Timeline, req: &PagestreamExistsRequest, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -760,7 +838,7 @@ impl PageServerHandler { timeline: &Timeline, req: &PagestreamNblocksRequest, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -780,7 +858,7 @@ impl PageServerHandler { timeline: &Timeline, req: &PagestreamDbSizeRequest, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -807,7 +885,7 @@ impl PageServerHandler { timeline: &Timeline, req: &PagestreamGetPageRequest, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -826,7 +904,7 @@ impl PageServerHandler { timeline: &Timeline, req: &PagestreamGetPageRequest, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let key = rel_block_to_key(req.rel, req.blkno); if timeline.get_shard_identity().is_key_local(&key) { self.do_handle_get_page_at_lsn_request(timeline, req, ctx) @@ -849,24 +927,26 @@ impl PageServerHandler { Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { // We already know this tenant exists in general, because we resolved it at // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node. - - // TODO: this should be some kind of structured error that the client will understand, - // so that it can block until its config is updated: this error is expected in the case - // that the Tenant's shards' placements are being updated and the client hasn't been - // informed yet. - // - // https://github.com/neondatabase/neon/issues/6038 - tracing::warn!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}", + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}", timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key); - return Err(anyhow::anyhow!("Request routed to wrong shard")); + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); } Err(e) => return Err(e.into()), }; // Take a GateGuard for the duration of this request. If we were using our main Timeline object, // the GateGuard was already held over the whole connection. - let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; + let _timeline_guard = timeline + .gate + .enter() + .map_err(|_| PageStreamError::Shutdown)?; self.do_handle_get_page_at_lsn_request(&timeline, req, ctx) .await @@ -1011,9 +1091,7 @@ impl PageServerHandler { ) .await .map_err(GetActiveTimelineError::Tenant)?; - let timeline = tenant - .get_timeline(timeline_id, true) - .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; + let timeline = tenant.get_timeline(timeline_id, true)?; Ok(timeline) } } @@ -1435,14 +1513,15 @@ enum GetActiveTimelineError { #[error(transparent)] Tenant(GetActiveTenantError), #[error(transparent)] - Timeline(anyhow::Error), + Timeline(#[from] GetTimelineError), } impl From for QueryError { fn from(e: GetActiveTimelineError) -> Self { match e { + GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown, GetActiveTimelineError::Tenant(e) => e.into(), - GetActiveTimelineError::Timeline(e) => QueryError::Other(e), + GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()), } } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 9fe75e5baf..f11a72f2ab 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -160,7 +160,7 @@ impl Timeline { //------------------------------------------------------------------------------ /// Look up given page version. - pub async fn get_rel_page_at_lsn( + pub(crate) async fn get_rel_page_at_lsn( &self, tag: RelTag, blknum: BlockNumber, @@ -191,7 +191,7 @@ impl Timeline { } // Get size of a database in blocks - pub async fn get_db_size( + pub(crate) async fn get_db_size( &self, spcnode: Oid, dbnode: Oid, @@ -211,7 +211,7 @@ impl Timeline { } /// Get size of a relation file - pub async fn get_rel_size( + pub(crate) async fn get_rel_size( &self, tag: RelTag, version: Version<'_>, @@ -256,7 +256,7 @@ impl Timeline { } /// Does relation exist? - pub async fn get_rel_exists( + pub(crate) async fn get_rel_exists( &self, tag: RelTag, version: Version<'_>, @@ -291,7 +291,7 @@ impl Timeline { /// # Cancel-Safety /// /// This method is cancellation-safe. - pub async fn list_rels( + pub(crate) async fn list_rels( &self, spcnode: Oid, dbnode: Oid, @@ -319,7 +319,7 @@ impl Timeline { } /// Look up given SLRU page version. - pub async fn get_slru_page_at_lsn( + pub(crate) async fn get_slru_page_at_lsn( &self, kind: SlruKind, segno: u32, @@ -332,7 +332,7 @@ impl Timeline { } /// Get size of an SLRU segment - pub async fn get_slru_segment_size( + pub(crate) async fn get_slru_segment_size( &self, kind: SlruKind, segno: u32, @@ -345,7 +345,7 @@ impl Timeline { } /// Get size of an SLRU segment - pub async fn get_slru_segment_exists( + pub(crate) async fn get_slru_segment_exists( &self, kind: SlruKind, segno: u32, @@ -372,7 +372,7 @@ impl Timeline { /// so it's not well defined which LSN you get if there were multiple commits /// "in flight" at that point in time. /// - pub async fn find_lsn_for_timestamp( + pub(crate) async fn find_lsn_for_timestamp( &self, search_timestamp: TimestampTz, cancel: &CancellationToken, @@ -452,7 +452,7 @@ impl Timeline { /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits /// with a smaller/larger timestamp. /// - pub async fn is_latest_commit_timestamp_ge_than( + pub(crate) async fn is_latest_commit_timestamp_ge_than( &self, search_timestamp: TimestampTz, probe_lsn: Lsn, @@ -475,7 +475,7 @@ impl Timeline { /// Obtain the possible timestamp range for the given lsn. /// /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps. - pub async fn get_timestamp_for_lsn( + pub(crate) async fn get_timestamp_for_lsn( &self, probe_lsn: Lsn, ctx: &RequestContext, @@ -532,7 +532,7 @@ impl Timeline { } /// Get a list of SLRU segments - pub async fn list_slru_segments( + pub(crate) async fn list_slru_segments( &self, kind: SlruKind, version: Version<'_>, @@ -548,7 +548,7 @@ impl Timeline { } } - pub async fn get_relmap_file( + pub(crate) async fn get_relmap_file( &self, spcnode: Oid, dbnode: Oid, @@ -561,7 +561,7 @@ impl Timeline { Ok(buf) } - pub async fn list_dbdirs( + pub(crate) async fn list_dbdirs( &self, lsn: Lsn, ctx: &RequestContext, @@ -575,7 +575,7 @@ impl Timeline { } } - pub async fn get_twophase_file( + pub(crate) async fn get_twophase_file( &self, xid: TransactionId, lsn: Lsn, @@ -586,7 +586,7 @@ impl Timeline { Ok(buf) } - pub async fn list_twophase_files( + pub(crate) async fn list_twophase_files( &self, lsn: Lsn, ctx: &RequestContext, @@ -600,7 +600,7 @@ impl Timeline { } } - pub async fn get_control_file( + pub(crate) async fn get_control_file( &self, lsn: Lsn, ctx: &RequestContext, @@ -608,7 +608,7 @@ impl Timeline { self.get(CONTROLFILE_KEY, lsn, ctx).await } - pub async fn get_checkpoint( + pub(crate) async fn get_checkpoint( &self, lsn: Lsn, ctx: &RequestContext, @@ -616,7 +616,7 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } - pub async fn list_aux_files( + pub(crate) async fn list_aux_files( &self, lsn: Lsn, ctx: &RequestContext, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1660de8923..7c609452e5 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -56,6 +56,7 @@ use self::timeline::uninit::TimelineUninitMark; use self::timeline::uninit::UninitializedTimeline; use self::timeline::EvictionTaskTenantState; use self::timeline::TimelineResources; +use self::timeline::WaitLsnError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; @@ -1758,7 +1759,15 @@ impl Tenant { // decoding the new WAL might need to look up previous pages, relation // sizes etc. and that would get confused if the previous page versions // are not in the repository yet. - ancestor_timeline.wait_lsn(*lsn, ctx).await?; + ancestor_timeline + .wait_lsn(*lsn, ctx) + .await + .map_err(|e| match e { + e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => { + CreateTimelineError::AncestorLsn(anyhow::anyhow!(e)) + } + WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown, + })?; } self.branch_timeline( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e8340a74b2..24a92859b7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -373,15 +373,20 @@ pub struct GcInfo { } /// An error happened in a get() operation. -#[derive(thiserror::Error)] -pub enum PageReconstructError { +#[derive(thiserror::Error, Debug)] +pub(crate) enum PageReconstructError { #[error(transparent)] Other(#[from] anyhow::Error), + #[error("Ancestor LSN wait error: {0}")] + AncestorLsnTimeout(#[from] WaitLsnError), + /// The operation was cancelled + #[error("Cancelled")] Cancelled, /// The ancestor of this is being stopped + #[error("ancestor timeline {0} is being stopped")] AncestorStopping(TimelineId), /// An error happened replaying WAL records @@ -402,32 +407,6 @@ enum FlushLayerError { Other(#[from] anyhow::Error), } -impl std::fmt::Debug for PageReconstructError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - match self { - Self::Other(err) => err.fmt(f), - Self::Cancelled => write!(f, "cancelled"), - Self::AncestorStopping(timeline_id) => { - write!(f, "ancestor timeline {timeline_id} is being stopped") - } - Self::WalRedo(err) => err.fmt(f), - } - } -} - -impl std::fmt::Display for PageReconstructError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - match self { - Self::Other(err) => err.fmt(f), - Self::Cancelled => write!(f, "cancelled"), - Self::AncestorStopping(timeline_id) => { - write!(f, "ancestor timeline {timeline_id} is being stopped") - } - Self::WalRedo(err) => err.fmt(f), - } - } -} - #[derive(Clone, Copy)] pub enum LogicalSizeCalculationCause { Initial, @@ -452,6 +431,21 @@ impl std::fmt::Debug for Timeline { } } +#[derive(thiserror::Error, Debug)] +pub(crate) enum WaitLsnError { + // Called on a timeline which is shutting down + #[error("Shutdown")] + Shutdown, + + // Called on an timeline not in active state or shutting down + #[error("Bad state (not active)")] + BadState, + + // Timeout expired while waiting for LSN to catch up with goal. + #[error("{0}")] + Timeout(String), +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -486,7 +480,7 @@ impl Timeline { /// # Cancel-Safety /// /// This method is cancellation-safe. - pub async fn get( + pub(crate) async fn get( &self, key: Key, lsn: Lsn, @@ -634,24 +628,28 @@ impl Timeline { /// You should call this before any of the other get_* or list_* functions. Calling /// those functions with an LSN that has been processed yet is an error. /// - pub async fn wait_lsn( + pub(crate) async fn wait_lsn( &self, lsn: Lsn, _ctx: &RequestContext, /* Prepare for use by cancellation */ - ) -> anyhow::Result<()> { - anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline"); + ) -> Result<(), WaitLsnError> { + if self.cancel.is_cancelled() { + return Err(WaitLsnError::Shutdown); + } else if !self.is_active() { + return Err(WaitLsnError::BadState); + } // This should never be called from the WAL receiver, because that could lead // to a deadlock. - anyhow::ensure!( + debug_assert!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager), "wait_lsn cannot be called in WAL receiver" ); - anyhow::ensure!( + debug_assert!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler), "wait_lsn cannot be called in WAL receiver" ); - anyhow::ensure!( + debug_assert!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller), "wait_lsn cannot be called in WAL receiver" ); @@ -665,18 +663,22 @@ impl Timeline { { Ok(()) => Ok(()), Err(e) => { - // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo - drop(_timer); - let walreceiver_status = self.walreceiver_status(); - Err(anyhow::Error::new(e).context({ - format!( + use utils::seqwait::SeqWaitError::*; + match e { + Shutdown => Err(WaitLsnError::Shutdown), + Timeout => { + // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo + drop(_timer); + let walreceiver_status = self.walreceiver_status(); + Err(WaitLsnError::Timeout(format!( "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}", lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn(), walreceiver_status, - ) - })) + ))) + } + } } } } @@ -2295,11 +2297,12 @@ impl Timeline { ancestor .wait_lsn(timeline.ancestor_lsn, ctx) .await - .with_context(|| { - format!( - "wait for lsn {} on ancestor timeline_id={}", - timeline.ancestor_lsn, ancestor.timeline_id - ) + .map_err(|e| match e { + e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e), + WaitLsnError::Shutdown => PageReconstructError::Cancelled, + e @ WaitLsnError::BadState => { + PageReconstructError::Other(anyhow::anyhow!(e)) + } })?; timeline_owned = ancestor; @@ -4228,7 +4231,7 @@ impl Timeline { .context("Failed to reconstruct a page image:") { Ok(img) => img, - Err(e) => return Err(PageReconstructError::from(e)), + Err(e) => return Err(PageReconstructError::WalRedo(e)), }; if img.len() == page_cache::PAGE_SZ { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 9ac9e36b61..898639ac98 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1053,26 +1053,34 @@ impl WalIngest { } }; - let vm_rel = RelTag { - forknum: VISIBILITYMAP_FORKNUM, - spcnode: decoded.blocks[0].rnode_spcnode, - dbnode: decoded.blocks[0].rnode_dbnode, - relnode: decoded.blocks[0].rnode_relnode, - }; - Ok(self - .clear_visibility_map_bits_if_required( - modification, - vm_rel, - new_heap_blkno, - old_heap_blkno, - flags, - ctx, - ) - .await - .map_err(HeapamRecordSpecialTreatmentError::EmitWalRecord)?) + // Clear the VM bits if required. + if new_heap_blkno.is_some() || old_heap_blkno.is_some() { + let vm_rel = RelTag { + forknum: VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }; + + Ok(self + .clear_visibility_map_bits( + modification, + vm_rel, + new_heap_blkno, + old_heap_blkno, + flags, + ctx, + ) + .await + .map_err(HeapamRecordSpecialTreatmentError::EmitWalRecord)?) + } else { + Ok(ClearVisibilityMapFlagsOutcome::Noop) + } } - async fn clear_visibility_map_bits_if_required( + /// Write NeonWalRecord::ClearVisibilityMapFlags records for clearing the VM bits + /// corresponding to new_heap_blkno and old_heap_blkno. + async fn clear_visibility_map_bits( &mut self, modification: &mut DatadirModification<'_>, vm_rel: RelTag, @@ -1081,9 +1089,8 @@ impl WalIngest { flags: u8, ctx: &RequestContext, ) -> anyhow::Result { - let new = new_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x))); - let old = old_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x))); - + // Determine the VM pages containing the old and the new heap block. + // // Sometimes, Postgres seems to create heap WAL records with the // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is // not set. In fact, it's possible that the VM page does not exist at all. @@ -1091,54 +1098,62 @@ impl WalIngest { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let (new, old) = { - let vm_size = if new.or(old).is_some() { - Some(get_relsize(modification, vm_rel, ctx).await?) - } else { + let vm_size = get_relsize(modification, vm_rel, ctx).await?; + let heap_to_vm_blk = |heap_blkno| { + let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); + if vm_blk >= vm_size { None - }; - let filter = |(heap_blk, vm_blk)| { - let vm_size = vm_size.expect("we set it to Some() if new or old is Some()"); - if vm_blk >= vm_size { - None - } else { - Some((heap_blk, vm_blk)) - } - }; - (new.and_then(filter), old.and_then(filter)) + } else { + Some(vm_blk) + } }; + let new_vm_blk = new_heap_blkno.map(heap_to_vm_blk).flatten(); + let old_vm_blk = old_heap_blkno.map(heap_to_vm_blk).flatten(); - let outcome = match (new, old) { - (Some((new_heap_blkno, new_vm_blk)), Some((old_heap_blkno, old_vm_blk))) - if new_vm_blk == old_vm_blk => - { + let mut outcome = ClearVisibilityMapFlagsOutcome::Noop; + if new_vm_blk.is_some() || old_vm_blk.is_some() { + if new_vm_blk == old_vm_blk { // An UPDATE record that needs to clear the bits for both old and the // new page, both of which reside on the same VM page. self.put_rel_wal_record( modification, vm_rel, - new_vm_blk, // could also be old_vm_blk, they're the same + new_vm_blk.unwrap(), NeonWalRecord::ClearVisibilityMapFlags { - heap_blkno_1: Some(new_heap_blkno), - heap_blkno_2: Some(old_heap_blkno), + new_heap_blkno, + old_heap_blkno, flags, }, ctx, ) .await?; - ClearVisibilityMapFlagsOutcome::Stored - } - (new, old) => { - // Emit one record per VM block that needs updating. - let mut outcome = ClearVisibilityMapFlagsOutcome::Noop; - for (heap_blkno, vm_blk) in [new, old].into_iter().flatten() { + outcome = ClearVisibilityMapFlagsOutcome::Stored; + } else { + // Clear VM bits for one heap page, or for two pages that reside on + // different VM pages. + if let Some(new_vm_blk) = new_vm_blk { self.put_rel_wal_record( modification, vm_rel, - vm_blk, + new_vm_blk, NeonWalRecord::ClearVisibilityMapFlags { - heap_blkno_1: Some(heap_blkno), - heap_blkno_2: None, + new_heap_blkno, + old_heap_blkno: None, + flags, + }, + ctx, + ) + .await?; + outcome = ClearVisibilityMapFlagsOutcome::Stored; + } + if let Some(old_vm_blk) = old_vm_blk { + self.put_rel_wal_record( + modification, + vm_rel, + old_vm_blk, + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno: None, + old_heap_blkno, flags, }, ctx, @@ -1146,7 +1161,6 @@ impl WalIngest { .await?; outcome = ClearVisibilityMapFlagsOutcome::Stored; } - outcome } }; anyhow::Ok(outcome) @@ -1244,7 +1258,7 @@ impl WalIngest { relnode: decoded.blocks[0].rnode_relnode, }; Ok(self - .clear_visibility_map_bits_if_required( + .clear_visibility_map_bits( modification, vm_rel, new_heap_blkno, diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 0bba6e2a51..ff6bc9194b 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -21,13 +21,10 @@ pub enum NeonWalRecord { /// Native PostgreSQL WAL record Postgres { will_init: bool, rec: Bytes }, - /// Clear the bits specified in `flags` in the heap visibility map for the given heap blocks. - /// - /// For example, for `{ heap_blkno_1: None, heap_blkno_2: Some(23), flags: 0b0010_0000}` - /// redo will apply `&=0b0010_0000` on heap block 23's visibility map bitmask. + /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear) ClearVisibilityMapFlags { - heap_blkno_1: Option, - heap_blkno_2: Option, + new_heap_blkno: Option, + old_heap_blkno: Option, flags: u8, }, /// Mark transaction IDs as committed on a CLOG page diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a1df1b86e9..c49c350f9d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -403,8 +403,8 @@ impl PostgresRedoManager { anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); } NeonWalRecord::ClearVisibilityMapFlags { - heap_blkno_1, - heap_blkno_2, + new_heap_blkno, + old_heap_blkno, flags, } => { // sanity check that this is modifying the correct relation @@ -414,8 +414,11 @@ impl PostgresRedoManager { "ClearVisibilityMapFlags record on unexpected rel {}", rel ); - for heap_blkno in [heap_blkno_1, heap_blkno_2].into_iter().flatten() { - let heap_blkno = *heap_blkno; + + // Helper function to clear the VM bit corresponding to 'heap_blkno'. + // (The logic is similar to the guts of the visibilitymap_clear() function + // in PostgreSQL, after it has locked the right VM page.) + let mut visibilitymap_clear = |heap_blkno| { // Calculate the VM block and offset that corresponds to the heap block. let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); @@ -428,6 +431,12 @@ impl PostgresRedoManager { let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; map[map_byte as usize] &= !(flags << map_offset); + }; + if let Some(heap_blkno) = *new_heap_blkno { + visibilitymap_clear(heap_blkno); + } + if let Some(heap_blkno) = *old_heap_blkno { + visibilitymap_clear(heap_blkno); } } // Non-relational WAL records are handled here, with custom code that has the