mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
[PRE-MERGE] hlinnaka/walingest-simplify-vm-flag-clearing-2
Squashed commit of the following: commitb1d701dc06Author: Heikki Linnakangas <heikki@neon.tech> Date: Thu Jan 4 18:23:59 2024 +0200 Refactor generation of ClearVisibilityMapFlags records. With fewer mutable variables, for sake of clarity. commitaa95a07d27Author: Heikki Linnakangas <heikki@neon.tech> Date: Thu Jan 4 18:20:03 2024 +0200 Refactor code to apply ClearVisibilityMapFlags records a little. To reduce the repetition. commit18e9208158Author: John Spray <john@neon.tech> Date: Thu Jan 4 10:40:03 2024 +0000 pageserver: improved error handling for shard routing error, timeline not found (#6262) ## Problem - When a client requests a key that isn't found in any shard on the node (edge case that only happens if a compute's config is out of date), we should prompt them to reconnect (as this includes a backoff), since they will not be able to complete the request until they eventually get a correct pageserver connection string. - QueryError::Other is used excessively: this contains a type-ambiguous anyhow::Error and is logged very verbosely (including backtrace). ## Summary of changes - Introduce PageStreamError to replace use of anyhow::Error in request handlers for getpage, etc. - Introduce Reconnect and NotFound variants to QueryError - Map the "shard routing error" case to PageStreamError::Reconnect -> QueryError::Reconnect - Update type conversions for LSN timeouts and tenant/timeline not found errors to use PageStreamError::NotFound->QueryError::NotFound
This commit is contained in:
@@ -35,6 +35,12 @@ pub enum QueryError {
|
||||
/// We were instructed to shutdown while processing the query
|
||||
#[error("Shutting down")]
|
||||
Shutdown,
|
||||
/// Query handler indicated that client should reconnect
|
||||
#[error("Server requested reconnect")]
|
||||
Reconnect,
|
||||
/// Query named an entity that was not found
|
||||
#[error("Not found: {0}")]
|
||||
NotFound(std::borrow::Cow<'static, str>),
|
||||
/// Authentication failure
|
||||
#[error("Unauthorized: {0}")]
|
||||
Unauthorized(std::borrow::Cow<'static, str>),
|
||||
@@ -54,9 +60,9 @@ impl From<io::Error> for QueryError {
|
||||
impl QueryError {
|
||||
pub fn pg_error_code(&self) -> &'static [u8; 5] {
|
||||
match self {
|
||||
Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure
|
||||
Self::Disconnected(_) | Self::SimulatedConnectionError | Self::Reconnect => b"08006", // connection failure
|
||||
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
|
||||
Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR,
|
||||
Self::Unauthorized(_) | Self::NotFound(_) => SQLSTATE_INTERNAL_ERROR,
|
||||
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
|
||||
}
|
||||
}
|
||||
@@ -425,6 +431,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
info!("Stopped due to shutdown");
|
||||
Ok(())
|
||||
}
|
||||
Err(QueryError::Reconnect) => {
|
||||
// Dropping out of this loop implicitly disconnects
|
||||
info!("Stopped due to handler reconnect request");
|
||||
Ok(())
|
||||
}
|
||||
Err(QueryError::Disconnected(e)) => {
|
||||
info!("Disconnected ({e:#})");
|
||||
// Disconnection is not an error: we just use it that way internally to drop
|
||||
@@ -974,7 +985,9 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I
|
||||
pub fn short_error(e: &QueryError) -> String {
|
||||
match e {
|
||||
QueryError::Disconnected(connection_error) => connection_error.to_string(),
|
||||
QueryError::Reconnect => "reconnect".to_string(),
|
||||
QueryError::Shutdown => "shutdown".to_string(),
|
||||
QueryError::NotFound(_) => "not found".to_string(),
|
||||
QueryError::Unauthorized(_e) => "JWT authentication error".to_string(),
|
||||
QueryError::SimulatedConnectionError => "simulated connection error".to_string(),
|
||||
QueryError::Other(e) => format!("{e:#}"),
|
||||
@@ -996,9 +1009,15 @@ fn log_query_error(query: &str, e: &QueryError) {
|
||||
QueryError::SimulatedConnectionError => {
|
||||
error!("query handler for query '{query}' failed due to a simulated connection error")
|
||||
}
|
||||
QueryError::Reconnect => {
|
||||
info!("query handler for '{query}' requested client to reconnect")
|
||||
}
|
||||
QueryError::Shutdown => {
|
||||
info!("query handler for '{query}' cancelled during tenant shutdown")
|
||||
}
|
||||
QueryError::NotFound(reason) => {
|
||||
info!("query handler for '{query}' entity not found: {reason}")
|
||||
}
|
||||
QueryError::Unauthorized(e) => {
|
||||
warn!("query handler for '{query}' failed with authentication error: {e}");
|
||||
}
|
||||
|
||||
@@ -31,6 +31,9 @@ pub enum ApiError {
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
#[error("Timeout")]
|
||||
Timeout(Cow<'static, str>),
|
||||
|
||||
#[error(transparent)]
|
||||
InternalServerError(anyhow::Error),
|
||||
}
|
||||
@@ -67,6 +70,10 @@ impl ApiError {
|
||||
err.to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::REQUEST_TIMEOUT,
|
||||
),
|
||||
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
|
||||
@@ -152,6 +152,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
|
||||
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
@@ -61,6 +62,9 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
@@ -283,6 +287,64 @@ struct PageServerHandler {
|
||||
connection_ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum PageStreamError {
|
||||
/// We encountered an error that should prompt the client to reconnect:
|
||||
/// in practice this means we drop the connection without sending a response.
|
||||
#[error("Reconnect required: {0}")]
|
||||
Reconnect(Cow<'static, str>),
|
||||
|
||||
/// We were instructed to shutdown while processing the query
|
||||
#[error("Shutting down")]
|
||||
Shutdown,
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Read error: {0}")]
|
||||
Read(PageReconstructError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
|
||||
/// The entity required to serve the request (tenant or timeline) is not found,
|
||||
/// or is not found in a suitable state to serve a request.
|
||||
#[error("Not found: {0}")]
|
||||
NotFound(std::borrow::Cow<'static, str>),
|
||||
|
||||
/// Request asked for something that doesn't make sense, like an invalid LSN
|
||||
#[error("Bad request: {0}")]
|
||||
BadRequest(std::borrow::Cow<'static, str>),
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for PageStreamError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::Shutdown,
|
||||
e => Self::Read(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
|
||||
GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
|
||||
GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WaitLsnError> for PageStreamError {
|
||||
fn from(value: WaitLsnError) -> Self {
|
||||
match value {
|
||||
e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
|
||||
WaitLsnError::Shutdown => Self::Shutdown,
|
||||
WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -428,7 +490,7 @@ impl PageServerHandler {
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
|
||||
|
||||
// Avoid starting new requests if the timeline has already started shutting down,
|
||||
// and block timeline shutdown until this request is complete, or drops out due
|
||||
@@ -520,32 +582,44 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
match response {
|
||||
Err(PageStreamError::Shutdown) => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
Err(PageStreamError::Reconnect(reason)) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
|
||||
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
|
||||
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
|
||||
//
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
r => {
|
||||
let response_msg = r.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -692,7 +766,7 @@ impl PageServerHandler {
|
||||
latest: bool,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
) -> Result<Lsn, PageStreamError> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
// to the page server that there have been no modifications to the
|
||||
@@ -723,15 +797,19 @@ impl PageServerHandler {
|
||||
}
|
||||
} else {
|
||||
if lsn == Lsn(0) {
|
||||
anyhow::bail!("invalid LSN(0) in request");
|
||||
return Err(PageStreamError::BadRequest(
|
||||
"invalid LSN(0) in request".into(),
|
||||
));
|
||||
}
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
}
|
||||
anyhow::ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
);
|
||||
|
||||
if lsn < **latest_gc_cutoff_lsn {
|
||||
return Err(PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
).into()));
|
||||
}
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
@@ -740,7 +818,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -760,7 +838,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -780,7 +858,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -807,7 +885,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -826,7 +904,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
if timeline.get_shard_identity().is_key_local(&key) {
|
||||
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
|
||||
@@ -849,24 +927,26 @@ impl PageServerHandler {
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node.
|
||||
|
||||
// TODO: this should be some kind of structured error that the client will understand,
|
||||
// so that it can block until its config is updated: this error is expected in the case
|
||||
// that the Tenant's shards' placements are being updated and the client hasn't been
|
||||
// informed yet.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/6038
|
||||
tracing::warn!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
|
||||
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
|
||||
return Err(anyhow::anyhow!("Request routed to wrong shard"));
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
|
||||
// the GateGuard was already held over the whole connection.
|
||||
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
|
||||
let _timeline_guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| PageStreamError::Shutdown)?;
|
||||
|
||||
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
|
||||
.await
|
||||
@@ -1011,9 +1091,7 @@ impl PageServerHandler {
|
||||
)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
Ok(timeline)
|
||||
}
|
||||
}
|
||||
@@ -1435,14 +1513,15 @@ enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
Tenant(GetActiveTenantError),
|
||||
#[error(transparent)]
|
||||
Timeline(anyhow::Error),
|
||||
Timeline(#[from] GetTimelineError),
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for QueryError {
|
||||
fn from(e: GetActiveTimelineError) -> Self {
|
||||
match e {
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
|
||||
GetActiveTimelineError::Tenant(e) => e.into(),
|
||||
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
|
||||
GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Look up given page version.
|
||||
pub async fn get_rel_page_at_lsn(
|
||||
pub(crate) async fn get_rel_page_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
@@ -191,7 +191,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub async fn get_db_size(
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -211,7 +211,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
pub async fn get_rel_size(
|
||||
pub(crate) async fn get_rel_size(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
@@ -256,7 +256,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
pub async fn get_rel_exists(
|
||||
pub(crate) async fn get_rel_exists(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
@@ -291,7 +291,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn list_rels(
|
||||
pub(crate) async fn list_rels(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -319,7 +319,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Look up given SLRU page version.
|
||||
pub async fn get_slru_page_at_lsn(
|
||||
pub(crate) async fn get_slru_page_at_lsn(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -332,7 +332,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
pub async fn get_slru_segment_size(
|
||||
pub(crate) async fn get_slru_segment_size(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -345,7 +345,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
pub async fn get_slru_segment_exists(
|
||||
pub(crate) async fn get_slru_segment_exists(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -372,7 +372,7 @@ impl Timeline {
|
||||
/// so it's not well defined which LSN you get if there were multiple commits
|
||||
/// "in flight" at that point in time.
|
||||
///
|
||||
pub async fn find_lsn_for_timestamp(
|
||||
pub(crate) async fn find_lsn_for_timestamp(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
cancel: &CancellationToken,
|
||||
@@ -452,7 +452,7 @@ impl Timeline {
|
||||
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
|
||||
/// with a smaller/larger timestamp.
|
||||
///
|
||||
pub async fn is_latest_commit_timestamp_ge_than(
|
||||
pub(crate) async fn is_latest_commit_timestamp_ge_than(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
probe_lsn: Lsn,
|
||||
@@ -475,7 +475,7 @@ impl Timeline {
|
||||
/// Obtain the possible timestamp range for the given lsn.
|
||||
///
|
||||
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
|
||||
pub async fn get_timestamp_for_lsn(
|
||||
pub(crate) async fn get_timestamp_for_lsn(
|
||||
&self,
|
||||
probe_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -532,7 +532,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get a list of SLRU segments
|
||||
pub async fn list_slru_segments(
|
||||
pub(crate) async fn list_slru_segments(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
version: Version<'_>,
|
||||
@@ -548,7 +548,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_relmap_file(
|
||||
pub(crate) async fn get_relmap_file(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -561,7 +561,7 @@ impl Timeline {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn list_dbdirs(
|
||||
pub(crate) async fn list_dbdirs(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -575,7 +575,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_twophase_file(
|
||||
pub(crate) async fn get_twophase_file(
|
||||
&self,
|
||||
xid: TransactionId,
|
||||
lsn: Lsn,
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn list_twophase_files(
|
||||
pub(crate) async fn list_twophase_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -600,7 +600,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_control_file(
|
||||
pub(crate) async fn get_control_file(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -608,7 +608,7 @@ impl Timeline {
|
||||
self.get(CONTROLFILE_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn get_checkpoint(
|
||||
pub(crate) async fn get_checkpoint(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -616,7 +616,7 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn list_aux_files(
|
||||
pub(crate) async fn list_aux_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -56,6 +56,7 @@ use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::TimelineResources;
|
||||
use self::timeline::WaitLsnError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
@@ -1758,7 +1759,15 @@ impl Tenant {
|
||||
// decoding the new WAL might need to look up previous pages, relation
|
||||
// sizes etc. and that would get confused if the previous page versions
|
||||
// are not in the repository yet.
|
||||
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
|
||||
ancestor_timeline
|
||||
.wait_lsn(*lsn, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
|
||||
CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
|
||||
}
|
||||
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
|
||||
})?;
|
||||
}
|
||||
|
||||
self.branch_timeline(
|
||||
|
||||
@@ -373,15 +373,20 @@ pub struct GcInfo {
|
||||
}
|
||||
|
||||
/// An error happened in a get() operation.
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum PageReconstructError {
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum PageReconstructError {
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
/// The operation was cancelled
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
/// The ancestor of this is being stopped
|
||||
#[error("ancestor timeline {0} is being stopped")]
|
||||
AncestorStopping(TimelineId),
|
||||
|
||||
/// An error happened replaying WAL records
|
||||
@@ -402,32 +407,6 @@ enum FlushLayerError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
Self::Other(err) => err.fmt(f),
|
||||
Self::Cancelled => write!(f, "cancelled"),
|
||||
Self::AncestorStopping(timeline_id) => {
|
||||
write!(f, "ancestor timeline {timeline_id} is being stopped")
|
||||
}
|
||||
Self::WalRedo(err) => err.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
Self::Other(err) => err.fmt(f),
|
||||
Self::Cancelled => write!(f, "cancelled"),
|
||||
Self::AncestorStopping(timeline_id) => {
|
||||
write!(f, "ancestor timeline {timeline_id} is being stopped")
|
||||
}
|
||||
Self::WalRedo(err) => err.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
@@ -452,6 +431,21 @@ impl std::fmt::Debug for Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum WaitLsnError {
|
||||
// Called on a timeline which is shutting down
|
||||
#[error("Shutdown")]
|
||||
Shutdown,
|
||||
|
||||
// Called on an timeline not in active state or shutting down
|
||||
#[error("Bad state (not active)")]
|
||||
BadState,
|
||||
|
||||
// Timeout expired while waiting for LSN to catch up with goal.
|
||||
#[error("{0}")]
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -486,7 +480,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn get(
|
||||
pub(crate) async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -634,24 +628,28 @@ impl Timeline {
|
||||
/// You should call this before any of the other get_* or list_* functions. Calling
|
||||
/// those functions with an LSN that has been processed yet is an error.
|
||||
///
|
||||
pub async fn wait_lsn(
|
||||
pub(crate) async fn wait_lsn(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
_ctx: &RequestContext, /* Prepare for use by cancellation */
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
|
||||
) -> Result<(), WaitLsnError> {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(WaitLsnError::Shutdown);
|
||||
} else if !self.is_active() {
|
||||
return Err(WaitLsnError::BadState);
|
||||
}
|
||||
|
||||
// This should never be called from the WAL receiver, because that could lead
|
||||
// to a deadlock.
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
@@ -665,18 +663,22 @@ impl Timeline {
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = self.walreceiver_status();
|
||||
Err(anyhow::Error::new(e).context({
|
||||
format!(
|
||||
use utils::seqwait::SeqWaitError::*;
|
||||
match e {
|
||||
Shutdown => Err(WaitLsnError::Shutdown),
|
||||
Timeout => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = self.walreceiver_status();
|
||||
Err(WaitLsnError::Timeout(format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
|
||||
lsn,
|
||||
self.get_last_record_lsn(),
|
||||
self.get_disk_consistent_lsn(),
|
||||
walreceiver_status,
|
||||
)
|
||||
}))
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2295,11 +2297,12 @@ impl Timeline {
|
||||
ancestor
|
||||
.wait_lsn(timeline.ancestor_lsn, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"wait for lsn {} on ancestor timeline_id={}",
|
||||
timeline.ancestor_lsn, ancestor.timeline_id
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
|
||||
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
|
||||
e @ WaitLsnError::BadState => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
timeline_owned = ancestor;
|
||||
@@ -4228,7 +4231,7 @@ impl Timeline {
|
||||
.context("Failed to reconstruct a page image:")
|
||||
{
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
|
||||
@@ -1053,26 +1053,34 @@ impl WalIngest {
|
||||
}
|
||||
};
|
||||
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
Ok(self
|
||||
.clear_visibility_map_bits_if_required(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(HeapamRecordSpecialTreatmentError::EmitWalRecord)?)
|
||||
// Clear the VM bits if required.
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
Ok(self
|
||||
.clear_visibility_map_bits(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(HeapamRecordSpecialTreatmentError::EmitWalRecord)?)
|
||||
} else {
|
||||
Ok(ClearVisibilityMapFlagsOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
async fn clear_visibility_map_bits_if_required(
|
||||
/// Write NeonWalRecord::ClearVisibilityMapFlags records for clearing the VM bits
|
||||
/// corresponding to new_heap_blkno and old_heap_blkno.
|
||||
async fn clear_visibility_map_bits(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
vm_rel: RelTag,
|
||||
@@ -1081,9 +1089,8 @@ impl WalIngest {
|
||||
flags: u8,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ClearVisibilityMapFlagsOutcome> {
|
||||
let new = new_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x)));
|
||||
let old = old_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x)));
|
||||
|
||||
// Determine the VM pages containing the old and the new heap block.
|
||||
//
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
@@ -1091,54 +1098,62 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let (new, old) = {
|
||||
let vm_size = if new.or(old).is_some() {
|
||||
Some(get_relsize(modification, vm_rel, ctx).await?)
|
||||
} else {
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
let heap_to_vm_blk = |heap_blkno| {
|
||||
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
|
||||
if vm_blk >= vm_size {
|
||||
None
|
||||
};
|
||||
let filter = |(heap_blk, vm_blk)| {
|
||||
let vm_size = vm_size.expect("we set it to Some() if new or old is Some()");
|
||||
if vm_blk >= vm_size {
|
||||
None
|
||||
} else {
|
||||
Some((heap_blk, vm_blk))
|
||||
}
|
||||
};
|
||||
(new.and_then(filter), old.and_then(filter))
|
||||
} else {
|
||||
Some(vm_blk)
|
||||
}
|
||||
};
|
||||
let new_vm_blk = new_heap_blkno.map(heap_to_vm_blk).flatten();
|
||||
let old_vm_blk = old_heap_blkno.map(heap_to_vm_blk).flatten();
|
||||
|
||||
let outcome = match (new, old) {
|
||||
(Some((new_heap_blkno, new_vm_blk)), Some((old_heap_blkno, old_vm_blk)))
|
||||
if new_vm_blk == old_vm_blk =>
|
||||
{
|
||||
let mut outcome = ClearVisibilityMapFlagsOutcome::Noop;
|
||||
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
||||
if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the
|
||||
// new page, both of which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk, // could also be old_vm_blk, they're the same
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
heap_blkno_1: Some(new_heap_blkno),
|
||||
heap_blkno_2: Some(old_heap_blkno),
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
ClearVisibilityMapFlagsOutcome::Stored
|
||||
}
|
||||
(new, old) => {
|
||||
// Emit one record per VM block that needs updating.
|
||||
let mut outcome = ClearVisibilityMapFlagsOutcome::Noop;
|
||||
for (heap_blkno, vm_blk) in [new, old].into_iter().flatten() {
|
||||
outcome = ClearVisibilityMapFlagsOutcome::Stored;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on
|
||||
// different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
vm_blk,
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
heap_blkno_1: Some(heap_blkno),
|
||||
heap_blkno_2: None,
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
outcome = ClearVisibilityMapFlagsOutcome::Stored;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
@@ -1146,7 +1161,6 @@ impl WalIngest {
|
||||
.await?;
|
||||
outcome = ClearVisibilityMapFlagsOutcome::Stored;
|
||||
}
|
||||
outcome
|
||||
}
|
||||
};
|
||||
anyhow::Ok(outcome)
|
||||
@@ -1244,7 +1258,7 @@ impl WalIngest {
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
Ok(self
|
||||
.clear_visibility_map_bits_if_required(
|
||||
.clear_visibility_map_bits(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_heap_blkno,
|
||||
|
||||
@@ -21,13 +21,10 @@ pub enum NeonWalRecord {
|
||||
/// Native PostgreSQL WAL record
|
||||
Postgres { will_init: bool, rec: Bytes },
|
||||
|
||||
/// Clear the bits specified in `flags` in the heap visibility map for the given heap blocks.
|
||||
///
|
||||
/// For example, for `{ heap_blkno_1: None, heap_blkno_2: Some(23), flags: 0b0010_0000}`
|
||||
/// redo will apply `&=0b0010_0000` on heap block 23's visibility map bitmask.
|
||||
/// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
|
||||
ClearVisibilityMapFlags {
|
||||
heap_blkno_1: Option<u32>,
|
||||
heap_blkno_2: Option<u32>,
|
||||
new_heap_blkno: Option<u32>,
|
||||
old_heap_blkno: Option<u32>,
|
||||
flags: u8,
|
||||
},
|
||||
/// Mark transaction IDs as committed on a CLOG page
|
||||
|
||||
@@ -403,8 +403,8 @@ impl PostgresRedoManager {
|
||||
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
|
||||
}
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
heap_blkno_1,
|
||||
heap_blkno_2,
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
} => {
|
||||
// sanity check that this is modifying the correct relation
|
||||
@@ -414,8 +414,11 @@ impl PostgresRedoManager {
|
||||
"ClearVisibilityMapFlags record on unexpected rel {}",
|
||||
rel
|
||||
);
|
||||
for heap_blkno in [heap_blkno_1, heap_blkno_2].into_iter().flatten() {
|
||||
let heap_blkno = *heap_blkno;
|
||||
|
||||
// Helper function to clear the VM bit corresponding to 'heap_blkno'.
|
||||
// (The logic is similar to the guts of the visibilitymap_clear() function
|
||||
// in PostgreSQL, after it has locked the right VM page.)
|
||||
let mut visibilitymap_clear = |heap_blkno| {
|
||||
// Calculate the VM block and offset that corresponds to the heap block.
|
||||
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
|
||||
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
|
||||
@@ -428,6 +431,12 @@ impl PostgresRedoManager {
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
};
|
||||
if let Some(heap_blkno) = *new_heap_blkno {
|
||||
visibilitymap_clear(heap_blkno);
|
||||
}
|
||||
if let Some(heap_blkno) = *old_heap_blkno {
|
||||
visibilitymap_clear(heap_blkno);
|
||||
}
|
||||
}
|
||||
// Non-relational WAL records are handled here, with custom code that has the
|
||||
|
||||
Reference in New Issue
Block a user