diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ec70bdc679..49e0645952 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -13,6 +13,7 @@ //! use bytes::{BufMut, BytesMut}; use tokio_epoll_uring::{BoundedBuf, Slice}; +use utils::vec_map::VecMap; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; @@ -21,6 +22,17 @@ use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; +pub struct VectoredBlobMeta { + pub start: usize, + pub end: usize, + pub meta: Meta, +} + +pub struct VectoredBlobs { + pub buf: BytesMut, + pub metas: Vec>, +} + impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. pub async fn read_blob( @@ -93,6 +105,146 @@ impl<'a> BlockCursor<'a> { } } +#[derive(Debug)] +pub struct VectoredRead { + pub start: u64, + pub end: u64, + pub blobs_at: VecMap, + + max_read_size: usize, +} + +#[derive(Eq, PartialEq)] +pub enum VectoredReadExtended { + Yes, + No, +} + +impl VectoredRead { + pub fn new(start_offset: u64, end_offset: u64, meta: Meta, max_read_size: usize) -> Self { + let mut blobs_at = VecMap::default(); + blobs_at + .append(start_offset, meta) + .expect("First insertion always succeeds"); + + Self { + start: start_offset, + end: end_offset, + blobs_at, + max_read_size, + } + } + + pub fn extend(&mut self, start: u64, end: u64, meta: Meta) -> VectoredReadExtended { + let size = (end - start) as usize; + if self.end == start && self.size() + size <= self.max_read_size { + self.end = end; + self.blobs_at + .append(start, meta) + .expect("LSNs are ordered within vectored reads"); + + return VectoredReadExtended::Yes; + } + + VectoredReadExtended::No + } + + pub fn last_meta(&self) -> Option<&Meta> { + self.blobs_at.last().map(|(_, meta)| meta) + } + + pub fn truncate_at(&mut self, mut pred: Pred) + where + Pred: FnMut(&Meta) -> bool, + { + if let Some(pos) = self + .blobs_at + .as_slice() + .iter() + .position(|(_, meta)| pred(meta)) + { + self.blobs_at.truncate(pos); + } + } + + pub fn size(&self) -> usize { + (self.end - self.start) as usize + } +} + +pub struct VectoredBlobReader { + file: VirtualFile, + max_vectored_read_size: usize, +} + +impl VectoredBlobReader { + pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self { + Self { + file, + max_vectored_read_size, + } + } + + pub fn get_max_read_size(&self) -> usize { + self.max_vectored_read_size + } + + pub async fn read_blobs( + &self, + read: &VectoredRead, + buf: BytesMut, + ) -> Result, std::io::Error> { + // tracing::info!("read_blobs(read={read:?}, read_size={})", read.size()); + + assert!(read.size() > 0); + assert!( + read.size() <= buf.capacity(), + "{} > {}", + read.size(), + buf.capacity() + ); + let buf = self + .file + .read_exact_at_n(buf, read.start, read.size()) + .await?; + + let blobs_at = read.blobs_at.as_slice(); + let start_offset = blobs_at.first().expect("VectoredRead is never empty").0; + + let mut metas = Vec::new(); + let pairs = blobs_at.iter().zip( + blobs_at + .iter() + .map(Some) + .skip(1) + .chain(std::iter::once(None)), + ); + for ((offset, meta), next) in pairs { + let offset_in_buf = offset - start_offset; + let first_len_byte = buf[offset_in_buf as usize]; + + // Each blob is prefixed by a header containing it's size. + // Skip that header to find the start of the data. + // TODO: validate against the stored size + let size_offset = if first_len_byte < 0x80 { 1 } else { 4 }; + + let start = offset_in_buf + size_offset; + let end = match next { + Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, + None => read.end - start_offset, + }; + + metas.push(VectoredBlobMeta { + start: start as usize, + end: end as usize, + meta: *meta, + }) + } + + Ok(VectoredBlobs { buf, metas }) + } +} + /// A wrapper of `VirtualFile` that allows users to write blobs. /// /// If a `BlobWriter` is dropped, the internal buffer will be