From 37bfa04996f3fd445c2251a877f09f4d0db517b6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Aug 2024 16:35:23 +0000 Subject: [PATCH] implement coalescing of multiple reads onto same page --- pageserver/benches/bench_ingest.rs | 7 +- pageserver/src/tenant/ephemeral_file.rs | 7 +- .../tenant/storage_layer/inmemory_layer.rs | 194 +++++++++++++----- pageserver/src/tenant/timeline.rs | 4 +- 4 files changed, 151 insertions(+), 61 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 0336302de0..f2e075ba1d 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -67,7 +67,8 @@ async fn ingest( let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?; - let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?; + let value = Value::Image(Bytes::from(vec![0u8; put_size])); + let data = value.ser()?; let ctx = RequestContext::new( pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler, pageserver::context::DownloadBehavior::Download, @@ -95,7 +96,9 @@ async fn ingest( } } - layer.put_value(key.to_compact(), lsn, &data, &ctx).await?; + layer + .put_value(key.to_compact(), lsn, &data, value.will_init(), &ctx) + .await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index e782e01859..893f883618 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -84,6 +84,7 @@ impl EphemeralFile { pub(crate) async fn write_blob( &mut self, buf: &[u8], + will_init: bool, ctx: &RequestContext, ) -> Result { let pos = self.rw.bytes_written(); @@ -105,7 +106,11 @@ impl EphemeralFile { self.rw.write_all_borrowed(buf, ctx).await?; - Ok(InMemoryLayerIndexValue { pos, len }) + Ok(InMemoryLayerIndexValue { + pos, + len, + will_init, + }) } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index fa9d8e9aad..3b7aa37c47 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -21,8 +21,8 @@ use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; -use std::collections::BTreeMap; -use std::sync::{Arc, OnceLock}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Mutex, OnceLock}; use std::time::Instant; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; @@ -93,6 +93,7 @@ pub struct InMemoryLayerInner { pub(crate) struct InMemoryLayerIndexValue { pub(crate) pos: u32, pub(crate) len: u32, + pub(crate) will_init: bool, // XXX this blows up the size, can we shrink down `len`? } impl std::fmt::Debug for InMemoryLayerInner { @@ -282,6 +283,14 @@ impl InMemoryLayer { let inner = self.inner.read().await; + // Determine ValueReads + let mut reads: HashMap> = HashMap::new(); + struct ValueRead { + entry_lsn: Lsn, + pos: u32, + len: u32, + value_buf: Mutex, Arc>>, + } for range in keyspace.ranges.iter() { for (key, vec_map) in inner .index @@ -295,62 +304,126 @@ impl InMemoryLayer { let slice = vec_map.slice_range(lsn_range); - 'foreach_value: for (entry_lsn, value) in slice.iter().rev() { - let InMemoryLayerIndexValue { pos, len } = value; - - // TODO: coalesce multiple reads that hit the same page into one page read - // Yuchen is working on a VectoredReadPlanner change to support this. - // In the meantime, we prepare the way for direct IO by doing full page reads. - let len = usize::try_from(*len).unwrap(); - let mut value_buf = Vec::with_capacity(len); - let mut page_buf_storage = Some(PageBuf::from(Box::new([0u8; PAGE_SZ]))); - let mut page_no = *pos / (PAGE_SZ as u32); - let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap(); - while value_buf.len() < len { - let read_result = match inner - .file - .read_page( - page_no, - page_buf_storage - .take() - .expect("we put it back each iteration"), - &ctx, - ) - .await - { - Ok(page) => page, - Err(e) => { - reconstruct_state - .on_key_error(key, PageReconstructError::from(anyhow!(e))); - break 'foreach_value; - } - }; - { - let page_contents = read_result.contents(); - let remaining_in_page = std::cmp::min( - len - value_buf.len(), - page_contents.len() - offset_in_page, - ); - value_buf.extend_from_slice( - &page_contents[offset_in_page..offset_in_page + remaining_in_page], - ); - } - offset_in_page = 0; - page_no += 1; - page_buf_storage = Some(read_result.into_page_buf()); + for (entry_lsn, index_value) in slice.iter().rev() { + reads.entry(key).or_default().push(ValueRead { + entry_lsn: *entry_lsn, + pos: index_value.pos, + len: index_value.len, + value_buf: Mutex::new(Ok(Vec::with_capacity(index_value.len as usize))), + }); + if index_value.will_init { + break; } - assert!(value_buf.len() == len); + } + } + } - let value = Value::des(&value_buf); - if let Err(e) = value { + // Plan which parts of which pages need to be appended to which value_buf + struct PageReadDestination<'a> { + value_read: &'a ValueRead, + offset_in_page: u32, + len: u32, + } + // use of BTreeMap's sorted iterator is critical to esnure value_buf is filled in order + let mut page_reads: BTreeMap> = BTreeMap::new(); + for value_read in reads.iter().flat_map(|(_, v)| v.iter()) { + let ValueRead { pos, len, .. } = value_read; + let mut remaining = usize::try_from(*len).unwrap(); + let mut page_no = *pos / (PAGE_SZ as u32); + let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap(); + while remaining > 0 { + let remaining_in_page = std::cmp::min(remaining, PAGE_SZ - offset_in_page); + page_reads + .entry(page_no) + .or_default() + .push(PageReadDestination { + value_read, + offset_in_page: offset_in_page as u32, + len: remaining_in_page as u32, + }); + offset_in_page = 0; + page_no += 1; + remaining -= remaining_in_page; + } + } + + // Execute reads and fill the destination + // TODO: prefetch + let mut page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); + for (page_no, dsts) in page_reads.into_iter() { + let all_done = dsts.iter().all(|PageReadDestination { value_read, .. }| { + let value_buf = value_read.value_buf.lock().unwrap(); + let Ok(buf) = &*value_buf else { + return true; // on Err() there's no need to read more + }; + buf.len() == value_read.len as usize + }); + if all_done { + continue; + } + let read_result = match inner.file.read_page(page_no, page_buf, &ctx).await { + Ok(read_result) => read_result, + Err(e) => { + let e = Arc::new(e); + for PageReadDestination { value_read, .. } in dsts { + *value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e)); + // this will make later reads short-circuit, see top of loop body + } + page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); // TODO: change read_page API to return the buffer + continue; + } + }; + let page_contents = read_result.contents(); + for PageReadDestination { + value_read, + offset_in_page, + len, + } in dsts + { + if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() { + buf.extend_from_slice( + &page_contents[offset_in_page as usize..(offset_in_page + len) as usize], + ); + } + } + page_buf = read_result.into_page_buf(); + } + drop(page_buf); + + // Process results into the reconstruct state + 'next_key: for (key, value_reads) in reads { + for ValueRead { + entry_lsn, + value_buf, + len, + .. + } in value_reads + { + let value_buf = value_buf.into_inner().unwrap(); + match value_buf { + Err(e) => { reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); - break; + continue 'next_key; } + Ok(value_buf) => { + assert_eq!( + value_buf.len(), + len as usize, + "bug in this function's planning logic" + ); + let value = Value::des(&value_buf); + if let Err(e) = value { + reconstruct_state + .on_key_error(key, PageReconstructError::from(anyhow!(e))); + continue 'next_key; + } - let key_situation = - reconstruct_state.update_key(&key, *entry_lsn, value.unwrap()); - if key_situation == ValueReconstructSituation::Complete { - break; + let key_situation = + reconstruct_state.update_key(&key, entry_lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + // TODO: metric to see if we fetched more values than necessary + continue 'next_key; + } } } } @@ -431,11 +504,13 @@ impl InMemoryLayer { key: CompactKey, lsn: Lsn, buf: &[u8], + will_init: bool, ctx: &RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, buf, ctx).await + self.put_value_locked(&mut inner, key, lsn, buf, will_init, ctx) + .await } async fn put_value_locked( @@ -444,6 +519,7 @@ impl InMemoryLayer { key: CompactKey, lsn: Lsn, buf: &[u8], + will_init: bool, ctx: &RequestContext, ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); @@ -452,6 +528,7 @@ impl InMemoryLayer { .file .write_blob( buf, + will_init, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(), @@ -593,15 +670,18 @@ impl InMemoryLayer { for (key, vec_map) in inner.index.iter() { // Write all page versions for (lsn, entry) in vec_map.as_slice() { - let InMemoryLayerIndexValue { pos, len } = entry; + let InMemoryLayerIndexValue { + pos, + len, + will_init, + } = entry; let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize); - let will_init = Value::des(&buf)?.will_init(); let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), *lsn, buf.slice_len(), - will_init, + *will_init, ctx, ) .await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a73e864d11..6d920b9a8a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5577,7 +5577,9 @@ impl<'a> TimelineWriter<'a> { let action = self.get_open_layer_action(lsn, buf_size); let layer = self.handle_open_layer_action(lsn, action, ctx).await?; - let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await; + let res = layer + .put_value(key.to_compact(), lsn, &buf, value.will_init(), ctx) + .await; if res.is_ok() { // Update the current size only when the entire write was ok.