mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-25 14:20:38 +00:00
Compare commits
14 Commits
hackathon/
...
vlad/test-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2cb10590e | ||
|
|
2923fd2a5b | ||
|
|
2a5336b9ab | ||
|
|
6f20726610 | ||
|
|
29f741e1e9 | ||
|
|
2b37a40079 | ||
|
|
af2b65a2fb | ||
|
|
5d194c7824 | ||
|
|
ac2702afd3 | ||
|
|
88fd46d795 | ||
|
|
2d6763882e | ||
|
|
c0c23cde72 | ||
|
|
942bc9544b | ||
|
|
02b7cdb305 |
@@ -142,11 +142,16 @@ impl PagestreamClient {
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
for i in 0..10 {
|
||||
let mut req = tokio_stream::once(Ok(req.clone()));
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
}
|
||||
|
||||
for i in 0..9 {
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
}
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
|
||||
@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
elapsed
|
||||
}
|
||||
};
|
||||
self.global_metric.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(timeline_metric) = self.timeline_metric {
|
||||
timeline_metric.observe(ex_throttled.as_secs_f64());
|
||||
for _ in 0..self.count {
|
||||
self.global_metric.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(timeline_metric) = self.timeline_metric {
|
||||
timeline_metric.observe(ex_throttled.as_secs_f64());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + '_> {
|
||||
self.start_timer_many(op, 1, ctx)
|
||||
}
|
||||
pub(crate) fn start_timer_many<'c: 'a, 'a>(
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + '_> {
|
||||
let global_metric = &self.global_metrics[op as usize];
|
||||
let start = Instant::now();
|
||||
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
|
||||
ctx,
|
||||
start,
|
||||
op,
|
||||
count,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
|
||||
Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_consecutive_nonblocking_getpage_requests",
|
||||
"Number of consecutive nonblocking getpage requests",
|
||||
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||
let _guard = SERIALIZE.lock().unwrap();
|
||||
|
||||
@@ -5,14 +5,14 @@ use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::TenantState;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use pageserver_api::models::{self, TenantState};
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
|
||||
PagestreamNblocksResponse, PagestreamProtocolVersion,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
|
||||
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamProtocolVersion,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
@@ -43,7 +43,7 @@ use crate::basebackup;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics;
|
||||
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
@@ -577,124 +577,317 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => break,
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unexpected message: {m:?} during COPY"
|
||||
)));
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
let mut batched = None;
|
||||
'outer: loop {
|
||||
enum DebouncedFeMessage {
|
||||
Exists(models::PagestreamExistsRequest),
|
||||
Nblocks(models::PagestreamNblocksRequest),
|
||||
GetPage {
|
||||
span: Span,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
effective_request_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
},
|
||||
DbSize(models::PagestreamDbSizeRequest),
|
||||
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
|
||||
RespondError(Span, PageStreamError),
|
||||
}
|
||||
let mut debounce: Option<std::time::Instant> = None;
|
||||
// return or `?` on protocol error
|
||||
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
|
||||
let next_batched: Option<DebouncedFeMessage> = loop {
|
||||
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
|
||||
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
|
||||
.unwrap()
|
||||
.into()
|
||||
});
|
||||
let sleep_fut = if let Some(started_at) = debounce {
|
||||
futures::future::Either::Left(tokio::time::sleep_until(
|
||||
(started_at + *BOUNCE_TIMEOUT).into(),
|
||||
))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::future::pending())
|
||||
};
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => {
|
||||
msg
|
||||
}
|
||||
_ = sleep_fut => {
|
||||
assert!(batched.is_some());
|
||||
break None;
|
||||
}
|
||||
};
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => break 'outer,
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unexpected message: {m:?} during COPY"
|
||||
)));
|
||||
}
|
||||
None => break 'outer, // client disconnected
|
||||
};
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
// parse request
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
// parse request
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
let this_msg = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
|
||||
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
|
||||
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
|
||||
PagestreamFeMessage::GetSlruSegment(msg) => {
|
||||
DebouncedFeMessage::GetSlruSegment(msg)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
rel,
|
||||
blkno,
|
||||
}) => {
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
|
||||
let key = rel_block_to_key(rel, blkno);
|
||||
let shard = match self
|
||||
.timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(GetActiveTimelineError::Tenant(
|
||||
GetActiveTenantError::NotFound(_),
|
||||
)) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
break Some(DebouncedFeMessage::RespondError(
|
||||
span,
|
||||
PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
),
|
||||
));
|
||||
}
|
||||
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
|
||||
};
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
&shard.get_latest_gc_cutoff_lsn(),
|
||||
&ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
break Some(DebouncedFeMessage::RespondError(span, e));
|
||||
}
|
||||
};
|
||||
DebouncedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![(rel, blkno)],
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// check if we can debounce
|
||||
match (&mut batched, this_msg) {
|
||||
(None, this_msg) => {
|
||||
batched = Some(this_msg);
|
||||
}
|
||||
(
|
||||
Some(DebouncedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
DebouncedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
},
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logig for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
return *accum_lsn == this_lsn;
|
||||
}
|
||||
.await =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
}
|
||||
(Some(_), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
break Some(this_msg);
|
||||
}
|
||||
}
|
||||
|
||||
// debounce impl piece
|
||||
let started_at = debounce.get_or_insert_with(Instant::now);
|
||||
if started_at.elapsed() > *BOUNCE_TIMEOUT {
|
||||
break None;
|
||||
}
|
||||
};
|
||||
|
||||
// invoke handler function
|
||||
let (handler_result, span) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let (handler_results, span): (
|
||||
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
|
||||
_,
|
||||
) = match batched.take().expect("loop above ensures this") {
|
||||
DebouncedFeMessage::Exists(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
DebouncedFeMessage::Nblocks(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
DebouncedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
} => {
|
||||
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
|
||||
span.record("batch_size", pages.len() as u64);
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
{
|
||||
let npages = pages.len();
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
res
|
||||
},
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
DebouncedFeMessage::DbSize(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
DebouncedFeMessage::GetSlruSegment(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
smallvec::smallvec![
|
||||
self.handle_get_slru_segment_request(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&req,
|
||||
&ctx
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
DebouncedFeMessage::RespondError(span, e) => {
|
||||
// We've already decided to respond with an error, so we don't need to
|
||||
// call the handler.
|
||||
(smallvec::smallvec![Err(e)], span)
|
||||
}
|
||||
};
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok(response_msg) => response_msg,
|
||||
};
|
||||
for handler_result in handler_results {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok(response_msg) => response_msg,
|
||||
};
|
||||
|
||||
// marshal & transmit response message
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
// marshal & transmit response message
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
@@ -706,6 +899,9 @@ impl PageServerHandler {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(batched.is_none(), "we take() earlier");
|
||||
batched = next_batched;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -949,60 +1145,30 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_get_page_at_lsn_request_batched(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetPageRequest,
|
||||
timeline: &Timeline,
|
||||
effective_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = match self
|
||||
.timeline_handles
|
||||
.get(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let _timer = timeline.query_metrics.start_timer_many(
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
pages.len(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
);
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
let pages = timeline
|
||||
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
|
||||
.await;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
|
||||
page.map(|page| {
|
||||
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
|
||||
})
|
||||
.map_err(PageStreamError::Read)
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -1499,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
);
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
}
|
||||
|
||||
struct WaitedForLsn(Lsn);
|
||||
impl From<WaitedForLsn> for Lsn {
|
||||
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
|
||||
lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,17 @@
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::{aux_file, repository::*};
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -191,26 +196,184 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
let pages = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
|
||||
assert_eq!(res.len(), 1);
|
||||
res.into_iter().next().unwrap()
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, version, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let request_lsn = match version {
|
||||
Version::Lsn(lsn) => lsn,
|
||||
Version::Modified(_) => panic!("unsupported"),
|
||||
};
|
||||
enum KeyState {
|
||||
NeedsVectoredGet,
|
||||
Done(Result<Bytes, PageReconstructError>),
|
||||
}
|
||||
let mut key_states = BTreeMap::new();
|
||||
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
|
||||
smallvec::SmallVec::with_capacity(pages.len());
|
||||
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
use std::collections::btree_map::Entry;
|
||||
let key_state_slot = match key_states.entry((key, response_order)) {
|
||||
Entry::Occupied(_entry) => unreachable!(
|
||||
"enumerate makes keys unique, even if batch contains same key twice"
|
||||
),
|
||||
Entry::Vacant(entry) => entry,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
version.get(self, key, ctx).await
|
||||
if tag.relnode == 0 {
|
||||
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
))));
|
||||
continue;
|
||||
}
|
||||
|
||||
let nblocks = match self.get_rel_size(tag, version, ctx).await {
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
key_state_slot.insert(KeyState::Done(Err(err)));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
|
||||
continue;
|
||||
}
|
||||
|
||||
vectored_gets.push(key);
|
||||
key_state_slot.insert(KeyState::NeedsVectoredGet);
|
||||
}
|
||||
// turn vectored_gets into a keyspace
|
||||
let keyspace = {
|
||||
// add_key reuqires monotonicity
|
||||
vectored_gets.sort_unstable();
|
||||
let mut acc = KeySpaceAccum::new();
|
||||
for key in vectored_gets
|
||||
.into_iter()
|
||||
// in fact it requires strong monotonicity
|
||||
.dedup()
|
||||
{
|
||||
acc.add_key(key);
|
||||
}
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self.get_vectored(keyspace, request_lsn, ctx).await {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
if let Err(err) = &res {
|
||||
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
|
||||
}
|
||||
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
|
||||
let first_interest = interests.next().unwrap();
|
||||
let next_interest = interests.peek().is_some();
|
||||
if !next_interest {
|
||||
match first_interest.1 {
|
||||
KeyState::NeedsVectoredGet => {
|
||||
*first_interest.1 = KeyState::Done(res);
|
||||
}
|
||||
KeyState::Done(_) => unreachable!(),
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
|
||||
match state {
|
||||
KeyState::NeedsVectoredGet => {
|
||||
*state = KeyState::Done(match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
// this `match` is working around the fact that we cannot Clone the PageReconstructError
|
||||
Err(err) => Err(match err {
|
||||
PageReconstructError::Cancelled => {
|
||||
PageReconstructError::Cancelled
|
||||
}
|
||||
|
||||
x @ PageReconstructError::Other(_) |
|
||||
x @ PageReconstructError::AncestorLsnTimeout(_) |
|
||||
x @ PageReconstructError::WalRedo(_) |
|
||||
x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
KeyState::Done(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
for ((_, _), state) in key_states.iter_mut() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
match &err {
|
||||
GetVectoredError::Cancelled => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::MissingKey(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::GetReadyAncestorError(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::Other(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(
|
||||
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
|
||||
)));
|
||||
}
|
||||
// TODO: we can prevent this error class by moving this check into the type system
|
||||
GetVectoredError::InvalidLsn(e) => {
|
||||
*state =
|
||||
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
|
||||
}
|
||||
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
|
||||
// TODO: we can prevent this error class by moving this check into the type system
|
||||
GetVectoredError::Oversized(err) => {
|
||||
*state = KeyState::Done(Err(anyhow::anyhow!(
|
||||
"batching oversized: {err:?}"
|
||||
)
|
||||
.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// get the results into the order in which they were requested
|
||||
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
|
||||
smallvec::SmallVec::with_capacity(key_states.len());
|
||||
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
|
||||
return_order.sort_unstable_by_key(|(_, idx)| *idx);
|
||||
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
|
||||
res.extend(return_order.into_iter().map(|key_states_key| {
|
||||
match key_states.remove(&key_states_key).unwrap() {
|
||||
KeyState::Done(res) => res,
|
||||
KeyState::NeedsVectoredGet => unreachable!(),
|
||||
}
|
||||
}));
|
||||
res
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
|
||||
@@ -73,6 +73,21 @@ impl ValueBytes {
|
||||
|
||||
Ok(raw[8] == 1)
|
||||
}
|
||||
|
||||
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
|
||||
if raw.len() < 12 {
|
||||
return Err(InvalidInput::TooShortValue);
|
||||
}
|
||||
|
||||
let value_discriminator = &raw[0..4];
|
||||
|
||||
if value_discriminator == [0, 0, 0, 0] {
|
||||
// Value::Image always initializes
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -8,15 +8,17 @@ mod layer_desc;
|
||||
mod layer_name;
|
||||
pub mod merge_iterator;
|
||||
|
||||
use tokio::sync::{self};
|
||||
use utils::bin_ser::BeSer;
|
||||
pub mod split_writer;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::repository::Value;
|
||||
use crate::repository::{Value, ValueBytes};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::{Key, DBDIR_KEY};
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::ops::Range;
|
||||
@@ -79,12 +81,18 @@ pub(crate) enum ValueReconstructSituation {
|
||||
}
|
||||
|
||||
/// Reconstruct data accumulated for a single key during a vectored get
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct VectoredValueReconstructState {
|
||||
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub(crate) img: Option<(Lsn, Bytes)>,
|
||||
pub(crate) records: Vec<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
pub(crate) img: Option<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
|
||||
situation: ValueReconstructSituation,
|
||||
pub(crate) situation: ValueReconstructSituation,
|
||||
}
|
||||
|
||||
impl VectoredValueReconstructState {
|
||||
@@ -93,16 +101,57 @@ impl VectoredValueReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<VectoredValueReconstructState> for ValueReconstructState {
|
||||
fn from(mut state: VectoredValueReconstructState) -> Self {
|
||||
// walredo expects the records to be descending in terms of Lsn
|
||||
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
|
||||
pub(crate) async fn convert(
|
||||
_key: Key,
|
||||
from: VectoredValueReconstructState,
|
||||
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||
let mut to = ValueReconstructState::default();
|
||||
|
||||
ValueReconstructState {
|
||||
records: state.records,
|
||||
img: state.img,
|
||||
for (lsn, fut) in from.records {
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
let value = Value::des(&bytes)
|
||||
.map_err(|err| PageReconstructError::Other(err.into()))?;
|
||||
|
||||
match value {
|
||||
Value::WalRecord(rec) => {
|
||||
to.records.push((lsn, rec));
|
||||
},
|
||||
Value::Image(img) => {
|
||||
assert!(to.img.is_none());
|
||||
to.img = Some((lsn, img));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if to.img.is_none() {
|
||||
let (lsn, fut) = from.img.expect("Need an image");
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
to.img = Some((lsn, bytes));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(to)
|
||||
}
|
||||
|
||||
/// Bag of data accumulated during a vectored get..
|
||||
@@ -200,7 +249,8 @@ impl ValuesReconstructState {
|
||||
&mut self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
value: Value,
|
||||
completes: bool,
|
||||
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
) -> ValueReconstructSituation {
|
||||
let state = self
|
||||
.keys
|
||||
@@ -208,31 +258,14 @@ impl ValuesReconstructState {
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
|
||||
if let Ok(state) = state {
|
||||
let key_done = match state.situation {
|
||||
match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match value {
|
||||
Value::Image(img) => {
|
||||
state.img = Some((lsn, img));
|
||||
true
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
debug_assert!(
|
||||
Some(lsn) > state.get_cached_lsn(),
|
||||
"Attempt to collect a record below cached LSN for walredo: {} < {}",
|
||||
lsn,
|
||||
state
|
||||
.get_cached_lsn()
|
||||
.expect("Assertion can only fire if a cached lsn is present")
|
||||
);
|
||||
ValueReconstructSituation::Continue => {
|
||||
state.records.push((lsn, value));
|
||||
}
|
||||
}
|
||||
|
||||
let will_init = rec.will_init();
|
||||
state.records.push((lsn, rec));
|
||||
will_init
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if key_done && state.situation == ValueReconstructSituation::Continue {
|
||||
if completes && state.situation == ValueReconstructSituation::Continue {
|
||||
state.situation = ValueReconstructSituation::Complete;
|
||||
self.keys_done.add_key(*key);
|
||||
}
|
||||
|
||||
@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{self, OnceCell};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::*;
|
||||
|
||||
@@ -224,7 +223,7 @@ pub struct DeltaLayerInner {
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
layer_key_range: Range<Key>,
|
||||
@@ -788,9 +787,11 @@ impl DeltaLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
@@ -980,77 +981,59 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let mut ignore_key_with_err = None;
|
||||
|
||||
let max_vectored_read_bytes = self
|
||||
.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into();
|
||||
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
// track when a key is done.
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
|
||||
.await;
|
||||
|
||||
let blobs_buf = match res {
|
||||
Ok(blobs_buf) => blobs_buf,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::Other(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key_with_err {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
|
||||
let value = match value {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
self.file.path,
|
||||
))),
|
||||
);
|
||||
|
||||
ignore_key_with_err = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
|
||||
// state, no further updates shall be made to it. The call below will
|
||||
// panic if the invariant is violated.
|
||||
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = sync::oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
blob_meta.will_init,
|
||||
rx,
|
||||
);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,7 +1173,14 @@ impl DeltaLayerInner {
|
||||
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
|
||||
let end_offset = offset;
|
||||
|
||||
Some((BlobMeta { key, lsn }, start_offset..end_offset))
|
||||
Some((
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
start_offset..end_offset,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
//!
|
||||
//! Every image layer file consists of three parts: "summary",
|
||||
//! "index", and "values". The summary is a fixed size header at the
|
||||
//! beginning of the file, and it contains basic information about the
|
||||
//! beginningof the file, and it contains basic information about the
|
||||
//! layer, and offsets to the other parts. The "index" is a B-tree,
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
@@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
@@ -163,7 +164,7 @@ pub struct ImageLayerInner {
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
@@ -390,9 +391,11 @@ impl ImageLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader
|
||||
@@ -579,8 +582,16 @@ impl ImageLayerInner {
|
||||
.0
|
||||
.into();
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
for read in reads.into_iter() {
|
||||
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
|
||||
Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
|
||||
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
|
||||
}
|
||||
|
||||
let buf_size = read.size();
|
||||
|
||||
if buf_size > max_vectored_read_bytes {
|
||||
@@ -599,36 +610,33 @@ impl ImageLayerInner {
|
||||
);
|
||||
}
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::{l0_flush, page_cache};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
};
|
||||
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
pub(crate) mod vectored_dio_read;
|
||||
|
||||
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
@@ -381,7 +378,11 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
pub(crate) fn try_len(&self) -> Option<u64> {
|
||||
self.inner.try_read().map(|i| i.file.len()).ok()
|
||||
self.inner
|
||||
.try_read()
|
||||
.map(|i| i.file.try_read().map(|i| i.len()).ok())
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
pub(crate) fn assert_writable(&self) {
|
||||
@@ -432,6 +433,10 @@ impl InMemoryLayer {
|
||||
read: vectored_dio_read::LogicalRead<Vec<u8>>,
|
||||
}
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
@@ -459,6 +464,11 @@ impl InMemoryLayer {
|
||||
Vec::with_capacity(len as usize),
|
||||
),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
senders.insert((key, *entry_lsn), tx);
|
||||
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
|
||||
|
||||
if will_init {
|
||||
break;
|
||||
}
|
||||
@@ -466,46 +476,42 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the reads.
|
||||
let read_from = inner.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let locked = read_from.read().await;
|
||||
let f = vectored_dio_read::execute(
|
||||
&*locked,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&read_ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
let f = vectored_dio_read::execute(
|
||||
&inner.file,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
// Process results into the reconstruct state
|
||||
'next_key: for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
}
|
||||
Ok(value_buf) => {
|
||||
let value = Value::des(&value_buf);
|
||||
if let Err(e) = value {
|
||||
reconstruct_state
|
||||
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
|
||||
}
|
||||
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
// TODO: metric to see if we fetched more values than necessary
|
||||
continue 'next_key;
|
||||
Ok(value_buf) => {
|
||||
let _ = sender.send(Ok(value_buf.into()));
|
||||
}
|
||||
|
||||
// process the next value in the next iteration of the loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
});
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
@@ -600,7 +606,8 @@ impl InMemoryLayer {
|
||||
/// Get layer size.
|
||||
pub async fn size(&self) -> Result<u64> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.file.len())
|
||||
let locked = inner.file.try_read().expect("no contention");
|
||||
Ok(locked.len())
|
||||
}
|
||||
|
||||
/// Create a new, empty, in-memory layer
|
||||
@@ -614,9 +621,10 @@ impl InMemoryLayer {
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file =
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
let file = Arc::new(tokio::sync::RwLock::new(
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
|
||||
));
|
||||
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
@@ -648,7 +656,7 @@ impl InMemoryLayer {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let base_offset = inner.file.len();
|
||||
let base_offset = inner.file.read().await.len();
|
||||
|
||||
let SerializedBatch {
|
||||
raw,
|
||||
@@ -672,8 +680,13 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
// FIXME: can't borrow arc
|
||||
let new_size = {
|
||||
let mut locked = inner.file.write().await;
|
||||
locked.write_raw(&raw, ctx).await?;
|
||||
locked.len()
|
||||
};
|
||||
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
@@ -713,7 +726,7 @@ impl InMemoryLayer {
|
||||
|
||||
pub(crate) async fn tick(&self) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
let size = inner.file.read().await.len();
|
||||
inner.resource_units.publish_size(size)
|
||||
}
|
||||
|
||||
@@ -809,7 +822,7 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
|
||||
|
||||
let file_contents = Bytes::from(file_contents);
|
||||
|
||||
|
||||
@@ -107,6 +107,8 @@ async fn smoke_test() {
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
|
||||
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
|
||||
assert_eq!(img_before, img_after);
|
||||
|
||||
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
|
||||
|
||||
@@ -18,6 +18,7 @@ use camino::Utf8Path;
|
||||
use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
@@ -68,7 +69,9 @@ use crate::{
|
||||
tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
storage_layer::{
|
||||
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
|
||||
},
|
||||
},
|
||||
walredo,
|
||||
};
|
||||
@@ -1129,22 +1132,38 @@ impl Timeline {
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
match res {
|
||||
Err(err) => {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let state = ValueReconstructState::from(state);
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
async move {
|
||||
let state = res.expect("Read path is infallible");
|
||||
assert!(matches!(
|
||||
state.situation,
|
||||
ValueReconstructSituation::Complete
|
||||
));
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
let converted = match convert(key, state).await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.await;
|
||||
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -5496,30 +5515,30 @@ impl Timeline {
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
_lsn: Lsn,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
}
|
||||
}
|
||||
}
|
||||
all_data.sort();
|
||||
Ok(all_data)
|
||||
// let mut all_data = Vec::new();
|
||||
// let guard = self.layers.read().await;
|
||||
// for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
// let layer = guard.get_from_desc(&layer);
|
||||
// let mut reconstruct_data = ValuesReconstructState::default();
|
||||
// layer
|
||||
// .get_values_reconstruct_data(
|
||||
// KeySpace::single(Key::MIN..Key::MAX),
|
||||
// lsn..Lsn(lsn.0 + 1),
|
||||
// &mut reconstruct_data,
|
||||
// ctx,
|
||||
// )
|
||||
// .await?;
|
||||
// for (k, v) in reconstruct_data.keys {
|
||||
// all_data.push((k, v?.img.unwrap().1));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// all_data.sort();
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Get all historic layer descriptors in the layer map
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
pub struct BlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
pub will_init: bool,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
@@ -355,7 +356,8 @@ pub enum BlobFlag {
|
||||
/// * Iterate over the collected blobs and coalesce them into reads at the end
|
||||
pub struct VectoredReadPlanner {
|
||||
// Track all the blob offsets. Start offsets must be ordered.
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
|
||||
// Note: last bool is will_init
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
|
||||
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag)>,
|
||||
|
||||
@@ -420,12 +422,12 @@ impl VectoredReadPlanner {
|
||||
match flag {
|
||||
BlobFlag::None => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, false));
|
||||
}
|
||||
BlobFlag::ReplaceAll => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, true));
|
||||
}
|
||||
BlobFlag::Ignore => {}
|
||||
}
|
||||
@@ -436,11 +438,17 @@ impl VectoredReadPlanner {
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for (key, blobs_for_key) in self.blobs {
|
||||
for (lsn, start_offset, end_offset) in blobs_for_key {
|
||||
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
|
||||
let extended = match &mut current_read_builder {
|
||||
Some(read_builder) => {
|
||||
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
|
||||
}
|
||||
Some(read_builder) => read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
),
|
||||
None => VectoredReadExtended::No,
|
||||
};
|
||||
|
||||
@@ -448,7 +456,11 @@ impl VectoredReadPlanner {
|
||||
let next_read_builder = VectoredReadBuilder::new(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
self.max_read_size,
|
||||
self.mode,
|
||||
);
|
||||
@@ -665,10 +677,19 @@ impl StreamingVectoredReadPlanner {
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
is_last_blob_in_read: bool,
|
||||
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
) -> Option<VectoredRead> {
|
||||
match &mut self.read_builder {
|
||||
Some(read_builder) => {
|
||||
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
|
||||
let extended = read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
);
|
||||
assert_eq!(extended, VectoredReadExtended::Yes);
|
||||
}
|
||||
None => {
|
||||
@@ -676,7 +697,11 @@ impl StreamingVectoredReadPlanner {
|
||||
Some(VectoredReadBuilder::new_streaming(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
self.mode,
|
||||
))
|
||||
};
|
||||
@@ -1008,6 +1033,7 @@ mod tests {
|
||||
let meta = BlobMeta {
|
||||
key: Key::MIN,
|
||||
lsn: Lsn(0),
|
||||
will_init: false,
|
||||
};
|
||||
|
||||
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
|
||||
|
||||
Reference in New Issue
Block a user