Compare commits

...

46 Commits

Author SHA1 Message Date
Vlad Lazar
3596ed43eb Investigation 2024-09-16 15:41:11 +01:00
Vlad Lazar
2b35f11fac More info 2024-09-16 13:01:00 +01:00
Vlad Lazar
c390bd1edd Assert on mismatched lsn ranges 2024-09-16 12:42:09 +01:00
Christian Schwarz
c108872a36 hypothesis 2024-09-15 15:19:04 +01:00
Christian Schwarz
588e6dd1ef include lsn in error msg 2024-09-15 14:01:04 +01:00
Christian Schwarz
2fafe47e09 better error message 2024-09-15 13:45:10 +01:00
Christian Schwarz
e6e72e8b68 log by invocation 2024-09-15 13:38:07 +01:00
Christian Schwarz
2a0695e01a log always, but at debug 2024-09-15 13:28:47 +01:00
Christian Schwarz
af6c168265 debug-log the buggy key 2024-09-15 13:12:25 +01:00
Christian Schwarz
1b3209497f image_layer: don't do .rev(), it's confusing and unlike what we did before 2024-09-15 12:58:13 +01:00
Christian Schwarz
9f97523d0d dump full missing keyspace 2024-09-15 12:40:41 +01:00
Christian Schwarz
33196c90bc todo! 2024-09-15 12:32:17 +01:00
Christian Schwarz
c72c5df922 make sure get_cached_lsn returns what it returned before 2024-09-15 12:20:58 +01:00
Christian Schwarz
4c7599e8df Revert "address the "This is silly" comment"
This reverts commit c736f9d6ef.
2024-09-15 12:18:26 +01:00
Christian Schwarz
4b2b0a24da Revert "WIP better typed"
This reverts commit ef5a95010b.
2024-09-15 12:18:21 +01:00
Christian Schwarz
ef5a95010b WIP better typed 2024-09-15 12:18:15 +01:00
Christian Schwarz
c736f9d6ef address the "This is silly" comment 2024-09-14 15:38:46 +00:00
Christian Schwarz
adc798b59e explicitly pass will_init 2024-09-14 15:15:48 +00:00
Christian Schwarz
f0668a7a4d cargo fmt 2024-09-14 15:15:48 +00:00
Christian Schwarz
6d73642a93 hyptohesis of what's wrong 2024-09-14 14:26:52 +00:00
Christian Schwarz
9012a18fa1 Revert "WIP"
This reverts commit 709a6ad29b.
2024-09-14 14:18:39 +00:00
Christian Schwarz
a6a7550bb4 add some debugging that didn't lead anywhere 2024-09-14 14:18:02 +00:00
Christian Schwarz
10556f25df fix double .remove() on InMemoryLayer read error (not the bug) 2024-09-14 13:41:36 +00:00
Christian Schwarz
f54cf567ff implement io concurrency control (serial / parallel). bug still happens 2024-09-14 13:22:28 +00:00
Christian Schwarz
4303914681 Revert "rule out empty keyspace bug" (it wasn't the bug)
This reverts commit 539225d792.
2024-09-14 13:02:20 +00:00
Christian Schwarz
539225d792 rule out empty keyspace bug 2024-09-14 13:02:06 +00:00
Christian Schwarz
c118736c9c draw-timeline-dir: ability to draw line 2024-09-14 12:48:23 +00:00
Christian Schwarz
3f85246a42 4096 queuedepth in pagebench get-page-latest-lsn 2024-09-14 12:06:57 +00:00
Christian Schwarz
709a6ad29b WIP 2024-09-14 12:05:42 +00:00
Vlad Lazar
3640824553 fixup: incorrect assumption on img layer need 2024-09-12 23:04:58 +01:00
Vlad Lazar
fea1f34f6a fixup: serialize again in image layer 2024-09-12 23:04:10 +01:00
Vlad Lazar
5d40d1ccdd fixup: order of waiters in delta layer 2024-09-12 23:04:01 +01:00
Vlad Lazar
b2cb10590e fixup: deserialize shenanigans 2024-09-12 20:00:06 +01:00
Vlad Lazar
2923fd2a5b fixup: remove stale import 2024-09-12 19:25:46 +01:00
Vlad Lazar
2a5336b9ab fixup image deserialization 2024-09-12 19:24:41 +01:00
Christian Schwarz
6f20726610 Merge remote-tracking branch 'origin/hackaneon/lisbon24/superscalar-page_service--problame/evaluate-debouncer' into hackaneon/lisbon24/superscalar-page_service 2024-09-12 17:47:16 +00:00
Christian Schwarz
29f741e1e9 debounce: actually issue vectored get 2024-09-12 17:46:30 +00:00
Vlad Lazar
2b37a40079 Materialize future ios 2024-09-12 18:25:17 +01:00
Vlad Lazar
af2b65a2fb Rework issuing of IOs on read path 2024-09-12 16:42:35 +01:00
Christian Schwarz
5d194c7824 debounce: bounce if shard or effective request_lsn differ 2024-09-12 14:20:32 +00:00
Christian Schwarz
ac2702afd3 deboucner: move decoding into debounce loop 2024-09-12 10:58:09 +00:00
Christian Schwarz
88fd46d795 sketch interface 2024-09-12 11:35:00 +01:00
Christian Schwarz
2d6763882e pagebench: fake queue depth of 10 2024-09-12 11:35:00 +01:00
Christian Schwarz
c0c23cde72 debouncer 2024-09-12 11:35:00 +01:00
Christian Schwarz
942bc9544b fixup 2024-09-11 20:04:39 +00:00
Christian Schwarz
02b7cdb305 HACK: instrument page_service to count nonblocking consecutive getpage requests 2024-09-11 19:25:19 +01:00
16 changed files with 1182 additions and 977 deletions

View File

@@ -1,6 +1,6 @@
use std::pin::Pin;
use std::{pin::Pin, sync::Arc};
use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
@@ -10,7 +10,6 @@ use pageserver_api::{
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -136,18 +135,68 @@ impl PagestreamClient {
drop(copy_both);
}
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
) -> anyhow::Result<PagestreamGetPageResponse> {
pub fn split(self) -> (PagestreamTx, PagestreamRx) {
let Self {
copy_both,
cancel_on_client_drop,
conn_task,
} = self;
let keep_client_alive = KeepClientAlive {
client: conn_task,
cancel_on_client_drop: cancel_on_client_drop.unwrap(),
};
let keep_client_alive = Arc::new(keep_client_alive);
let (sink, stream): (
futures::stream::SplitSink<
Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
bytes::Bytes,
>,
futures::stream::SplitStream<Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>>,
) = copy_both.split();
(
PagestreamTx {
sink,
keep_client_alive: keep_client_alive.clone(),
},
PagestreamRx {
stream,
keep_client_alive,
},
)
}
}
struct KeepClientAlive {
client: JoinHandle<()>,
cancel_on_client_drop: tokio_util::sync::DropGuard,
}
pub struct PagestreamTx {
sink: futures::stream::SplitSink<
Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
bytes::Bytes,
>,
keep_client_alive: Arc<KeepClientAlive>,
}
pub struct PagestreamRx {
stream: futures::stream::SplitStream<Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>>,
keep_client_alive: Arc<KeepClientAlive>,
}
impl PagestreamTx {
pub async fn send_getpage(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
let mut req = tokio_stream::once(Ok(req.clone()));
self.sink.send_all(&mut req).await?;
Ok(())
}
}
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
impl PagestreamRx {
pub async fn recv_getpage(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;

View File

@@ -108,6 +108,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
enum LineKind {
GcCutoff,
Branch,
KeyVertical,
}
impl From<LineKind> for Fill {
@@ -115,6 +116,7 @@ impl From<LineKind> for Fill {
match value {
LineKind::GcCutoff => Fill::Color(rgb(255, 0, 0)),
LineKind::Branch => Fill::Color(rgb(0, 255, 0)),
LineKind::KeyVertical => Fill::Color(rgb(0, 0, 255)),
}
}
}
@@ -126,6 +128,7 @@ impl FromStr for LineKind {
Ok(match s {
"gc_cutoff" => LineKind::GcCutoff,
"branch" => LineKind::Branch,
"key" => LineKind::KeyVertical,
_ => anyhow::bail!("unsupported linekind: {s}"),
})
}
@@ -142,25 +145,31 @@ pub fn main() -> Result<()> {
let stdin = io::stdin();
let mut lines: Vec<(Lsn, LineKind)> = vec![];
let mut vertical_lines: Vec<(Key, LineKind)> = vec![];
for (lineno, line) in stdin.lock().lines().enumerate() {
let lineno = lineno + 1;
let line = line.unwrap();
if let Some((kind, lsn)) = line.split_once(':') {
let (kind, lsn) = LineKind::from_str(kind)
.context("parse kind")
.and_then(|kind| {
if lsn.contains('/') {
Lsn::from_str(lsn)
} else {
Lsn::from_hex(lsn)
if let Some((kind, what)) = line.split_once(':') {
(|| {
match LineKind::from_str(kind).context("parse kind")? {
kind @ LineKind::Branch | kind @ LineKind::GcCutoff => {
let lsn = if what.contains('/') {
Lsn::from_str(what)?
} else {
Lsn::from_hex(what)?
};
lines.push((lsn, kind));
}
.map(|lsn| (kind, lsn))
.context("parse lsn")
})
.with_context(|| format!("parse {line:?} on {lineno}"))?;
lines.push((lsn, kind));
kind @ LineKind::KeyVertical => {
let key = Key::from_hex(what).context("parse key")?;
vertical_lines.push((key, kind));
}
}
anyhow::Ok(())
})()
.with_context(|| format!("parse {line:?} on {lineno}"))?;
continue;
}
let line = PathBuf::from_str(&line).unwrap();
@@ -175,7 +184,7 @@ pub fn main() -> Result<()> {
}
// Collect all coordinates
let mut keys: Vec<Key> = Vec::with_capacity(files.len());
let mut keys: Vec<Key> = Vec::with_capacity(files.len() + vertical_lines.len());
let mut lsns: Vec<Lsn> = Vec::with_capacity(files.len() + lines.len());
for Layer {
@@ -192,6 +201,8 @@ pub fn main() -> Result<()> {
lsns.extend(lines.iter().map(|(lsn, _)| *lsn));
keys.extend(vertical_lines.iter().map(|(key, _)| *key));
// Analyze
let key_map = build_coordinate_compression_map(keys);
let lsn_map = build_coordinate_compression_map(lsns);
@@ -283,6 +294,25 @@ pub fn main() -> Result<()> {
);
}
for (key, kind) in vertical_lines {
let key = *key_map.get(&key).unwrap();
let stretch = 2.0;
let xmargin = 0.05;
let ymargin = 0.05;
let lsn_diff = 0.3;
let lsn_offset = -lsn_diff / 2.0;
println!(
"{}",
rectangle(
5.0 + key as f32,
0.0,
(key_map.len() + 10) as f32,
lsn_map.len() as f32,
)
.fill(kind)
);
}
println!("{}", EndSvg);
eprintln!("num_images: {}", num_images);

View File

@@ -13,7 +13,7 @@ use rand::prelude::*;
use tokio::task::JoinSet;
use tracing::info;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
@@ -295,64 +295,58 @@ async fn main_impl(
.await
.unwrap();
let (mut pagestream_tx, mut pagestream_rx) = client.split();
start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
let periods_passed_until_now =
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
let (rq_tx, mut rq_rx) = tokio::sync::mpsc::channel(4096);
let sender = tokio::spawn(async move {
while !cancel.is_cancelled() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
pagestream_tx.send_getpage(req).await.unwrap();
rq_tx.send(start).await.unwrap();
}
});
let receiver = tokio::spawn(async move {
while let Some(start) = rq_rx.recv().await {
let response = pagestream_rx.recv_getpage().await.unwrap();
let end = Instant::now();
live_stats.request_done();
STATS.with(|stats| {
stats
.borrow()
.lock()
.unwrap()
.observe(end.duration_since(start))
.unwrap();
if periods_passed_until_now > ticks_processed {
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
}
ticks_processed = periods_passed_until_now;
});
}
});
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;
STATS.with(|stats| {
stats
.borrow()
.lock()
.unwrap()
.observe(end.duration_since(start))
.unwrap();
});
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
sender.await.unwrap();
receiver.await.unwrap();
})
};

View File

@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
}
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
for _ in 0..self.count {
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
}
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
ctx,
start,
op,
count,
})
}
}
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
Lazy::new(|| {
register_histogram!(
"pageserver_consecutive_nonblocking_getpage_requests",
"Number of consecutive nonblocking getpage requests",
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
)
.unwrap()
});
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALIZE.lock().unwrap();

View File

@@ -5,14 +5,14 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use once_cell::sync::OnceCell;
use pageserver_api::models::TenantState;
use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse, PagestreamProtocolVersion,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -43,7 +43,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics;
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -577,124 +577,326 @@ impl PageServerHandler {
}
}
loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
let mut batched = None;
'outer: loop {
enum DebouncedFeMessage {
Exists(models::PagestreamExistsRequest),
Nblocks(models::PagestreamNblocksRequest),
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
},
DbSize(models::PagestreamDbSizeRequest),
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
RespondError(Span, PageStreamError),
}
let mut debounce: Option<std::time::Instant> = None;
// return or `?` on protocol error
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
let next_batched: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
.into()
});
let sleep_fut = if let Some(started_at) = debounce {
futures::future::Either::Left(tokio::time::sleep_until(
(started_at + *BOUNCE_TIMEOUT).into(),
))
} else {
futures::future::Either::Right(futures::future::pending())
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batched.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty, batch_id = tracing::field::Empty);
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
// check if we can debounce
match (&mut batched, this_msg) {
(None, this_msg) => {
batched = Some(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
DebouncedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if (*accum_lsn != this_lsn) {
return false;
}
return true;
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break None;
}
};
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let (handler_results, span): (
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
_,
) = match batched.take().expect("loop above ensures this") {
DebouncedFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
DebouncedFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
PagestreamFeMessage::GetPage(req) => {
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
span.record("batch_size", pages.len() as u64);
static BATCH_ID: Lazy<std::sync::atomic::AtomicUsize> =
Lazy::new(|| std::sync::atomic::AtomicUsize::new(0));
span.record(
"batch_id",
BATCH_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as u64,
);
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
{
let npages = pages.len();
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
PagestreamFeMessage::DbSize(req) => {
DebouncedFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
DebouncedFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
smallvec::smallvec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx
)
.instrument(span.clone())
.await,
.await
],
span,
)
}
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(smallvec::smallvec![Err(e)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
@@ -706,6 +908,9 @@ impl PageServerHandler {
res?;
}
}
assert!(batched.is_none(), "we take() earlier");
batched = next_batched;
}
Ok(())
}
@@ -949,60 +1154,30 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_page_at_lsn_request(
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
)
.await?;
);
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
let pages = timeline
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
.await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::Read)
}))
}
@@ -1499,3 +1674,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
);
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}

View File

@@ -9,12 +9,17 @@
use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -191,26 +196,184 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let pages = smallvec::smallvec![(tag, blknum)];
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
version: Version<'_>,
ctx: &RequestContext,
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let request_lsn = match version {
Version::Lsn(lsn) => lsn,
Version::Modified(_) => panic!("unsupported"),
};
enum KeyState {
NeedsVectoredGet,
Done(Result<Bytes, PageReconstructError>),
}
let mut key_states = BTreeMap::new();
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(pages.len());
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
let key = rel_block_to_key(tag, blknum);
use std::collections::btree_map::Entry;
let key_state_slot = match key_states.entry((key, response_order)) {
Entry::Occupied(_entry) => unreachable!(
"enumerate makes keys unique, even if batch contains same key twice"
),
Entry::Vacant(entry) => entry,
};
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
if tag.relnode == 0 {
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
))));
continue;
}
let nblocks = match self.get_rel_size(tag, version, ctx).await {
Ok(nblocks) => nblocks,
Err(err) => {
key_state_slot.insert(KeyState::Done(Err(err)));
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
continue;
}
vectored_gets.push(key);
key_state_slot.insert(KeyState::NeedsVectoredGet);
}
// turn vectored_gets into a keyspace
let keyspace = {
// add_key reuqires monotonicity
vectored_gets.sort_unstable();
let mut acc = KeySpaceAccum::new();
for key in vectored_gets
.into_iter()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, request_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
if let Err(err) = &res {
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
}
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
let first_interest = interests.next().unwrap();
let next_interest = interests.peek().is_some();
if !next_interest {
match first_interest.1 {
KeyState::NeedsVectoredGet => {
*first_interest.1 = KeyState::Done(res);
}
KeyState::Done(_) => unreachable!(),
}
continue;
} else {
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
match state {
KeyState::NeedsVectoredGet => {
*state = KeyState::Done(match &res {
Ok(buf) => Ok(buf.clone()),
// this `match` is working around the fact that we cannot Clone the PageReconstructError
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
});
}
KeyState::Done(_) => unreachable!(),
}
}
}
}
}
Err(err) => {
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
for ((_, _), state) in key_states.iter_mut() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
match &err {
GetVectoredError::Cancelled => {
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
)));
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
*state =
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
*state = KeyState::Done(Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into()));
}
}
}
}
};
// get the results into the order in which they were requested
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
smallvec::SmallVec::with_capacity(key_states.len());
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
return_order.sort_unstable_by_key(|(_, idx)| *idx);
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
res.extend(return_order.into_iter().map(|key_states_key| {
match key_states.remove(&key_states_key).unwrap() {
KeyState::Done(res) => res,
KeyState::NeedsVectoredGet => unreachable!(),
}
}));
res
}
// Get size of a database in blocks

View File

@@ -73,6 +73,21 @@ impl ValueBytes {
Ok(raw[8] == 1)
}
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
Ok(false)
}
}
#[cfg(test)]

View File

@@ -1176,6 +1176,32 @@ mod tests {
}
}
#[test]
fn vlad_test() {
let layers = vec![
LayerDesc {
key_range: Key::from_i128(0)..Key::from_i128(100),
lsn_range: Lsn(0)..Lsn(100),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(20)..Key::from_i128(30),
lsn_range: Lsn(10)..Lsn(50),
is_delta: false,
},
];
let layer_map = create_layer_map(layers.clone());
let range = Key::from_i128(0)..Key::from_i128(100);
let result = layer_map.range_search(range.clone(), Lsn(100));
let expected = brute_force_range_search(&layer_map, range, Lsn(100));
eprintln!("result: {result:?}");
assert_range_search_result_eq(result, expected);
}
#[test]
fn layer_visibility_basic() {
// A simple synthetic input, as a smoke test.

View File

@@ -8,15 +8,18 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
use tokio::sync::{self};
use tracing::{debug, Instrument};
use utils::bin_ser::BeSer;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::repository::{Value, ValueBytes};
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::key::{Key, DBDIR_KEY};
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use std::cmp::{Ordering, Reverse};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
@@ -79,30 +82,57 @@ pub(crate) enum ValueReconstructSituation {
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
pub(crate) records: Vec<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
pub(crate) will_init_lsn: Option<Lsn>,
situation: ValueReconstructSituation,
pub(crate) situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
fn get_cached_lsn(&self) -> Option<Lsn> {
self.img.as_ref().map(|img| img.0)
self.will_init_lsn
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
pub(crate) async fn convert(
_key: Key,
from: VectoredValueReconstructState,
) -> Result<ValueReconstructState, PageReconstructError> {
let mut to = ValueReconstructState::default();
ValueReconstructState {
records: state.records,
img: state.img,
for (lsn, fut) in from.records {
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
match value {
Value::WalRecord(rec) => {
to.records.push((lsn, rec));
}
Value::Image(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
}
}
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
Ok(to)
}
/// Bag of data accumulated during a vectored get..
@@ -119,6 +149,47 @@ pub(crate) struct ValuesReconstructState {
// Statistics that are still accessible as a caller of `get_vectored_impl`.
layers_visited: u32,
delta_layers_visited: u32,
io_concurrency: IoConcurrency,
}
enum IoConcurrency {
Serial {
prev_io: Option<(usize, tokio::task::JoinHandle<()>)>,
},
Parallel,
}
impl IoConcurrency {
pub(crate) fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
static IO_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let io_id = IO_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span = tracing::debug_span!("spawned_io", io_id,);
match self {
IoConcurrency::Serial { prev_io } => {
let prev = prev_io.take();
*prev_io = Some((
io_id,
tokio::spawn(
async move {
if let Some((prev_id, prev_task)) = prev {
debug!(prev_io = prev_id, "Waiting for previous IO to complete");
prev_task.await.unwrap();
}
fut.await;
}
.instrument(span),
),
));
}
IoConcurrency::Parallel => {
tokio::spawn(fut);
}
}
}
}
impl ValuesReconstructState {
@@ -129,9 +200,30 @@ impl ValuesReconstructState {
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency: {
static IO_CONCURRENCY: once_cell::sync::Lazy<String> =
once_cell::sync::Lazy::new(|| {
std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY").unwrap()
});
match IO_CONCURRENCY.as_str() {
"parallel" => IoConcurrency::Parallel,
"serial" => IoConcurrency::Serial { prev_io: None },
x => panic!(
"Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}",
x
),
}
},
}
}
pub(crate) fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
self.io_concurrency.spawn_io(fut);
}
/// Associate a key with the error which it encountered and mark it as done
pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
@@ -200,7 +292,8 @@ impl ValuesReconstructState {
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
completes: bool,
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
) -> ValueReconstructSituation {
let state = self
.keys
@@ -208,31 +301,16 @@ impl ValuesReconstructState {
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
ValueReconstructSituation::Continue => {
state.records.push((lsn, value));
}
}
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
if completes && state.situation == ValueReconstructSituation::Continue {
assert_eq!(state.will_init_lsn, None);
state.will_init_lsn = Some(lsn);
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
@@ -311,6 +389,7 @@ pub(crate) struct LayerFringe {
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
lsn_range: Range<Lsn>,
}
impl LayerFringe {
@@ -335,6 +414,7 @@ impl LayerFringe {
LayerKeyspace {
layer,
mut target_keyspace,
..
},
)) => Some((
layer,
@@ -355,11 +435,31 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
// On this branch, we don't add to planned_reads_by_lsn
// even though we might be interested in a different lsn_range.
entry.get_mut().target_keyspace.add_keyspace(keyspace);
if lsn_range != entry.get().lsn_range {
tracing::error!(
"LSN range assumption violated for layer: {:?}, lsn_range: {:?}",
layer_id,
lsn_range
);
let fringe = self.planned_reads_by_lsn.clone().into_sorted_vec();
for read in fringe {
tracing::error!(
"layer: {:?}, lsn_range: {:?}",
read.layer_id,
read.lsn_range
);
}
panic!("LSN range assumption violated");
}
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
@@ -367,6 +467,7 @@ impl LayerFringe {
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
lsn_range,
});
}
}

View File

@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::sync::{self, OnceCell};
use tokio_epoll_uring::IoBuf;
use tracing::*;
@@ -224,7 +223,7 @@ pub struct DeltaLayerInner {
index_start_blk: u32,
index_root_blk: u32,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
layer_key_range: Range<Key>,
@@ -788,9 +787,11 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
@@ -841,6 +842,7 @@ impl DeltaLayerInner {
// can be further optimised to visit the index only once.
pub(super) async fn get_values_reconstruct_data(
&self,
self_desc: PersistentLayerDesc,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
@@ -863,6 +865,7 @@ impl DeltaLayerInner {
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
self_desc,
&keyspace,
lsn_range.clone(),
data_end_offset,
@@ -883,6 +886,7 @@ impl DeltaLayerInner {
}
async fn plan_reads<Reader>(
self_desc: PersistentLayerDesc,
keyspace: &KeySpace,
lsn_range: Range<Lsn>,
data_end_offset: u64,
@@ -911,6 +915,8 @@ impl DeltaLayerInner {
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
debug!(file = %self_desc.layer_name(), %key, %lsn, will_init = blob_ref.will_init(), "delta layer found key");
// Lsns are not monotonically increasing across keys, so we don't assert on them.
assert!(key >= range.start);
@@ -933,7 +939,7 @@ impl DeltaLayerInner {
range_end_handled = true;
break;
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
planner.handle(key, lsn, blob_ref.pos(), flag, blob_ref.will_init());
}
}
@@ -980,77 +986,59 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
let max_vectored_read_bytes = self
.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
let mut senders: HashMap<
(Key, Lsn),
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
let (tx, rx) = sync::oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
rx,
);
}
buf = Some(blobs_buf.buf);
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
reconstruct_state.spawn_io(async move {
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
});
}
}
@@ -1190,7 +1178,14 @@ impl DeltaLayerInner {
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
Some((
BlobMeta {
key,
lsn,
will_init: false,
},
start_offset..end_offset,
))
} else {
None
};
@@ -1583,78 +1578,6 @@ pub(crate) mod test {
};
use bytes::Bytes;
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
/// verify that the traversal fed the right index key and value
/// pairs into the planner.
#[tokio::test]
async fn test_delta_layer_index_traversal() {
let base_key = Key {
field1: 0,
field2: 1663,
field3: 12972,
field4: 16396,
field5: 0,
field6: 246080,
};
// Populate the index with some entries
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
]);
let mut disk = TestDisk::default();
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
let mut disk_offset = 0;
for (key, lsns) in &entries {
for lsn in lsns {
let index_key = DeltaKey::from_key_lsn(key, *lsn);
let blob_ref = BlobRef::new(disk_offset, false);
writer
.append(&index_key.0, blob_ref.0)
.expect("In memory disk append should never fail");
disk_offset += 1;
}
}
// Prepare all the arguments for the call into `plan_reads` below
let (root_offset, _writer) = writer
.finish()
.expect("In memory disk finish should never fail");
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
let planner = VectoredReadPlanner::new(100);
let mut reconstruct_state = ValuesReconstructState::new();
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let keyspace = KeySpace {
ranges: vec![
base_key..base_key.add(3),
base_key.add(3)..base_key.add(100),
],
};
let lsn_range = Lsn(2)..Lsn(40);
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
&keyspace,
lsn_range.clone(),
disk_offset,
reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await
.expect("Read planning should not fail");
validate(keyspace, lsn_range, vectored_reads, entries);
}
fn validate(
keyspace: KeySpace,
lsn_range: Range<Lsn>,
@@ -1832,102 +1755,6 @@ pub(crate) mod test {
keyspace
}
#[tokio::test]
async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
let (tenant, ctx) = harness.load().await;
let timeline_id = TimelineId::generate();
let timeline = tenant
.create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
.await?;
tracing::info!("Generating test data ...");
let rng = &mut StdRng::seed_from_u64(0);
let entries = generate_entries(rng);
let entries_meta = get_entries_meta(&entries);
tracing::info!("Done generating {} entries", entries.len());
tracing::info!("Writing test data to delta layer ...");
let mut writer = DeltaLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&ctx,
)
.await?;
for entry in entries {
let (_, res) = writer
.put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
.await;
res?;
}
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
let inner = resident.get_as_delta(&ctx).await?;
let file_size = inner.file.metadata().await?.len();
tracing::info!(
"Done writing test data to delta layer. Resulting file size is: {}",
file_size
);
for i in 0..constants::READS_COUNT {
tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
block_reader,
);
let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
let mut reconstruct_state = ValuesReconstructState::new();
let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
let vectored_reads = DeltaLayerInner::plan_reads(
&keyspace,
entries_meta.lsn_range.clone(),
data_end_offset,
index_reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await?;
let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
let buf_size = DeltaLayerInner::get_min_read_buffer_size(
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));
for read in vectored_reads {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
for meta in blobs_buf.blobs.iter() {
let value = &blobs_buf.buf[meta.start..meta.end];
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
}
buf = Some(blobs_buf.buf);
}
}
Ok(())
}
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use crate::walrecord::NeonWalRecord;

View File

@@ -21,7 +21,7 @@
//!
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the
//! beginningof the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
@@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::Timeline;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -163,7 +164,7 @@ pub struct ImageLayerInner {
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
@@ -390,9 +391,11 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
@@ -438,12 +441,13 @@ impl ImageLayerInner {
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
&self,
self_desc: PersistentLayerDesc,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, None, ctx)
.plan_reads(self_desc, keyspace, None, ctx)
.await
.map_err(GetVectoredError::Other)?;
@@ -462,6 +466,7 @@ impl ImageLayerInner {
/// this shard.
async fn plan_reads(
&self,
self_desc: PersistentLayerDesc,
keyspace: KeySpace,
shard_identity: Option<&ShardIdentity>,
ctx: &RequestContext,
@@ -505,12 +510,14 @@ impl ImageLayerInner {
BlobFlag::None
};
debug!(file = %self_desc.layer_name(), %key, %self.lsn, will_init=true, "image layer found key");
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
break;
} else {
planner.handle(key, self.lsn, offset, flag);
planner.handle(key, self.lsn, offset, flag, true);
}
}
@@ -534,6 +541,7 @@ impl ImageLayerInner {
// Fragment the range into the regions owned by this ShardIdentity
let plan = self
.plan_reads(
todo!(),
KeySpace {
// If asked for the total key space, plan_reads will give us all the keys in the layer
ranges: vec![Key::MIN..Key::MAX],
@@ -579,8 +587,16 @@ impl ImageLayerInner {
.0
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() {
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
}
let buf_size = read.size();
if buf_size > max_vectored_read_bytes {
@@ -599,36 +615,36 @@ impl ImageLayerInner {
);
}
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
reconstruct_state.spawn_io(async move {
let buf = BytesMut::with_capacity(buf_size);
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
// TODO: this is silly - sort it out
let bytes = Value::ser(&Value::Image(Bytes::copy_from_slice(buf)))
.expect("stupid but correct");
let _ = sender.send(Ok(bytes.into()));
}
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
}
};
});
}
}

View File

@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
pub(crate) mod vectored_dio_read;
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
resource_units: GlobalResourceUnits,
}
@@ -381,7 +378,11 @@ impl InMemoryLayer {
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
self.inner
.try_read()
.map(|i| i.file.try_read().map(|i| i.len()).ok())
.ok()
.flatten()
}
pub(crate) fn assert_writable(&self) {
@@ -432,6 +433,10 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut senders: HashMap<
(Key, Lsn),
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -459,6 +464,13 @@ impl InMemoryLayer {
Vec::with_capacity(len as usize),
),
});
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
debug!(%key, %entry_lsn, will_init, "inmemory layer found key");
if will_init {
break;
}
@@ -466,46 +478,39 @@ impl InMemoryLayer {
}
}
// Execute the reads.
let read_from = inner.file.clone();
let read_ctx = ctx.attached_child();
reconstruct_state.spawn_io(async move {
let locked = read_from.read().await;
let f = vectored_dio_read::execute(
&*locked,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&read_ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
match read.into_result().expect("we run execute() above") {
Err(e) => {
let _ = sender
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
}
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
Ok(value_buf) => {
let _ = sender.send(Ok(value_buf.into()));
}
// process the next value in the next iteration of the loop
}
}
}
}
assert!(senders.is_empty());
});
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
@@ -600,7 +605,8 @@ impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
let locked = inner.file.try_read().expect("no contention");
Ok(locked.len())
}
/// Create a new, empty, in-memory layer
@@ -614,9 +620,10 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
let file = Arc::new(tokio::sync::RwLock::new(
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
));
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
Ok(InMemoryLayer {
file_id: key,
@@ -648,7 +655,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.read().await.len();
let SerializedBatch {
raw,
@@ -672,8 +679,13 @@ impl InMemoryLayer {
}
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// FIXME: can't borrow arc
let new_size = {
let mut locked = inner.file.write().await;
locked.write_raw(&raw, ctx).await?;
locked.len()
};
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
@@ -713,7 +725,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
let size = inner.file.read().await.len();
inner.resource_units.publish_size(size)
}
@@ -809,7 +821,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
let file_contents = Bytes::from(file_contents);

View File

@@ -1755,11 +1755,17 @@ impl DownloadedLayer {
.map_err(GetVectoredError::Other)?
{
Delta(d) => {
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
.await
d.get_values_reconstruct_data(
owner.desc.clone(),
keyspace,
lsn_range,
reconstruct_data,
ctx,
)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
i.get_values_reconstruct_data(owner.desc.clone(), keyspace, reconstruct_data, ctx)
.await
}
}

View File

@@ -19,182 +19,6 @@ const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
/// timeout uses to advance futures.
const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
#[tokio::test]
async fn smoke_test() {
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("smoke_test").await.unwrap();
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let (tenant, _) = h.load().await;
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
layers.swap_remove(0)
};
// all layers created at pageserver are like `layer`, initialized with strong
// Arc<DownloadedLayer>.
let controlfile_keyspace = KeySpace {
ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
};
let img_before = {
let mut data = ValuesReconstructState::default();
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.await
.unwrap();
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
// important part is evicting the layer, which can be done when there are no more ResidentLayer
// instances -- there currently are none, only two `Layer` values, one in the layermap and on
// in scope.
layer.evict_and_wait(FOREVER).await.unwrap();
// double-evict returns an error, which is valid if both eviction_task and disk usage based
// eviction would both evict the same layer at the same time.
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
assert!(matches!(e, EvictionError::NotFound));
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValuesReconstructState::default();
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.instrument(download_span.clone())
.await
.unwrap();
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
assert_eq!(img_before, img_after);
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
//
// ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
// artificially slow it down.
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
match layer
.evict_and_wait(std::time::Duration::ZERO)
.await
.unwrap_err()
{
EvictionError::Timeout => {
// expected, but note that the eviction is "still ongoing"
helper.release().await;
// exhaust spawn_blocking pool to ensure it is now complete
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
.await;
}
other => unreachable!("{other:?}"),
}
// only way to query if a layer is resident is to acquire a ResidentLayer instance.
// Layer::keep_resident never downloads, but it might initialize if the layer file is found
// downloaded locally.
let none = layer.keep_resident().await;
assert!(
none.is_none(),
"Expected none, because eviction removed the local file, found: {none:?}"
);
// plain downloading is rarely needed
layer
.download_and_keep_resident()
.instrument(download_span)
.await
.unwrap();
// last important part is deletion on drop: gc and compaction use it for compacted L0 layers
// or fully garbage collected layers. deletion means deleting the local file, and scheduling a
// deletion of the already unlinked from index_part.json remote file.
//
// marking a layer to be deleted on drop is irreversible; there is no technical reason against
// reversiblity, but currently it is not needed so it is not provided.
layer.delete_on_drop();
let path = layer.local_path().to_owned();
// wait_drop produces an unconnected to Layer future which will resolve when the
// LayerInner::drop has completed.
let mut wait_drop = std::pin::pin!(layer.wait_drop());
// paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
// until here
tokio::time::pause();
tokio::time::timeout(ADVANCE, &mut wait_drop)
.await
.expect_err("should had timed out because two strong references exist");
tokio::fs::metadata(&path)
.await
.expect("the local layer file still exists");
let rtc = &timeline.remote_client;
{
let layers = &[layer];
let mut g = timeline.layers.write().await;
g.open_mut().unwrap().finish_gc_timeline(layers);
// this just updates the remote_physical_size for demonstration purposes
rtc.schedule_gc_update(layers).unwrap();
}
// when strong references are dropped, the file is deleted and remote deletion is scheduled
wait_drop.await;
let e = tokio::fs::metadata(&path)
.await
.expect_err("the local file is deleted");
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
rtc.wait_completion().await.unwrap();
assert_eq!(rtc.get_remote_physical_size(), 0);
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
}
/// This test demonstrates a previous hang when a eviction and deletion were requested at the same
/// time. Now both of them complete per Arc drop semantics.
#[tokio::test(start_paused = true)]

View File

@@ -18,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
@@ -48,7 +49,6 @@ use utils::{
sync::gate::{Gate, GateGuard},
};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
@@ -62,13 +62,16 @@ use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use std::{pin::pin, sync::atomic::AtomicUsize};
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
storage_layer::{
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
},
},
walredo,
};
@@ -565,7 +568,7 @@ impl From<layer_manager::Shutdown> for GetVectoredError {
#[derive(thiserror::Error)]
pub struct MissingKeyError {
key: Key,
keys: KeySpace,
shard: ShardNumber,
cont_lsn: Lsn,
request_lsn: Lsn,
@@ -583,8 +586,8 @@ impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"could not find data for key {} (shard {:?}) at LSN {}, request LSN {}",
self.key, self.shard, self.cont_lsn, self.request_lsn
"could not find data for keys (shard {:?}) at LSN {}, request LSN {} keys {}",
self.shard, self.cont_lsn, self.request_lsn, self.keys
)?;
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
write!(f, ", ancestor {}", ancestor_lsn)?;
@@ -952,7 +955,11 @@ impl Timeline {
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
keys: {
let mut accum = KeySpaceAccum::new();
accum.add_key(key);
accum.to_keyspace()
},
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
@@ -1122,29 +1129,53 @@ impl Timeline {
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
static INVOCATION: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
let invocation = INVOCATION.fetch_add(1, AtomicOrdering::Relaxed);
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
.await?;
.instrument(debug_span!("get_vectored_reconstruct_data", invocation))
.await
.map_err(|err| {
anyhow::anyhow!(
"get_vectored_reconstruct_data invocation {invocation} lsn={lsn} keyspace={keyspace} {err:?}",
)
})?;
get_data_timer.stop_and_record();
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
let futs = FuturesUnordered::new();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
async move {
let state = res.expect("Read path is infallible");
assert!(matches!(
state.situation,
ValueReconstructSituation::Complete
));
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
let converted = match convert(key, state).await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
reconstruct_timer.stop_and_record();
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -3120,7 +3151,7 @@ impl Timeline {
if let Some(missing_keyspace) = missing_keyspace {
return Err(GetVectoredError::MissingKey(MissingKeyError {
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
keys: missing_keyspace.clone(),
shard: self
.shard_identity
.get_shard_number(&missing_keyspace.start().unwrap()),
@@ -5496,30 +5527,30 @@ impl Timeline {
#[cfg(test)]
pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>,
lsn: Lsn,
ctx: &RequestContext,
_lsn: Lsn,
_ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1),
&mut reconstruct_data,
ctx,
)
.await?;
for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1));
}
}
}
all_data.sort();
Ok(all_data)
// let mut all_data = Vec::new();
// let guard = self.layers.read().await;
// for layer in guard.layer_map()?.iter_historic_layers() {
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
// let layer = guard.get_from_desc(&layer);
// let mut reconstruct_data = ValuesReconstructState::default();
// layer
// .get_values_reconstruct_data(
// KeySpace::single(Key::MIN..Key::MAX),
// lsn..Lsn(lsn.0 + 1),
// &mut reconstruct_data,
// ctx,
// )
// .await?;
// for (k, v) in reconstruct_data.keys {
// all_data.push((k, v?.img.unwrap().1));
// }
// }
// }
// all_data.sort();
Ok(Vec::new())
}
/// Get all historic layer descriptors in the layer map

View File

@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
pub will_init: bool,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
@@ -355,9 +356,10 @@ pub enum BlobFlag {
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Note: last bool is will_init
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
prev: Option<(Key, Lsn, u64, BlobFlag, bool)>,
max_read_size: usize,
@@ -392,40 +394,62 @@ impl VectoredReadPlanner {
/// This is used for WAL records that `will_init`.
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
/// if the blob is cached.
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, will_init: bool) {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
let (prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init) = match self.prev {
None => {
self.prev = Some((key, lsn, offset, flag));
self.prev = Some((key, lsn, offset, flag, will_init));
return;
}
Some(prev) => prev,
};
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
prev_flag,
prev_will_init,
);
self.prev = Some((key, lsn, offset, flag));
self.prev = Some((key, lsn, offset, flag, will_init));
}
pub fn handle_range_end(&mut self, offset: u64) {
if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init)) = self.prev {
self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
prev_flag,
prev_will_init,
);
}
self.prev = None;
}
fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
fn add_blob(
&mut self,
key: Key,
lsn: Lsn,
start_offset: u64,
end_offset: u64,
flag: BlobFlag,
will_init: bool,
) {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, will_init));
}
BlobFlag::ReplaceAll => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, will_init));
}
BlobFlag::Ignore => {}
}
@@ -436,11 +460,17 @@ impl VectoredReadPlanner {
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
let extended = match &mut current_read_builder {
Some(read_builder) => {
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
}
Some(read_builder) => read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init,
},
),
None => VectoredReadExtended::No,
};
@@ -448,7 +478,11 @@ impl VectoredReadPlanner {
let next_read_builder = VectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init,
},
self.max_read_size,
self.mode,
);
@@ -668,7 +702,15 @@ impl StreamingVectoredReadPlanner {
) -> Option<VectoredRead> {
match &mut self.read_builder {
Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
let extended = read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init: todo!(),
},
);
assert_eq!(extended, VectoredReadExtended::Yes);
}
None => {
@@ -676,7 +718,11 @@ impl StreamingVectoredReadPlanner {
Some(VectoredReadBuilder::new_streaming(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init: todo!(),
},
self.mode,
))
};
@@ -728,146 +774,6 @@ mod tests {
assert_eq!(expected_offsets_in_read, offsets_in_read);
}
#[test]
fn planner_chunked_coalesce_all_test() {
use crate::virtual_file;
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
// The test explicitly does not check chunk size < 512
if chunk_size < 512 {
return;
}
let max_read_size = chunk_size as usize * 8;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = [
(key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
(key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size / 2, BlobFlag::None),
(key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size, BlobFlag::None),
(key, lsn, chunk_size * 2 - 1, BlobFlag::None),
(key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size * 3 + 1, BlobFlag::None),
(key, lsn, chunk_size * 5 + 1, BlobFlag::None),
(key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
(key, lsn, chunk_size * 7 + 1, BlobFlag::None),
(key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
(key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
(key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
];
let ranges = [
&[
blob_descriptions[0],
blob_descriptions[2],
blob_descriptions[4],
blob_descriptions[5],
blob_descriptions[7],
blob_descriptions[8],
blob_descriptions[10],
],
&blob_descriptions[11..12],
&blob_descriptions[13..],
];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(652 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), ranges.len());
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn planner_max_read_size_test() {
let max_read_size = 128 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = vec![
(key, lsn, 0, BlobFlag::None),
(key, lsn, 32 * 1024, BlobFlag::None),
(key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
(key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
(key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
(key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
(key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
(key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
];
let ranges = [
&blob_descriptions[0..3],
&blob_descriptions[3..4],
&blob_descriptions[4..5],
&blob_descriptions[5..6],
&blob_descriptions[6..7],
&blob_descriptions[7..],
];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(652 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 6);
// TODO: could remove zero reads to produce 5 reads here
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn planner_replacement_test() {
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
let max_read_size = 128 * chunk_size as usize;
let first_key = Key::MIN;
let second_key = first_key.next();
let lsn = Lsn(0);
let blob_descriptions = vec![
(first_key, lsn, 0, BlobFlag::None), // First in read 1
(first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
(second_key, lsn, 3 * chunk_size, BlobFlag::None),
(second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
(second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
];
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(6 * chunk_size);
let reads = planner.finish();
assert_eq!(reads.len(), 2);
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn streaming_planner_max_read_size_test() {
let max_read_size = 128 * 1024;
@@ -1008,6 +914,7 @@ mod tests {
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
will_init: false,
};
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {