mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
13 Commits
conrad/jso
...
problame/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a7ea3f533 | ||
|
|
c029203d47 | ||
|
|
925f365aa0 | ||
|
|
c7e50a54da | ||
|
|
53c18ffe84 | ||
|
|
4c1c6f29cb | ||
|
|
f45c0be5ca | ||
|
|
3759a4898d | ||
|
|
1623918562 | ||
|
|
d1d3c0a6bb | ||
|
|
5a4765f1f2 | ||
|
|
7583e0a696 | ||
|
|
a6fb4f2687 |
@@ -35,6 +35,12 @@ pub enum QueryError {
|
||||
/// We were instructed to shutdown while processing the query
|
||||
#[error("Shutting down")]
|
||||
Shutdown,
|
||||
/// Query handler indicated that client should reconnect
|
||||
#[error("Server requested reconnect")]
|
||||
Reconnect,
|
||||
/// Query named an entity that was not found
|
||||
#[error("Not found: {0}")]
|
||||
NotFound(std::borrow::Cow<'static, str>),
|
||||
/// Authentication failure
|
||||
#[error("Unauthorized: {0}")]
|
||||
Unauthorized(std::borrow::Cow<'static, str>),
|
||||
@@ -54,9 +60,9 @@ impl From<io::Error> for QueryError {
|
||||
impl QueryError {
|
||||
pub fn pg_error_code(&self) -> &'static [u8; 5] {
|
||||
match self {
|
||||
Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure
|
||||
Self::Disconnected(_) | Self::SimulatedConnectionError | Self::Reconnect => b"08006", // connection failure
|
||||
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
|
||||
Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR,
|
||||
Self::Unauthorized(_) | Self::NotFound(_) => SQLSTATE_INTERNAL_ERROR,
|
||||
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
|
||||
}
|
||||
}
|
||||
@@ -425,6 +431,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
info!("Stopped due to shutdown");
|
||||
Ok(())
|
||||
}
|
||||
Err(QueryError::Reconnect) => {
|
||||
// Dropping out of this loop implicitly disconnects
|
||||
info!("Stopped due to handler reconnect request");
|
||||
Ok(())
|
||||
}
|
||||
Err(QueryError::Disconnected(e)) => {
|
||||
info!("Disconnected ({e:#})");
|
||||
// Disconnection is not an error: we just use it that way internally to drop
|
||||
@@ -974,7 +985,9 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I
|
||||
pub fn short_error(e: &QueryError) -> String {
|
||||
match e {
|
||||
QueryError::Disconnected(connection_error) => connection_error.to_string(),
|
||||
QueryError::Reconnect => "reconnect".to_string(),
|
||||
QueryError::Shutdown => "shutdown".to_string(),
|
||||
QueryError::NotFound(_) => "not found".to_string(),
|
||||
QueryError::Unauthorized(_e) => "JWT authentication error".to_string(),
|
||||
QueryError::SimulatedConnectionError => "simulated connection error".to_string(),
|
||||
QueryError::Other(e) => format!("{e:#}"),
|
||||
@@ -996,9 +1009,15 @@ fn log_query_error(query: &str, e: &QueryError) {
|
||||
QueryError::SimulatedConnectionError => {
|
||||
error!("query handler for query '{query}' failed due to a simulated connection error")
|
||||
}
|
||||
QueryError::Reconnect => {
|
||||
info!("query handler for '{query}' requested client to reconnect")
|
||||
}
|
||||
QueryError::Shutdown => {
|
||||
info!("query handler for '{query}' cancelled during tenant shutdown")
|
||||
}
|
||||
QueryError::NotFound(reason) => {
|
||||
info!("query handler for '{query}' entity not found: {reason}")
|
||||
}
|
||||
QueryError::Unauthorized(e) => {
|
||||
warn!("query handler for '{query}' failed with authentication error: {e}");
|
||||
}
|
||||
|
||||
@@ -79,6 +79,8 @@ pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
pub const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
|
||||
pub const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
|
||||
|
||||
// From srlu.h
|
||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||
@@ -103,12 +105,6 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
||||
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
|
||||
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_NEXTOID: u8 = 0x30;
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
|
||||
pub const XLOG_FPI: u8 = 0xB0;
|
||||
|
||||
// From multixact.h
|
||||
pub const FIRST_MULTIXACT_ID: u32 = 1;
|
||||
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
|
||||
@@ -136,12 +132,20 @@ pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
|
||||
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
||||
pub const XLOG_HEAP_TRUNCATE: u8 = 0x30;
|
||||
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
||||
pub const XLOG_HEAP_CONFIRM: u8 = 0x50;
|
||||
pub const XLOG_HEAP_LOCK: u8 = 0x60;
|
||||
pub const XLOG_HEAP_INPLACE: u8 = 0x70;
|
||||
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
|
||||
pub const XLOG_HEAP2_REWRITE: u8 = 0x00;
|
||||
pub const XLOG_HEAP2_PRUNE: u8 = 0x10;
|
||||
pub const XLOG_HEAP2_VACUUM: u8 = 0x20;
|
||||
pub const XLOG_HEAP2_FREEZE_PAGE: u8 = 0x30;
|
||||
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
||||
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
||||
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
|
||||
pub const XLOG_HEAP2_NEW_CID: u8 = 0x70;
|
||||
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
|
||||
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
||||
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||
@@ -164,8 +168,21 @@ pub const RM_RELMAP_ID: u8 = 7;
|
||||
pub const RM_STANDBY_ID: u8 = 8;
|
||||
pub const RM_HEAP2_ID: u8 = 9;
|
||||
pub const RM_HEAP_ID: u8 = 10;
|
||||
pub const RM_BTREE_ID: u8 = 11;
|
||||
pub const RM_HASH_ID: u8 = 12;
|
||||
pub const RM_GIN_ID: u8 = 13;
|
||||
pub const RM_GIST_ID: u8 = 14;
|
||||
pub const RM_SEQ_ID: u8 = 15;
|
||||
pub const RM_SPGIST_ID: u8 = 16;
|
||||
pub const RM_BRIN_ID: u8 = 17;
|
||||
pub const RM_COMMIT_TS_ID: u8 = 18;
|
||||
pub const RM_REPLORIGIN_ID: u8 = 19;
|
||||
pub const RM_GENERIC_ID: u8 = 20;
|
||||
pub const RM_LOGICALMSG_ID: u8 = 21;
|
||||
|
||||
// from relmapper.h
|
||||
pub const XLOG_RELMAP_UPDATE: u8 = 0x0;
|
||||
|
||||
// from neon_rmgr.h
|
||||
pub const RM_NEON_ID: u8 = 134;
|
||||
|
||||
@@ -215,8 +232,22 @@ pub const INVALID_TRANSACTION_ID: u32 = 0;
|
||||
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
|
||||
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
|
||||
|
||||
/* pg_control.h */
|
||||
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
||||
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||
pub const XLOG_NOOP: u8 = 0x20;
|
||||
pub const XLOG_NEXTOID: u8 = 0x30;
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_BACKUP_END: u8 = 0x50;
|
||||
pub const XLOG_PARAMETER_CHANGE: u8 = 0x60;
|
||||
pub const XLOG_RESTORE_POINT: u8 = 0x70;
|
||||
pub const XLOG_FPW_CHANGE: u8 = 0x80;
|
||||
pub const XLOG_END_OF_RECOVERY: u8 = 0x90;
|
||||
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
|
||||
pub const XLOG_FPI: u8 = 0xB0;
|
||||
/* 0xC0 is used in Postgres 9.5-11 */
|
||||
pub const XLOG_OVERWRITE_CONTRECORD: u8 = 0xD0;
|
||||
|
||||
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||
|
||||
|
||||
@@ -31,6 +31,9 @@ pub enum ApiError {
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
#[error("Timeout")]
|
||||
Timeout(Cow<'static, str>),
|
||||
|
||||
#[error(transparent)]
|
||||
InternalServerError(anyhow::Error),
|
||||
}
|
||||
@@ -67,6 +70,10 @@ impl ApiError {
|
||||
err.to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::REQUEST_TIMEOUT,
|
||||
),
|
||||
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
|
||||
@@ -152,6 +152,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
|
||||
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
@@ -61,6 +62,9 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
@@ -283,6 +287,64 @@ struct PageServerHandler {
|
||||
connection_ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum PageStreamError {
|
||||
/// We encountered an error that should prompt the client to reconnect:
|
||||
/// in practice this means we drop the connection without sending a response.
|
||||
#[error("Reconnect required: {0}")]
|
||||
Reconnect(Cow<'static, str>),
|
||||
|
||||
/// We were instructed to shutdown while processing the query
|
||||
#[error("Shutting down")]
|
||||
Shutdown,
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Read error: {0}")]
|
||||
Read(PageReconstructError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
|
||||
/// The entity required to serve the request (tenant or timeline) is not found,
|
||||
/// or is not found in a suitable state to serve a request.
|
||||
#[error("Not found: {0}")]
|
||||
NotFound(std::borrow::Cow<'static, str>),
|
||||
|
||||
/// Request asked for something that doesn't make sense, like an invalid LSN
|
||||
#[error("Bad request: {0}")]
|
||||
BadRequest(std::borrow::Cow<'static, str>),
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for PageStreamError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::Shutdown,
|
||||
e => Self::Read(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
|
||||
GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
|
||||
GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WaitLsnError> for PageStreamError {
|
||||
fn from(value: WaitLsnError) -> Self {
|
||||
match value {
|
||||
e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
|
||||
WaitLsnError::Shutdown => Self::Shutdown,
|
||||
WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -428,7 +490,7 @@ impl PageServerHandler {
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
|
||||
|
||||
// Avoid starting new requests if the timeline has already started shutting down,
|
||||
// and block timeline shutdown until this request is complete, or drops out due
|
||||
@@ -520,32 +582,44 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
match response {
|
||||
Err(PageStreamError::Shutdown) => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
Err(PageStreamError::Reconnect(reason)) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
|
||||
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
|
||||
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
|
||||
//
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
r => {
|
||||
let response_msg = r.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -692,7 +766,7 @@ impl PageServerHandler {
|
||||
latest: bool,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
) -> Result<Lsn, PageStreamError> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
// to the page server that there have been no modifications to the
|
||||
@@ -723,15 +797,19 @@ impl PageServerHandler {
|
||||
}
|
||||
} else {
|
||||
if lsn == Lsn(0) {
|
||||
anyhow::bail!("invalid LSN(0) in request");
|
||||
return Err(PageStreamError::BadRequest(
|
||||
"invalid LSN(0) in request".into(),
|
||||
));
|
||||
}
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
}
|
||||
anyhow::ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
);
|
||||
|
||||
if lsn < **latest_gc_cutoff_lsn {
|
||||
return Err(PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
).into()));
|
||||
}
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
@@ -740,7 +818,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -760,7 +838,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -780,7 +858,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -807,7 +885,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -826,7 +904,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
if timeline.get_shard_identity().is_key_local(&key) {
|
||||
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
|
||||
@@ -849,24 +927,26 @@ impl PageServerHandler {
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node.
|
||||
|
||||
// TODO: this should be some kind of structured error that the client will understand,
|
||||
// so that it can block until its config is updated: this error is expected in the case
|
||||
// that the Tenant's shards' placements are being updated and the client hasn't been
|
||||
// informed yet.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/6038
|
||||
tracing::warn!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
|
||||
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
|
||||
return Err(anyhow::anyhow!("Request routed to wrong shard"));
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
|
||||
// the GateGuard was already held over the whole connection.
|
||||
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
|
||||
let _timeline_guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| PageStreamError::Shutdown)?;
|
||||
|
||||
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
|
||||
.await
|
||||
@@ -1011,9 +1091,7 @@ impl PageServerHandler {
|
||||
)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
Ok(timeline)
|
||||
}
|
||||
}
|
||||
@@ -1435,14 +1513,15 @@ enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
Tenant(GetActiveTenantError),
|
||||
#[error(transparent)]
|
||||
Timeline(anyhow::Error),
|
||||
Timeline(#[from] GetTimelineError),
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for QueryError {
|
||||
fn from(e: GetActiveTimelineError) -> Self {
|
||||
match e {
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
|
||||
GetActiveTimelineError::Tenant(e) => e.into(),
|
||||
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
|
||||
GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Look up given page version.
|
||||
pub async fn get_rel_page_at_lsn(
|
||||
pub(crate) async fn get_rel_page_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
@@ -191,7 +191,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub async fn get_db_size(
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -211,7 +211,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
pub async fn get_rel_size(
|
||||
pub(crate) async fn get_rel_size(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
@@ -256,7 +256,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
pub async fn get_rel_exists(
|
||||
pub(crate) async fn get_rel_exists(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
@@ -291,7 +291,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn list_rels(
|
||||
pub(crate) async fn list_rels(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -319,7 +319,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Look up given SLRU page version.
|
||||
pub async fn get_slru_page_at_lsn(
|
||||
pub(crate) async fn get_slru_page_at_lsn(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -332,7 +332,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
pub async fn get_slru_segment_size(
|
||||
pub(crate) async fn get_slru_segment_size(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -345,7 +345,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
pub async fn get_slru_segment_exists(
|
||||
pub(crate) async fn get_slru_segment_exists(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
@@ -372,7 +372,7 @@ impl Timeline {
|
||||
/// so it's not well defined which LSN you get if there were multiple commits
|
||||
/// "in flight" at that point in time.
|
||||
///
|
||||
pub async fn find_lsn_for_timestamp(
|
||||
pub(crate) async fn find_lsn_for_timestamp(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
cancel: &CancellationToken,
|
||||
@@ -452,7 +452,7 @@ impl Timeline {
|
||||
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
|
||||
/// with a smaller/larger timestamp.
|
||||
///
|
||||
pub async fn is_latest_commit_timestamp_ge_than(
|
||||
pub(crate) async fn is_latest_commit_timestamp_ge_than(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
probe_lsn: Lsn,
|
||||
@@ -475,7 +475,7 @@ impl Timeline {
|
||||
/// Obtain the possible timestamp range for the given lsn.
|
||||
///
|
||||
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
|
||||
pub async fn get_timestamp_for_lsn(
|
||||
pub(crate) async fn get_timestamp_for_lsn(
|
||||
&self,
|
||||
probe_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -532,7 +532,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get a list of SLRU segments
|
||||
pub async fn list_slru_segments(
|
||||
pub(crate) async fn list_slru_segments(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
version: Version<'_>,
|
||||
@@ -548,7 +548,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_relmap_file(
|
||||
pub(crate) async fn get_relmap_file(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
@@ -561,7 +561,7 @@ impl Timeline {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn list_dbdirs(
|
||||
pub(crate) async fn list_dbdirs(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -575,7 +575,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_twophase_file(
|
||||
pub(crate) async fn get_twophase_file(
|
||||
&self,
|
||||
xid: TransactionId,
|
||||
lsn: Lsn,
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn list_twophase_files(
|
||||
pub(crate) async fn list_twophase_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -600,7 +600,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_control_file(
|
||||
pub(crate) async fn get_control_file(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -608,7 +608,7 @@ impl Timeline {
|
||||
self.get(CONTROLFILE_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn get_checkpoint(
|
||||
pub(crate) async fn get_checkpoint(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -616,7 +616,7 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn list_aux_files(
|
||||
pub(crate) async fn list_aux_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -56,6 +56,7 @@ use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::TimelineResources;
|
||||
use self::timeline::WaitLsnError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
@@ -1758,7 +1759,15 @@ impl Tenant {
|
||||
// decoding the new WAL might need to look up previous pages, relation
|
||||
// sizes etc. and that would get confused if the previous page versions
|
||||
// are not in the repository yet.
|
||||
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
|
||||
ancestor_timeline
|
||||
.wait_lsn(*lsn, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
|
||||
CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
|
||||
}
|
||||
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
|
||||
})?;
|
||||
}
|
||||
|
||||
self.branch_timeline(
|
||||
|
||||
@@ -373,15 +373,20 @@ pub struct GcInfo {
|
||||
}
|
||||
|
||||
/// An error happened in a get() operation.
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum PageReconstructError {
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum PageReconstructError {
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
/// The operation was cancelled
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
/// The ancestor of this is being stopped
|
||||
#[error("ancestor timeline {0} is being stopped")]
|
||||
AncestorStopping(TimelineId),
|
||||
|
||||
/// An error happened replaying WAL records
|
||||
@@ -402,32 +407,6 @@ enum FlushLayerError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
Self::Other(err) => err.fmt(f),
|
||||
Self::Cancelled => write!(f, "cancelled"),
|
||||
Self::AncestorStopping(timeline_id) => {
|
||||
write!(f, "ancestor timeline {timeline_id} is being stopped")
|
||||
}
|
||||
Self::WalRedo(err) => err.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
Self::Other(err) => err.fmt(f),
|
||||
Self::Cancelled => write!(f, "cancelled"),
|
||||
Self::AncestorStopping(timeline_id) => {
|
||||
write!(f, "ancestor timeline {timeline_id} is being stopped")
|
||||
}
|
||||
Self::WalRedo(err) => err.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
@@ -452,6 +431,21 @@ impl std::fmt::Debug for Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum WaitLsnError {
|
||||
// Called on a timeline which is shutting down
|
||||
#[error("Shutdown")]
|
||||
Shutdown,
|
||||
|
||||
// Called on an timeline not in active state or shutting down
|
||||
#[error("Bad state (not active)")]
|
||||
BadState,
|
||||
|
||||
// Timeout expired while waiting for LSN to catch up with goal.
|
||||
#[error("{0}")]
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -486,7 +480,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn get(
|
||||
pub(crate) async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -634,24 +628,28 @@ impl Timeline {
|
||||
/// You should call this before any of the other get_* or list_* functions. Calling
|
||||
/// those functions with an LSN that has been processed yet is an error.
|
||||
///
|
||||
pub async fn wait_lsn(
|
||||
pub(crate) async fn wait_lsn(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
_ctx: &RequestContext, /* Prepare for use by cancellation */
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
|
||||
) -> Result<(), WaitLsnError> {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(WaitLsnError::Shutdown);
|
||||
} else if !self.is_active() {
|
||||
return Err(WaitLsnError::BadState);
|
||||
}
|
||||
|
||||
// This should never be called from the WAL receiver, because that could lead
|
||||
// to a deadlock.
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
@@ -665,18 +663,22 @@ impl Timeline {
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = self.walreceiver_status();
|
||||
Err(anyhow::Error::new(e).context({
|
||||
format!(
|
||||
use utils::seqwait::SeqWaitError::*;
|
||||
match e {
|
||||
Shutdown => Err(WaitLsnError::Shutdown),
|
||||
Timeout => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = self.walreceiver_status();
|
||||
Err(WaitLsnError::Timeout(format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
|
||||
lsn,
|
||||
self.get_last_record_lsn(),
|
||||
self.get_disk_consistent_lsn(),
|
||||
walreceiver_status,
|
||||
)
|
||||
}))
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2295,11 +2297,12 @@ impl Timeline {
|
||||
ancestor
|
||||
.wait_lsn(timeline.ancestor_lsn, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"wait for lsn {} on ancestor timeline_id={}",
|
||||
timeline.ancestor_lsn, ancestor.timeline_id
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
|
||||
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
|
||||
e @ WaitLsnError::BadState => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
timeline_owned = ancestor;
|
||||
@@ -4228,7 +4231,7 @@ impl Timeline {
|
||||
.context("Failed to reconstruct a page image:")
|
||||
{
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -414,7 +414,11 @@ impl PostgresRedoManager {
|
||||
"ClearVisibilityMapFlags record on unexpected rel {}",
|
||||
rel
|
||||
);
|
||||
if let Some(heap_blkno) = *new_heap_blkno {
|
||||
|
||||
// Helper function to clear the VM bit corresponding to 'heap_blkno'.
|
||||
// (The logic is similar to the guts of the visibilitymap_clear() function
|
||||
// in PostgreSQL, after it has locked the right VM page.)
|
||||
let mut visibilitymap_clear = |heap_blkno| {
|
||||
// Calculate the VM block and offset that corresponds to the heap block.
|
||||
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
|
||||
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
|
||||
@@ -427,19 +431,12 @@ impl PostgresRedoManager {
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
};
|
||||
if let Some(heap_blkno) = *new_heap_blkno {
|
||||
visibilitymap_clear(heap_blkno);
|
||||
}
|
||||
|
||||
// Repeat for 'old_heap_blkno', if any
|
||||
if let Some(heap_blkno) = *old_heap_blkno {
|
||||
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
|
||||
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
|
||||
let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
|
||||
|
||||
assert!(map_block == blknum);
|
||||
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
visibilitymap_clear(heap_blkno);
|
||||
}
|
||||
}
|
||||
// Non-relational WAL records are handled here, with custom code that has the
|
||||
|
||||
Reference in New Issue
Block a user