From a58780ad0d219a73d7f08fee5404587f5b0963d1 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Wed, 14 Aug 2024 05:17:46 +0000 Subject: [PATCH] WIP: refactor vectored read to do dio-aligned section coalesce Signed-off-by: Yuchen Liang --- .../src/tenant/storage_layer/delta_layer.rs | 6 +- .../src/tenant/storage_layer/image_layer.rs | 4 +- pageserver/src/tenant/vectored_blob_io.rs | 85 +++++++++++++++---- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f4e965b99a..eae54851c7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -975,7 +975,7 @@ impl DeltaLayerInner { .blobs_at .as_slice() .iter() - .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn)) + .map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn)) .join(", "); tracing::warn!( "Oversized vectored read ({} > {}) for keys {}", @@ -1017,7 +1017,7 @@ impl DeltaLayerInner { Ok(blobs_buf) => blobs_buf, Err(err) => { let kind = err.kind(); - for (_, blob_meta) in read.blobs_at.as_slice() { + for (_, (_, blob_meta)) in read.blobs_at.as_slice() { reconstruct_state.on_key_error( blob_meta.key, PageReconstructError::from(anyhow!( @@ -1678,7 +1678,7 @@ pub(crate) mod test { let mut planned_blobs = Vec::new(); for read in vectored_reads { - for (at, meta) in read.blobs_at.as_slice() { + for (at, (_, meta)) in read.blobs_at.as_slice() { planned_blobs.push(BlobSpec { key: meta.key, lsn: meta.lsn, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f9d3fdf186..008200b4ec 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -602,7 +602,7 @@ impl ImageLayerInner { .blobs_at .as_slice() .iter() - .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn)) + .map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn)) .join(", "); tracing::warn!( "Oversized vectored read ({} > {}) for keys {}", @@ -630,7 +630,7 @@ impl ImageLayerInner { } Err(err) => { let kind = err.kind(); - for (_, blob_meta) in read.blobs_at.as_slice() { + for (_, (_, blob_meta)) in read.blobs_at.as_slice() { reconstruct_state.on_key_error( blob_meta.key, PageReconstructError::from(anyhow!( diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 54a3ad789b..fb5be4412d 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -19,6 +19,7 @@ use std::collections::BTreeMap; use std::num::NonZeroUsize; use bytes::BytesMut; +use itertools::Itertools; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -61,7 +62,7 @@ pub struct VectoredRead { pub start: u64, pub end: u64, /// Starting offsets and metadata for each blob in this read - pub blobs_at: VecMap, + pub blobs_at: VecMap, } impl VectoredRead { @@ -79,7 +80,7 @@ pub(crate) enum VectoredReadExtended { pub(crate) struct VectoredReadBuilder { start: u64, end: u64, - blobs_at: VecMap, + blobs_at: VecMap, max_read_size: Option, } @@ -97,7 +98,7 @@ impl VectoredReadBuilder { ) -> Self { let mut blobs_at = VecMap::default(); blobs_at - .append(start_offset, meta) + .append(start_offset, (end_offset, meta)) .expect("First insertion always succeeds"); Self { @@ -122,7 +123,7 @@ impl VectoredReadBuilder { } { self.end = end; self.blobs_at - .append(start, meta) + .append(start, (end, meta)) .expect("LSNs are ordered within vectored reads"); return VectoredReadExtended::Yes; @@ -270,6 +271,57 @@ impl VectoredReadPlanner { reads } + + pub fn finish_v2(self) -> Vec { + const STX_ALIGN: usize = 4096; + + let x = self + .blobs + .into_iter() + .flat_map(|(key, blobs_for_key)| { + blobs_for_key + .into_iter() + .map(move |(lsn, start_offset, end_offset)| { + VectoredReadBuilder::new( + start_offset, + end_offset, + BlobMeta { key, lsn }, + self.max_read_size, + ) + }) + }) + .coalesce(|mut x, mut y| { + if x.end == y.start && { + if let Some(max_read_size) = x.max_read_size { + x.size() + y.size() <= max_read_size + } else { + true + } + } { + if x.blobs_at.extend(&mut y.blobs_at).is_ok() { + x.end = y.end; + return Ok(x); + } + } + Err((x, y)) + }); + + // for (key, blobs_for_key) in { + + // // blobs_for_key + // // .into_iter() + // // .map(|(lsn, start_offset, end_offset)| { + // // VectoredReadBuilder::new( + // // start_offset, + // // end_offset, + // // BlobMeta { key, lsn }, + // // self.max_read_size, + // // ) + // // }), + // // ); + // } + todo!() + } } /// Disk reader for vectored blob spans (does not go through the page cache) @@ -317,18 +369,19 @@ impl<'a> VectoredBlobReader<'a> { // Blobs in `read` only provide their starting offset. The end offset // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - let pairs = blobs_at.iter().zip( - blobs_at - .iter() - .map(Some) - .skip(1) - .chain(std::iter::once(None)), - ); + + // let pairs = blobs_at.iter().zip( + // blobs_at + // .iter() + // .map(Some) + // .skip(1) + // .chain(std::iter::once(None)), + // ); // Some scratch space, put here for reusing the allocation let mut decompressed_vec = Vec::new(); - for ((offset, meta), next) in pairs { + for (offset, (end_offset, meta)) in blobs_at.iter() { let offset_in_buf = offset - start_offset; let first_len_byte = buf[offset_in_buf as usize]; @@ -354,10 +407,8 @@ impl<'a> VectoredBlobReader<'a> { }; let start_raw = offset_in_buf + size_length; - let end_raw = match next { - Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, - None => start_raw + blob_size, - }; + let end_raw = *end_offset; + assert_eq!(end_raw - start_raw, blob_size); let (start, end); if compression_bits == BYTE_UNCOMPRESSED { @@ -469,7 +520,7 @@ impl StreamingVectoredReadPlanner { self.read_builder = { let mut blobs_at = VecMap::default(); blobs_at - .append(start_offset, BlobMeta { key, lsn }) + .append(start_offset, (end_offset, BlobMeta { key, lsn })) .expect("First insertion always succeeds"); Some(VectoredReadBuilder {