mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
pageserver/blob_io: add vectored blob read
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use tokio_epoll_uring::{BoundedBuf, Slice};
|
||||
use utils::vec_map::VecMap;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
@@ -21,6 +22,17 @@ use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
pub struct VectoredBlobMeta<Meta> {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
pub meta: Meta,
|
||||
}
|
||||
|
||||
pub struct VectoredBlobs<Meta> {
|
||||
pub buf: BytesMut,
|
||||
pub metas: Vec<VectoredBlobMeta<Meta>>,
|
||||
}
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
@@ -93,6 +105,146 @@ impl<'a> BlockCursor<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VectoredRead<Meta> {
|
||||
pub start: u64,
|
||||
pub end: u64,
|
||||
pub blobs_at: VecMap<u64, Meta>,
|
||||
|
||||
max_read_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
pub enum VectoredReadExtended {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
impl<Meta> VectoredRead<Meta> {
|
||||
pub fn new(start_offset: u64, end_offset: u64, meta: Meta, max_read_size: usize) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
.expect("First insertion always succeeds");
|
||||
|
||||
Self {
|
||||
start: start_offset,
|
||||
end: end_offset,
|
||||
blobs_at,
|
||||
max_read_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, start: u64, end: u64, meta: Meta) -> VectoredReadExtended {
|
||||
let size = (end - start) as usize;
|
||||
if self.end == start && self.size() + size <= self.max_read_size {
|
||||
self.end = end;
|
||||
self.blobs_at
|
||||
.append(start, meta)
|
||||
.expect("LSNs are ordered within vectored reads");
|
||||
|
||||
return VectoredReadExtended::Yes;
|
||||
}
|
||||
|
||||
VectoredReadExtended::No
|
||||
}
|
||||
|
||||
pub fn last_meta(&self) -> Option<&Meta> {
|
||||
self.blobs_at.last().map(|(_, meta)| meta)
|
||||
}
|
||||
|
||||
pub fn truncate_at<Pred>(&mut self, mut pred: Pred)
|
||||
where
|
||||
Pred: FnMut(&Meta) -> bool,
|
||||
{
|
||||
if let Some(pos) = self
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.position(|(_, meta)| pred(meta))
|
||||
{
|
||||
self.blobs_at.truncate(pos);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VectoredBlobReader {
|
||||
file: VirtualFile,
|
||||
max_vectored_read_size: usize,
|
||||
}
|
||||
|
||||
impl VectoredBlobReader {
|
||||
pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self {
|
||||
Self {
|
||||
file,
|
||||
max_vectored_read_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_max_read_size(&self) -> usize {
|
||||
self.max_vectored_read_size
|
||||
}
|
||||
|
||||
pub async fn read_blobs<Meta: Copy>(
|
||||
&self,
|
||||
read: &VectoredRead<Meta>,
|
||||
buf: BytesMut,
|
||||
) -> Result<VectoredBlobs<Meta>, std::io::Error> {
|
||||
// tracing::info!("read_blobs(read={read:?}, read_size={})", read.size());
|
||||
|
||||
assert!(read.size() > 0);
|
||||
assert!(
|
||||
read.size() <= buf.capacity(),
|
||||
"{} > {}",
|
||||
read.size(),
|
||||
buf.capacity()
|
||||
);
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at_n(buf, read.start, read.size())
|
||||
.await?;
|
||||
|
||||
let blobs_at = read.blobs_at.as_slice();
|
||||
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
|
||||
|
||||
let mut metas = Vec::new();
|
||||
let pairs = blobs_at.iter().zip(
|
||||
blobs_at
|
||||
.iter()
|
||||
.map(Some)
|
||||
.skip(1)
|
||||
.chain(std::iter::once(None)),
|
||||
);
|
||||
for ((offset, meta), next) in pairs {
|
||||
let offset_in_buf = offset - start_offset;
|
||||
let first_len_byte = buf[offset_in_buf as usize];
|
||||
|
||||
// Each blob is prefixed by a header containing it's size.
|
||||
// Skip that header to find the start of the data.
|
||||
// TODO: validate against the stored size
|
||||
let size_offset = if first_len_byte < 0x80 { 1 } else { 4 };
|
||||
|
||||
let start = offset_in_buf + size_offset;
|
||||
let end = match next {
|
||||
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
|
||||
None => read.end - start_offset,
|
||||
};
|
||||
|
||||
metas.push(VectoredBlobMeta {
|
||||
start: start as usize,
|
||||
end: end as usize,
|
||||
meta: *meta,
|
||||
})
|
||||
}
|
||||
|
||||
Ok(VectoredBlobs { buf, metas })
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
|
||||
Reference in New Issue
Block a user