Compare commits

...

13 Commits

Author SHA1 Message Date
Christian Schwarz
4a7ea3f533 [PRE-MERGE] hlinnaka/walingest-simplify-vm-flag-clearing-2
Squashed commit of the following:

commit b1d701dc06
Author: Heikki Linnakangas <heikki@neon.tech>
Date:   Thu Jan 4 18:23:59 2024 +0200

    Refactor generation of ClearVisibilityMapFlags records.

    With fewer mutable variables, for sake of clarity.

commit aa95a07d27
Author: Heikki Linnakangas <heikki@neon.tech>
Date:   Thu Jan 4 18:20:03 2024 +0200

    Refactor code to apply ClearVisibilityMapFlags records a little.

    To reduce the repetition.

commit 18e9208158
Author: John Spray <john@neon.tech>
Date:   Thu Jan 4 10:40:03 2024 +0000

    pageserver: improved error handling for shard routing error, timeline not found (#6262)

    ## Problem

    - When a client requests a key that isn't found in any shard on the node
    (edge case that only happens if a compute's config is out of date), we
    should prompt them to reconnect (as this includes a backoff), since they
    will not be able to complete the request until they eventually get a
    correct pageserver connection string.
    - QueryError::Other is used excessively: this contains a type-ambiguous
    anyhow::Error and is logged very verbosely (including backtrace).

    ## Summary of changes

    - Introduce PageStreamError to replace use of anyhow::Error in request
    handlers for getpage, etc.
    - Introduce Reconnect and NotFound variants to QueryError
    - Map the "shard routing error" case to PageStreamError::Reconnect ->
    QueryError::Reconnect
    - Update type conversions for LSN timeouts and tenant/timeline not found
    errors to use PageStreamError::NotFound->QueryError::NotFound
2024-01-04 17:08:24 +00:00
Christian Schwarz
c029203d47 [PRE-MERGE] walingest/walredo: simplify Visibility Map flag clearing code #6271 + manual work
Squashed commit of the following:

commit 70f993331c
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 13:07:22 2024 +0000

    clippy

commit c87c19a646
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 13:03:39 2024 +0000

    move the logic of emitting the clear visibility wal records into a common function

commit 92280727df
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:52:34 2024 +0000

    turns on ingest_neonrmgr_record is just copy-pasta, re-do copy-pasta

commit 31fc069482
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:48:49 2024 +0000

    fixup

commit 16090c876d
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:47:42 2024 +0000

    and now it's obvious that new_heap_blkno and old_heap_blkno really are independent

commit 02dc0db633
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:36:45 2024 +0000

    comments

commit 8e04de6ef9
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:36:35 2024 +0000

    fixup 'restructure match block to make the special case clear'

commit 0713f367d4
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:23:23 2024 +0000

    restructure match block to make the special case clear

commit 93d0f5e93d
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 12:14:31 2024 +0000

    lift up the vm_size checking logic

commit 20957d6c4e
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 11:54:08 2024 +0000

    lift up HEAPBLK_TO_MAPBLOCK call

commit f4de9adb1d
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 11:10:12 2024 +0000

    same for the Some,Some case

commit 98ee0d9012
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 11:05:26 2024 +0000

    propagate Some()-ness

commit 6933f5d089
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 10:57:19 2024 +0000

    transform the nested `if` into a flattened `match`

commit 853f77eb11
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 10:52:21 2024 +0000

    some constant propagation

commit ccfc9741f6
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 10:48:53 2024 +0000

    move vm_rel out of match

commit c6d09f8942
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 10:47:05 2024 +0000

    transform outermost `if` to a `match`

commit c8d36dab59
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu Jan 4 10:41:36 2024 +0000

    walredo: DRY ClearVisibilityMapFlags record handling
2024-01-04 16:46:52 +00:00
Christian Schwarz
925f365aa0 review pass: found two incorrect Noops 2024-01-04 13:15:48 +00:00
Christian Schwarz
c7e50a54da Merge remote-tracking branch 'origin/main' into problame/walingest-decoding-exhaustiveness-refs-iss-5962 2024-01-04 10:02:48 +00:00
Christian Schwarz
53c18ffe84 make clippy happy 2024-01-04 09:59:03 +00:00
Christian Schwarz
4c1c6f29cb ingest_heapam_record: preserve original bailing behavior 2024-01-04 09:48:32 +00:00
Heikki Linnakangas
f45c0be5ca make it compile 2024-01-04 00:14:08 +02:00
Christian Schwarz
3759a4898d more fixups, still doesn't compile 2024-01-03 18:15:30 +00:00
Christian Schwarz
1623918562 fixup ingest_heapam_record 2024-01-03 18:07:38 +00:00
Christian Schwarz
d1d3c0a6bb manually import more constants from postgres header files 2024-01-03 18:02:08 +00:00
Christian Schwarz
5a4765f1f2 rename types, fix stray ')', rustfmt 2024-01-03 17:46:08 +00:00
Heikki Linnakangas
7583e0a696 Fill in missing cases for WAL record types that we can safely ignore.
I did this by looking at all the possible record types in the Postgres
header files and adding cases for everything that wasn't already
handled. I didn't try to compile this.
2023-11-30 19:29:47 +02:00
Christian Schwarz
a6fb4f2687 walingest: prepare for exhaustive match and handling of no-ops
Use rust type system to force the "special treatment" block
to declare the expected outcome of special treatment.

The TODO comments will be addressed in future commits, for now
let's just encode our expectations.

This commit doesn't compile yet because a bunch of `else { ... }`
blocks are missing. I'll ask Postgres-savvy colleagues to fill
in here.
2023-11-30 16:37:46 +00:00
10 changed files with 800 additions and 337 deletions

View File

@@ -35,6 +35,12 @@ pub enum QueryError {
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Query handler indicated that client should reconnect
#[error("Server requested reconnect")]
Reconnect,
/// Query named an entity that was not found
#[error("Not found: {0}")]
NotFound(std::borrow::Cow<'static, str>),
/// Authentication failure
#[error("Unauthorized: {0}")]
Unauthorized(std::borrow::Cow<'static, str>),
@@ -54,9 +60,9 @@ impl From<io::Error> for QueryError {
impl QueryError {
pub fn pg_error_code(&self) -> &'static [u8; 5] {
match self {
Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure
Self::Disconnected(_) | Self::SimulatedConnectionError | Self::Reconnect => b"08006", // connection failure
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR,
Self::Unauthorized(_) | Self::NotFound(_) => SQLSTATE_INTERNAL_ERROR,
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
}
}
@@ -425,6 +431,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
info!("Stopped due to shutdown");
Ok(())
}
Err(QueryError::Reconnect) => {
// Dropping out of this loop implicitly disconnects
info!("Stopped due to handler reconnect request");
Ok(())
}
Err(QueryError::Disconnected(e)) => {
info!("Disconnected ({e:#})");
// Disconnection is not an error: we just use it that way internally to drop
@@ -974,7 +985,9 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I
pub fn short_error(e: &QueryError) -> String {
match e {
QueryError::Disconnected(connection_error) => connection_error.to_string(),
QueryError::Reconnect => "reconnect".to_string(),
QueryError::Shutdown => "shutdown".to_string(),
QueryError::NotFound(_) => "not found".to_string(),
QueryError::Unauthorized(_e) => "JWT authentication error".to_string(),
QueryError::SimulatedConnectionError => "simulated connection error".to_string(),
QueryError::Other(e) => format!("{e:#}"),
@@ -996,9 +1009,15 @@ fn log_query_error(query: &str, e: &QueryError) {
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")
}
QueryError::Reconnect => {
info!("query handler for '{query}' requested client to reconnect")
}
QueryError::Shutdown => {
info!("query handler for '{query}' cancelled during tenant shutdown")
}
QueryError::NotFound(reason) => {
info!("query handler for '{query}' entity not found: {reason}")
}
QueryError::Unauthorized(e) => {
warn!("query handler for '{query}' failed with authentication error: {e}");
}

View File

@@ -79,6 +79,8 @@ pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
pub const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
pub const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
@@ -103,12 +105,6 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
// From pg_control.h and rmgrlist.h
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
@@ -136,12 +132,20 @@ pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_TRUNCATE: u8 = 0x30;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP_CONFIRM: u8 = 0x50;
pub const XLOG_HEAP_LOCK: u8 = 0x60;
pub const XLOG_HEAP_INPLACE: u8 = 0x70;
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
pub const XLOG_HEAP2_REWRITE: u8 = 0x00;
pub const XLOG_HEAP2_PRUNE: u8 = 0x10;
pub const XLOG_HEAP2_VACUUM: u8 = 0x20;
pub const XLOG_HEAP2_FREEZE_PAGE: u8 = 0x30;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
pub const XLOG_HEAP2_NEW_CID: u8 = 0x70;
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
@@ -164,8 +168,21 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_BTREE_ID: u8 = 11;
pub const RM_HASH_ID: u8 = 12;
pub const RM_GIN_ID: u8 = 13;
pub const RM_GIST_ID: u8 = 14;
pub const RM_SEQ_ID: u8 = 15;
pub const RM_SPGIST_ID: u8 = 16;
pub const RM_BRIN_ID: u8 = 17;
pub const RM_COMMIT_TS_ID: u8 = 18;
pub const RM_REPLORIGIN_ID: u8 = 19;
pub const RM_GENERIC_ID: u8 = 20;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from relmapper.h
pub const XLOG_RELMAP_UPDATE: u8 = 0x0;
// from neon_rmgr.h
pub const RM_NEON_ID: u8 = 134;
@@ -215,8 +232,22 @@ pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* pg_control.h */
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLOG_NOOP: u8 = 0x20;
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_BACKUP_END: u8 = 0x50;
pub const XLOG_PARAMETER_CHANGE: u8 = 0x60;
pub const XLOG_RESTORE_POINT: u8 = 0x70;
pub const XLOG_FPW_CHANGE: u8 = 0x80;
pub const XLOG_END_OF_RECOVERY: u8 = 0x90;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
/* 0xC0 is used in Postgres 9.5-11 */
pub const XLOG_OVERWRITE_CONTRECORD: u8 = 0xD0;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -31,6 +31,9 @@ pub enum ApiError {
#[error("Shutting down")]
ShuttingDown,
#[error("Timeout")]
Timeout(Cow<'static, str>),
#[error(transparent)]
InternalServerError(anyhow::Error),
}
@@ -67,6 +70,10 @@ impl ApiError {
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::REQUEST_TIMEOUT,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -152,6 +152,7 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
}
}

View File

@@ -25,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
@@ -61,6 +62,9 @@ use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::trace::Tracer;
@@ -283,6 +287,64 @@ struct PageServerHandler {
connection_ctx: RequestContext,
}
#[derive(thiserror::Error, Debug)]
enum PageStreamError {
/// We encountered an error that should prompt the client to reconnect:
/// in practice this means we drop the connection without sending a response.
#[error("Reconnect required: {0}")]
Reconnect(Cow<'static, str>),
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Something went wrong reading a page: this likely indicates a pageserver bug
#[error("Read error: {0}")]
Read(PageReconstructError),
/// Ran out of time waiting for an LSN
#[error("LSN timeout: {0}")]
LsnTimeout(WaitLsnError),
/// The entity required to serve the request (tenant or timeline) is not found,
/// or is not found in a suitable state to serve a request.
#[error("Not found: {0}")]
NotFound(std::borrow::Cow<'static, str>),
/// Request asked for something that doesn't make sense, like an invalid LSN
#[error("Bad request: {0}")]
BadRequest(std::borrow::Cow<'static, str>),
}
impl From<PageReconstructError> for PageStreamError {
fn from(value: PageReconstructError) -> Self {
match value {
PageReconstructError::Cancelled => Self::Shutdown,
e => Self::Read(e),
}
}
}
impl From<GetActiveTimelineError> for PageStreamError {
fn from(value: GetActiveTimelineError) -> Self {
match value {
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
}
}
}
impl From<WaitLsnError> for PageStreamError {
fn from(value: WaitLsnError) -> Self {
match value {
e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
WaitLsnError::Shutdown => Self::Shutdown,
WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
}
}
}
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
@@ -428,7 +490,7 @@ impl PageServerHandler {
// Check that the timeline exists
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| anyhow::anyhow!(e))?;
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
// Avoid starting new requests if the timeline has already started shutting down,
// and block timeline shutdown until this request is complete, or drops out due
@@ -520,32 +582,44 @@ impl PageServerHandler {
}
};
if let Err(e) = &response {
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
match response {
Err(PageStreamError::Shutdown) => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
Err(PageStreamError::Reconnect(reason)) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
//
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
return Err(QueryError::Shutdown);
}
r => {
let response_msg = r.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
}
}
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
}
Ok(())
}
@@ -692,7 +766,7 @@ impl PageServerHandler {
latest: bool,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Lsn> {
) -> Result<Lsn, PageStreamError> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
@@ -723,15 +797,19 @@ impl PageServerHandler {
}
} else {
if lsn == Lsn(0) {
anyhow::bail!("invalid LSN(0) in request");
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
timeline.wait_lsn(lsn, ctx).await?;
}
anyhow::ensure!(
lsn >= **latest_gc_cutoff_lsn,
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
);
if lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
).into()));
}
Ok(lsn)
}
@@ -740,7 +818,7 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -760,7 +838,7 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -780,7 +858,7 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -807,7 +885,7 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -826,7 +904,7 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
) -> Result<PagestreamBeMessage, PageStreamError> {
let key = rel_block_to_key(req.rel, req.blkno);
if timeline.get_shard_identity().is_key_local(&key) {
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
@@ -849,24 +927,26 @@ impl PageServerHandler {
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node.
// TODO: this should be some kind of structured error that the client will understand,
// so that it can block until its config is updated: this error is expected in the case
// that the Tenant's shards' placements are being updated and the client hasn't been
// informed yet.
//
// https://github.com/neondatabase/neon/issues/6038
tracing::warn!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
return Err(anyhow::anyhow!("Request routed to wrong shard"));
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
// the GateGuard was already held over the whole connection.
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
let _timeline_guard = timeline
.gate
.enter()
.map_err(|_| PageStreamError::Shutdown)?;
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
.await
@@ -1011,9 +1091,7 @@ impl PageServerHandler {
)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
let timeline = tenant.get_timeline(timeline_id, true)?;
Ok(timeline)
}
}
@@ -1435,14 +1513,15 @@ enum GetActiveTimelineError {
#[error(transparent)]
Tenant(GetActiveTenantError),
#[error(transparent)]
Timeline(anyhow::Error),
Timeline(#[from] GetTimelineError),
}
impl From<GetActiveTimelineError> for QueryError {
fn from(e: GetActiveTimelineError) -> Self {
match e {
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
GetActiveTimelineError::Tenant(e) => e.into(),
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
}
}
}

View File

@@ -160,7 +160,7 @@ impl Timeline {
//------------------------------------------------------------------------------
/// Look up given page version.
pub async fn get_rel_page_at_lsn(
pub(crate) async fn get_rel_page_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
@@ -191,7 +191,7 @@ impl Timeline {
}
// Get size of a database in blocks
pub async fn get_db_size(
pub(crate) async fn get_db_size(
&self,
spcnode: Oid,
dbnode: Oid,
@@ -211,7 +211,7 @@ impl Timeline {
}
/// Get size of a relation file
pub async fn get_rel_size(
pub(crate) async fn get_rel_size(
&self,
tag: RelTag,
version: Version<'_>,
@@ -256,7 +256,7 @@ impl Timeline {
}
/// Does relation exist?
pub async fn get_rel_exists(
pub(crate) async fn get_rel_exists(
&self,
tag: RelTag,
version: Version<'_>,
@@ -291,7 +291,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn list_rels(
pub(crate) async fn list_rels(
&self,
spcnode: Oid,
dbnode: Oid,
@@ -319,7 +319,7 @@ impl Timeline {
}
/// Look up given SLRU page version.
pub async fn get_slru_page_at_lsn(
pub(crate) async fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
@@ -332,7 +332,7 @@ impl Timeline {
}
/// Get size of an SLRU segment
pub async fn get_slru_segment_size(
pub(crate) async fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
@@ -345,7 +345,7 @@ impl Timeline {
}
/// Get size of an SLRU segment
pub async fn get_slru_segment_exists(
pub(crate) async fn get_slru_segment_exists(
&self,
kind: SlruKind,
segno: u32,
@@ -372,7 +372,7 @@ impl Timeline {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
pub async fn find_lsn_for_timestamp(
pub(crate) async fn find_lsn_for_timestamp(
&self,
search_timestamp: TimestampTz,
cancel: &CancellationToken,
@@ -452,7 +452,7 @@ impl Timeline {
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
///
pub async fn is_latest_commit_timestamp_ge_than(
pub(crate) async fn is_latest_commit_timestamp_ge_than(
&self,
search_timestamp: TimestampTz,
probe_lsn: Lsn,
@@ -475,7 +475,7 @@ impl Timeline {
/// Obtain the possible timestamp range for the given lsn.
///
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
pub async fn get_timestamp_for_lsn(
pub(crate) async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
@@ -532,7 +532,7 @@ impl Timeline {
}
/// Get a list of SLRU segments
pub async fn list_slru_segments(
pub(crate) async fn list_slru_segments(
&self,
kind: SlruKind,
version: Version<'_>,
@@ -548,7 +548,7 @@ impl Timeline {
}
}
pub async fn get_relmap_file(
pub(crate) async fn get_relmap_file(
&self,
spcnode: Oid,
dbnode: Oid,
@@ -561,7 +561,7 @@ impl Timeline {
Ok(buf)
}
pub async fn list_dbdirs(
pub(crate) async fn list_dbdirs(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -575,7 +575,7 @@ impl Timeline {
}
}
pub async fn get_twophase_file(
pub(crate) async fn get_twophase_file(
&self,
xid: TransactionId,
lsn: Lsn,
@@ -586,7 +586,7 @@ impl Timeline {
Ok(buf)
}
pub async fn list_twophase_files(
pub(crate) async fn list_twophase_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -600,7 +600,7 @@ impl Timeline {
}
}
pub async fn get_control_file(
pub(crate) async fn get_control_file(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -608,7 +608,7 @@ impl Timeline {
self.get(CONTROLFILE_KEY, lsn, ctx).await
}
pub async fn get_checkpoint(
pub(crate) async fn get_checkpoint(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -616,7 +616,7 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
pub async fn list_aux_files(
pub(crate) async fn list_aux_files(
&self,
lsn: Lsn,
ctx: &RequestContext,

View File

@@ -56,6 +56,7 @@ use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
@@ -1758,7 +1759,15 @@ impl Tenant {
// decoding the new WAL might need to look up previous pages, relation
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
ancestor_timeline
.wait_lsn(*lsn, ctx)
.await
.map_err(|e| match e {
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
}
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
})?;
}
self.branch_timeline(

View File

@@ -373,15 +373,20 @@ pub struct GcInfo {
}
/// An error happened in a get() operation.
#[derive(thiserror::Error)]
pub enum PageReconstructError {
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
/// The operation was cancelled
#[error("Cancelled")]
Cancelled,
/// The ancestor of this is being stopped
#[error("ancestor timeline {0} is being stopped")]
AncestorStopping(TimelineId),
/// An error happened replaying WAL records
@@ -402,32 +407,6 @@ enum FlushLayerError {
Other(#[from] anyhow::Error),
}
impl std::fmt::Debug for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
}
impl std::fmt::Display for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
}
#[derive(Clone, Copy)]
pub enum LogicalSizeCalculationCause {
Initial,
@@ -452,6 +431,21 @@ impl std::fmt::Debug for Timeline {
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
Shutdown,
// Called on an timeline not in active state or shutting down
#[error("Bad state (not active)")]
BadState,
// Timeout expired while waiting for LSN to catch up with goal.
#[error("{0}")]
Timeout(String),
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -486,7 +480,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get(
pub(crate) async fn get(
&self,
key: Key,
lsn: Lsn,
@@ -634,24 +628,28 @@ impl Timeline {
/// You should call this before any of the other get_* or list_* functions. Calling
/// those functions with an LSN that has been processed yet is an error.
///
pub async fn wait_lsn(
pub(crate) async fn wait_lsn(
&self,
lsn: Lsn,
_ctx: &RequestContext, /* Prepare for use by cancellation */
) -> anyhow::Result<()> {
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
) -> Result<(), WaitLsnError> {
if self.cancel.is_cancelled() {
return Err(WaitLsnError::Shutdown);
} else if !self.is_active() {
return Err(WaitLsnError::BadState);
}
// This should never be called from the WAL receiver, because that could lead
// to a deadlock.
anyhow::ensure!(
debug_assert!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
"wait_lsn cannot be called in WAL receiver"
);
anyhow::ensure!(
debug_assert!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
"wait_lsn cannot be called in WAL receiver"
);
anyhow::ensure!(
debug_assert!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
"wait_lsn cannot be called in WAL receiver"
);
@@ -665,18 +663,22 @@ impl Timeline {
{
Ok(()) => Ok(()),
Err(e) => {
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = self.walreceiver_status();
Err(anyhow::Error::new(e).context({
format!(
use utils::seqwait::SeqWaitError::*;
match e {
Shutdown => Err(WaitLsnError::Shutdown),
Timeout => {
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = self.walreceiver_status();
Err(WaitLsnError::Timeout(format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
lsn,
self.get_last_record_lsn(),
self.get_disk_consistent_lsn(),
walreceiver_status,
)
}))
)))
}
}
}
}
}
@@ -2295,11 +2297,12 @@ impl Timeline {
ancestor
.wait_lsn(timeline.ancestor_lsn, ctx)
.await
.with_context(|| {
format!(
"wait for lsn {} on ancestor timeline_id={}",
timeline.ancestor_lsn, ancestor.timeline_id
)
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
e @ WaitLsnError::BadState => {
PageReconstructError::Other(anyhow::anyhow!(e))
}
})?;
timeline_owned = ancestor;
@@ -4228,7 +4231,7 @@ impl Timeline {
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::from(e)),
Err(e) => return Err(PageReconstructError::WalRedo(e)),
};
if img.len() == page_cache::PAGE_SZ {

File diff suppressed because it is too large Load Diff

View File

@@ -414,7 +414,11 @@ impl PostgresRedoManager {
"ClearVisibilityMapFlags record on unexpected rel {}",
rel
);
if let Some(heap_blkno) = *new_heap_blkno {
// Helper function to clear the VM bit corresponding to 'heap_blkno'.
// (The logic is similar to the guts of the visibilitymap_clear() function
// in PostgreSQL, after it has locked the right VM page.)
let mut visibilitymap_clear = |heap_blkno| {
// Calculate the VM block and offset that corresponds to the heap block.
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
@@ -427,19 +431,12 @@ impl PostgresRedoManager {
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
map[map_byte as usize] &= !(flags << map_offset);
};
if let Some(heap_blkno) = *new_heap_blkno {
visibilitymap_clear(heap_blkno);
}
// Repeat for 'old_heap_blkno', if any
if let Some(heap_blkno) = *old_heap_blkno {
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
assert!(map_block == blknum);
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
map[map_byte as usize] &= !(flags << map_offset);
visibilitymap_clear(heap_blkno);
}
}
// Non-relational WAL records are handled here, with custom code that has the