From fb78185074ae70f072ee77fe95e4c707503f6118 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 15 Aug 2024 10:29:20 +0000 Subject: [PATCH] merging of adjacent chunk reads, up to max batch size --- .../tenant/storage_layer/inmemory_layer.rs | 118 ++++++++++++++---- 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5b5c544caf..909674e89f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -15,6 +15,7 @@ use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Result}; use bytes::Bytes; use camino::Utf8PathBuf; +use itertools::Itertools; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; @@ -319,7 +320,7 @@ impl InMemoryLayer { const DIO_CHUNK_SIZE: usize = 512; - // Plan which parts of which pages need to be appended to which value_buf + // Plan which parts of which chunks need to be appended to which value_buf struct ChunkReadDestination<'a> { value_read: &'a ValueRead, offset_in_chunk: u32, @@ -348,28 +349,99 @@ impl InMemoryLayer { } } - // TODO: merge adjacent chunk reads (merging pass on the BTreeMap iterator) + // Merge adjacent chunk reads (merging pass on the BTreeMap iterator) + const MAX_CHUNK_BATCH_SIZE: usize = { + let desired = 128 * 1024; // 128k + if desired % DIO_CHUNK_SIZE != 0 { + panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE") + // compile-time error + } + desired / DIO_CHUNK_SIZE + }; + struct MergedRead<'a> { + start_chunk_no: u32, + nchunks: u32, + dsts: Vec>, + } + struct MergedChunkReadDestination<'a> { + value_read: &'a ValueRead, + offset_in_merged_read: u32, + len: u32, + } + let mut merged_reads: Vec = Vec::new(); + let mut chunk_reads = chunk_reads.into_iter().peekable(); + loop { + let mut last_chunk_no = None; + let to_merge: Vec<(u32, Vec)> = chunk_reads + .peeking_take_while(|(chunk_no, _)| { + if let Some(last_chunk_no) = last_chunk_no { + if *chunk_no != last_chunk_no + 1 { + return false; + } + } + last_chunk_no = Some(*chunk_no); + true + }) + .take(MAX_CHUNK_BATCH_SIZE) + .collect(); // TODO: avoid this .collect() + let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else { + break; + }; + let nchunks = to_merge.len() as u32; + let dsts = to_merge + .into_iter() + .enumerate() + .flat_map(|(i, (_, dsts))| { + dsts.into_iter().map( + move |ChunkReadDestination { + value_read, + offset_in_chunk, + len, + }| { + MergedChunkReadDestination { + value_read, + offset_in_merged_read: i as u32 * DIO_CHUNK_SIZE as u32 + + offset_in_chunk, + len, + } + }, + ) + }) + .collect(); + merged_reads.push(MergedRead { + start_chunk_no, + nchunks, + dsts, + }); + } + drop(chunk_reads); // Execute reads and fill the destination // TODO: prefetch - let get_chunk_buf = || Vec::with_capacity(DIO_CHUNK_SIZE); - let mut chunk_buf = get_chunk_buf(); - for (chunk_no, dsts) in chunk_reads.into_iter() { - let all_done = dsts.iter().all(|ChunkReadDestination { value_read, .. }| { - let value_buf = value_read.value_buf.lock().unwrap(); - let Ok(buf) = &*value_buf else { - return true; // on Err() there's no need to read more - }; - buf.len() == value_read.len as usize - }); + let get_chunk_buf = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE)); + for MergedRead { + start_chunk_no, + nchunks, + dsts, + } in merged_reads + { + let all_done = dsts + .iter() + .all(|MergedChunkReadDestination { value_read, .. }| { + let value_buf = value_read.value_buf.lock().unwrap(); + let Ok(buf) = &*value_buf else { + return true; // on Err() there's no need to read more + }; + buf.len() == value_read.len as usize + }); if all_done { continue; } - let (tmp, nread) = match inner + let (merged_read_buf_slice, nread) = match inner .file .read_at_to_end( - chunk_no * DIO_CHUNK_SIZE as u32, - chunk_buf.slice_full(), + start_chunk_no * DIO_CHUNK_SIZE as u32, + get_chunk_buf(nchunks).slice_full(), &ctx, ) .await @@ -377,30 +449,30 @@ impl InMemoryLayer { Ok(t) => t, Err(e) => { let e = Arc::new(e); - for ChunkReadDestination { value_read, .. } in dsts { + for MergedChunkReadDestination { value_read, .. } in dsts { *value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e)); // this will make later reads short-circuit, see top of loop body } - chunk_buf = get_chunk_buf(); // TODO: change API to return the buffer back on error continue; } }; - chunk_buf = tmp.into_inner(); - let contents = &chunk_buf[..nread]; - for ChunkReadDestination { + let merged_read_buf = merged_read_buf_slice.into_inner(); + assert_eq!(nread, merged_read_buf.len()); + let merged_read_buf = &merged_read_buf[..nread]; + for MergedChunkReadDestination { value_read, - offset_in_chunk, + offset_in_merged_read, len, } in dsts { if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() { buf.extend_from_slice( - &contents[offset_in_chunk as usize..(offset_in_chunk + len) as usize], + &merged_read_buf[offset_in_merged_read as usize + ..(offset_in_merged_read + len) as usize], ); } } } - drop(chunk_buf); // Process results into the reconstruct state 'next_key: for (key, value_reads) in reads {