mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
WIP: refactor vectored read to do dio-aligned section coalesce
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -975,7 +975,7 @@ impl DeltaLayerInner {
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.join(", ");
|
||||
tracing::warn!(
|
||||
"Oversized vectored read ({} > {}) for keys {}",
|
||||
@@ -1017,7 +1017,7 @@ impl DeltaLayerInner {
|
||||
Ok(blobs_buf) => blobs_buf,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
@@ -1678,7 +1678,7 @@ pub(crate) mod test {
|
||||
|
||||
let mut planned_blobs = Vec::new();
|
||||
for read in vectored_reads {
|
||||
for (at, meta) in read.blobs_at.as_slice() {
|
||||
for (at, (_, meta)) in read.blobs_at.as_slice() {
|
||||
planned_blobs.push(BlobSpec {
|
||||
key: meta.key,
|
||||
lsn: meta.lsn,
|
||||
|
||||
@@ -602,7 +602,7 @@ impl ImageLayerInner {
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.join(", ");
|
||||
tracing::warn!(
|
||||
"Oversized vectored read ({} > {}) for keys {}",
|
||||
@@ -630,7 +630,7 @@ impl ImageLayerInner {
|
||||
}
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
@@ -61,7 +62,7 @@ pub struct VectoredRead {
|
||||
pub start: u64,
|
||||
pub end: u64,
|
||||
/// Starting offsets and metadata for each blob in this read
|
||||
pub blobs_at: VecMap<u64, BlobMeta>,
|
||||
pub blobs_at: VecMap<u64, (u64, BlobMeta)>,
|
||||
}
|
||||
|
||||
impl VectoredRead {
|
||||
@@ -79,7 +80,7 @@ pub(crate) enum VectoredReadExtended {
|
||||
pub(crate) struct VectoredReadBuilder {
|
||||
start: u64,
|
||||
end: u64,
|
||||
blobs_at: VecMap<u64, BlobMeta>,
|
||||
blobs_at: VecMap<u64, (u64, BlobMeta)>,
|
||||
max_read_size: Option<usize>,
|
||||
}
|
||||
|
||||
@@ -97,7 +98,7 @@ impl VectoredReadBuilder {
|
||||
) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
.append(start_offset, (end_offset, meta))
|
||||
.expect("First insertion always succeeds");
|
||||
|
||||
Self {
|
||||
@@ -122,7 +123,7 @@ impl VectoredReadBuilder {
|
||||
} {
|
||||
self.end = end;
|
||||
self.blobs_at
|
||||
.append(start, meta)
|
||||
.append(start, (end, meta))
|
||||
.expect("LSNs are ordered within vectored reads");
|
||||
|
||||
return VectoredReadExtended::Yes;
|
||||
@@ -270,6 +271,57 @@ impl VectoredReadPlanner {
|
||||
|
||||
reads
|
||||
}
|
||||
|
||||
pub fn finish_v2(self) -> Vec<VectoredRead> {
|
||||
const STX_ALIGN: usize = 4096;
|
||||
|
||||
let x = self
|
||||
.blobs
|
||||
.into_iter()
|
||||
.flat_map(|(key, blobs_for_key)| {
|
||||
blobs_for_key
|
||||
.into_iter()
|
||||
.map(move |(lsn, start_offset, end_offset)| {
|
||||
VectoredReadBuilder::new(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
self.max_read_size,
|
||||
)
|
||||
})
|
||||
})
|
||||
.coalesce(|mut x, mut y| {
|
||||
if x.end == y.start && {
|
||||
if let Some(max_read_size) = x.max_read_size {
|
||||
x.size() + y.size() <= max_read_size
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} {
|
||||
if x.blobs_at.extend(&mut y.blobs_at).is_ok() {
|
||||
x.end = y.end;
|
||||
return Ok(x);
|
||||
}
|
||||
}
|
||||
Err((x, y))
|
||||
});
|
||||
|
||||
// for (key, blobs_for_key) in {
|
||||
|
||||
// // blobs_for_key
|
||||
// // .into_iter()
|
||||
// // .map(|(lsn, start_offset, end_offset)| {
|
||||
// // VectoredReadBuilder::new(
|
||||
// // start_offset,
|
||||
// // end_offset,
|
||||
// // BlobMeta { key, lsn },
|
||||
// // self.max_read_size,
|
||||
// // )
|
||||
// // }),
|
||||
// // );
|
||||
// }
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Disk reader for vectored blob spans (does not go through the page cache)
|
||||
@@ -317,18 +369,19 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
// Blobs in `read` only provide their starting offset. The end offset
|
||||
// of a blob is implicit: the start of the next blob if one exists
|
||||
// or the end of the read.
|
||||
let pairs = blobs_at.iter().zip(
|
||||
blobs_at
|
||||
.iter()
|
||||
.map(Some)
|
||||
.skip(1)
|
||||
.chain(std::iter::once(None)),
|
||||
);
|
||||
|
||||
// let pairs = blobs_at.iter().zip(
|
||||
// blobs_at
|
||||
// .iter()
|
||||
// .map(Some)
|
||||
// .skip(1)
|
||||
// .chain(std::iter::once(None)),
|
||||
// );
|
||||
|
||||
// Some scratch space, put here for reusing the allocation
|
||||
let mut decompressed_vec = Vec::new();
|
||||
|
||||
for ((offset, meta), next) in pairs {
|
||||
for (offset, (end_offset, meta)) in blobs_at.iter() {
|
||||
let offset_in_buf = offset - start_offset;
|
||||
let first_len_byte = buf[offset_in_buf as usize];
|
||||
|
||||
@@ -354,10 +407,8 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
};
|
||||
|
||||
let start_raw = offset_in_buf + size_length;
|
||||
let end_raw = match next {
|
||||
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
|
||||
None => start_raw + blob_size,
|
||||
};
|
||||
let end_raw = *end_offset;
|
||||
|
||||
assert_eq!(end_raw - start_raw, blob_size);
|
||||
let (start, end);
|
||||
if compression_bits == BYTE_UNCOMPRESSED {
|
||||
@@ -469,7 +520,7 @@ impl StreamingVectoredReadPlanner {
|
||||
self.read_builder = {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, BlobMeta { key, lsn })
|
||||
.append(start_offset, (end_offset, BlobMeta { key, lsn }))
|
||||
.expect("First insertion always succeeds");
|
||||
|
||||
Some(VectoredReadBuilder {
|
||||
|
||||
Reference in New Issue
Block a user