Compare commits

...

2 Commits

Author SHA1 Message Date
Yuchen Liang
cc818ca7db remove comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-14 05:22:46 +00:00
Yuchen Liang
a58780ad0d WIP: refactor vectored read to do dio-aligned section coalesce
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-14 05:17:46 +00:00
3 changed files with 50 additions and 26 deletions

View File

@@ -975,7 +975,7 @@ impl DeltaLayerInner {
.blobs_at
.as_slice()
.iter()
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.join(", ");
tracing::warn!(
"Oversized vectored read ({} > {}) for keys {}",
@@ -1017,7 +1017,7 @@ impl DeltaLayerInner {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
@@ -1678,7 +1678,7 @@ pub(crate) mod test {
let mut planned_blobs = Vec::new();
for read in vectored_reads {
for (at, meta) in read.blobs_at.as_slice() {
for (at, (_, meta)) in read.blobs_at.as_slice() {
planned_blobs.push(BlobSpec {
key: meta.key,
lsn: meta.lsn,

View File

@@ -602,7 +602,7 @@ impl ImageLayerInner {
.blobs_at
.as_slice()
.iter()
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.join(", ");
tracing::warn!(
"Oversized vectored read ({} > {}) for keys {}",
@@ -630,7 +630,7 @@ impl ImageLayerInner {
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(

View File

@@ -19,6 +19,7 @@ use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use bytes::BytesMut;
use itertools::Itertools;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -61,7 +62,7 @@ pub struct VectoredRead {
pub start: u64,
pub end: u64,
/// Starting offsets and metadata for each blob in this read
pub blobs_at: VecMap<u64, BlobMeta>,
pub blobs_at: VecMap<u64, (u64, BlobMeta)>,
}
impl VectoredRead {
@@ -79,7 +80,7 @@ pub(crate) enum VectoredReadExtended {
pub(crate) struct VectoredReadBuilder {
start: u64,
end: u64,
blobs_at: VecMap<u64, BlobMeta>,
blobs_at: VecMap<u64, (u64, BlobMeta)>,
max_read_size: Option<usize>,
}
@@ -97,7 +98,7 @@ impl VectoredReadBuilder {
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.append(start_offset, (end_offset, meta))
.expect("First insertion always succeeds");
Self {
@@ -122,7 +123,7 @@ impl VectoredReadBuilder {
} {
self.end = end;
self.blobs_at
.append(start, meta)
.append(start, (end, meta))
.expect("LSNs are ordered within vectored reads");
return VectoredReadExtended::Yes;
@@ -270,6 +271,42 @@ impl VectoredReadPlanner {
reads
}
pub fn finish_v2(self) -> Vec<VectoredRead> {
const STX_ALIGN: usize = 4096;
self.blobs
.into_iter()
.flat_map(|(key, blobs_for_key)| {
blobs_for_key
.into_iter()
.map(move |(lsn, start_offset, end_offset)| {
VectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
)
})
})
.coalesce(|mut x, mut y| {
if x.end == y.start && {
if let Some(max_read_size) = x.max_read_size {
x.size() + y.size() <= max_read_size
} else {
true
}
} {
if x.blobs_at.extend(&mut y.blobs_at).is_ok() {
x.end = y.end;
return Ok(x);
}
}
Err((x, y))
})
.map(|x| x.build())
.collect()
}
}
/// Disk reader for vectored blob spans (does not go through the page cache)
@@ -314,21 +351,10 @@ impl<'a> VectoredBlobReader<'a> {
let mut metas = Vec::with_capacity(blobs_at.len());
// Blobs in `read` only provide their starting offset. The end offset
// of a blob is implicit: the start of the next blob if one exists
// or the end of the read.
let pairs = blobs_at.iter().zip(
blobs_at
.iter()
.map(Some)
.skip(1)
.chain(std::iter::once(None)),
);
// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();
for ((offset, meta), next) in pairs {
for (offset, (end_offset, meta)) in blobs_at.iter() {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
@@ -354,10 +380,8 @@ impl<'a> VectoredBlobReader<'a> {
};
let start_raw = offset_in_buf + size_length;
let end_raw = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start_raw + blob_size,
};
let end_raw = *end_offset;
assert_eq!(end_raw - start_raw, blob_size);
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
@@ -469,7 +493,7 @@ impl StreamingVectoredReadPlanner {
self.read_builder = {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, BlobMeta { key, lsn })
.append(start_offset, (end_offset, BlobMeta { key, lsn }))
.expect("First insertion always succeeds");
Some(VectoredReadBuilder {