mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
feat(pageserver): more info on read errors
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -82,6 +82,49 @@ const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Information about a GetPage@LSN request.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GetPageRequestTraceInfo {
|
||||
request_lsn: Lsn,
|
||||
gc_cutoff_lsn: Lsn,
|
||||
start_time: SystemTime,
|
||||
original_start_time: Option<SystemTime>,
|
||||
gc_blocked_by_lsn_lease_deadline: bool,
|
||||
covered_by_lsn_lease: bool,
|
||||
merged: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for GetPageRequestTraceInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
use chrono::{DateTime, Utc};
|
||||
let start_time: DateTime<Utc> = self.start_time.into();
|
||||
let original_start_time: Option<DateTime<Utc>> = self.original_start_time.map(|t| t.into());
|
||||
write!(
|
||||
f,
|
||||
"request_lsn={}, start_time={}, gc_cutoff_lsn={}",
|
||||
self.request_lsn, start_time, self.gc_cutoff_lsn
|
||||
)?;
|
||||
if let Some(original_start_time) = original_start_time {
|
||||
write!(f, "original_start_time={},", original_start_time)?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"gc_blocked_by_lsn_lease_deadline={}, covered_by_lsn_lease={}, merged={}",
|
||||
self.gc_blocked_by_lsn_lease_deadline, self.covered_by_lsn_lease, self.merged
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPageRequestTraceInfo {
|
||||
pub fn merge(&mut self, other: &GetPageRequestTraceInfo) {
|
||||
if !self.merged {
|
||||
self.original_start_time = Some(self.start_time);
|
||||
}
|
||||
self.start_time = std::cmp::min(self.start_time, other.start_time);
|
||||
self.merged = true;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Listener {
|
||||
cancel: CancellationToken,
|
||||
/// Cancel the listener task through `listen_cancel` to shut down the listener
|
||||
@@ -507,8 +550,11 @@ enum PageStreamError {
|
||||
Shutdown,
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Read error")]
|
||||
Read(#[source] PageReconstructError),
|
||||
#[error("Read error: {0}")]
|
||||
Read(
|
||||
#[source] PageReconstructError,
|
||||
Option<GetPageRequestTraceInfo>,
|
||||
),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
@@ -528,7 +574,7 @@ impl From<PageReconstructError> for PageStreamError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::Shutdown,
|
||||
e => Self::Read(e),
|
||||
e => Self::Read(e, None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -612,6 +658,7 @@ enum BatchedFeMessage {
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
effective_request_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
trace_info: Option<GetPageRequestTraceInfo>,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
@@ -876,6 +923,16 @@ impl PageServerHandler {
|
||||
}};
|
||||
}
|
||||
|
||||
let mut trace_info = GetPageRequestTraceInfo {
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
gc_cutoff_lsn: Lsn::INVALID,
|
||||
start_time: SystemTime::now(),
|
||||
gc_blocked_by_lsn_lease_deadline: false,
|
||||
covered_by_lsn_lease: false,
|
||||
merged: false,
|
||||
original_start_time: None,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
@@ -922,6 +979,7 @@ impl PageServerHandler {
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
Some(&mut trace_info),
|
||||
ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
@@ -937,6 +995,7 @@ impl PageServerHandler {
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
trace_info: Some(trace_info),
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -981,12 +1040,14 @@ impl PageServerHandler {
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
trace_info: batched_trace_info,
|
||||
}),
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
trace_info: trace_info_to_batch,
|
||||
},
|
||||
) if (|| {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
@@ -1011,6 +1072,11 @@ impl PageServerHandler {
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
if let Some(trace_info_to_batch) = trace_info_to_batch {
|
||||
if let Some(batched_trace_info) = batched_trace_info {
|
||||
batched_trace_info.merge(&trace_info_to_batch);
|
||||
}
|
||||
}
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1143,7 +1209,21 @@ impl PageServerHandler {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
PageStreamError::Read(_, Some(trace_info)) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e.err);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}, {trace_info}")
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
})
|
||||
}
|
||||
PageStreamError::Read(_, None)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
@@ -1262,6 +1342,7 @@ impl PageServerHandler {
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
trace_info,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
(
|
||||
@@ -1274,6 +1355,7 @@ impl PageServerHandler {
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
io_concurrency,
|
||||
trace_info,
|
||||
ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
@@ -1726,6 +1808,7 @@ impl PageServerHandler {
|
||||
request_lsn: Lsn,
|
||||
not_modified_since: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
mut trace_info: Option<&mut GetPageRequestTraceInfo>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Lsn, PageStreamError> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
@@ -1752,16 +1835,33 @@ impl PageServerHandler {
|
||||
//
|
||||
// We may have older data available, but we make a best effort to detect this case and return an error,
|
||||
// to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
|
||||
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
|
||||
let gc_info = &timeline.gc_info.read().unwrap();
|
||||
if !gc_info.lsn_covered_by_lease(request_lsn) {
|
||||
return Err(
|
||||
PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
request_lsn, **latest_gc_cutoff_lsn
|
||||
).into())
|
||||
);
|
||||
let gc_cutoff_lsn = **latest_gc_cutoff_lsn;
|
||||
if !timeline.is_gc_blocked_by_lsn_lease_deadline() {
|
||||
if request_lsn < gc_cutoff_lsn {
|
||||
let gc_info = &timeline.gc_info.read().unwrap();
|
||||
if !gc_info.lsn_covered_by_lease(request_lsn) {
|
||||
return Err(
|
||||
PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
request_lsn, gc_cutoff_lsn
|
||||
).into())
|
||||
);
|
||||
}
|
||||
// The request is below the gc_cutoff, but it is covered by the lsn lease.
|
||||
if let Some(trace_info) = trace_info.as_mut() {
|
||||
trace_info.covered_by_lsn_lease = true;
|
||||
}
|
||||
}
|
||||
// Otherwise, the request is above the gc_cutoff.
|
||||
} else {
|
||||
// gc is blocked by lsn lease deadline, we don't do any check against gc_cutoff.
|
||||
if let Some(trace_info) = trace_info.as_mut() {
|
||||
trace_info.gc_blocked_by_lsn_lease_deadline = true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(trace_info) = trace_info.as_mut() {
|
||||
trace_info.gc_cutoff_lsn = gc_cutoff_lsn;
|
||||
}
|
||||
|
||||
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
|
||||
@@ -1829,12 +1929,6 @@ impl PageServerHandler {
|
||||
.to_string()
|
||||
});
|
||||
|
||||
info!(
|
||||
"acquired lease for {} until {}",
|
||||
lsn,
|
||||
valid_until_str.as_deref().unwrap_or("<unknown>")
|
||||
);
|
||||
|
||||
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
@@ -1858,6 +1952,7 @@ impl PageServerHandler {
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1885,6 +1980,7 @@ impl PageServerHandler {
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1912,6 +2008,7 @@ impl PageServerHandler {
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1934,6 +2031,7 @@ impl PageServerHandler {
|
||||
effective_lsn: Lsn,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
trace_info: Option<GetPageRequestTraceInfo>,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -1982,7 +2080,13 @@ impl PageServerHandler {
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
err: PageStreamError::from(e),
|
||||
err: {
|
||||
let mut err = PageStreamError::from(e);
|
||||
if let PageStreamError::Read(_, err_trace_info) = &mut err {
|
||||
*err_trace_info = trace_info.clone();
|
||||
}
|
||||
err
|
||||
},
|
||||
req: req.req.hdr,
|
||||
})
|
||||
}),
|
||||
@@ -2002,6 +2106,7 @@ impl PageServerHandler {
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user