Compare commits

...

14 Commits

Author SHA1 Message Date
Vlad Lazar
b2cb10590e fixup: deserialize shenanigans 2024-09-12 20:00:06 +01:00
Vlad Lazar
2923fd2a5b fixup: remove stale import 2024-09-12 19:25:46 +01:00
Vlad Lazar
2a5336b9ab fixup image deserialization 2024-09-12 19:24:41 +01:00
Christian Schwarz
6f20726610 Merge remote-tracking branch 'origin/hackaneon/lisbon24/superscalar-page_service--problame/evaluate-debouncer' into hackaneon/lisbon24/superscalar-page_service 2024-09-12 17:47:16 +00:00
Christian Schwarz
29f741e1e9 debounce: actually issue vectored get 2024-09-12 17:46:30 +00:00
Vlad Lazar
2b37a40079 Materialize future ios 2024-09-12 18:25:17 +01:00
Vlad Lazar
af2b65a2fb Rework issuing of IOs on read path 2024-09-12 16:42:35 +01:00
Christian Schwarz
5d194c7824 debounce: bounce if shard or effective request_lsn differ 2024-09-12 14:20:32 +00:00
Christian Schwarz
ac2702afd3 deboucner: move decoding into debounce loop 2024-09-12 10:58:09 +00:00
Christian Schwarz
88fd46d795 sketch interface 2024-09-12 11:35:00 +01:00
Christian Schwarz
2d6763882e pagebench: fake queue depth of 10 2024-09-12 11:35:00 +01:00
Christian Schwarz
c0c23cde72 debouncer 2024-09-12 11:35:00 +01:00
Christian Schwarz
942bc9544b fixup 2024-09-11 20:04:39 +00:00
Christian Schwarz
02b7cdb305 HACK: instrument page_service to count nonblocking consecutive getpage requests 2024-09-11 19:25:19 +01:00
12 changed files with 862 additions and 393 deletions

View File

@@ -142,11 +142,16 @@ impl PagestreamClient {
) -> anyhow::Result<PagestreamGetPageResponse> { ) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req); let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize(); let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?; for i in 0..10 {
let mut req = tokio_stream::once(Ok(req.clone()));
self.copy_both.send_all(&mut req).await?;
}
for i in 0..9 {
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await; let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?; let next: bytes::Bytes = next.unwrap()?;

View File

@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext, ctx: &'c RequestContext,
start: std::time::Instant, start: std::time::Instant,
op: SmgrQueryType, op: SmgrQueryType,
count: usize,
} }
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed elapsed
} }
}; };
self.global_metric.observe(ex_throttled.as_secs_f64()); for _ in 0..self.count {
if let Some(timeline_metric) = self.timeline_metric { self.global_metric.observe(ex_throttled.as_secs_f64());
timeline_metric.observe(ex_throttled.as_secs_f64()); if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
} }
} }
} }
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
&'a self, &'a self,
op: SmgrQueryType, op: SmgrQueryType,
ctx: &'c RequestContext, ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> { ) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize]; let global_metric = &self.global_metrics[op as usize];
let start = Instant::now(); let start = Instant::now();
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
ctx, ctx,
start, start,
op, op,
count,
}) })
} }
} }
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
Lazy::new(|| {
register_histogram!(
"pageserver_consecutive_nonblocking_getpage_requests",
"Number of consecutive nonblocking getpage requests",
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
)
.unwrap()
});
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) { pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(()); static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALIZE.lock().unwrap(); let _guard = SERIALIZE.lock().unwrap();

View File

@@ -5,14 +5,14 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder; use async_compression::tokio::write::GzipEncoder;
use bytes::Buf; use bytes::Buf;
use futures::FutureExt; use futures::FutureExt;
use once_cell::sync::OnceCell; use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::TenantState; use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{ use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamNblocksResponse, PagestreamProtocolVersion, PagestreamProtocolVersion,
}; };
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -43,7 +43,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError; use crate::basebackup::BasebackupError;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext}; use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics; use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version; use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError; use crate::tenant::PageReconstructError;
use crate::tenant::Timeline; use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key; use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
@@ -577,124 +577,317 @@ impl PageServerHandler {
} }
} }
loop { let mut batched = None;
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) 'outer: loop {
let msg = tokio::select! { enum DebouncedFeMessage {
biased; Exists(models::PagestreamExistsRequest),
_ = self.cancel.cancelled() => { Nblocks(models::PagestreamNblocksRequest),
return Err(QueryError::Shutdown) GetPage {
} span: Span,
msg = pgb.read_message() => { msg } shard: timeline::handle::Handle<TenantManagerTypes>,
}; effective_request_lsn: Lsn,
let copy_data_bytes = match msg? { pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
Some(FeMessage::CopyData(bytes)) => bytes, },
Some(FeMessage::Terminate) => break, DbSize(models::PagestreamDbSizeRequest),
Some(m) => { GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
return Err(QueryError::Other(anyhow::anyhow!( RespondError(Span, PageStreamError),
"unexpected message: {m:?} during COPY" }
))); let mut debounce: Option<std::time::Instant> = None;
} // return or `?` on protocol error
None => break, // client disconnected // `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
}; let next_batched: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
.into()
});
let sleep_fut = if let Some(started_at) = debounce {
futures::future::Either::Left(tokio::time::sleep_until(
(started_at + *BOUNCE_TIMEOUT).into(),
))
} else {
futures::future::Either::Right(futures::future::pending())
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batched.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
trace!("query: {copy_data_bytes:?}"); // parse request
fail::fail_point!("ps::handle-pagerequest-message"); let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// parse request let this_msg = match neon_fe_msg {
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
// check if we can debounce
match (&mut batched, this_msg) {
(None, this_msg) => {
batched = Some(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
DebouncedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
return *accum_lsn == this_lsn;
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break None;
}
};
// invoke handler function // invoke handler function
let (handler_result, span) = match neon_fe_msg { let (handler_results, span): (
PagestreamFeMessage::Exists(req) => { smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
_,
) = match batched.take().expect("loop above ensures this") {
DebouncedFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists"); fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
( (
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) smallvec::smallvec![
.instrument(span.clone()) self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.await, .instrument(span.clone())
.await
],
span, span,
) )
} }
PagestreamFeMessage::Nblocks(req) => { DebouncedFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks"); fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
( (
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) smallvec::smallvec![
.instrument(span.clone()) self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.await, .instrument(span.clone())
.await,
],
span, span,
) )
} }
PagestreamFeMessage::GetPage(req) => { DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
span.record("batch_size", pages.len() as u64);
fail::fail_point!("ps::handle-pagerequest-message::getpage"); fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler // shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
( (
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) {
.instrument(span.clone()) let npages = pages.len();
.await, let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span, span,
) )
} }
PagestreamFeMessage::DbSize(req) => { DebouncedFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize"); fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
( (
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) smallvec::smallvec![
.instrument(span.clone()) self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.await, .instrument(span.clone())
.await
],
span, span,
) )
} }
PagestreamFeMessage::GetSlruSegment(req) => { DebouncedFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
( (
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) smallvec::smallvec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx
)
.instrument(span.clone()) .instrument(span.clone())
.await, .await
],
span, span,
) )
} }
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(smallvec::smallvec![Err(e)], span)
}
}; };
// Map handler result to protocol behavior. // Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol. // Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result { for handler_result in handler_results {
Err(e) => match &e { let response_msg = match handler_result {
PageStreamError::Shutdown => { Err(e) => match &e {
// If we fail to fulfil a request during shutdown, which may be _because_ of PageStreamError::Shutdown => {
// shutdown, then do not send the error to the client. Instead just drop the // If we fail to fulfil a request during shutdown, which may be _because_ of
// connection. // shutdown, then do not send the error to the client. Instead just drop the
span.in_scope(|| info!("dropping connection due to shutdown")); // connection.
return Err(QueryError::Shutdown); span.in_scope(|| info!("dropping connection due to shutdown"));
} return Err(QueryError::Shutdown);
PageStreamError::Reconnect(reason) => { }
span.in_scope(|| info!("handler requested reconnect: {reason}")); PageStreamError::Reconnect(reason) => {
return Err(QueryError::Reconnect); span.in_scope(|| info!("handler requested reconnect: {reason}"));
} return Err(QueryError::Reconnect);
PageStreamError::Read(_) }
| PageStreamError::LsnTimeout(_) PageStreamError::Read(_)
| PageStreamError::NotFound(_) | PageStreamError::LsnTimeout(_)
| PageStreamError::BadRequest(_) => { | PageStreamError::NotFound(_)
// print the all details to the log with {:#}, but for the client the | PageStreamError::BadRequest(_) => {
// error message is enough. Do not log if shutting down, as the anyhow::Error // print the all details to the log with {:#}, but for the client the
// here includes cancellation which is not an error. // error message is enough. Do not log if shutting down, as the anyhow::Error
let full = utils::error::report_compact_sources(&e); // here includes cancellation which is not an error.
span.in_scope(|| { let full = utils::error::report_compact_sources(&e);
error!("error reading relation or page version: {full:#}") span.in_scope(|| {
}); error!("error reading relation or page version: {full:#}")
PagestreamBeMessage::Error(PagestreamErrorResponse { });
message: e.to_string(), PagestreamBeMessage::Error(PagestreamErrorResponse {
}) message: e.to_string(),
} })
}, }
Ok(response_msg) => response_msg, },
}; Ok(response_msg) => response_msg,
};
// marshal & transmit response message // marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! { tokio::select! {
biased; biased;
_ = self.cancel.cancelled() => { _ = self.cancel.cancelled() => {
@@ -706,6 +899,9 @@ impl PageServerHandler {
res?; res?;
} }
} }
assert!(batched.is_none(), "we take() earlier");
batched = next_batched;
} }
Ok(()) Ok(())
} }
@@ -949,60 +1145,30 @@ impl PageServerHandler {
})) }))
} }
#[instrument(skip_all, fields(shard_id))] #[instrument(skip_all)]
async fn handle_get_page_at_lsn_request( async fn handle_get_page_at_lsn_request_batched(
&mut self, &mut self,
tenant_id: TenantId, timeline: &Timeline,
timeline_id: TimelineId, effective_lsn: Lsn,
req: &PagestreamGetPageRequest, pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
let timeline = match self debug_assert_current_span_has_tenant_and_timeline_id();
.timeline_handles let _timer = timeline.query_metrics.start_timer_many(
.get( metrics::SmgrQueryType::GetPageAtLsn,
tenant_id, pages.len(),
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx, ctx,
) );
.await?;
let page = timeline let pages = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx) .get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
.await?; .await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
page, page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::Read)
})) }))
} }
@@ -1499,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
); );
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
} }
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}

View File

@@ -9,12 +9,17 @@
use super::tenant::{PageReconstructError, Timeline}; use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*}; use crate::{aux_file, repository::*};
use anyhow::{ensure, Context}; use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum; use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{ use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet}; use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow; use std::ops::ControlFlow;
use std::ops::Range; use std::ops::Range;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
@@ -191,26 +196,184 @@ impl Timeline {
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 { let pages = smallvec::smallvec![(tag, blknum)];
return Err(PageReconstructError::Other( let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
RelationError::InvalidRelnode.into(), assert_eq!(res.len(), 1);
)); res.into_iter().next().unwrap()
} }
let nblocks = self.get_rel_size(tag, version, ctx).await?; /// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
if blknum >= nblocks { pub(crate) async fn get_rel_page_at_lsn_batched(
debug!( &self,
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
tag, version: Version<'_>,
blknum, ctx: &RequestContext,
version.get_lsn(), ) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
nblocks debug_assert_current_span_has_tenant_and_timeline_id();
); let request_lsn = match version {
return Ok(ZERO_PAGE.clone()); Version::Lsn(lsn) => lsn,
Version::Modified(_) => panic!("unsupported"),
};
enum KeyState {
NeedsVectoredGet,
Done(Result<Bytes, PageReconstructError>),
} }
let mut key_states = BTreeMap::new();
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(pages.len());
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
let key = rel_block_to_key(tag, blknum);
use std::collections::btree_map::Entry;
let key_state_slot = match key_states.entry((key, response_order)) {
Entry::Occupied(_entry) => unreachable!(
"enumerate makes keys unique, even if batch contains same key twice"
),
Entry::Vacant(entry) => entry,
};
let key = rel_block_to_key(tag, blknum); if tag.relnode == 0 {
version.get(self, key, ctx).await key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
))));
continue;
}
let nblocks = match self.get_rel_size(tag, version, ctx).await {
Ok(nblocks) => nblocks,
Err(err) => {
key_state_slot.insert(KeyState::Done(Err(err)));
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
continue;
}
vectored_gets.push(key);
key_state_slot.insert(KeyState::NeedsVectoredGet);
}
// turn vectored_gets into a keyspace
let keyspace = {
// add_key reuqires monotonicity
vectored_gets.sort_unstable();
let mut acc = KeySpaceAccum::new();
for key in vectored_gets
.into_iter()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, request_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
if let Err(err) = &res {
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
}
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
let first_interest = interests.next().unwrap();
let next_interest = interests.peek().is_some();
if !next_interest {
match first_interest.1 {
KeyState::NeedsVectoredGet => {
*first_interest.1 = KeyState::Done(res);
}
KeyState::Done(_) => unreachable!(),
}
continue;
} else {
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
match state {
KeyState::NeedsVectoredGet => {
*state = KeyState::Done(match &res {
Ok(buf) => Ok(buf.clone()),
// this `match` is working around the fact that we cannot Clone the PageReconstructError
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
});
}
KeyState::Done(_) => unreachable!(),
}
}
}
}
}
Err(err) => {
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
for ((_, _), state) in key_states.iter_mut() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
match &err {
GetVectoredError::Cancelled => {
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
)));
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
*state =
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
*state = KeyState::Done(Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into()));
}
}
}
}
};
// get the results into the order in which they were requested
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
smallvec::SmallVec::with_capacity(key_states.len());
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
return_order.sort_unstable_by_key(|(_, idx)| *idx);
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
res.extend(return_order.into_iter().map(|key_states_key| {
match key_states.remove(&key_states_key).unwrap() {
KeyState::Done(res) => res,
KeyState::NeedsVectoredGet => unreachable!(),
}
}));
res
} }
// Get size of a database in blocks // Get size of a database in blocks

View File

@@ -73,6 +73,21 @@ impl ValueBytes {
Ok(raw[8] == 1) Ok(raw[8] == 1)
} }
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
Ok(false)
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -8,15 +8,17 @@ mod layer_desc;
mod layer_name; mod layer_name;
pub mod merge_iterator; pub mod merge_iterator;
use tokio::sync::{self};
use utils::bin_ser::BeSer;
pub mod split_writer; pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext}; use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value; use crate::repository::{Value, ValueBytes};
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use bytes::Bytes; use bytes::Bytes;
use pageserver_api::key::Key; use pageserver_api::key::{Key, DBDIR_KEY};
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use std::cmp::{Ordering, Reverse}; use std::cmp::Ordering;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap}; use std::collections::{BinaryHeap, HashMap};
use std::ops::Range; use std::ops::Range;
@@ -79,12 +81,18 @@ pub(crate) enum ValueReconstructSituation {
} }
/// Reconstruct data accumulated for a single key during a vectored get /// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)] #[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState { pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>, pub(crate) records: Vec<(
pub(crate) img: Option<(Lsn, Bytes)>, Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
pub(crate) img: Option<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
situation: ValueReconstructSituation, pub(crate) situation: ValueReconstructSituation,
} }
impl VectoredValueReconstructState { impl VectoredValueReconstructState {
@@ -93,16 +101,57 @@ impl VectoredValueReconstructState {
} }
} }
impl From<VectoredValueReconstructState> for ValueReconstructState { pub(crate) async fn convert(
fn from(mut state: VectoredValueReconstructState) -> Self { _key: Key,
// walredo expects the records to be descending in terms of Lsn from: VectoredValueReconstructState,
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn)); ) -> Result<ValueReconstructState, PageReconstructError> {
let mut to = ValueReconstructState::default();
ValueReconstructState { for (lsn, fut) in from.records {
records: state.records, match fut.await {
img: state.img, Ok(res) => match res {
Ok(bytes) => {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
match value {
Value::WalRecord(rec) => {
to.records.push((lsn, rec));
},
Value::Image(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
}
}
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
} }
} }
if to.img.is_none() {
let (lsn, fut) = from.img.expect("Need an image");
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
to.img = Some((lsn, bytes));
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
Ok(to)
} }
/// Bag of data accumulated during a vectored get.. /// Bag of data accumulated during a vectored get..
@@ -200,7 +249,8 @@ impl ValuesReconstructState {
&mut self, &mut self,
key: &Key, key: &Key,
lsn: Lsn, lsn: Lsn,
value: Value, completes: bool,
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
) -> ValueReconstructSituation { ) -> ValueReconstructSituation {
let state = self let state = self
.keys .keys
@@ -208,31 +258,14 @@ impl ValuesReconstructState {
.or_insert(Ok(VectoredValueReconstructState::default())); .or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state { if let Ok(state) = state {
let key_done = match state.situation { match state.situation {
ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value { ValueReconstructSituation::Continue => {
Value::Image(img) => { state.records.push((lsn, value));
state.img = Some((lsn, img)); }
true }
}
Value::WalRecord(rec) => {
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
let will_init = rec.will_init(); if completes && state.situation == ValueReconstructSituation::Continue {
state.records.push((lsn, rec));
will_init
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete; state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key); self.keys_done.add_key(*key);
} }

View File

@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner, VectoredReadCoalesceMode, VectoredReadPlanner,
}; };
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile}; use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt; use futures::StreamExt;
use itertools::Itertools; use itertools::Itertools;
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::VecDeque; use std::collections::{HashMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::ops::Range; use std::ops::Range;
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::OnceCell; use tokio::sync::{self, OnceCell};
use tokio_epoll_uring::IoBuf; use tokio_epoll_uring::IoBuf;
use tracing::*; use tracing::*;
@@ -224,7 +223,7 @@ pub struct DeltaLayerInner {
index_start_blk: u32, index_start_blk: u32,
index_root_blk: u32, index_root_blk: u32,
file: VirtualFile, file: Arc<VirtualFile>,
file_id: FileId, file_id: FileId,
layer_key_range: Range<Key>, layer_key_range: Range<Key>,
@@ -788,9 +787,11 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>, max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx) let file = Arc::new(
.await VirtualFile::open(path, ctx)
.context("open layer file")?; .await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id(); let file_id = page_cache::next_file_id();
@@ -980,77 +981,59 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &RequestContext,
) { ) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
let max_vectored_read_bytes = self let max_vectored_read_bytes = self
.max_vectored_read_bytes .max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config") .expect("Layer is loaded with max vectored bytes config")
.0 .0
.into(); .into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn). // Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can // This is the order that `ReconstructState` requires such that it can
// track when a key is done. // track when a key is done.
for read in reads.into_iter().rev() { for read in reads.into_iter().rev() {
let res = vectored_blob_reader let mut senders: HashMap<
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx) (Key, Lsn),
.await; sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
let blobs_buf = match res { for (_, blob_meta) in read.blobs_at.as_slice() {
Ok(blobs_buf) => blobs_buf, let (tx, rx) = sync::oneshot::channel();
Err(err) => { senders.insert((blob_meta.key, blob_meta.lsn), tx);
let kind = err.kind(); reconstruct_state.update_key(
for (_, blob_meta) in read.blobs_at.as_slice() { &blob_meta.key,
reconstruct_state.on_key_error( blob_meta.lsn,
blob_meta.key, blob_meta.will_init,
PageReconstructError::Other(anyhow!( rx,
"Failed to read blobs from virtual file {}: {}", );
self.file.path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
} }
buf = Some(blobs_buf.buf); let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
});
} }
} }
@@ -1190,7 +1173,14 @@ impl DeltaLayerInner {
let actionable = if let Some((key, lsn, start_offset)) = prev.take() { let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset; let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset)) Some((
BlobMeta {
key,
lsn,
will_init: false,
},
start_offset..end_offset,
))
} else { } else {
None None
}; };

View File

@@ -21,7 +21,7 @@
//! //!
//! Every image layer file consists of three parts: "summary", //! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the //! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the //! beginningof the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a B-tree, //! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The //! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part. //! actual page images are stored in the "values" part.
@@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{ use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
}; };
use crate::tenant::{PageReconstructError, Timeline}; use crate::tenant::Timeline;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile}; use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
use hex; use hex;
@@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::VecDeque; use std::collections::{HashMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::ops::Range; use std::ops::Range;
use std::os::unix::prelude::FileExt; use std::os::unix::prelude::FileExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::*; use tracing::*;
@@ -163,7 +164,7 @@ pub struct ImageLayerInner {
key_range: Range<Key>, key_range: Range<Key>,
lsn: Lsn, lsn: Lsn,
file: VirtualFile, file: Arc<VirtualFile>,
file_id: FileId, file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>, max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
@@ -390,9 +391,11 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>, max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx) let file = Arc::new(
.await VirtualFile::open(path, ctx)
.context("open layer file")?; .await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id(); let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id); let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader let summary_blk = block_reader
@@ -579,8 +582,16 @@ impl ImageLayerInner {
.0 .0
.into(); .into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() { for read in reads.into_iter() {
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
}
let buf_size = read.size(); let buf_size = read.size();
if buf_size > max_vectored_read_bytes { if buf_size > max_vectored_read_bytes {
@@ -599,36 +610,33 @@ impl ImageLayerInner {
); );
} }
let buf = BytesMut::with_capacity(buf_size); let read_from = self.file.clone();
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let buf = BytesMut::with_capacity(buf_size);
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res { match res {
Ok(blobs_buf) => { Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze(); for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
for meta in blobs_buf.blobs.iter() { assert!(senders.is_empty());
let img_buf = frozen_buf.slice(meta.start..meta.end); }
reconstruct_state.update_key( Err(err) => {
&meta.meta.key, for (_, sender) in senders {
self.lsn, let _ = sender
Value::Image(img_buf), .send(Err(std::io::Error::new(err.kind(), "vec read failed")));
); }
} }
} }
Err(err) => { });
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
}
};
} }
} }

View File

@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value}; use crate::repository::{Key, Value};
use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError; use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache}; use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result}; use anyhow::{Context, Result};
use bytes::Bytes; use bytes::Bytes;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey; use pageserver_api::key::CompactKey;
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use super::{ use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
pub(crate) mod vectored_dio_read; pub(crate) mod vectored_dio_read;
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file. /// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field. /// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file. /// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile, file: Arc<tokio::sync::RwLock<EphemeralFile>>,
resource_units: GlobalResourceUnits, resource_units: GlobalResourceUnits,
} }
@@ -381,7 +378,11 @@ impl InMemoryLayer {
} }
pub(crate) fn try_len(&self) -> Option<u64> { pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok() self.inner
.try_read()
.map(|i| i.file.try_read().map(|i| i.len()).ok())
.ok()
.flatten()
} }
pub(crate) fn assert_writable(&self) { pub(crate) fn assert_writable(&self) {
@@ -432,6 +433,10 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>, read: vectored_dio_read::LogicalRead<Vec<u8>>,
} }
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new(); let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut senders: HashMap<
(Key, Lsn),
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for range in keyspace.ranges.iter() { for range in keyspace.ranges.iter() {
for (key, vec_map) in inner for (key, vec_map) in inner
@@ -459,6 +464,11 @@ impl InMemoryLayer {
Vec::with_capacity(len as usize), Vec::with_capacity(len as usize),
), ),
}); });
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
if will_init { if will_init {
break; break;
} }
@@ -466,46 +476,42 @@ impl InMemoryLayer {
} }
} }
// Execute the reads. let read_from = inner.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let locked = read_from.read().await;
let f = vectored_dio_read::execute(
&*locked,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&read_ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
let f = vectored_dio_read::execute( for (key, value_reads) in reads {
&inner.file, for ValueRead { entry_lsn, read } in value_reads {
reads let sender = senders
.iter() .remove(&(key, entry_lsn))
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), .expect("sender must exist");
&ctx, match read.into_result().expect("we run execute() above") {
); Err(e) => {
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865 let sender = senders
.await; .remove(&(key, entry_lsn))
.expect("sender must exist");
// Process results into the reconstruct state let _ = sender
'next_key: for (key, value_reads) in reads { .send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
} }
Ok(value_buf) => {
let key_situation = let _ = sender.send(Ok(value_buf.into()));
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
} }
// process the next value in the next iteration of the loop
} }
} }
} }
}
assert!(senders.is_empty());
});
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn); reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
@@ -600,7 +606,8 @@ impl InMemoryLayer {
/// Get layer size. /// Get layer size.
pub async fn size(&self) -> Result<u64> { pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
Ok(inner.file.len()) let locked = inner.file.try_read().expect("no contention");
Ok(locked.len())
} }
/// Create a new, empty, in-memory layer /// Create a new, empty, in-memory layer
@@ -614,9 +621,10 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> { ) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = let file = Arc::new(tokio::sync::RwLock::new(
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?; EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
let key = InMemoryLayerFileId(file.page_cache_file_id()); ));
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
Ok(InMemoryLayer { Ok(InMemoryLayer {
file_id: key, file_id: key,
@@ -648,7 +656,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
self.assert_writable(); self.assert_writable();
let base_offset = inner.file.len(); let base_offset = inner.file.read().await.len();
let SerializedBatch { let SerializedBatch {
raw, raw,
@@ -672,8 +680,13 @@ impl InMemoryLayer {
} }
// Write the batch to the file // Write the batch to the file
inner.file.write_raw(&raw, ctx).await?; // FIXME: can't borrow arc
let new_size = inner.file.len(); let new_size = {
let mut locked = inner.file.write().await;
locked.write_raw(&raw, ctx).await?;
locked.len()
};
let expected_new_len = base_offset let expected_new_len = base_offset
.checked_add(raw.len().into_u64()) .checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64. // write_raw would error if we were to overflow u64.
@@ -713,7 +726,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> { pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
let size = inner.file.len(); let size = inner.file.read().await.len();
inner.resource_units.publish_size(size) inner.resource_units.publish_size(size)
} }
@@ -809,7 +822,7 @@ impl InMemoryLayer {
match l0_flush_global_state { match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => { l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?; let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
let file_contents = Bytes::from(file_contents); let file_contents = Bytes::from(file_contents);

View File

@@ -107,6 +107,8 @@ async fn smoke_test() {
.expect("tenant harness writes the control file") .expect("tenant harness writes the control file")
}; };
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
assert_eq!(img_before, img_after); assert_eq!(img_before, img_after);
// evict_and_wait can timeout, but it doesn't cancel the evicting itself // evict_and_wait can timeout, but it doesn't cancel the evicting itself

View File

@@ -18,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use enumset::EnumSet; use enumset::EnumSet;
use fail::fail_point; use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId; use handle::ShardTimelineId;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use pageserver_api::{ use pageserver_api::{
@@ -68,7 +69,9 @@ use crate::{
tenant::{ tenant::{
layer_map::{LayerMap, SearchResult}, layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata, metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc}, storage_layer::{
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
},
}, },
walredo, walredo,
}; };
@@ -1129,22 +1132,38 @@ impl Timeline {
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind) .for_get_kind(get_kind)
.start_timer(); .start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited(); let layers_visited = reconstruct_state.get_layers_visited();
let futs = FuturesUnordered::new();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) { for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res { futs.push({
Err(err) => { let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
results.insert(key, Err(err)); async move {
} let state = res.expect("Read path is infallible");
Ok(state) => { assert!(matches!(
let state = ValueReconstructState::from(state); state.situation,
ValueReconstructSituation::Complete
));
let reconstruct_res = self.reconstruct_value(key, lsn, state).await; let converted = match convert(key, state).await {
results.insert(key, reconstruct_res); Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
} }
} });
} }
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
reconstruct_timer.stop_and_record(); reconstruct_timer.stop_and_record();
// For aux file keys (v1 or v2) the vectored read path does not return an error // For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -5496,30 +5515,30 @@ impl Timeline {
#[cfg(test)] #[cfg(test)]
pub(crate) async fn inspect_image_layers( pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>, self: &Arc<Timeline>,
lsn: Lsn, _lsn: Lsn,
ctx: &RequestContext, _ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> { ) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new(); // let mut all_data = Vec::new();
let guard = self.layers.read().await; // let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() { // for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn { // if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer); // let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default(); // let mut reconstruct_data = ValuesReconstructState::default();
layer // layer
.get_values_reconstruct_data( // .get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX), // KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1), // lsn..Lsn(lsn.0 + 1),
&mut reconstruct_data, // &mut reconstruct_data,
ctx, // ctx,
) // )
.await?; // .await?;
for (k, v) in reconstruct_data.keys { // for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1)); // all_data.push((k, v?.img.unwrap().1));
} // }
} // }
} // }
all_data.sort(); // all_data.sort();
Ok(all_data) Ok(Vec::new())
} }
/// Get all historic layer descriptors in the layer map /// Get all historic layer descriptors in the layer map

View File

@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
pub struct BlobMeta { pub struct BlobMeta {
pub key: Key, pub key: Key,
pub lsn: Lsn, pub lsn: Lsn,
pub will_init: bool,
} }
/// Blob offsets into [`VectoredBlobsBuf::buf`] /// Blob offsets into [`VectoredBlobsBuf::buf`]
@@ -355,7 +356,8 @@ pub enum BlobFlag {
/// * Iterate over the collected blobs and coalesce them into reads at the end /// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner { pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered. // Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>, // Note: last bool is will_init
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`] // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>, prev: Option<(Key, Lsn, u64, BlobFlag)>,
@@ -420,12 +422,12 @@ impl VectoredReadPlanner {
match flag { match flag {
BlobFlag::None => { BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default(); let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset)); blobs_for_key.push((lsn, start_offset, end_offset, false));
} }
BlobFlag::ReplaceAll => { BlobFlag::ReplaceAll => {
let blobs_for_key = self.blobs.entry(key).or_default(); let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear(); blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset)); blobs_for_key.push((lsn, start_offset, end_offset, true));
} }
BlobFlag::Ignore => {} BlobFlag::Ignore => {}
} }
@@ -436,11 +438,17 @@ impl VectoredReadPlanner {
let mut reads = Vec::new(); let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs { for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key { for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
let extended = match &mut current_read_builder { let extended = match &mut current_read_builder {
Some(read_builder) => { Some(read_builder) => read_builder.extend(
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn }) start_offset,
} end_offset,
BlobMeta {
key,
lsn,
will_init,
},
),
None => VectoredReadExtended::No, None => VectoredReadExtended::No,
}; };
@@ -448,7 +456,11 @@ impl VectoredReadPlanner {
let next_read_builder = VectoredReadBuilder::new( let next_read_builder = VectoredReadBuilder::new(
start_offset, start_offset,
end_offset, end_offset,
BlobMeta { key, lsn }, BlobMeta {
key,
lsn,
will_init,
},
self.max_read_size, self.max_read_size,
self.mode, self.mode,
); );
@@ -665,10 +677,19 @@ impl StreamingVectoredReadPlanner {
start_offset: u64, start_offset: u64,
end_offset: u64, end_offset: u64,
is_last_blob_in_read: bool, is_last_blob_in_read: bool,
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
) -> Option<VectoredRead> { ) -> Option<VectoredRead> {
match &mut self.read_builder { match &mut self.read_builder {
Some(read_builder) => { Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn }); let extended = read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init: false,
},
);
assert_eq!(extended, VectoredReadExtended::Yes); assert_eq!(extended, VectoredReadExtended::Yes);
} }
None => { None => {
@@ -676,7 +697,11 @@ impl StreamingVectoredReadPlanner {
Some(VectoredReadBuilder::new_streaming( Some(VectoredReadBuilder::new_streaming(
start_offset, start_offset,
end_offset, end_offset,
BlobMeta { key, lsn }, BlobMeta {
key,
lsn,
will_init: false,
},
self.mode, self.mode,
)) ))
}; };
@@ -1008,6 +1033,7 @@ mod tests {
let meta = BlobMeta { let meta = BlobMeta {
key: Key::MIN, key: Key::MIN,
lsn: Lsn(0), lsn: Lsn(0),
will_init: false,
}; };
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {